Skip to content

Commit

Permalink
replication: implement an instance id filter for relay
Browse files Browse the repository at this point in the history
Add a filter for relay to skip rows coming from unwanted instances.
A list of instance ids whose rows replica doesn't want to fetch is encoded
together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.

Filtering rows is needed to prevent an instance from fetching its own
rows from a remote master, which is useful on initial configuration and
harmful on resubscribe.

Prerequisite #4739, #3294

@TarantoolBot document

Title: document new binary protocol key and subscribe request changes

Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
This is an optional key used in SUBSCRIBE request followed by an array
of ids of instances whose rows won't be relayed to the replica.

SUBSCRIBE request is supplemented with an optional field of the
following structure:
```
+====================+
|      ID_FILTER     |
|   0x51 : ID LIST   |
| MP_INT : MP_ARRRAY |
|                    |
+====================+
```
The field is encoded only when the id list is not empty.

(cherry picked from commit 45de990)
  • Loading branch information
sergepetrenko authored and kyukhin committed Mar 2, 2020
1 parent 0882f94 commit c8423f7
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/box/applier.cc
Expand Up @@ -783,7 +783,7 @@ applier_subscribe(struct applier *applier)
vclock_create(&vclock);
vclock_copy(&vclock, &replicaset.vclock);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&vclock);
&vclock, 0);
coio_write_xrow(coio, &row);

/* Read SUBSCRIBE response */
Expand Down
6 changes: 4 additions & 2 deletions src/box/box.cc
Expand Up @@ -1564,8 +1564,10 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
struct vclock replica_clock;
uint32_t replica_version_id;
vclock_create(&replica_clock);
uint32_t id_filter;
xrow_decode_subscribe_xc(header, &replicaset_uuid, &replica_uuid,
&replica_clock, &replica_version_id);
&replica_clock, &replica_version_id,
&id_filter);

/* Forbid connection to itself */
if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
Expand Down Expand Up @@ -1639,7 +1641,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
* indefinitely).
*/
relay_subscribe(replica, io->fd, header->sync, &replica_clock,
replica_version_id);
replica_version_id, id_filter);
}

void
Expand Down
2 changes: 2 additions & 0 deletions src/box/iproto_constants.h
Expand Up @@ -120,6 +120,8 @@ enum iproto_key {
* }
*/
IPROTO_SQL_INFO = 0x42,
/* Leave a gap between SQL keys and additional request keys */
IPROTO_ID_FILTER = 0x51,
IPROTO_KEY_MAX
};

Expand Down
15 changes: 14 additions & 1 deletion src/box/relay.cc
Expand Up @@ -109,6 +109,13 @@ struct relay {
struct vclock recv_vclock;
/** Replicatoin slave version. */
uint32_t version_id;
/**
* A filter of replica ids whose rows should be ignored.
* Each set filter bit corresponds to a replica id whose
* rows shouldn't be relayed. The list of ids to ignore
* is passed by the replica on subscribe.
*/
uint32_t id_filter;
/**
* Local vclock at the moment of subscribe, used to check
* dataset on the other side and send missing data rows if any.
Expand Down Expand Up @@ -647,7 +654,8 @@ relay_subscribe_f(va_list ap)
/** Replication acceptor fiber handler. */
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_clock, uint32_t replica_version_id)
struct vclock *replica_clock, uint32_t replica_version_id,
uint32_t replica_id_filter)
{
assert(replica->id != REPLICA_ID_NIL);
struct relay *relay = replica->relay;
Expand Down Expand Up @@ -676,6 +684,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;

relay->id_filter = replica_id_filter;

int rc = cord_costart(&relay->cord, "subscribe",
relay_subscribe_f, relay);
if (rc == 0)
Expand Down Expand Up @@ -727,6 +737,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
packet->group_id = GROUP_DEFAULT;
packet->bodycnt = 0;
}
/* Check if the rows from the instance are filtered. */
if ((1 << packet->replica_id & relay->id_filter) != 0)
return;
/*
* We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
* request. If this is FINAL JOIN (i.e. relay->replica is NULL),
Expand Down
3 changes: 2 additions & 1 deletion src/box/relay.h
Expand Up @@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
*/
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_vclock, uint32_t replica_version_id);
struct vclock *replica_vclock, uint32_t replica_version_id,
uint32_t replica_id_filter);

