Skip to content

Commit

Permalink
swim: introduce incarnation.generation
Browse files Browse the repository at this point in the history
SWIM uses incarnation to refute old information, but it is not
enough when restarts are possible. If an instance restarts, its
incarnation is reset to 0. After several local and fast updates
it gets N. But it is possible, that other instances also know
incarnation of this instance as N, from its previous life, but
with different information. They will never take new version of
data, because their current version is also considered actual.

As a result, incarnation is not enough. There was a necessity to
create a persistent part of incarnation. This patch introduces it
and calls 'generation'. As an additional profit, generation
allows to react on instance restart in user defined triggers.

Closes #4280

@TarantoolBot document
Title: SWIM generation

Incarnation now is a two-part value {generation, version}.

Version is exactly the same that is called 'incarnation' in the
original SWIM paper, and before this patch. It is a volatile
automatically managed number to refute false gossips and update
information on remote nodes.

Generation is a new persistent part of incarnation allowing users
to refute old pieces of information left from previous lifes of an
instance. It is a static attribute set when a SWIM instance is
created, and can't be changed without restarting the instance.

A one could think of incarnation as 128 bit unsigned integer,
where upper 64 bits are static and persistent, while lower 64 bits
are volatile.

Generation not only helps with overriding old information, but
also can be used to detect restarts in user defined triggers,
because it can be updated only when a SWIM instance is recreated.

How to set generation:
```Lua
swim = require('swim')
s = swim.new({generation = <value>})
```
Generation can't be set in `swim:cfg`. If it is omitted, then 0
is used by default. But be careful - if the instance is started
not a first time, it is safer to use a new generation. Ideally it
should be persisted somehow: in a file, in a space, in a global
service. Each next `swim.new()` should take incremented value of
generation.

How is incarnation update changed:
```Lua
swim = require('swim')
s = swim.new()
s:on_member_event(function(m, e)
    if e:is_new_incarnation() then
        if e:is_new_generation() then
            -- Process restart.
        end
        if e:is_new_version() then
            -- Process version update. It means
            -- the member is somehow changed.
        end
    end
end)
```

Note, `is_new_incarnation` is now a shortcut for checking update
of generation, or version, or both.

Method `member:incarnation()` is changed. Now it returns cdata
object with attributes `version` and `generation`. Usage:
```Lua
incarnation = member:incarnation()
tarantool> incarnation.version
---
- 15
...
tarantool> incarnation.generation
---
- 2
...
```

These objects can be compared using comparison operators:
```Lua
member1:incarnation() < member2:incarnation
member1:incarnation() >= member2:incarnation()
-- Any operator works: ==, <, >, <=, >=, ~=.
```

Being printed, incarnation shows a string with both generation
and incarnation.

Binary protocol is updated. Now Protocol Logic section looks like
this:

```
+-------------------Protocol logic section--------------------+
| map {                                                       |
|     0 = SWIM_SRC_UUID: 16 byte UUID,                        |
|                                                             |
|                 AND                                         |
|                                                             |
|     2 = SWIM_FAILURE_DETECTION: map {                       |
|         0 = SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,  |
|         1 = SWIM_FD_GENERATION: uint,                       |
|         2 = SWIM_FD_VERSION: uint                           |
|     },                                                      |
|                                                             |
|               OR/AND                                        |
|                                                             |
|     3 = SWIM_DISSEMINATION: array [                         |
|         map {                                               |
|             0 = SWIM_MEMBER_STATUS: uint,                   |
|                                     enum member_status,     |
|             1 = SWIM_MEMBER_ADDRESS: uint, ip,              |
|             2 = SWIM_MEMBER_PORT: uint, port,               |
|             3 = SWIM_MEMBER_UUID: 16 byte UUID,             |
|             4 = SWIM_MEMBER_GENERATION: uint,               |
|             5 = SWIM_MEMBER_VERSION: uint,                  |
|             6 = SWIM_MEMBER_PAYLOAD: bin                    |
|         },                                                  |
|         ...                                                 |
|     ],                                                      |
|                                                             |
|               OR/AND                                        |
|                                                             |
|     1 = SWIM_ANTI_ENTROPY: array [                          |
|         map {                                               |
|             0 = SWIM_MEMBER_STATUS: uint,                   |
|                                     enum member_status,     |
|             1 = SWIM_MEMBER_ADDRESS: uint, ip,              |
|             2 = SWIM_MEMBER_PORT: uint, port,               |
|             3 = SWIM_MEMBER_UUID: 16 byte UUID,             |
|             4 = SWIM_MEMBER_GENERATION: uint,               |
|             5 = SWIM_MEMBER_VERSION: uint,                  |
|             6 = SWIM_MEMBER_PAYLOAD: bin                    |
|         },                                                  |
|         ...                                                 |
|     ],                                                      |
|                                                             |
|               OR/AND                                        |
|                                                             |
|     4 = SWIM_QUIT: map {                                    |
|         0 = SWIM_QUIT_GENERATION: uint,                     |
|         1 = SWIM_QUIT_VERSION: uint                         |
|     }                                                       |
| }                                                           |
+-------------------------------------------------------------+
```

