Skip to content

Commit

Permalink
swim: make incarnation struct
Browse files Browse the repository at this point in the history
Traditional SWIM describes member version as incarnation -
volatile monotonically growing number to refute false gossips.
But it is not enough in the real world because of necessity to
detect restarts and refute information from previous lifes of an
instance.

Incarnation is going to be a two-part value with persistent upper
part and volatile lower part. This patch does preparations making
incarnation struct instead of a number.

Volatile part is called 'version.

Part of #4280
  • Loading branch information
Gerold103 committed Jun 23, 2019
1 parent 88a2329 commit 3aecf9f
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 178 deletions.
150 changes: 100 additions & 50 deletions src/lib/swim/swim.c
Expand Up @@ -213,6 +213,31 @@ swim_uuid_hash(const struct tt_uuid *uuid)
return mh_strn_hash((const char *) uuid, UUID_LEN);
}

/**
* Compare two incarnation values and collect their diff into
* @a diff out parameter. The difference is used to fire triggers.
*/
static inline int
swim_incarnation_diff(const struct swim_incarnation *l,
const struct swim_incarnation *r,
enum swim_ev_mask *diff)
{
if (l->version == r->version) {
*diff = 0;
return 0;
}
*diff = SWIM_EV_NEW_VERSION;
return l->version < r->version ? -1 : 1;
}

int
swim_incarnation_cmp(const struct swim_incarnation *l,
const struct swim_incarnation *r)
{
enum swim_ev_mask unused;
return swim_incarnation_diff(l, r, &unused);
}

/**
* A cluster member description. This structure describes the
* last known state of an instance. This state is updated
Expand Down Expand Up @@ -353,11 +378,11 @@ struct swim_member {
* Failure detection component
*/
/**
* A monotonically growing number to refute old member's
* A monotonically growing value to refute old member's
* state, characterized by a triplet
* {incarnation, status, address}.
*/
uint64_t incarnation;
struct swim_incarnation incarnation;
/**
* How many recent pings did not receive an ack while the
* member was in the current status. When this number
Expand Down Expand Up @@ -631,24 +656,25 @@ swim_has_pending_events(struct swim *swim)
static inline void
swim_update_member_inc_status(struct swim *swim, struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation)
const struct swim_incarnation *incarnation)
{
/*
* Source of truth about self is this instance and it is
* never updated from remote. Refutation is handled
* separately.
*/
assert(member != swim->self);
if (member->incarnation < incarnation) {
enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION;
enum swim_ev_mask events;
int cmp = swim_incarnation_diff(&member->incarnation, incarnation,
&events);
if (cmp < 0) {
if (new_status != member->status) {
events |= SWIM_EV_NEW_STATUS;
member->status = new_status;
}
member->incarnation = incarnation;
member->incarnation = *incarnation;
swim_on_member_update(swim, member, events);
} else if (member->incarnation == incarnation &&
member->status < new_status) {
} else if (cmp == 0 && member->status < new_status) {
member->status = new_status;
swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
}
Expand Down Expand Up @@ -760,7 +786,8 @@ swim_member_delete(struct swim_member *member)
/** Create a new member. It is not registered anywhere here. */
static struct swim_member *
swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
enum swim_member_status status, uint64_t incarnation)
enum swim_member_status status,
const struct swim_incarnation *incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
Expand All @@ -776,7 +803,7 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
rlist_create(&member->in_round_queue);

