diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2c3cfa9bcfa5..bb9e9f519b58 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -222,11 +222,21 @@ swim_incarnation_diff(const struct swim_incarnation *l, const struct swim_incarnation *r, enum swim_ev_mask *diff) { - if (l->version == r->version) { + if (l->generation == r->generation && + l->version == r->version) { *diff = 0; return 0; } *diff = SWIM_EV_NEW_VERSION; + if (l->generation < r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return -1; + } + if (l->generation > r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return 1; + } + assert(l->version != r->version); return l->version < r->version ? -1 : 1; } @@ -483,6 +493,15 @@ struct swim { struct ev_timer wait_ack_tick; /** GC state saying how to remove dead members. */ enum swim_gc_mode gc_mode; + /** + * Generation of that instance is set when the latter is + * created. It is actual only until the instance is + * configured. After that the instance can learn a bigger + * own generation from other members. Despite meaning + * in fact a wrong usage of SWIM generations, it is still + * possible. + */ + uint64_t initial_generation; /** * * Dissemination component @@ -1683,12 +1702,17 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, return -1; } struct swim_incarnation incarnation; - swim_incarnation_create(&incarnation, 0); + swim_incarnation_create(&incarnation, 0, 0); for (uint32_t i = 0; i < size; ++i) { uint64_t tmp; if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) return -1; switch (tmp) { + case SWIM_QUIT_GENERATION: + if (swim_decode_uint(pos, end, &incarnation.generation, + prefix, "generation") != 0) + return -1; + break; case SWIM_QUIT_VERSION: if (swim_decode_uint(pos, end, &incarnation.version, prefix, "version") != 0) @@ -1829,13 +1853,14 @@ swim_event_handler_f(va_list va) struct swim * -swim_new(void) +swim_new(uint64_t generation) { struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); if (swim == NULL) { diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); return NULL; } + swim->initial_generation = generation; swim->members = mh_swim_table_new(); if (swim->members == NULL) { free(swim); @@ -1941,7 +1966,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } struct swim_incarnation incarnation; - swim_incarnation_create(&incarnation, 0); + swim_incarnation_create(&incarnation, swim->initial_generation, + 0); swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &incarnation, NULL, 0); if (swim->self == NULL) @@ -2062,7 +2088,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { struct swim_incarnation inc; - swim_incarnation_create(&inc, 0); + swim_incarnation_create(&inc, 0, 0); member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &inc, NULL, -1); return member == NULL ? -1 : 0; diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index b8e44515ed37..4565eb976519 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -68,7 +68,7 @@ enum swim_gc_mode { * yields. */ struct swim * -swim_new(void); +swim_new(uint64_t generation); /** Check if a swim instance is configured. */ bool @@ -279,16 +279,17 @@ enum swim_ev_mask { SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, - SWIM_EV_NEW_VERSION = 0b00001000, + SWIM_EV_NEW_GENERATION = 0b00001000, + SWIM_EV_NEW_VERSION = 0b00010000, /* * Shortcut to check for update of any part of * incarnation. */ - SWIM_EV_NEW_INCARNATION = 0b00001000, - SWIM_EV_NEW_PAYLOAD = 0b00010000, + SWIM_EV_NEW_INCARNATION = 0b00011000, + SWIM_EV_NEW_PAYLOAD = 0b00100000, /* Shortcut to check for any update. */ - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; /** On member event trigger context. */ diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h index f105040c6541..cee109c8ccba 100644 --- a/src/lib/swim/swim_constants.h +++ b/src/lib/swim/swim_constants.h @@ -63,6 +63,12 @@ extern const char *swim_member_status_strs[]; * actual. */ struct swim_incarnation { + /** + * Generation is a persistent part of incarnation. It is + * set by a user on SWIM start, and normally is not + * changed during instance lifetime. + */ + uint64_t generation; /** * Version is a volatile part of incarnation. It is * managed by SWIM fully internally. @@ -72,8 +78,10 @@ struct swim_incarnation { /** Create a new incarnation value. */ static inline void -swim_incarnation_create(struct swim_incarnation *i, uint64_t version) +swim_incarnation_create(struct swim_incarnation *i, uint64_t generation, + uint64_t version) { + i->generation = generation; i->version = version; } diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 31c931b98899..615bbf685ecb 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -161,8 +161,10 @@ swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, */ static inline void swim_incarnation_bin_create(struct swim_incarnation_bin *bin, - uint8_t version_key) + uint8_t generation_key, uint8_t version_key) { + bin->k_generation = generation_key; + bin->m_generation = 0xcf; bin->k_version = version_key; bin->m_version = 0xcf; } @@ -175,6 +177,7 @@ static inline void swim_incarnation_bin_fill(struct swim_incarnation_bin *bin, const struct swim_incarnation *incarnation) { + bin->v_generation = mp_bswap_u64(incarnation->generation); bin->v_version = mp_bswap_u64(incarnation->version); } @@ -270,6 +273,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_GENERATION: + if (swim_decode_uint(pos, end, &def->incarnation.generation, + prefix, "member generation") != 0) + return -1; + break; case SWIM_MEMBER_VERSION: if (swim_decode_uint(pos, end, &def->incarnation.version, prefix, "member version") != 0) @@ -342,7 +350,8 @@ swim_fd_header_bin_create(struct swim_fd_header_bin *header, header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - swim_incarnation_bin_create(&header->incarnation, SWIM_FD_VERSION); + swim_incarnation_bin_create(&header->incarnation, SWIM_FD_GENERATION, + SWIM_FD_VERSION); swim_incarnation_bin_fill(&header->incarnation, incarnation); } @@ -378,6 +387,12 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, } def->type = key; break; + case SWIM_FD_GENERATION: + if (swim_decode_uint(pos, end, + &def->incarnation.generation, + prefix, "generation") != 0) + return -1; + break; case SWIM_FD_VERSION: if (swim_decode_uint(pos, end, &def->incarnation.version, prefix, @@ -429,6 +444,7 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; swim_incarnation_bin_create(&passport->incarnation, + SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION); } @@ -591,7 +607,8 @@ swim_quit_bin_create(struct swim_quit_bin *header, header->k_quit = SWIM_QUIT; assert(mp_sizeof_map(SWIM_INCARNATION_BIN_SIZE) == 1); header->m_quit = 0x80 | SWIM_INCARNATION_BIN_SIZE; - swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_VERSION); + swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_GENERATION, + SWIM_QUIT_VERSION); swim_incarnation_bin_fill(&header->incarnation, incarnation); } diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index ee3bc03faa94..595606499d79 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -72,6 +72,7 @@ enum { * | | * | SWIM_FAILURE_DETECTION: { | * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_GENERATION: uint, | * | SWIM_FD_VERSION: uint | * | }, | * | | @@ -83,6 +84,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -97,6 +99,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -106,6 +109,7 @@ enum { * | OR/AND | * | | * | SWIM_QUIT: { | + * | SWIM_QUIT_GENERATION: uint, | * | SWIM_QUIT_VERSION: uint | * | } | * | } | @@ -118,7 +122,7 @@ enum { * Structures storing an incarnation should use this size * so as to correctly encode MessagePack map header. */ - SWIM_INCARNATION_BIN_SIZE = 1, + SWIM_INCARNATION_BIN_SIZE = 2, }; /** @@ -126,6 +130,12 @@ enum { * value. It expects its owner is a map. */ struct PACKED swim_incarnation_bin { + /** mp_encode_uint(generation key) */ + uint8_t k_generation; + /** mp_encode_uint(64bit generation) */ + uint8_t m_generation; + uint64_t v_generation; + /** mp_encode_uint(version key) */ uint8_t k_version; /** mp_encode_uint(64bit version) */ @@ -209,6 +219,7 @@ enum swim_fd_key { * it was considered dead, but ping/ack with greater * incarnation was received from it. */ + SWIM_FD_GENERATION, SWIM_FD_VERSION, }; @@ -225,7 +236,7 @@ extern const char *swim_fd_msg_type_strs[]; struct PACKED swim_fd_header_bin { /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ uint8_t k_header; - /** mp_encode_map(2) */ + /** mp_encode_map(3) */ uint8_t m_header; /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ @@ -233,7 +244,7 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** SWIM_FD_VERSION */ + /** SWIM_FD_GENERATION, SWIM_FD_VERSION */ struct swim_incarnation_bin incarnation; }; @@ -309,6 +320,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION, SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, @@ -340,7 +352,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * date and TTD is > 0. */ struct PACKED swim_passport_bin { - /** mp_encode_map(5 or 6) */ + /** mp_encode_map(6 or 7) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -358,7 +370,7 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** SWIM_MEMBER_VERSION */ + /** SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION */ struct swim_incarnation_bin incarnation; }; @@ -579,17 +591,18 @@ swim_route_bin_create(struct swim_route_bin *route, enum swim_quit_key { /** Incarnation to ignore old quit messages. */ - SWIM_QUIT_VERSION = 0, + SWIM_QUIT_GENERATION = 0, + SWIM_QUIT_VERSION, }; /** Quit section. Describes voluntary quit from the cluster. */ struct PACKED swim_quit_bin { /** mp_encode_uint(SWIM_QUIT) */ uint8_t k_quit; - /** mp_encode_map(1) */ + /** mp_encode_map(2) */ uint8_t m_quit; - /** SWIM_QUIT_VERSION */ + /** SWIM_QUIT_GENERATION, SWIM_QUIT_VERSION */ struct swim_incarnation_bin incarnation; }; diff --git a/src/lua/swim.c b/src/lua/swim.c index c3a0a9911659..26646f41f042 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -67,7 +67,8 @@ lua_swim_on_member_event(struct lua_State *L) static int lua_swim_new(struct lua_State *L) { - struct swim *s = swim_new(); + uint64_t generation = luaL_checkuint64(L, 1); + struct swim *s = swim_new(generation); *(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s; if (s != NULL) return 1; diff --git a/src/lua/swim.lua b/src/lua/swim.lua index 0686590cbe49..b6d826ca0e1c 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -28,14 +28,16 @@ ffi.cdef[[ SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, - SWIM_EV_NEW_VERSION = 0b00001000, - SWIM_EV_NEW_INCARNATION = 0b00001000, - SWIM_EV_NEW_PAYLOAD = 0b00010000, - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_NEW_GENERATION = 0b00001000, + SWIM_EV_NEW_VERSION = 0b00010000, + SWIM_EV_NEW_INCARNATION = 0b00011000, + SWIM_EV_NEW_PAYLOAD = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; struct swim_incarnation { + uint64_t generation; uint64_t version; }; @@ -131,16 +133,19 @@ local swim_member_status_strs = { local swim_incarnation_mt = { __eq = function(l, r) - return l.version == r.version + return l.version == r.version and l.generation == r.generation end, __lt = function(l, r) - return l.version < r.version + return l.generation < r.generation or + l.generation == r.generation and l.version < r.version end, __le = function(l, r) - return l.version <= r.version + return l.generation < r.generation or + l.generation == r.generation and l.version <= r.version end, __tostring = function(i) - return string.format('cdata {version = %s}', i.version) + return string.format('cdata {generation = %s, version = %s}', + i.generation, i.version) end, } ffi.metatype(ffi.typeof('struct swim_incarnation'), swim_incarnation_mt) @@ -747,6 +752,9 @@ local swim_member_event_index = { is_new_incarnation = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 end, + is_new_generation = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_GENERATION) ~= 0 + end, is_new_version = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_VERSION) ~= 0 end, @@ -938,7 +946,19 @@ local cache_table_mt = { __mode = 'v' } -- provided. -- local function swim_new(cfg) - local ptr = internal.swim_new() + local generation = 0 + if cfg and type(cfg) == 'table' and cfg.generation then + generation = cfg.generation + if type(generation) ~= 'number' or generation < 0 or + math.floor(generation) ~= generation then + return error('swim.new: generation should be non-negative integer') + end + cfg = table.copy(cfg) + -- Nullify in order to do not raise errors in the + -- following swim:cfg() about unknown parameters. + cfg.generation = nil + end + local ptr = internal.swim_new(generation) if ptr == nil then return nil, box.error.last() end diff --git a/test/swim/swim.result b/test/swim/swim.result index e3b89f809454..05a108614855 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -307,7 +307,7 @@ old_self --- - uri: 127.0.0.1: status: left - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -391,7 +391,7 @@ s --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -413,7 +413,7 @@ s:uri() ... s:incarnation() --- -- cdata {version = 1ULL} +- cdata {generation = 0ULL, version = 1ULL} ... s:payload_cdata() --- @@ -490,7 +490,7 @@ s1:member_by_uuid(s:uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -721,7 +721,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- cdata {version = 1ULL} +- cdata {generation = 0ULL, version = 1ULL} ... s1:set_payload('payload') --- @@ -732,7 +732,7 @@ while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- cdata {version = 2ULL} +- cdata {generation = 0ULL, version = 2ULL} ... s1:set_payload('payload2') --- @@ -743,7 +743,7 @@ while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- cdata {version = 3ULL} +- cdata {generation = 0ULL, version = 3ULL} ... s1:delete() --- @@ -778,7 +778,7 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -791,13 +791,13 @@ iterate() - - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -810,19 +810,19 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 - - 00000000-0000-1000-8000-000000000003 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000003 payload_size: 0 - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -906,7 +906,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- cdata {version = 3ULL} +- cdata {generation = 0ULL, version = 3ULL} ... s1:cfg({heartbeat_rate = 0.01}) --- @@ -932,7 +932,7 @@ p ... s1_view:incarnation() --- -- cdata {version = 3ULL} +- cdata {generation = 0ULL, version = 3ULL} ... s1:delete() --- @@ -964,7 +964,7 @@ s2 --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1210,7 +1210,7 @@ s2:member_by_uuid(s1:self():uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -1287,7 +1287,7 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -1329,7 +1329,7 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1371,17 +1371,17 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 2ULL} + incarnation: cdata {generation = 0ULL, version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 2ULL} + incarnation: cdata {generation = 0ULL, version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 ... @@ -1390,12 +1390,12 @@ e_list - - is_new_payload: true is_new: true is_update: true - - is_new_version: true - is_new_payload: true + - is_new_payload: true + is_new_version: true is_new_incarnation: true is_update: true - - is_new_version: true - is_new_payload: true + - is_new_payload: true + is_new_version: true is_new_incarnation: true is_update: true ... @@ -1430,12 +1430,12 @@ m_list --- - - uri: 127.0.0.1: status: left - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: left - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1487,6 +1487,93 @@ ctx_list s1:delete() --- ... +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +--- +... +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +--- +... +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +--- +- true +... +s1_view = s2:member_by_uuid(uuid(1)) +--- +... +s1:set_payload('payload 1') +--- +- true +... +while not s1_view:payload() do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 1 +... +s1:self():incarnation() +--- +- cdata {generation = 0ULL, version = 2ULL} +... +-- Now S2 knows S1's payload as 'payload 1'. +new_gen_ev = nil +--- +... +_ = s2:on_member_event(function(m, e) if e:is_new_generation() then new_gen_ev = e end end) +--- +... +s1:delete() +--- +... +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +--- +... +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +--- +- true +... +s1:set_payload('payload 2') +--- +- true +... +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same version. +s1:self():incarnation() +--- +- cdata {generation = 1ULL, version = 2ULL} +... +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 2 +... +new_gen_ev +--- +- is_new_version: true + is_new_generation: true + is_update: true + is_new_payload: true + is_new_uri: true + is_new_incarnation: true +... +-- Generation is static parameter. +s1:cfg({generation = 5}) +--- +- error: 'swim:cfg: unknown option generation' +... +s1:delete() +--- +... +s2:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index 576219b4d7d6..b4e4adafcf43 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -499,4 +499,40 @@ ctx_list s1:delete() +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +s1_view = s2:member_by_uuid(uuid(1)) +s1:set_payload('payload 1') +while not s1_view:payload() do fiber.sleep(0.1) end +s1_view:payload() +s1:self():incarnation() + +-- Now S2 knows S1's payload as 'payload 1'. + +new_gen_ev = nil +_ = s2:on_member_event(function(m, e) if e:is_new_generation() then new_gen_ev = e end end) +s1:delete() +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +s1:set_payload('payload 2') +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same version. +s1:self():incarnation() + +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +s1_view:payload() +new_gen_ev + +-- Generation is static parameter. +s1:cfg({generation = 5}) + +s1:delete() +s2:delete() + test_run:cmd("clear filter") diff --git a/test/unit/swim.c b/test/unit/swim.c index 63f816d4a299..3486d3f73fee 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -133,7 +133,7 @@ swim_test_cfg(void) { swim_start_test(16); - struct swim *s = swim_new(); + struct swim *s = swim_new(0); assert(s != NULL); is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); @@ -149,7 +149,7 @@ swim_test_cfg(void) is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ "URI"); - struct swim *s2 = swim_new(); + struct swim *s2 = swim_new(0); assert(s2 != NULL); const char *bad_uri1 = "127.1.1.1.1.1.1:1"; const char *bad_uri2 = "google.com:1"; @@ -379,7 +379,7 @@ swim_test_probe(void) static void swim_test_refute(void) { - swim_start_test(4); + swim_start_test(6); struct swim_cluster *cluster = swim_cluster_new(2); swim_cluster_set_ack_timeout(cluster, 2); @@ -391,16 +391,21 @@ swim_test_refute(void) fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 4) != 0); swim_cluster_set_drop(cluster, 1, 0); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + is(swim_cluster_wait_incarnation(cluster, 1, 1, 0, 1, 1), 0, "S2 increments its own incarnation to refute its suspicion"); - is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0, + is(swim_cluster_wait_incarnation(cluster, 0, 1, 0, 1, 1), 0, "new incarnation has reached S1 with a next round message"); swim_cluster_restart_node(cluster, 1); - is(swim_cluster_member_incarnation(cluster, 1, 1).version, 0, - "after restart S2's incarnation is default again"); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, - "S2 learned its old bigger incarnation 1 from S0"); + struct swim_incarnation inc = + swim_cluster_member_incarnation(cluster, 1, 1); + is(inc.version, 0, "after restart S2's version is 0 again"); + is(inc.generation, 1, "but generation is new"); + + is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 0, 1), 0, + "S2 disseminates new incarnation, S1 learns it"); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "and considers S2 alive"); swim_cluster_delete(cluster); swim_finish_test(); @@ -527,7 +532,7 @@ swim_test_quit(void) * old LEFT status. */ swim_cluster_restart_node(cluster, 0); - is(swim_cluster_wait_incarnation(cluster, 0, 0, 1, 2), 0, + is(swim_cluster_wait_incarnation(cluster, 0, 0, 1, 0, 2), 0, "quited member S1 has returned and refuted the old status"); fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0); /* @@ -555,9 +560,14 @@ swim_test_quit(void) is(swim_cluster_member_status(cluster, 2, 0), swim_member_status_MAX, "S3 did not add S1 back when received its 'quit'"); - /* Now allow S2 to get the 'self-quit' message. */ + /* + * Now allow S2 to get the 'self-quit' message. Note, + * together with 'quit' it receives new generation, which + * belonged to S1 before. Of course, it is a bug, but in + * a user application - UUIDs are messed. + */ swim_cluster_unblock_io(cluster, 1); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 2, 0), 0, + is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1, 0), 0, "S2 finally got 'quit' message from S1, but with its 'own' UUID - "\ "refute it") swim_cluster_delete(cluster); @@ -1090,7 +1100,7 @@ swim_test_triggers(void) swim_member_unref(tctx.ctx.member); /* Check that recfg fires version update trigger. */ - s1 = swim_new(); + s1 = swim_new(0); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; fail_if(swim_cfg(s1, "127.0.0.1:1", -1, -1, -1, &uuid) != 0); @@ -1113,10 +1123,39 @@ swim_test_triggers(void) swim_finish_test(); } +static void +swim_test_generation(void) +{ + swim_start_test(3); + + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_interconnect(cluster, 0, 1); + + const char *p1 = "payload 1"; + int p1_size = strlen(p1); + swim_cluster_member_set_payload(cluster, 0, p1, p1_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p1, p1_size, 1), 0, + "S1 disseminated its payload to S2"); + + swim_cluster_restart_node(cluster, 0); + const char *p2 = "payload 2"; + int p2_size = strlen(p2); + swim_cluster_member_set_payload(cluster, 0, p2, p2_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p2, p2_size, 2), 0, + "S1 restarted and set another payload. Without generation it could "\ + "lead to never disseminated new payload."); + is(swim_cluster_member_incarnation(cluster, 1, 0).generation, 1, + "S2 sees new generation of S1"); + + swim_cluster_delete(cluster); + + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(21); + swim_start_test(22); (void) ap; swim_test_ev_init(); @@ -1143,6 +1182,7 @@ main_f(va_list ap) swim_test_encryption(); swim_test_slow_net(); swim_test_triggers(); + swim_test_generation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 8d653477b64d..04a2778e69b0 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..21 +1..22 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -85,11 +85,13 @@ ok 6 - subtests ok 7 - subtests *** swim_test_probe: done *** *** swim_test_refute *** - 1..4 + 1..6 ok 1 - S2 increments its own incarnation to refute its suspicion ok 2 - new incarnation has reached S1 with a next round message - ok 3 - after restart S2's incarnation is default again - ok 4 - S2 learned its old bigger incarnation 1 from S0 + ok 3 - after restart S2's version is 0 again + ok 4 - but generation is new + ok 5 - S2 disseminates new incarnation, S1 learns it + ok 6 - and considers S2 alive ok 8 - subtests *** swim_test_refute: done *** *** swim_test_basic_gossip *** @@ -228,4 +230,11 @@ ok 20 - subtests ok 23 - version is a part of incarnation, so the latter is updated too ok 21 - subtests *** swim_test_triggers: done *** + *** swim_test_generation *** + 1..3 + ok 1 - S1 disseminated its payload to S2 + ok 2 - S1 restarted and set another payload. Without generation it could lead to never disseminated new payload. + ok 3 - S2 sees new generation of S1 +ok 22 - subtests + *** swim_test_generation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 72149b3536d8..c56af22338b4 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -157,6 +157,8 @@ struct swim_node { * that instance. */ struct tt_uuid uuid; + /** Generation to increment on restart. */ + uint64_t generation; /** * Filter to drop packets with a certain probability * from/to a specified direction. @@ -212,7 +214,8 @@ swim_test_event_cb(struct trigger *trigger, void *event) static inline void swim_node_create(struct swim_node *n, int id) { - n->swim = swim_new(); + n->generation = 0; + n->swim = swim_new(0); assert(n->swim != NULL); struct trigger *t = (struct trigger *) malloc(sizeof(*t)); trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); @@ -356,7 +359,7 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, swim_cluster_member_view(cluster, node_id, member_id); if (m == NULL) { struct swim_incarnation inc; - swim_incarnation_create(&inc, UINT64_MAX); + swim_incarnation_create(&inc, UINT64_MAX, UINT64_MAX); return inc; } return swim_member_incarnation(m); @@ -405,7 +408,7 @@ swim_cluster_restart_node(struct swim_cluster *cluster, int i) &n->uuid)); swim_delete(s); } - s = swim_new(); + s = swim_new(++n->generation); assert(s != NULL); int rc = swim_cfg(s, uri, -1, cluster->ack_timeout, cluster->gc_mode, &n->uuid); @@ -754,10 +757,10 @@ swim_member_template_set_status(struct swim_member_template *t, */ static inline void swim_member_template_set_incarnation(struct swim_member_template *t, - uint64_t version) + uint64_t generation, uint64_t version) { t->need_check_incarnation = true; - swim_incarnation_create(&t->incarnation, version); + swim_incarnation_create(&t->incarnation, generation, version); } /** @@ -790,7 +793,7 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; - swim_incarnation_create(&incarnation, 0); + swim_incarnation_create(&incarnation, 0, 0); payload = NULL; payload_size = 0; } @@ -852,12 +855,12 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t version, - double timeout) + int member_id, uint64_t generation, + uint64_t version, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_incarnation(&t, version); + swim_member_template_set_incarnation(&t, generation, version); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 587118c60e55..7064aa67d56d 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -224,8 +224,8 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, */ int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t version, - double timeout); + int member_id, uint64_t generation, + uint64_t version, double timeout); /** * Wait until a member with id @a member_id is seen with