Note - SWIM_FD_INCARNATION, SWIM_MEMBER_INCARNATION, and
SWIM_QUIT_INCARNATION disappeared. Incarnation is sent now in two
parts: version and generation.

SWIM_MEMBER_PAYLOAD got a new value.

This changes are legal because 1) the SWIM is not released yet,
so it is mutable, 2) I wanted to emphasize that 'generation' is
first/upper part of incarnation, 'version' is second/lower part.
  • Loading branch information
Gerold103 committed Jun 23, 2019
1 parent 3aecf9f commit 44b9a60
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 91 deletions.
36 changes: 31 additions & 5 deletions src/lib/swim/swim.c
Expand Up @@ -222,11 +222,21 @@ swim_incarnation_diff(const struct swim_incarnation *l,
const struct swim_incarnation *r,
enum swim_ev_mask *diff)
{
if (l->version == r->version) {
if (l->generation == r->generation &&
l->version == r->version) {
*diff = 0;
return 0;
}
*diff = SWIM_EV_NEW_VERSION;
if (l->generation < r->generation) {
*diff |= SWIM_EV_NEW_GENERATION;
return -1;
}
if (l->generation > r->generation) {
*diff |= SWIM_EV_NEW_GENERATION;
return 1;
}
assert(l->version != r->version);
return l->version < r->version ? -1 : 1;
}

Expand Down Expand Up @@ -483,6 +493,15 @@ struct swim {
struct ev_timer wait_ack_tick;
/** GC state saying how to remove dead members. */
enum swim_gc_mode gc_mode;
/**
* Generation of that instance is set when the latter is
* created. It is actual only until the instance is
* configured. After that the instance can learn a bigger
* own generation from other members. Despite meaning
* in fact a wrong usage of SWIM generations, it is still
* possible.
*/
uint64_t initial_generation;
/**
*
* Dissemination component
Expand Down Expand Up @@ -1683,12 +1702,17 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
return -1;
}
struct swim_incarnation incarnation;
swim_incarnation_create(&incarnation, 0);
swim_incarnation_create(&incarnation, 0, 0);
for (uint32_t i = 0; i < size; ++i) {
uint64_t tmp;
if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0)
return -1;
switch (tmp) {
case SWIM_QUIT_GENERATION:
if (swim_decode_uint(pos, end, &incarnation.generation,
prefix, "generation") != 0)
return -1;
break;
case SWIM_QUIT_VERSION:
if (swim_decode_uint(pos, end, &incarnation.version,
prefix, "version") != 0)
Expand Down Expand Up @@ -1829,13 +1853,14 @@ swim_event_handler_f(va_list va)


struct swim *
swim_new(void)
swim_new(uint64_t generation)
{
struct swim *swim = (struct swim *) calloc(1, sizeof(*swim));
if (swim == NULL) {
diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
return NULL;
}
swim->initial_generation = generation;
swim->members = mh_swim_table_new();
if (swim->members == NULL) {
free(swim);
Expand Down Expand Up @@ -1941,7 +1966,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
struct swim_incarnation incarnation;
swim_incarnation_create(&incarnation, 0);
swim_incarnation_create(&incarnation, swim->initial_generation,
0);
swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
&incarnation, NULL, 0);
if (swim->self == NULL)
Expand Down Expand Up @@ -2062,7 +2088,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
struct swim_incarnation inc;
swim_incarnation_create(&inc, 0);
swim_incarnation_create(&inc, 0, 0);
member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &inc,
NULL, -1);
return member == NULL ? -1 : 0;
Expand Down
13 changes: 7 additions & 6 deletions src/lib/swim/swim.h
Expand Up @@ -68,7 +68,7 @@ enum swim_gc_mode {
* yields.
*/
struct swim *
swim_new(void);
swim_new(uint64_t generation);

/** Check if a swim instance is configured. */
bool
Expand Down Expand Up @@ -279,16 +279,17 @@ enum swim_ev_mask {
SWIM_EV_NEW = 0b00000001,
SWIM_EV_NEW_STATUS = 0b00000010,
SWIM_EV_NEW_URI = 0b00000100,
SWIM_EV_NEW_VERSION = 0b00001000,
SWIM_EV_NEW_GENERATION = 0b00001000,
SWIM_EV_NEW_VERSION = 0b00010000,
/*
* Shortcut to check for update of any part of
* incarnation.
*/
SWIM_EV_NEW_INCARNATION = 0b00001000,
SWIM_EV_NEW_PAYLOAD = 0b00010000,
SWIM_EV_NEW_INCARNATION = 0b00011000,
SWIM_EV_NEW_PAYLOAD = 0b00100000,
/* Shortcut to check for any update. */
SWIM_EV_UPDATE = 0b00011110,
SWIM_EV_DROP = 0b00100000,
SWIM_EV_UPDATE = 0b00111110,
SWIM_EV_DROP = 0b01000000,
};

/** On member event trigger context. */
Expand Down
10 changes: 9 additions & 1 deletion src/lib/swim/swim_constants.h
Expand Up @@ -63,6 +63,12 @@ extern const char *swim_member_status_strs[];
* actual.
*/
struct swim_incarnation {
/**
* Generation is a persistent part of incarnation. It is
* set by a user on SWIM start, and normally is not
* changed during instance lifetime.
*/
uint64_t generation;
/**
* Version is a volatile part of incarnation. It is
* managed by SWIM fully internally.
Expand All @@ -72,8 +78,10 @@ struct swim_incarnation {

/** Create a new incarnation value. */
static inline void
swim_incarnation_create(struct swim_incarnation *i, uint64_t version)
swim_incarnation_create(struct swim_incarnation *i, uint64_t generation,
uint64_t version)
{
i->generation = generation;
i->version = version;
}

Expand Down
23 changes: 20 additions & 3 deletions src/lib/swim/swim_proto.c
Expand Up @@ -161,8 +161,10 @@ swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
*/
static inline void
swim_incarnation_bin_create(struct swim_incarnation_bin *bin,
uint8_t version_key)
uint8_t generation_key, uint8_t version_key)
{
bin->k_generation = generation_key;
bin->m_generation = 0xcf;
bin->k_version = version_key;
bin->m_version = 0xcf;
}
Expand All @@ -175,6 +177,7 @@ static inline void
swim_incarnation_bin_fill(struct swim_incarnation_bin *bin,
const struct swim_incarnation *incarnation)
{
bin->v_generation = mp_bswap_u64(incarnation->generation);
bin->v_version = mp_bswap_u64(incarnation->version);
}

Expand Down Expand Up @@ -270,6 +273,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member uuid") != 0)
return -1;
break;
case SWIM_MEMBER_GENERATION:
if (swim_decode_uint(pos, end, &def->incarnation.generation,
prefix, "member generation") != 0)
return -1;
break;
case SWIM_MEMBER_VERSION:
if (swim_decode_uint(pos, end, &def->incarnation.version,
prefix, "member version") != 0)
Expand Down Expand Up @@ -342,7 +350,8 @@ swim_fd_header_bin_create(struct swim_fd_header_bin *header,
header->k_type = SWIM_FD_MSG_TYPE;
header->v_type = type;

swim_incarnation_bin_create(&header->incarnation, SWIM_FD_VERSION);
swim_incarnation_bin_create(&header->incarnation, SWIM_FD_GENERATION,
SWIM_FD_VERSION);
swim_incarnation_bin_fill(&header->incarnation, incarnation);
}

Expand Down Expand Up @@ -378,6 +387,12 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
}
def->type = key;
break;
case SWIM_FD_GENERATION:
if (swim_decode_uint(pos, end,
&def->incarnation.generation,
prefix, "generation") != 0)
return -1;
break;
case SWIM_FD_VERSION:
if (swim_decode_uint(pos, end,
&def->incarnation.version, prefix,
Expand Down Expand Up @@ -429,6 +444,7 @@ swim_passport_bin_create(struct swim_passport_bin *passport)
passport->m_uuid = 0xc4;
passport->m_uuid_len = UUID_LEN;
swim_incarnation_bin_create(&passport->incarnation,
SWIM_MEMBER_GENERATION,
SWIM_MEMBER_VERSION);
}

Expand Down Expand Up @@ -591,7 +607,8 @@ swim_quit_bin_create(struct swim_quit_bin *header,
header->k_quit = SWIM_QUIT;
assert(mp_sizeof_map(SWIM_INCARNATION_BIN_SIZE) == 1);
header->m_quit = 0x80 | SWIM_INCARNATION_BIN_SIZE;
swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_VERSION);
swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_GENERATION,
SWIM_QUIT_VERSION);
swim_incarnation_bin_fill(&header->incarnation, incarnation);
}