/* Failure detection component. */
member->incarnation = incarnation;
member->incarnation = *incarnation;
heap_node_create(&member->in_wait_ack_heap);
swim_task_create(&member->ack_task, NULL, NULL, "ack");
swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
Expand Down Expand Up @@ -835,7 +862,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
static struct swim_member *
swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
const struct tt_uuid *uuid, enum swim_member_status status,
uint64_t incarnation, const char *payload, int payload_size)
const struct swim_incarnation *incarnation, const char *payload,
int payload_size)
{
int new_bsize = sizeof(swim->shuffled[0]) *
(mh_size(swim->members) + 1);
Expand Down Expand Up @@ -962,7 +990,7 @@ swim_encode_member(struct swim_packet *packet, struct swim_member *m,
if (pos == NULL)
return -1;
swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
m->incarnation, encode_payload);
&m->incarnation, encode_payload);
memcpy(pos, passport, sizeof(*passport));
if (encode_payload) {
pos += sizeof(*passport);
Expand Down Expand Up @@ -1044,7 +1072,7 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
if (pos == NULL)
return 0;
swim_fd_header_bin_create(&fd_header_bin, type,
swim->self->incarnation);
&swim->self->incarnation);
memcpy(pos, &fd_header_bin, size);
return 1;
}
Expand Down Expand Up @@ -1412,14 +1440,15 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
struct swim_member *member)
{
assert(member != swim->self);
assert(def->incarnation >= member->incarnation);
int cmp = swim_incarnation_cmp(&def->incarnation, &member->incarnation);
assert(cmp >= 0);
/*
* Payload update rules are simple: it can be updated
* either if the new payload has a bigger incarnation, or
* the same incarnation, but local payload is outdated.
*/
bool update_payload = false;
if (def->incarnation > member->incarnation) {
if (cmp > 0) {
if (! swim_inaddr_eq(&def->addr, &member->addr))
swim_update_member_addr(swim, member, &def->addr);
if (def->payload_size >= 0)
Expand All @@ -1436,7 +1465,7 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
diag_log();
}
swim_update_member_inc_status(swim, member, def->status,
def->incarnation);
&def->incarnation);
}

/**
Expand Down Expand Up @@ -1475,14 +1504,17 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
goto skip;
}
*result = swim_new_member(swim, &def->addr, &def->uuid,
def->status, def->incarnation,
def->status, &def->incarnation,
def->payload, def->payload_size);
return *result != NULL ? 0 : -1;
}
*result = member;
struct swim_member *self = swim->self;
enum swim_ev_mask diff;
int cmp = swim_incarnation_diff(&def->incarnation, &member->incarnation,
&diff);
if (member != self) {
if (def->incarnation < member->incarnation)
if (cmp < 0)
goto skip;
swim_update_member(swim, def, member);
return 0;
Expand All @@ -1491,22 +1523,21 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
* It is possible that other instances know a bigger
* incarnation of this instance - such thing happens when
* the instance restarts and loses its local incarnation
* number. It will be restored by receiving dissemination
* value. It will be restored by receiving dissemination
* and anti-entropy messages about self.
*/
if (self->incarnation < def->incarnation) {
if (cmp > 0) {
self->incarnation = def->incarnation;
swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
swim_on_member_update(swim, self, diff);
}
if (def->status != MEMBER_ALIVE &&
def->incarnation == self->incarnation) {
if (def->status != MEMBER_ALIVE && cmp == 0) {
/*
* In the cluster a gossip exists that this
* instance is not alive. Refute this information
* with a bigger incarnation.
*/
self->incarnation++;
swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
self->incarnation.version++;
swim_on_member_update(swim, self, SWIM_EV_NEW_VERSION);
}
return 0;
skip:
Expand Down Expand Up @@ -1595,7 +1626,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
* case - this message was received from the member
* directly, and evidently it is alive.
*/
if (def.incarnation == member->incarnation &&
if (swim_incarnation_cmp(&def.incarnation, &member->incarnation) == 0 &&
member->status != MEMBER_ALIVE) {
member->status = MEMBER_ALIVE;
swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
Expand Down Expand Up @@ -1646,31 +1677,45 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
uint32_t size;
if (swim_decode_map(pos, end, &size, prefix, "root") != 0)
return -1;
if (size != 1) {
diag_set(SwimError, "%s map of size 1 is expected", prefix);
if (size != SWIM_INCARNATION_BIN_SIZE) {
diag_set(SwimError, "%s map of size %d is expected",
prefix, SWIM_INCARNATION_BIN_SIZE);
return -1;
}
uint64_t tmp;
if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0)
return -1;
if (tmp != SWIM_QUIT_INCARNATION) {
diag_set(SwimError, "%s a key should be incarnation", prefix);
return -1;
struct swim_incarnation incarnation;
swim_incarnation_create(&incarnation, 0);
for (uint32_t i = 0; i < size; ++i) {
uint64_t tmp;
if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0)
return -1;
switch (tmp) {
case SWIM_QUIT_VERSION:
if (swim_decode_uint(pos, end, &incarnation.version,
prefix, "version") != 0)
return -1;
break;
default:
diag_set(SwimError, "%s unknown key", prefix);
return -1;
}
}
if (swim_decode_uint(pos, end, &tmp, prefix, "incarnation") != 0)
return -1;
struct swim_member *m = swim_find_member(swim, uuid);
if (m == NULL)
return 0;
/*
* Check for 'self' in case this instance took UUID of a
* quited instance.
*/
enum swim_ev_mask diff;
if (m != swim->self) {
swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp);
} else if (tmp >= m->incarnation) {
m->incarnation = tmp + 1;
swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION);
swim_update_member_inc_status(swim, m, MEMBER_LEFT,
&incarnation);
} else if (swim_incarnation_diff(&incarnation, &m->incarnation,
&diff) >= 0) {
m->incarnation = incarnation;
++m->incarnation.version;
diff |= SWIM_EV_NEW_VERSION;
swim_on_member_update(swim, m, diff);
}
return 0;
}
Expand Down Expand Up @@ -1895,8 +1940,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
"a first config", prefix);
return -1;
}
struct swim_incarnation incarnation;
swim_incarnation_create(&incarnation, 0);
swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
0, NULL, 0);
&incarnation, NULL, 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
Expand All @@ -1908,7 +1955,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
new_self = swim_new_member(swim, &swim->self->addr, uuid,
MEMBER_ALIVE, 0, swim->self->payload,
MEMBER_ALIVE,
&swim->self->incarnation,
swim->self->payload,
swim->self->payload_size);
if (new_self == NULL)
return -1;
Expand Down Expand Up @@ -1959,9 +2008,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
swim->self = new_self;
}
if (! swim_inaddr_eq(&addr, &swim->self->addr)) {
swim->self->incarnation++;
swim_on_member_update(swim, swim->self,
SWIM_EV_NEW_INCARNATION);
swim->self->incarnation.version++;
swim_on_member_update(swim, swim->self, SWIM_EV_NEW_VERSION);
swim_update_member_addr(swim, swim->self, &addr);
}
if (gc_mode != SWIM_GC_DEFAULT)
Expand Down Expand Up @@ -1994,8 +2042,8 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size)
struct swim_member *self = swim->self;
if (swim_update_member_payload(swim, self, payload, payload_size) != 0)
return -1;
self->incarnation++;
swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
self->incarnation.version++;
swim_on_member_update(swim, self, SWIM_EV_NEW_VERSION);
return 0;
}

