/
swim.c
2335 lines (2234 loc) · 70.4 KB
/
swim.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "swim.h"
#include "swim_io.h"
#include "swim_proto.h"
#include "swim_ev.h"
#include "uri/uri.h"
#include "fiber.h"
#include "msgpuck.h"
#include "assoc.h"
#include "sio.h"
#include "trigger.h"
#define HEAP_FORWARD_DECLARATION
#include "salad/heap.h"
/**
* SWIM - Scalable Weakly-consistent Infection-style Process Group
* Membership Protocol. It consists of 2 components: events
* dissemination and failure detection, and stores in memory a
* table of known remote hosts - members. Also some SWIM
* implementations have an additional component: anti-entropy -
* periodical broadcast of a random subset of the member table.
*
* Each SWIM component is different in both protocol payload and
* goals, and could even use different messages to send data. But
* SWIM describes piggybacking of messages: a ping message can
* piggyback a dissemination's one.
*
* SWIM has a main operating cycle during which it randomly
* chooses members from a member table and sends to them events +
* ping. Replies are processed out of the main cycle,
* asynchronously.
*
* When a member unacknowledged too many pings, its status is
* changed to 'suspected'. The SWIM paper describes suspicion
* subcomponent as a protection against false-positive detection
* of alive members as dead. It happens when a member is
* overloaded and responds to pings too slow, or when the network
* is in trouble and packets can not go through some channels.
* When a member is suspected, another instance pings it
* indirectly via other members. It sends a fixed number of pings
* to the suspected one in parallel via additional hops selected
* randomly among other members.
*
* Random selection in all the components provides even network
* load of ~1 message on each member per one protocol step
* regardless of the cluster size. Without randomness each member
* would receive a network load of N messages in each protocol
* step, where N is the cluster size.
*
* To speed up propagation of new information by means of a few
* random messages SWIM proposes a kind of fairness: when
* selecting a next random member to ping, the protocol prefers
* LRU members. In code it would be too complicated, so
* Tarantool's implementation is slightly different, easier:
*
* Tarantool splits protocol operation into rounds. At the
* beginning of a round all members are randomly reordered and
* linked into a list. At each round step a member is popped from
* the list head, a message is sent to it, and then it waits for
* the next round. In such implementation all random selection of
* the original SWIM is executed once per round. The round is
* 'planned', actually. A list is used instead of an array since
* new members can be added to its tail without realloc, and dead
* members can be removed easily as well.
*
* Also Tarantool implements the third SWIM component -
* anti-entropy. Why is it needed and even vital? Consider the
* example: two SWIM nodes, both are alive. Nothing happens, so
* the events list is empty, only pings are being sent
* periodically. Then a third node appears. It knows about one of
* the existing nodes. How can it learn about the rest? Sure,
* its known counterpart can try to notify its peer, but it is
* UDP, so this event can be lost. Anti-entropy is an extra simple
* component, it just piggybacks random part of the member table
* with each regular message. In the example above the new node
* will learn about the third one via anti-entropy messages from
* the second one sooner or later.
*
* Surprisingly, original SWIM does not describe any addressing,
* how to uniquely identify a member. IP/port fallaciously could
* be considered as a good unique identifier, but some arguments
* below demolish this belief:
*
* - if instances work in separate containers, they can have
* the same IP/port inside a container NATed to a unique
* IP/port outside the container;
*
* - IP/port are likely to change during instance lifecycle.
* Once IP/port are changed, a ghost of the old member's
* configuration still lives for a while until it is
* suspected, dead and GC-ed. Taking into account that ACK
* timeout can be tens of seconds, 'Dead Souls' can exist
* unpleasantly long.
*
* Tarantool SWIM implementation uses UUIDs as unique identifiers.
* UUID is much more unlikely to change than IP/port. But even if
* that happens, dissemination component for a while gossips the
* new UUID together with the old one.
*
* SWIM implementation is split into 3 parts: protocol logic,
* transport level, protocol structure.
*
* - protocol logic consists of how to react on various
* events, failure detection pings/acks, how often to send
* messages, handles the logic of the three components
* (failure detection, anti-entropy, dissemination);
*
* - transport level handles routing, transport headers,
* packet forwarding;
*
* - protocol structure describes how packet looks in
* MessagePack, how sections and headers follow each other.
*/
enum {
/**
* How often to send membership messages and pings in
* seconds. Nothing special in this concrete default
* value.
*/
HEARTBEAT_RATE_DEFAULT = 1,
/**
* If a ping was sent, it is considered lost after this
* time without an ack. Nothing special in this value.
*/
ACK_TIMEOUT_DEFAULT = 30,
/**
* If an alive member has not been responding to pings
* this number of times, it is suspected to be dead. To
* confirm the death it should fail more pings.
*/
NO_ACKS_TO_SUSPECT = 2,
/**
* If a suspected member has not been responding to pings
* this number of times, it is considered dead. According
* to the SWIM paper, for a member it is sufficient to
* miss one direct ping, and an arbitrary but fixed number
* of simultaneous indirect pings, to be considered dead.
* Seems too little, so here it is bigger.
*/
NO_ACKS_TO_DEAD = 3,
/**
* If a member is confirmed to be dead, it is removed from
* the member table after at least this number of
* unacknowledged pings. According to the SWIM paper, a
* dead member is deleted immediately. But we keep it for
* a while to 1) maybe refute its dead status,
* 2) disseminate the status via dissemination and
* anti-entropy components.
*/
NO_ACKS_TO_GC = 2,
/**
* Number of pings sent indirectly to a member via other
* members when it did not answer on a regular ping. The
* messages are sent in parallel and via different
* members.
*/
INDIRECT_PING_COUNT = 2,
};
/**
* Return a random number within given boundaries.
*
* Instead of blindly calculating a modulo, scale the random
* number down the given boundaries to preserve the original
* distribution. The result belongs to range [start, end].
*/
static inline int
swim_scaled_rand(int start, int end)
{
assert(end >= start);
/*
* RAND_MAX is likely to be INT_MAX - hardly SWIM will
* ever be used in such a huge cluster.
*/
assert(end - start < RAND_MAX);
return rand() / (RAND_MAX / (end - start + 1) + 1);
}
/** Calculate UUID hash to use as a member table key. */
static inline uint32_t
swim_uuid_hash(const struct tt_uuid *uuid)
{
return mh_strn_hash((const char *) uuid, UUID_LEN);
}
/**
* Compare two incarnation values and collect their diff into
* @a diff out parameter. The difference is used to fire triggers.
*/
static inline int
swim_incarnation_diff(const struct swim_incarnation *l,
const struct swim_incarnation *r,
enum swim_ev_mask *diff)
{
if (l->version == r->version) {
*diff = 0;
return 0;
}
*diff = SWIM_EV_NEW_VERSION;
return l->version < r->version ? -1 : 1;
}
int
swim_incarnation_cmp(const struct swim_incarnation *l,
const struct swim_incarnation *r)
{
enum swim_ev_mask unused;
return swim_incarnation_diff(l, r, &unused);
}
/**
* A cluster member description. This structure describes the
* last known state of an instance. This state is updated
* periodically via UDP according to SWIM protocol rules.
*/
struct swim_member {
/**
* Member status. Since the communication goes via UDP,
* actual status can be different, as well as different on
* other SWIM nodes. But SWIM guarantees that each member
* will learn a real status of an instance sometime.
*/
enum swim_member_status status;
/**
* Address of the instance to which to send UDP packets.
*/
struct sockaddr_in addr;
/**
* A unique identifier of the member. Is used as a key in
* the members table.
*/
struct tt_uuid uuid;
/**
* Cached hash of the uuid for the member table lookups.
*/
uint32_t hash;
/**
* Position in a queue of members in the current round.
*/
struct rlist in_round_queue;
/**
* Reference counter. Used by public API to prevent the
* member deletion after it is obtained by UUID or from an
* iterator.
*/
int refs;
/**
* True, if the member was dropped from the member table.
* At the same time it still can be not deleted, if users
* of the public API referenced the member. Dropped member
* is not valid anymore and should be dereferenced.
*/
bool is_dropped;
/**
*
* Dissemination component
*
* Dissemination component sends events. Event is a
* notification about some member state update. The member
* maintains a different event type for each significant
* attribute - status, incarnation, etc not to send entire
* member state each time any member attribute changes.
*
* According to SWIM, an event should be sent to all
* members at least once - for that a TTD
* (time-to-disseminate) counter is maintained for each
* independent event type.
*
* When a member state changes, the TTD is reset to the
* cluster size. It is then decremented after each send.
* This guarantees that each member state change is sent
* to each SWIM member at least once. If a new event of
* the same type is generated before a round is finished,
* the current event object is updated in place with reset
* of the TTD.
*
* To conclude, TTD works in two ways: to see which
* specific member attribute needs dissemination and to
* track how many cluster members still need to learn
* about the change from this instance.
*/
/**
* General TTD reset each time when any visible member
* attribute is updated. It is always bigger or equal than
* any other TTDs. In addition it helps to keep a dead
* member not dropped until the TTD gets zero so as to
* allow other members to learn the dead status.
*/
int status_ttd;
/** Arbitrary user data, disseminated on each change. */
char *payload;
/** Payload size, in bytes. */
uint16_t payload_size;
/**
* True, if the payload is thought to be of the most
* actual version. In such a case it can be disseminated
* further. Otherwise @a payload is suspected to be
* outdated and can be updated in two cases only:
*
* 1) when it is received with a bigger incarnation from
* anywhere;
*
* 2) when it is received with the same incarnation, but
* local payload is outdated.
*
* A payload can become outdated, if anyhow a new
* incarnation of the member has been learned, but not a
* new payload. For example, a message with new payload
* could be lost, and at the same time this instance
* responds to a ping with newly incarnated ack. The ack
* receiver will learn the new incarnation, but not the
* new payload.
*
* In this case it can't be said exactly whether the
* member has updated payload, or another attribute. The
* only way here is to wait until the most actual payload
* will be received from another instance. Note, that such
* an instance always exists - the payload originator
* instance.
*/
bool is_payload_up_to_date;
/**
* TTD of payload. At most this number of times payload is
* sent as a part of dissemination component. Reset on
* each payload update.
*/
int payload_ttd;
/**
* All created events are put into a queue sorted by event
* time.
*/
struct rlist in_dissemination_queue;
/**
* Each time a member is updated, or created, or dropped,
* it is added to an event queue. Members from this queue
* are dispatched into user defined triggers.
*/
struct stailq_entry in_event_queue;
/**
* Mask of events happened with this member since a
* previous trigger invocation. Once the events are
* delivered into a trigger, the mask is nullified and
* starts collecting new events.
*/
enum swim_ev_mask events;
/**
*
* Failure detection component
*/
/**
* A monotonically growing value to refute old member's
* state, characterized by a triplet
* {incarnation, status, address}.
*/
struct swim_incarnation incarnation;
/**
* How many recent pings did not receive an ack while the
* member was in the current status. When this number
* reaches a configured threshold the instance is marked
* as dead. After a few more unacknowledged it is removed
* from the member table. This counter is reset on each
* acknowledged ping, status or incarnation change.
*/
int unacknowledged_pings;
/**
* A deadline when we stop expecting a response to the
* ping and account it as unacknowledged.
*/
double ping_deadline;
/**
* Position in a queue of members waiting for an ack.
* A member is added to the queue when we send a ping
* message to it.
*/
struct heap_node in_wait_ack_heap;
/** Ready at hand regular ACK task. */
struct swim_task ack_task;
/** Ready at hand regular PING task. */
struct swim_task ping_task;
};
#define mh_name _swim_table
struct mh_swim_table_key {
uint32_t hash;
const struct tt_uuid *uuid;
};
#define mh_key_t struct mh_swim_table_key
#define mh_node_t struct swim_member *
#define mh_arg_t void *
#define mh_hash(a, arg) ((*a)->hash)
#define mh_hash_key(a, arg) (a.hash)
#define mh_cmp(a, b, arg) (tt_uuid_compare(&(*a)->uuid, &(*b)->uuid))
#define mh_cmp_key(a, b, arg) (tt_uuid_compare(a.uuid, &(*b)->uuid))
#define MH_SOURCE 1
#include "salad/mhash.h"
#define HEAP_NAME wait_ack_heap
#define HEAP_LESS(h, a, b) ((a)->ping_deadline < (b)->ping_deadline)
#define heap_value_t struct swim_member
#define heap_value_attr in_wait_ack_heap
#include "salad/heap.h"
/**
* SWIM instance. Stores configuration, manages periodical tasks,
* rounds. Each member has an object of this type on its host,
* while on others it is represented as a struct swim_member
* object.
*/
struct swim {
/**
* Global hash of all known members of the cluster. Hash
* key is UUID, value is a struct member, describing a
* remote instance. Discovered members live here until
* they are detected as dead - in such a case they are
* removed from the hash after a while.
*/
struct mh_swim_table_t *members;
/**
* This node. Is used to not send messages to self, it's
* meaningless. Also to refute false gossips about self
* status.
*/
struct swim_member *self;
/**
* Scheduler of output requests, receiver of incoming
* ones.
*/
struct swim_scheduler scheduler;
/**
* An offset of this instance in the hash table. Is used
* to iterate in the hash table starting from this swim
* instance. Such iteration is unstable between yields
* (i.e. member positions may change when the table is
* resized after an incoming event), but still is useful
* for a fast non-yielding scan of the member table
* starting from this instance.
*/
mh_int_t iterator;
/**
*
* Failure detection component
*/
/**
* A heap of members waiting for an ACK. A member is added
* to the queue when a ping is sent, and is removed from
* the queue when an ACK is received or a timeout expires.
* The heap is sorted by ping deadline in ascending order
* (bottom is farther in the future, top is closer to now
* or is in the past).
*/
heap_t wait_ack_heap;
/** Generator of ack checking events. */
struct ev_timer wait_ack_tick;
/** GC state saying how to remove dead members. */
enum swim_gc_mode gc_mode;
/**
*
* Dissemination component
*/
/**
* Queue of all members which have dissemination
* information. A member is added to the queue whenever
* any of its attributes changes, and stays in the queue
* as long as the event TTD is non-zero.
*/
struct rlist dissemination_queue;
/**
* Queue of updated, new, and dropped members to deliver
* the events to triggers. Dropped members are also kept
* here until they are handled by a trigger.
*/
struct stailq event_queue;
/**
* List of triggers to call on each new, dropped, and
* updated member.
*/
struct rlist on_member_event;
/**
* Members to which a message should be sent next during
* this round.
*/
struct rlist round_queue;
/** Generator of round step events. */
struct ev_timer round_tick;
/**
* True if a packet in the round step task is still valid
* and can be resent on a next round step.
*/
bool is_round_packet_valid;
/**
* Preallocated buffer to store shuffled members here at
* the beginning of each round.
*/
struct swim_member **shuffled;
/**
* Fiber to serve member event triggers. This task is
* being done in a separate fiber, because user triggers
* can yield and libev callbacks, processing member
* events, are not allowed to yield.
*/
struct fiber *event_handler;
/**
* Single round step task. It is impossible to have
* multiple round steps in the same SWIM instance at the
* same time, so it is single and preallocated per SWIM
* instance. Note, that the task's packet once built at
* the beginning of a round is reused during the round
* without rebuilding on each step. But packet rebuild can
* be triggered by any update of any member.
*
* Keep this structure at the bottom - it is huge and
* should not split other attributes into different cache
* lines.
*/
struct swim_task round_step_task;
};
/**
* Mark cached round message invalid on any change of any member.
* It triggers postponed rebuilding of the message. The round
* packet can not be rebuilt right now because 1) invalidation can
* occur several times in row when multiple member attributes are
* updated, or more than one member are added, 2) the message can
* be in fly right now in the output queue inside the scheduler.
*/
static inline void
swim_cached_round_msg_invalidate(struct swim *swim)
{
swim->is_round_packet_valid = false;
}
/** Put the member into a list of ACK waiters. */
static void
swim_wait_ack(struct swim *swim, struct swim_member *member,
bool was_ping_indirect)
{
if (heap_node_is_stray(&member->in_wait_ack_heap)) {
double timeout = swim->wait_ack_tick.repeat;
/*
* Direct ping is two trips: PING + ACK.
* Indirect ping is four trips: PING,
* FORWARD PING, ACK, FORWARD ACK. This is why x2
* for indirects.
*/
if (was_ping_indirect)
timeout *= 2;
member->ping_deadline = swim_time() + timeout;
wait_ack_heap_insert(&swim->wait_ack_heap, member);
swim_ev_timer_again(swim_loop(), &swim->wait_ack_tick);
}
}
/**
* On literally any update of a member it is added to a queue of
* members to disseminate updates. Regardless of other TTDs, each
* update also resets status TTD. Status TTD is always greater
* than any other event-related TTD, so it's sufficient to look at
* it alone to see that a member needs information dissemination.
* The status change itself occupies only 2 bytes in a packet, so
* it is cheap to send it on any update, while does reduce
* entropy.
*/
static inline void
swim_register_event(struct swim *swim, struct swim_member *member)
{
if (rlist_empty(&member->in_dissemination_queue)) {
rlist_add_tail_entry(&swim->dissemination_queue, member,
in_dissemination_queue);
}
member->status_ttd = mh_size(swim->members);
swim_cached_round_msg_invalidate(swim);
}
/**
* Make all needed actions to process a member's update like a
* change of its status, or incarnation, or both.
*/
static void
swim_on_member_update(struct swim *swim, struct swim_member *member,
enum swim_ev_mask events)
{
member->unacknowledged_pings = 0;
swim_register_event(swim, member);
/*
* Member event should be delivered to triggers only if
* there is at least one trigger.
*/
if (! rlist_empty(&swim->on_member_event)) {
/*
* Member is referenced and added to a queue only
* once. That moment can be detected when a first
* event happens.
*/
if (member->events == 0 && events != 0) {
swim_member_ref(member);
stailq_add_tail_entry(&swim->event_queue, member,
in_event_queue);
fiber_wakeup(swim->event_handler);
}
member->events |= events;
}
}
struct rlist *
swim_trigger_list_on_member_event(struct swim *swim)
{
return &swim->on_member_event;
}
bool
swim_has_pending_events(struct swim *swim)
{
return ! stailq_empty(&swim->event_queue);
}
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
* new_status can override an old one only if its incarnation is
* greater, or the same, but its status is "bigger". Statuses are
* compared by their identifier, so "alive" < "dead". This
* protects from the case when a member is detected as dead on one
* instance, but overridden by another instance with the same
* incarnation's "alive" message.
*/
static inline void
swim_update_member_inc_status(struct swim *swim, struct swim_member *member,
enum swim_member_status new_status,
const struct swim_incarnation *incarnation)
{
/*
* Source of truth about self is this instance and it is
* never updated from remote. Refutation is handled
* separately.
*/
assert(member != swim->self);
enum swim_ev_mask events;
int cmp = swim_incarnation_diff(&member->incarnation, incarnation,
&events);
if (cmp < 0) {
if (new_status != member->status) {
events |= SWIM_EV_NEW_STATUS;
member->status = new_status;
}
member->incarnation = *incarnation;
swim_on_member_update(swim, member, events);
} else if (cmp == 0 && member->status < new_status) {
member->status = new_status;
swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
}
}
int
swim_fd(const struct swim *swim)
{
return swim->scheduler.transport.fd;
}
/**
* A helper to get a pointer to a SWIM instance having only a
* pointer to its scheduler. It is used by task complete
* functions.
*/
static inline struct swim *
swim_by_scheduler(struct swim_scheduler *scheduler)
{
return container_of(scheduler, struct swim, scheduler);
}
/** Update member's payload, register a corresponding event. */
static inline int
swim_update_member_payload(struct swim *swim, struct swim_member *member,
const char *payload, uint16_t payload_size)
{
assert(payload_size <= MAX_PAYLOAD_SIZE);
char *new_payload;
if (payload_size > 0) {
new_payload = (char *) realloc(member->payload, payload_size);
if (new_payload == NULL) {
diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
return -1;
}
memcpy(new_payload, payload, payload_size);
} else {
free(member->payload);
new_payload = NULL;
}
member->payload = new_payload;
member->payload_size = payload_size;
member->payload_ttd = mh_size(swim->members);
member->is_payload_up_to_date = true;
swim_on_member_update(swim, member, SWIM_EV_NEW_PAYLOAD);
return 0;
}
/**
* Once a ping is sent, the member should start waiting for an
* ACK.
*/
static void
swim_ping_task_complete(struct swim_task *task,
struct swim_scheduler *scheduler, int rc)
{
/*
* If ping send has failed, it makes no sense to wait for
* an ACK.
*/
if (rc < 0)
return;
struct swim *swim = swim_by_scheduler(scheduler);
struct swim_member *m = container_of(task, struct swim_member,
ping_task);
swim_wait_ack(swim, m, false);
}
void
swim_member_ref(struct swim_member *member)
{
++member->refs;
}
void
swim_member_unref(struct swim_member *member)
{
assert(member->refs > 0);
if (--member->refs == 0) {
free(member->payload);
free(member);
}
}
bool
swim_member_is_dropped(const struct swim_member *member)
{
return member->is_dropped;
}
/** Free member's resources. */
static inline void
swim_member_delete(struct swim_member *member)
{
assert(rlist_empty(&member->in_round_queue));
member->is_dropped = true;
/* Failure detection component. */
assert(heap_node_is_stray(&member->in_wait_ack_heap));
swim_task_destroy(&member->ack_task);
swim_task_destroy(&member->ping_task);
/* Dissemination component. */
assert(rlist_empty(&member->in_dissemination_queue));
swim_member_unref(member);
}
/** Create a new member. It is not registered anywhere here. */
static struct swim_member *
swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
enum swim_member_status status,
const struct swim_incarnation *incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
if (member == NULL) {
diag_set(OutOfMemory, sizeof(*member), "calloc", "member");
return NULL;
}
member->refs = 1;
member->status = status;
member->addr = *addr;
member->uuid = *uuid;
member->hash = swim_uuid_hash(uuid);
rlist_create(&member->in_round_queue);
/* Failure detection component. */
member->incarnation = *incarnation;
heap_node_create(&member->in_wait_ack_heap);
swim_task_create(&member->ack_task, NULL, NULL, "ack");
swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
"ping");
/* Dissemination component. */
rlist_create(&member->in_dissemination_queue);
return member;
}
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
*/
static void
swim_delete_member(struct swim *swim, struct swim_member *member)
{
say_verbose("SWIM %d: member %s is deleted", swim_fd(swim),
tt_uuid_str(&member->uuid));
struct mh_swim_table_key key = {member->hash, &member->uuid};
mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
assert(rc != mh_end(swim->members));
mh_swim_table_del(swim->members, rc, NULL);
rlist_del_entry(member, in_round_queue);
/* Failure detection component. */
if (! heap_node_is_stray(&member->in_wait_ack_heap))
wait_ack_heap_delete(&swim->wait_ack_heap, member);
/* Dissemination component. */
swim_on_member_update(swim, member, SWIM_EV_DROP);
rlist_del_entry(member, in_dissemination_queue);
swim_member_delete(member);
}
/** Find a member by UUID. */
static inline struct swim_member *
swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
{
struct mh_swim_table_key key = {swim_uuid_hash(uuid), uuid};
mh_int_t node = mh_swim_table_find(swim->members, key, NULL);
if (node == mh_end(swim->members))
return NULL;
return *mh_swim_table_node(swim->members, node);
}
/**
* Register a new member with a specified status. It is not added
* to the round queue here. It waits until the current round is
* finished, and then is included into a new round. It is done
* mainly to not add self into the round queue, because self is
* also created via this function.
*/
static struct swim_member *
swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
const struct tt_uuid *uuid, enum swim_member_status status,
const struct swim_incarnation *incarnation, const char *payload,
int payload_size)
{
int new_bsize = sizeof(swim->shuffled[0]) *
(mh_size(swim->members) + 1);
struct swim_member **new_shuffled =
(struct swim_member **) realloc(swim->shuffled, new_bsize);
if (new_shuffled == NULL) {
diag_set(OutOfMemory, new_bsize, "realloc", "new_shuffled");
return NULL;
}
swim->shuffled = new_shuffled;
/*
* Reserve one more slot to never fail push into the ack
* waiters heap.
*/
if (wait_ack_heap_reserve(&swim->wait_ack_heap) != 0) {
diag_set(OutOfMemory, sizeof(struct heap_node), "realloc",
"wait_ack_heap");
return NULL;
}
struct swim_member *member =
swim_member_new(addr, uuid, status, incarnation);
if (member == NULL)
return NULL;
assert(swim_find_member(swim, uuid) == NULL);
mh_int_t rc = mh_swim_table_put(swim->members,
(const struct swim_member **) &member,
NULL, NULL);
if (rc == mh_end(swim->members)) {
swim_member_delete(member);
diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
return NULL;
}
if (mh_size(swim->members) > 1)
swim_ev_timer_again(swim_loop(), &swim->round_tick);
/* Dissemination component. */
swim_on_member_update(swim, member, SWIM_EV_NEW);
if (payload_size >= 0 &&
swim_update_member_payload(swim, member, payload,
payload_size) != 0) {
swim_delete_member(swim, member);
return NULL;
}
say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
tt_uuid_str(&member->uuid), mh_size(swim->members));
return member;
}
/**
* Take all the members from the table and shuffle them randomly.
* Is used for forthcoming round planning.
*/
static void
swim_shuffle_members(struct swim *swim)
{
struct mh_swim_table_t *members = swim->members;
int i = 0;
/*
* This shuffling preserves even distribution of a random
* sequence. The distribution properties have been
* verified by a longevity test.
*/
for (mh_int_t node = mh_first(members), end = mh_end(members);
node != end; node = mh_next(members, node), ++i) {
swim->shuffled[i] = *mh_swim_table_node(members, node);
int j = swim_scaled_rand(0, i);
SWAP(swim->shuffled[i], swim->shuffled[j]);
}
}
/**
* Shuffle members, build randomly ordered queue of addressees. In
* other words, do all round preparation work.
*/
static void
swim_new_round(struct swim *swim)
{
int size = mh_size(swim->members);
if (size == 1) {
assert(swim->self != NULL);
say_verbose("SWIM %d: skip a round - no members",
swim_fd(swim));
return;
}
/* -1 for self. */
say_verbose("SWIM %d: start a new round with %d members", swim_fd(swim),
size - 1);
swim_cached_round_msg_invalidate(swim);
swim_shuffle_members(swim);
rlist_create(&swim->round_queue);
for (int i = 0; i < size; ++i) {
if (swim->shuffled[i] != swim->self) {
rlist_add_entry(&swim->round_queue, swim->shuffled[i],
in_round_queue);
}
}
}
/**
* Encode one member into @a packet using @a passport structure.
* Note that this function does not make a decision whether
* payload should be encoded, because its callers have different
* conditions for that. The anti-entropy needs the payload be
* up-to-date. The dissemination component additionally needs
* TTD > 0.
* @retval 0 Success, encoded.
* @retval -1 Not enough memory in the packet.
*/
static int
swim_encode_member(struct swim_packet *packet, struct swim_member *m,
struct swim_passport_bin *passport,
struct swim_member_payload_bin *payload_header,
bool encode_payload)
{
/* The headers should be initialized. */
assert(passport->k_status == SWIM_MEMBER_STATUS);
assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD);
int size = sizeof(*passport);
encode_payload = encode_payload && m->is_payload_up_to_date;
if (encode_payload)
size += sizeof(*payload_header) + m->payload_size;
char *pos = swim_packet_alloc(packet, size);
if (pos == NULL)
return -1;
swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
&m->incarnation, encode_payload);
memcpy(pos, passport, sizeof(*passport));
if (encode_payload) {
pos += sizeof(*passport);
swim_member_payload_bin_fill(payload_header, m->payload_size);
memcpy(pos, payload_header, sizeof(*payload_header));
pos += sizeof(*payload_header);
memcpy(pos, m->payload, m->payload_size);