Expand Down
29 changes: 21 additions & 8 deletions src/lib/swim/swim_proto.h
Expand Up @@ -72,6 +72,7 @@ enum {
* | |
* | SWIM_FAILURE_DETECTION: { |
* | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, |
* | SWIM_FD_GENERATION: uint, |
* | SWIM_FD_VERSION: uint |
* | }, |
* | |
Expand All @@ -83,6 +84,7 @@ enum {
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
* | SWIM_MEMBER_UUID: 16 byte UUID, |
* | SWIM_MEMBER_GENERATION: uint, |
* | SWIM_MEMBER_VERSION: uint, |
* | SWIM_MEMBER_PAYLOAD: bin |
* | }, |
Expand All @@ -97,6 +99,7 @@ enum {
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
* | SWIM_MEMBER_UUID: 16 byte UUID, |
* | SWIM_MEMBER_GENERATION: uint, |
* | SWIM_MEMBER_VERSION: uint, |
* | SWIM_MEMBER_PAYLOAD: bin |
* | }, |
Expand All @@ -106,6 +109,7 @@ enum {
* | OR/AND |
* | |
* | SWIM_QUIT: { |
* | SWIM_QUIT_GENERATION: uint, |
* | SWIM_QUIT_VERSION: uint |
* | } |
* | } |
Expand All @@ -118,14 +122,20 @@ enum {
* Structures storing an incarnation should use this size
* so as to correctly encode MessagePack map header.
*/
SWIM_INCARNATION_BIN_SIZE = 1,
SWIM_INCARNATION_BIN_SIZE = 2,
};

/**
* Prepared binary MessagePack representation of an incarnation
* value. It expects its owner is a map.
*/
struct PACKED swim_incarnation_bin {
/** mp_encode_uint(generation key) */
uint8_t k_generation;
/** mp_encode_uint(64bit generation) */
uint8_t m_generation;
uint64_t v_generation;

/** mp_encode_uint(version key) */
uint8_t k_version;
/** mp_encode_uint(64bit version) */
Expand Down Expand Up @@ -209,6 +219,7 @@ enum swim_fd_key {
* it was considered dead, but ping/ack with greater
* incarnation was received from it.
*/
SWIM_FD_GENERATION,
SWIM_FD_VERSION,
};

Expand All @@ -225,15 +236,15 @@ extern const char *swim_fd_msg_type_strs[];
struct PACKED swim_fd_header_bin {
/** mp_encode_uint(SWIM_FAILURE_DETECTION) */
uint8_t k_header;
/** mp_encode_map(2) */
/** mp_encode_map(3) */
uint8_t m_header;

/** mp_encode_uint(SWIM_FD_MSG_TYPE) */
uint8_t k_type;
/** mp_encode_uint(enum swim_fd_msg_type) */
uint8_t v_type;

/** SWIM_FD_VERSION */
/** SWIM_FD_GENERATION, SWIM_FD_VERSION */
struct swim_incarnation_bin incarnation;
};

Expand Down Expand Up @@ -309,6 +320,7 @@ enum swim_member_key {
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
SWIM_MEMBER_GENERATION,
SWIM_MEMBER_VERSION,
SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
Expand Down Expand Up @@ -340,7 +352,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
* date and TTD is > 0.
*/
struct PACKED swim_passport_bin {
/** mp_encode_map(5 or 6) */
/** mp_encode_map(6 or 7) */
uint8_t m_header;

/** mp_encode_uint(SWIM_MEMBER_STATUS) */
Expand All @@ -358,7 +370,7 @@ struct PACKED swim_passport_bin {
uint8_t m_uuid_len;
uint8_t v_uuid[UUID_LEN];

/** SWIM_MEMBER_VERSION */
/** SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION */
struct swim_incarnation_bin incarnation;
};

Expand Down Expand Up @@ -579,17 +591,18 @@ swim_route_bin_create(struct swim_route_bin *route,

enum swim_quit_key {
/** Incarnation to ignore old quit messages. */
SWIM_QUIT_VERSION = 0,
SWIM_QUIT_GENERATION = 0,
SWIM_QUIT_VERSION,
};

/** Quit section. Describes voluntary quit from the cluster. */
struct PACKED swim_quit_bin {
/** mp_encode_uint(SWIM_QUIT) */
uint8_t k_quit;
/** mp_encode_map(1) */
/** mp_encode_map(2) */
uint8_t m_quit;

/** SWIM_QUIT_VERSION */
/** SWIM_QUIT_GENERATION, SWIM_QUIT_VERSION */
struct swim_incarnation_bin incarnation;
};

Expand Down
3 changes: 2 additions & 1 deletion src/lua/swim.c
Expand Up @@ -67,7 +67,8 @@ lua_swim_on_member_event(struct lua_State *L)
static int
lua_swim_new(struct lua_State *L)
{
struct swim *s = swim_new();
uint64_t generation = luaL_checkuint64(L, 1);
struct swim *s = swim_new(generation);
*(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s;
if (s != NULL)
return 1;
Expand Down

0 comments on commit 44b9a60

Please sign in to comment.