Expand All @@ -2013,7 +2061,9 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0,
struct swim_incarnation inc;
swim_incarnation_create(&inc, 0);
member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &inc,
NULL, -1);
return member == NULL ? -1 : 0;
}
Expand Down Expand Up @@ -2173,7 +2223,7 @@ swim_encode_quit(struct swim *swim, struct swim_packet *packet)
char *pos = swim_packet_alloc(packet, sizeof(bin));
if (pos == NULL)
return 0;
swim_quit_bin_create(&bin, swim->self->incarnation);
swim_quit_bin_create(&bin, &swim->self->incarnation);
memcpy(pos, &bin, sizeof(bin));
return 1;
}
Expand Down Expand Up @@ -2265,7 +2315,7 @@ swim_member_uuid(const struct swim_member *member)
return &member->uuid;
}

uint64_t
struct swim_incarnation
swim_member_incarnation(const struct swim_member *member)
{
return member->incarnation;
Expand Down
7 changes: 6 additions & 1 deletion src/lib/swim/swim.h
Expand Up @@ -234,7 +234,7 @@ const struct tt_uuid *
swim_member_uuid(const struct swim_member *member);

/** Member's incarnation. */
uint64_t
struct swim_incarnation
swim_member_incarnation(const struct swim_member *member);

/** Member's payload. */
Expand Down Expand Up @@ -279,6 +279,11 @@ enum swim_ev_mask {
SWIM_EV_NEW = 0b00000001,
SWIM_EV_NEW_STATUS = 0b00000010,
SWIM_EV_NEW_URI = 0b00000100,
SWIM_EV_NEW_VERSION = 0b00001000,
/*
* Shortcut to check for update of any part of
* incarnation.
*/
SWIM_EV_NEW_INCARNATION = 0b00001000,
SWIM_EV_NEW_PAYLOAD = 0b00010000,
/* Shortcut to check for any update. */
Expand Down

0 comments on commit 3aecf9f

Please sign in to comment.