diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2b37d41e01ec..2c3cfa9bcfa5 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -213,6 +213,31 @@ swim_uuid_hash(const struct tt_uuid *uuid) return mh_strn_hash((const char *) uuid, UUID_LEN); } +/** + * Compare two incarnation values and collect their diff into + * @a diff out parameter. The difference is used to fire triggers. + */ +static inline int +swim_incarnation_diff(const struct swim_incarnation *l, + const struct swim_incarnation *r, + enum swim_ev_mask *diff) +{ + if (l->version == r->version) { + *diff = 0; + return 0; + } + *diff = SWIM_EV_NEW_VERSION; + return l->version < r->version ? -1 : 1; +} + +int +swim_incarnation_cmp(const struct swim_incarnation *l, + const struct swim_incarnation *r) +{ + enum swim_ev_mask unused; + return swim_incarnation_diff(l, r, &unused); +} + /** * A cluster member description. This structure describes the * last known state of an instance. This state is updated @@ -353,11 +378,11 @@ struct swim_member { * Failure detection component */ /** - * A monotonically growing number to refute old member's + * A monotonically growing value to refute old member's * state, characterized by a triplet * {incarnation, status, address}. */ - uint64_t incarnation; + struct swim_incarnation incarnation; /** * How many recent pings did not receive an ack while the * member was in the current status. When this number @@ -631,7 +656,7 @@ swim_has_pending_events(struct swim *swim) static inline void swim_update_member_inc_status(struct swim *swim, struct swim_member *member, enum swim_member_status new_status, - uint64_t incarnation) + const struct swim_incarnation *incarnation) { /* * Source of truth about self is this instance and it is @@ -639,16 +664,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member, * separately. */ assert(member != swim->self); - if (member->incarnation < incarnation) { - enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION; + enum swim_ev_mask events; + int cmp = swim_incarnation_diff(&member->incarnation, incarnation, + &events); + if (cmp < 0) { if (new_status != member->status) { events |= SWIM_EV_NEW_STATUS; member->status = new_status; } - member->incarnation = incarnation; + member->incarnation = *incarnation; swim_on_member_update(swim, member, events); - } else if (member->incarnation == incarnation && - member->status < new_status) { + } else if (cmp == 0 && member->status < new_status) { member->status = new_status; swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } @@ -760,7 +786,8 @@ swim_member_delete(struct swim_member *member) /** Create a new member. It is not registered anywhere here. */ static struct swim_member * swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, + const struct swim_incarnation *incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -776,7 +803,7 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, rlist_create(&member->in_round_queue); /* Failure detection component. */ - member->incarnation = incarnation; + member->incarnation = *incarnation; heap_node_create(&member->in_wait_ack_heap); swim_task_create(&member->ack_task, NULL, NULL, "ack"); swim_task_create(&member->ping_task, swim_ping_task_complete, NULL, @@ -835,7 +862,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation, const char *payload, int payload_size) + const struct swim_incarnation *incarnation, const char *payload, + int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -962,7 +990,7 @@ swim_encode_member(struct swim_packet *packet, struct swim_member *m, if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation, encode_payload); + &m->incarnation, encode_payload); memcpy(pos, passport, sizeof(*passport)); if (encode_payload) { pos += sizeof(*passport); @@ -1044,7 +1072,7 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, if (pos == NULL) return 0; swim_fd_header_bin_create(&fd_header_bin, type, - swim->self->incarnation); + &swim->self->incarnation); memcpy(pos, &fd_header_bin, size); return 1; } @@ -1412,14 +1440,15 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, struct swim_member *member) { assert(member != swim->self); - assert(def->incarnation >= member->incarnation); + int cmp = swim_incarnation_cmp(&def->incarnation, &member->incarnation); + assert(cmp >= 0); /* * Payload update rules are simple: it can be updated * either if the new payload has a bigger incarnation, or * the same incarnation, but local payload is outdated. */ bool update_payload = false; - if (def->incarnation > member->incarnation) { + if (cmp > 0) { if (! swim_inaddr_eq(&def->addr, &member->addr)) swim_update_member_addr(swim, member, &def->addr); if (def->payload_size >= 0) @@ -1436,7 +1465,7 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, diag_log(); } swim_update_member_inc_status(swim, member, def->status, - def->incarnation); + &def->incarnation); } /** @@ -1475,14 +1504,17 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation, + def->status, &def->incarnation, def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; struct swim_member *self = swim->self; + enum swim_ev_mask diff; + int cmp = swim_incarnation_diff(&def->incarnation, &member->incarnation, + &diff); if (member != self) { - if (def->incarnation < member->incarnation) + if (cmp < 0) goto skip; swim_update_member(swim, def, member); return 0; @@ -1491,22 +1523,21 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, * It is possible that other instances know a bigger * incarnation of this instance - such thing happens when * the instance restarts and loses its local incarnation - * number. It will be restored by receiving dissemination + * value. It will be restored by receiving dissemination * and anti-entropy messages about self. */ - if (self->incarnation < def->incarnation) { + if (cmp > 0) { self->incarnation = def->incarnation; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + swim_on_member_update(swim, self, diff); } - if (def->status != MEMBER_ALIVE && - def->incarnation == self->incarnation) { + if (def->status != MEMBER_ALIVE && cmp == 0) { /* * In the cluster a gossip exists that this * instance is not alive. Refute this information * with a bigger incarnation. */ - self->incarnation++; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + self->incarnation.version++; + swim_on_member_update(swim, self, SWIM_EV_NEW_VERSION); } return 0; skip: @@ -1595,7 +1626,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos, * case - this message was received from the member * directly, and evidently it is alive. */ - if (def.incarnation == member->incarnation && + if (swim_incarnation_cmp(&def.incarnation, &member->incarnation) == 0 && member->status != MEMBER_ALIVE) { member->status = MEMBER_ALIVE; swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); @@ -1646,19 +1677,28 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, uint32_t size; if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; - if (size != 1) { - diag_set(SwimError, "%s map of size 1 is expected", prefix); + if (size != SWIM_INCARNATION_BIN_SIZE) { + diag_set(SwimError, "%s map of size %d is expected", + prefix, SWIM_INCARNATION_BIN_SIZE); return -1; } - uint64_t tmp; - if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) - return -1; - if (tmp != SWIM_QUIT_INCARNATION) { - diag_set(SwimError, "%s a key should be incarnation", prefix); - return -1; + struct swim_incarnation incarnation; + swim_incarnation_create(&incarnation, 0); + for (uint32_t i = 0; i < size; ++i) { + uint64_t tmp; + if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) + return -1; + switch (tmp) { + case SWIM_QUIT_VERSION: + if (swim_decode_uint(pos, end, &incarnation.version, + prefix, "version") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", prefix); + return -1; + } } - if (swim_decode_uint(pos, end, &tmp, prefix, "incarnation") != 0) - return -1; struct swim_member *m = swim_find_member(swim, uuid); if (m == NULL) return 0; @@ -1666,11 +1706,16 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, * Check for 'self' in case this instance took UUID of a * quited instance. */ + enum swim_ev_mask diff; if (m != swim->self) { - swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp); - } else if (tmp >= m->incarnation) { - m->incarnation = tmp + 1; - swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION); + swim_update_member_inc_status(swim, m, MEMBER_LEFT, + &incarnation); + } else if (swim_incarnation_diff(&incarnation, &m->incarnation, + &diff) >= 0) { + m->incarnation = incarnation; + ++m->incarnation.version; + diff |= SWIM_EV_NEW_VERSION; + swim_on_member_update(swim, m, diff); } return 0; } @@ -1895,8 +1940,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, "a first config", prefix); return -1; } + struct swim_incarnation incarnation; + swim_incarnation_create(&incarnation, 0); swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0, NULL, 0); + &incarnation, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1908,7 +1955,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0, swim->self->payload, + MEMBER_ALIVE, + &swim->self->incarnation, + swim->self->payload, swim->self->payload_size); if (new_self == NULL) return -1; @@ -1959,9 +2008,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, swim->self = new_self; } if (! swim_inaddr_eq(&addr, &swim->self->addr)) { - swim->self->incarnation++; - swim_on_member_update(swim, swim->self, - SWIM_EV_NEW_INCARNATION); + swim->self->incarnation.version++; + swim_on_member_update(swim, swim->self, SWIM_EV_NEW_VERSION); swim_update_member_addr(swim, swim->self, &addr); } if (gc_mode != SWIM_GC_DEFAULT) @@ -1994,8 +2042,8 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size) struct swim_member *self = swim->self; if (swim_update_member_payload(swim, self, payload, payload_size) != 0) return -1; - self->incarnation++; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + self->incarnation.version++; + swim_on_member_update(swim, self, SWIM_EV_NEW_VERSION); return 0; } @@ -2013,7 +2061,9 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + struct swim_incarnation inc; + swim_incarnation_create(&inc, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &inc, NULL, -1); return member == NULL ? -1 : 0; } @@ -2173,7 +2223,7 @@ swim_encode_quit(struct swim *swim, struct swim_packet *packet) char *pos = swim_packet_alloc(packet, sizeof(bin)); if (pos == NULL) return 0; - swim_quit_bin_create(&bin, swim->self->incarnation); + swim_quit_bin_create(&bin, &swim->self->incarnation); memcpy(pos, &bin, sizeof(bin)); return 1; } @@ -2265,7 +2315,7 @@ swim_member_uuid(const struct swim_member *member) return &member->uuid; } -uint64_t +struct swim_incarnation swim_member_incarnation(const struct swim_member *member) { return member->incarnation; diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index a42ace7c649b..b8e44515ed37 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -234,7 +234,7 @@ const struct tt_uuid * swim_member_uuid(const struct swim_member *member); /** Member's incarnation. */ -uint64_t +struct swim_incarnation swim_member_incarnation(const struct swim_member *member); /** Member's payload. */ @@ -279,6 +279,11 @@ enum swim_ev_mask { SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_VERSION = 0b00001000, + /* + * Shortcut to check for update of any part of + * incarnation. + */ SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, /* Shortcut to check for any update. */ diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h index 4f8404ce381e..f105040c6541 100644 --- a/src/lib/swim/swim_constants.h +++ b/src/lib/swim/swim_constants.h @@ -55,4 +55,36 @@ enum swim_member_status { extern const char *swim_member_status_strs[]; +/** + * A monotonically growing value to refute false gossips and + * update member attributes on remote instances. Any piece of + * information is labeled with an incarnation value. Information + * labeled with a newer (bigger) incarnation is considered more + * actual. + */ +struct swim_incarnation { + /** + * Version is a volatile part of incarnation. It is + * managed by SWIM fully internally. + */ + uint64_t version; +}; + +/** Create a new incarnation value. */ +static inline void +swim_incarnation_create(struct swim_incarnation *i, uint64_t version) +{ + i->version = version; +} + +/** + * Compare two incarnation values. + * @retval =0 l == r. + * @retval <0 l < r. + * @retval >0 l > r. + */ +int +swim_incarnation_cmp(const struct swim_incarnation *l, + const struct swim_incarnation *r); + #endif /* TARANTOOL_SWIM_CONSTANTS_H_INCLUDED */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 938631e4919e..31c931b98899 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -155,6 +155,29 @@ swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, return 0; } +/** + * Create incarnation binary MessagePack structure. It expects + * parent structure specific keys for incarnation parts. + */ +static inline void +swim_incarnation_bin_create(struct swim_incarnation_bin *bin, + uint8_t version_key) +{ + bin->k_version = version_key; + bin->m_version = 0xcf; +} + +/** + * Fill a created incarnation binary structure with an incarnation + * value. + */ +static inline void +swim_incarnation_bin_fill(struct swim_incarnation_bin *bin, + const struct swim_incarnation *incarnation) +{ + bin->v_version = mp_bswap_u64(incarnation->version); +} + /** * Check if @a addr is not empty, i.e. not nullified. Set an error * in the diagnostics area in case of emptiness. @@ -247,9 +270,9 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; - case SWIM_MEMBER_INCARNATION: - if (swim_decode_uint(pos, end, &def->incarnation, prefix, - "member incarnation") != 0) + case SWIM_MEMBER_VERSION: + if (swim_decode_uint(pos, end, &def->incarnation.version, + prefix, "member version") != 0) return -1; break; case SWIM_MEMBER_PAYLOAD: @@ -308,17 +331,19 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, void swim_fd_header_bin_create(struct swim_fd_header_bin *header, - enum swim_fd_msg_type type, uint64_t incarnation) + enum swim_fd_msg_type type, + const struct swim_incarnation *incarnation) { header->k_header = SWIM_FAILURE_DETECTION; - header->m_header = 0x82; + int map_size = 1 + SWIM_INCARNATION_BIN_SIZE; + assert(mp_sizeof_map(map_size) == 1); + header->m_header = 0x80 | map_size; header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - header->k_incarnation = SWIM_FD_INCARNATION; - header->m_incarnation = 0xcf; - header->v_incarnation = mp_bswap_u64(incarnation); + swim_incarnation_bin_create(&header->incarnation, SWIM_FD_VERSION); + swim_incarnation_bin_fill(&header->incarnation, incarnation); } int @@ -331,9 +356,10 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, return -1; memset(def, 0, sizeof(*def)); def->type = swim_fd_msg_type_MAX; - if (size != 2) { - diag_set(SwimError, "%s root map should have two keys - "\ - "message type and incarnation", prefix); + if (size != 1 + SWIM_INCARNATION_BIN_SIZE) { + diag_set(SwimError, "%s root map should have %d keys - "\ + "message type and version", prefix, + 1 + SWIM_INCARNATION_BIN_SIZE); return -1; } for (int i = 0; i < (int) size; ++i) { @@ -352,9 +378,10 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, } def->type = key; break; - case SWIM_FD_INCARNATION: - if (swim_decode_uint(pos, end, &def->incarnation, - prefix, "incarnation") != 0) + case SWIM_FD_VERSION: + if (swim_decode_uint(pos, end, + &def->incarnation.version, prefix, + "version") != 0) return -1; break; default: @@ -401,24 +428,26 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->k_uuid = SWIM_MEMBER_UUID; passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; - passport->k_incarnation = SWIM_MEMBER_INCARNATION; - passport->m_incarnation = 0xcf; + swim_incarnation_bin_create(&passport->incarnation, + SWIM_MEMBER_VERSION); } void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation, + enum swim_member_status status, + const struct swim_incarnation *incarnation, bool encode_payload) { - int map_size = 3 + SWIM_INADDR_BIN_SIZE + encode_payload; + int map_size = 2 + SWIM_INCARNATION_BIN_SIZE + SWIM_INADDR_BIN_SIZE + + encode_payload; assert(mp_sizeof_map(map_size) == 1); passport->m_header = 0x80 | map_size; passport->v_status = status; swim_inaddr_bin_fill(&passport->addr, addr); memcpy(passport->v_uuid, uuid, UUID_LEN); - passport->v_incarnation = mp_bswap_u64(incarnation); + swim_incarnation_bin_fill(&passport->incarnation, incarnation); } void @@ -556,13 +585,14 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, } void -swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation) +swim_quit_bin_create(struct swim_quit_bin *header, + const struct swim_incarnation *incarnation) { header->k_quit = SWIM_QUIT; - header->m_quit = 0x81; - header->k_incarnation = SWIM_QUIT_INCARNATION; - header->m_incarnation = 0xcf; - header->v_incarnation = mp_bswap_u64(incarnation); + assert(mp_sizeof_map(SWIM_INCARNATION_BIN_SIZE) == 1); + header->m_quit = 0x80 | SWIM_INCARNATION_BIN_SIZE; + swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_VERSION); + swim_incarnation_bin_fill(&header->incarnation, incarnation); } void diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 482d79fb19cf..ee3bc03faa94 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -72,7 +72,7 @@ enum { * | | * | SWIM_FAILURE_DETECTION: { | * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | - * | SWIM_FD_INCARNATION: uint | + * | SWIM_FD_VERSION: uint | * | }, | * | | * | OR/AND | @@ -83,7 +83,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | @@ -97,7 +97,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | @@ -106,12 +106,33 @@ enum { * | OR/AND | * | | * | SWIM_QUIT: { | - * | SWIM_QUIT_INCARNATION: uint | + * | SWIM_QUIT_VERSION: uint | * | } | * | } | * +-------------------------------------------------------------+ */ +enum { + /** + * Number of keys in the incarnation binary structure. + * Structures storing an incarnation should use this size + * so as to correctly encode MessagePack map header. + */ + SWIM_INCARNATION_BIN_SIZE = 1, +}; + +/** + * Prepared binary MessagePack representation of an incarnation + * value. It expects its owner is a map. + */ +struct PACKED swim_incarnation_bin { + /** mp_encode_uint(version key) */ + uint8_t k_version; + /** mp_encode_uint(64bit version) */ + uint8_t m_version; + uint64_t v_version; +}; + /** * SWIM member attributes from anti-entropy and dissemination * messages. @@ -119,7 +140,7 @@ enum { struct swim_member_def { struct tt_uuid uuid; struct sockaddr_in addr; - uint64_t incarnation; + struct swim_incarnation incarnation; enum swim_member_status status; const char *payload; int payload_size; @@ -188,7 +209,7 @@ enum swim_fd_key { * it was considered dead, but ping/ack with greater * incarnation was received from it. */ - SWIM_FD_INCARNATION, + SWIM_FD_VERSION, }; /** Failure detection message type. */ @@ -212,24 +233,22 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** mp_encode_uint(SWIM_FD_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_FD_VERSION */ + struct swim_incarnation_bin incarnation; }; /** Initialize failure detection section. */ void swim_fd_header_bin_create(struct swim_fd_header_bin *header, - enum swim_fd_msg_type type, uint64_t incarnation); + enum swim_fd_msg_type type, + const struct swim_incarnation *incarnation); /** A decoded failure detection message. */ struct swim_failure_detection_def { /** Type of the message. */ enum swim_fd_msg_type type; /** Incarnation of the sender. */ - uint64_t incarnation; + struct swim_incarnation incarnation; }; /** @@ -290,7 +309,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, - SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_VERSION, SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -339,11 +358,8 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_MEMBER_VERSION */ + struct swim_incarnation_bin incarnation; }; /** @@ -384,7 +400,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation, + enum swim_member_status status, + const struct swim_incarnation *incarnation, bool encode_payload); /** }}} Anti-entropy component */ @@ -562,7 +579,7 @@ swim_route_bin_create(struct swim_route_bin *route, enum swim_quit_key { /** Incarnation to ignore old quit messages. */ - SWIM_QUIT_INCARNATION = 0, + SWIM_QUIT_VERSION = 0, }; /** Quit section. Describes voluntary quit from the cluster. */ @@ -572,16 +589,14 @@ struct PACKED swim_quit_bin { /** mp_encode_map(1) */ uint8_t m_quit; - /** mp_encode_uint(SWIM_QUIT_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_QUIT_VERSION */ + struct swim_incarnation_bin incarnation; }; /** Initialize quit section. */ void -swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation); +swim_quit_bin_create(struct swim_quit_bin *header, + const struct swim_incarnation *incarnation); /** * Helpers to decode some values - map, array, etc with diff --git a/src/lua/swim.lua b/src/lua/swim.lua index 4f91ac2334da..0686590cbe49 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -28,12 +28,17 @@ ffi.cdef[[ SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_VERSION = 0b00001000, SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, SWIM_EV_UPDATE = 0b00011110, SWIM_EV_DROP = 0b00100000, }; + struct swim_incarnation { + uint64_t version; + }; + bool swim_is_configured(const struct swim *swim); @@ -92,7 +97,7 @@ ffi.cdef[[ const struct tt_uuid * swim_member_uuid(const struct swim_member *member); - uint64_t + struct swim_incarnation swim_member_incarnation(const struct swim_member *member); const char * @@ -124,6 +129,22 @@ local swim_member_status_strs = { [capi.MEMBER_LEFT] = 'left' } +local swim_incarnation_mt = { + __eq = function(l, r) + return l.version == r.version + end, + __lt = function(l, r) + return l.version < r.version + end, + __le = function(l, r) + return l.version <= r.version + end, + __tostring = function(i) + return string.format('cdata {version = %s}', i.version) + end, +} +ffi.metatype(ffi.typeof('struct swim_incarnation'), swim_incarnation_mt) + -- -- Check if @a value is something that can be passed as a -- URI parameter. Note, it does not validate URI, because it is @@ -370,7 +391,7 @@ local function swim_member_payload(m) -- payload. For example, via ACK messages. local key1 = capi.swim_member_incarnation(ptr) local key2 = capi.swim_member_is_payload_up_to_date(ptr) - if key1 == m.p_key1 and key2 == m.p_key2 then + if m.p_key1 and key1 == m.p_key1 and key2 == m.p_key2 then return m.p end local cdata, size = swim_member_payload_raw(ptr) @@ -726,6 +747,9 @@ local swim_member_event_index = { is_new_incarnation = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 end, + is_new_version = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_VERSION) ~= 0 + end, is_new_payload = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0 end, diff --git a/test/swim/swim.result b/test/swim/swim.result index cceee259509c..e3b89f809454 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -307,7 +307,7 @@ old_self --- - uri: 127.0.0.1: status: left - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -391,7 +391,7 @@ s --- - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -413,7 +413,7 @@ s:uri() ... s:incarnation() --- -- 1 +- cdata {version = 1ULL} ... s:payload_cdata() --- @@ -490,7 +490,7 @@ s1:member_by_uuid(s:uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -721,7 +721,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- 1 +- cdata {version = 1ULL} ... s1:set_payload('payload') --- @@ -732,7 +732,7 @@ while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- 2 +- cdata {version = 2ULL} ... s1:set_payload('payload2') --- @@ -743,7 +743,7 @@ while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- 3 +- cdata {version = 3ULL} ... s1:delete() --- @@ -778,7 +778,7 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -791,13 +791,13 @@ iterate() - - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -810,19 +810,19 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 - - 00000000-0000-1000-8000-000000000003 - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000003 payload_size: 0 - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -906,7 +906,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- 3 +- cdata {version = 3ULL} ... s1:cfg({heartbeat_rate = 0.01}) --- @@ -932,7 +932,7 @@ p ... s1_view:incarnation() --- -- 3 +- cdata {version = 3ULL} ... s1:delete() --- @@ -964,7 +964,7 @@ s2 --- - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1210,7 +1210,7 @@ s2:member_by_uuid(s1:self():uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -1287,13 +1287,14 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... e_list --- -- - is_update: true +- - is_new_version: true + is_update: true is_new_payload: true is_new_uri: true is_new: true @@ -1328,7 +1329,7 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1370,17 +1371,17 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: alive - incarnation: 2 + incarnation: cdata {version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 - uri: 127.0.0.1: status: alive - incarnation: 2 + incarnation: cdata {version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 ... @@ -1389,12 +1390,14 @@ e_list - - is_new_payload: true is_new: true is_update: true - - is_new_payload: true - is_update: true + - is_new_version: true + is_new_payload: true is_new_incarnation: true - - is_new_payload: true is_update: true + - is_new_version: true + is_new_payload: true is_new_incarnation: true + is_update: true ... ctx_list --- @@ -1427,12 +1430,12 @@ m_list --- - - uri: 127.0.0.1: status: left - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: left - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... diff --git a/test/unit/swim.c b/test/unit/swim.c index 0977e0969476..63f816d4a299 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -397,8 +397,8 @@ swim_test_refute(void) "new incarnation has reached S1 with a next round message"); swim_cluster_restart_node(cluster, 1); - is(swim_cluster_member_incarnation(cluster, 1, 1), 0, - "after restart S2's incarnation is 0 again"); + is(swim_cluster_member_incarnation(cluster, 1, 1).version, 0, + "after restart S2's incarnation is default again"); is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, "S2 learned its old bigger incarnation 1 from S0"); @@ -698,8 +698,8 @@ swim_test_payload_basic(void) is(swim_cluster_member_set_payload(cluster, 0, s0_payload, s0_payload_size), 0, "payload is set"); - is(swim_cluster_member_incarnation(cluster, 0, 0), 1, - "incarnation is incremeted on each payload update"); + is(swim_cluster_member_incarnation(cluster, 0, 0).version, 1, + "version is incremented on each payload update"); const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size); ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0, "payload is successfully obtained back"); @@ -712,8 +712,8 @@ swim_test_payload_basic(void) is(swim_cluster_member_set_payload(cluster, 0, s0_payload, s0_payload_size), 0, "payload is changed"); - is(swim_cluster_member_incarnation(cluster, 0, 0), 2, - "incarnation is incremeted on each payload update"); + is(swim_cluster_member_incarnation(cluster, 0, 0).version, 2, + "version is incremented on each payload update"); is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, s0_payload_size, cluster_size), 0, "second payload is disseminated"); @@ -759,9 +759,9 @@ swim_test_payload_refutation(void) * The test checks the following case. Assume there are 3 * nodes: S1, S2, S3. They all know each other. S1 sets * new payload, S2 and S3 knows that. They all see that S1 - * has incarnation 1 and payload P1. + * has version 1 and payload P1. * - * Now S1 changes payload to P2. Its incarnation becomes + * Now S1 changes payload to P2. Its version becomes * 2. During next entire round its round messages are * lost, however ACKs work ok. */ @@ -774,9 +774,9 @@ swim_test_payload_refutation(void) swim_run_for(3); swim_cluster_drop_components(cluster, 0, NULL, 0); - is(swim_cluster_member_incarnation(cluster, 1, 0), 2, - "S2 sees new incarnation of S1"); - is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + is(swim_cluster_member_incarnation(cluster, 1, 0).version, 2, + "S2 sees new version of S1"); + is(swim_cluster_member_incarnation(cluster, 2, 0).version, 2, "S3 does the same"); const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size); @@ -794,7 +794,7 @@ swim_test_payload_refutation(void) /* * Now S1's payload TTD is 0, but via ACKs S1 sent its new - * incarnation to S2 and S3. Despite that they should + * version to S2 and S3. Despite that they should * apply new S1's payload via anti-entropy. Next lines * test that: * @@ -821,15 +821,15 @@ swim_test_payload_refutation(void) ok(size == s0_new_payload_size && memcmp(tmp, s0_new_payload, size) == 0, "S2 learned S1's payload via anti-entropy"); - is(swim_cluster_member_incarnation(cluster, 1, 0), 2, - "incarnation still is the same"); + is(swim_cluster_member_incarnation(cluster, 1, 0).version, 2, + "version still is the same"); tmp = swim_cluster_member_payload(cluster, 2, 0, &size); ok(size == s0_old_payload_size && memcmp(tmp, s0_old_payload, size) == 0, "S3 was blocked and does not know anything"); - is(swim_cluster_member_incarnation(cluster, 2, 0), 2, - "incarnation still is the same"); + is(swim_cluster_member_incarnation(cluster, 2, 0).version, 2, + "version still is the same"); /* S1 will not participate in the tests further. */ swim_cluster_set_drop(cluster, 0, 100); @@ -855,7 +855,7 @@ swim_test_payload_refutation(void) /* * Now check the case (3) - S3 accepts new S1's payload - * from S2. Even knowing the same S1's incarnation. + * from S2. Even knowing the same S1's version. */ swim_cluster_set_drop(cluster, 1, 0); swim_cluster_set_drop_out(cluster, 2, 100); @@ -998,7 +998,7 @@ swim_cluster_delete_f(va_list ap) static void swim_test_triggers(void) { - swim_start_test(22); + swim_start_test(23); struct swim_cluster *cluster = swim_cluster_new(2); swim_cluster_set_ack_timeout(cluster, 1); struct trigger_ctx tctx, tctx2; @@ -1034,8 +1034,8 @@ swim_test_triggers(void) swim_cluster_run_triggers(cluster); is(tctx.counter, 3, "self payload is updated"); is(tctx.ctx.member, swim_self(s1), "self is set as a member"); - is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION, - "both incarnation and payload events are presented"); + is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_VERSION, + "both version and payload events are presented"); swim_cluster_set_drop(cluster, 1, 100); fail_if(swim_cluster_wait_status(cluster, 0, 1, @@ -1089,7 +1089,7 @@ swim_test_triggers(void) if (tctx.ctx.member != NULL) swim_member_unref(tctx.ctx.member); - /* Check that recfg fires incarnation update trigger. */ + /* Check that recfg fires version update trigger. */ s1 = swim_new(); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; @@ -1100,8 +1100,10 @@ swim_test_triggers(void) fail_if(swim_cfg(s1, "127.0.0.1:2", -1, -1, -1, NULL) != 0); while (tctx.ctx.events == 0) fiber_sleep(0); - is(tctx.ctx.events, SWIM_EV_NEW_URI | SWIM_EV_NEW_INCARNATION, - "local URI update warns about incarnation update"); + is(tctx.ctx.events, SWIM_EV_NEW_URI | SWIM_EV_NEW_VERSION, + "local URI update warns about version update"); + ok((tctx.ctx.events & SWIM_EV_NEW_INCARNATION) != 0, + "version is a part of incarnation, so the latter is updated too"); swim_delete(s1); if (tctx.ctx.member != NULL) diff --git a/test/unit/swim.result b/test/unit/swim.result index 2968a2da7099..8d653477b64d 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -88,7 +88,7 @@ ok 7 - subtests 1..4 ok 1 - S2 increments its own incarnation to refute its suspicion ok 2 - new incarnation has reached S1 with a next round message - ok 3 - after restart S2's incarnation is 0 again + ok 3 - after restart S2's incarnation is default again ok 4 - S2 learned its old bigger incarnation 1 from S0 ok 8 - subtests *** swim_test_refute: done *** @@ -158,25 +158,25 @@ ok 15 - subtests ok 2 - can not set too big payload ok 3 - diag says too big ok 4 - payload is set - ok 5 - incarnation is incremeted on each payload update + ok 5 - version is incremented on each payload update ok 6 - payload is successfully obtained back ok 7 - payload is disseminated ok 8 - payload is changed - ok 9 - incarnation is incremeted on each payload update + ok 9 - version is incremented on each payload update ok 10 - second payload is disseminated ok 11 - third payload is disseminated via anti-entropy ok 16 - subtests *** swim_test_payload_basic: done *** *** swim_test_payload_refutation *** 1..11 - ok 1 - S2 sees new incarnation of S1 + ok 1 - S2 sees new version of S1 ok 2 - S3 does the same ok 3 - but S2 does not known the new payload ok 4 - as well as S3 ok 5 - S2 learned S1's payload via anti-entropy - ok 6 - incarnation still is the same + ok 6 - version still is the same ok 7 - S3 was blocked and does not know anything - ok 8 - incarnation still is the same + ok 8 - version still is the same ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it ok 10 - S3 still does not know anything ok 11 - S3 learns S1's payload from S2 @@ -201,7 +201,7 @@ ok 19 - subtests ok 20 - subtests *** swim_test_slow_net: done *** *** swim_test_triggers *** - 1..22 + 1..23 ok 1 - trigger is fired ok 2 - is not deleted ok 3 - ctx.member is set @@ -211,7 +211,7 @@ ok 20 - subtests ok 7 - mask says that ok 8 - self payload is updated ok 9 - self is set as a member - ok 10 - both incarnation and payload events are presented + ok 10 - both version and payload events are presented ok 11 - suspicion fired a trigger ok 12 - status suspected ok 13 - death fired a trigger @@ -224,7 +224,8 @@ ok 20 - subtests ok 20 - non-yielding still is not ok 21 - trigger is not deleted until all currently sleeping triggers are finished # now all the triggers are done and deleted - ok 22 - local URI update warns about incarnation update + ok 22 - local URI update warns about version update + ok 23 - version is a part of incarnation, so the latter is updated too ok 21 - subtests *** swim_test_triggers: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 463c6239013f..72149b3536d8 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -348,14 +348,17 @@ swim_cluster_member_status(struct swim_cluster *cluster, int node_id, return swim_member_status(m); } -uint64_t +struct swim_incarnation swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id) { const struct swim_member *m = swim_cluster_member_view(cluster, node_id, member_id); - if (m == NULL) - return UINT64_MAX; + if (m == NULL) { + struct swim_incarnation inc; + swim_incarnation_create(&inc, UINT64_MAX); + return inc; + } return swim_member_incarnation(m); } @@ -713,7 +716,7 @@ struct swim_member_template { * to @a incarnation. */ bool need_check_incarnation; - uint64_t incarnation; + struct swim_incarnation incarnation; /** * True, if the payload should be checked to be equal to * @a payload of size @a payload_size. @@ -751,10 +754,10 @@ swim_member_template_set_status(struct swim_member_template *t, */ static inline void swim_member_template_set_incarnation(struct swim_member_template *t, - uint64_t incarnation) + uint64_t version) { t->need_check_incarnation = true; - t->incarnation = incarnation; + swim_incarnation_create(&t->incarnation, version); } /** @@ -778,7 +781,7 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) const struct swim_member *m = swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; - uint64_t incarnation; + struct swim_incarnation incarnation; const char *payload; int payload_size; if (m != NULL) { @@ -787,13 +790,14 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; - incarnation = 0; + swim_incarnation_create(&incarnation, 0); payload = NULL; payload_size = 0; } if (t->need_check_status && status != t->status) return false; - if (t->need_check_incarnation && incarnation != t->incarnation) + if (t->need_check_incarnation && + swim_incarnation_cmp(&incarnation, &t->incarnation) != 0) return false; if (t->need_check_payload && (payload_size != t->payload_size || @@ -848,12 +852,12 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t incarnation, + int member_id, uint64_t version, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_incarnation(&t, incarnation); + swim_member_template_set_incarnation(&t, version); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index fde84e39b709..587118c60e55 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -156,7 +156,7 @@ enum swim_member_status swim_cluster_member_status(struct swim_cluster *cluster, int node_id, int member_id); -uint64_t +struct swim_incarnation swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); @@ -218,13 +218,13 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, double timeout); /** - * Wait until a member with id @a member_id is seen with @a + * Wait until a member with id @a member_id is seen with needed * incarnation in the membership table of a member with id @a * node_id. At most @a timeout seconds. */ int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t incarnation, + int member_id, uint64_t version, double timeout); /**