#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
38 changes: 35 additions & 3 deletions src/box/xrow.c
Expand Up @@ -1152,7 +1152,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock)
const struct vclock *vclock, uint32_t id_filter)
{
memset(row, 0, sizeof(*row));
size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
Expand All @@ -1162,7 +1162,8 @@ xrow_encode_subscribe(struct xrow_header *row,
return -1;
}
char *data = buf;
data = mp_encode_map(data, 4);
int filter_size = __builtin_popcount(id_filter);
data = mp_encode_map(data, filter_size != 0 ? 5 : 4);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
Expand All @@ -1171,6 +1172,17 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_vclock(data, vclock);
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
if (filter_size != 0) {
data = mp_encode_uint(data, IPROTO_ID_FILTER);
data = mp_encode_array(data, filter_size);
struct bit_iterator it;
bit_iterator_init(&it, &id_filter, sizeof(id_filter),
true);
for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
id = bit_iterator_next(&it)) {
data = mp_encode_uint(data, id);
}
}
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
Expand All @@ -1182,7 +1194,7 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id)
uint32_t *version_id, uint32_t *id_filter)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
Expand All @@ -1198,6 +1210,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
return -1;
}

if (id_filter)
*id_filter = 0;
d = data;
uint32_t map_size = mp_decode_map(&d);
for (uint32_t i = 0; i < map_size; i++) {
Expand Down Expand Up @@ -1245,6 +1259,24 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
*version_id = mp_decode_uint(&d);
break;
case IPROTO_ID_FILTER:
if (id_filter == NULL)
goto skip;
if (mp_typeof(*d) != MP_ARRAY) {
id_filter_decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"invalid ID_FILTER");
return -1;
}
uint32_t len = mp_decode_array(&d);
for (uint32_t i = 0; i < len; ++i) {
if (mp_typeof(*d) != MP_UINT)
goto id_filter_decode_err;
uint64_t val = mp_decode_uint(&d);
if (val >= VCLOCK_MAX)
goto id_filter_decode_err;
*id_filter |= 1 << val;
}
break;
default: skip:
mp_next(&d); /* value */
}
Expand Down
30 changes: 18 additions & 12 deletions src/box/xrow.h
Expand Up @@ -48,7 +48,7 @@ enum {
XROW_BODY_IOVMAX = 2,
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
XROW_HEADER_LEN_MAX = 52,
XROW_BODY_LEN_MAX = 128,
XROW_BODY_LEN_MAX = 256,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
Expand Down Expand Up @@ -307,6 +307,8 @@ xrow_encode_vote(struct xrow_header *row);
* @param replicaset_uuid Replica set uuid.
* @param instance_uuid Instance uuid.
* @param vclock Replication clock.
* @param id_filter A List of replica ids to skip rows from
* when feeding a replica.
*
* @retval 0 Success.
* @retval -1 Memory error.
Expand All @@ -315,7 +317,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock);
const struct vclock *vclock, uint32_t id_filter);

/**
* Decode SUBSCRIBE command.
Expand All @@ -324,14 +326,16 @@ xrow_encode_subscribe(struct xrow_header *row,
* @param[out] instance_uuid.
* @param[out] vclock.
* @param[out] version_id.
* @param[out] id_filter A list of ids to skip rows from when
* feeding a replica.
*
* @retval 0 Success.
* @retval -1 Memory or format error.
*/
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id);
uint32_t *version_id, uint32_t *id_filter);

/**
* Encode JOIN command.
Expand All @@ -355,7 +359,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
static inline int
xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
{
return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL);
return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL,
NULL);
}

/**
Expand All @@ -380,7 +385,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
static inline int
xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
{
return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL);
return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
}

/**
Expand Down Expand Up @@ -411,7 +416,8 @@ xrow_decode_subscribe_response(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct vclock *vclock)
{
return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL);
return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL,
NULL);
}

/**
Expand Down Expand Up @@ -774,22 +780,22 @@ static inline void
xrow_encode_subscribe_xc(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock)
const struct vclock *vclock, uint32_t id_filter)
{
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
vclock) != 0)
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, vclock,
id_filter) != 0)
diag_raise();
}

/** @copydoc xrow_decode_subscribe. */
static inline void
xrow_decode_subscribe_xc(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *replica_version_id)
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *replica_version_id, uint32_t *id_filter)
{
if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
vclock, replica_version_id) != 0)
vclock, replica_version_id, id_filter) != 0)
diag_raise();
}

Expand Down

0 comments on commit c8423f7

Please sign in to comment.