Skip to content

Commit 2993a75

Browse files
lockerrtsisyk
authored andcommitted
vinyl: rework replication
Currently, on initial join we send the current vinyl state. To do that, we open a read iterator over a space's primary index and send statements returned by it. Such an approach has a number of inherent problems: - An open read iterator blocks compaction, which is unacceptable for such a long operation as join. To avoid blocking compaction, we open the iterator in the dirty mode, i.e. it skims over the tops. This, however, introduces a different kind of problem: this makes the threshold between initial and final join phases hazy - statements sent on final join may or may not be among those sent during the initial join, and there's no efficient way to differentiate between them w/o sending extra information. - The replica expects LSNs to be growing monotonically. This constraint is imposed by the lsregion allocator used for storing statements in memory, but read iterator returns statements ordered by key, not by LSN. Currently, replica simply crashes if statements happen to be sent in an order different from chronological, which renders vinyl replication unusable. In the scope of the current model, we can't fix this by assigning fake LSNs to statements received on initial join, because there's no strict LSN threshold between initial and final join phases (see the previous paragraph). - In the initial join phase, replica is only aware of spaces that were created before the last snapshot, while vinyl sends statements from spaces that exist now. As a result, if a space was created after the most recent snapshot, the replica won't be able to receive its tuples and fail. To address the above-mentioned problems, we make vinyl initial join send the latest snapshot, just like in case of memtx. We implement this by loading the vinyl state from the last snapshot of the metadata log and sending statements of all runs from the snapshot as is (including deletes and updates), to be applied by the replica. To make lsregion at the receiving end happy, we assign fake monotonically growing LSNs to statements received on initial join. This is OK, because any LSN from final join > max real LSN from initial join max real LSN from initial join >= max fake LSN hence any LSN from final join > any fake LSN from initial join Besides fixing vinyl replication, this patch also enables the replication test suite for the vinyl engine (except for hot_standby) and makes engine/replica_join cover the following test cases: - secondary indexes - delete and update statements - keys added in an order different from LSN - recreate space after checkpoint Closes #1911 Closes #2001
1 parent ac87c66 commit 2993a75

File tree

11 files changed

+416
-235
lines changed

11 files changed

+416
-235
lines changed

src/box/box.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
284284
static void
285285
apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
286286
{
287-
if (row->type != IPROTO_INSERT) {
288-
tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
289-
(uint32_t) row->type);
290-
}
291-
292287
(void) stream;
293288
struct request *request;
294289
request = region_alloc_object_xc(&fiber()->gc, struct request);

src/box/memtx_space.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,10 @@ dup_replace_mode(uint32_t op)
249249
void
250250
MemtxSpace::applyInitialJoinRow(struct space *space, struct request *request)
251251
{
252-
assert(request->type == IPROTO_INSERT);
252+
if (request->type != IPROTO_INSERT) {
253+
tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
254+
(uint32_t) request->type);
255+
}
253256
request->header->replica_id = 0;
254257
struct txn *txn = txn_begin_stmt(space);
255258
try {

src/box/vinyl.c

Lines changed: 121 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7493,6 +7493,18 @@ vy_page_delete(struct vy_page *page)
74937493
free(page);
74947494
}
74957495

7496+
static int
7497+
vy_page_xrow(struct vy_page *page, uint32_t stmt_no,
7498+
struct xrow_header *xrow)
7499+
{
7500+
assert(stmt_no < page->count);
7501+
const char *data = page->data + page->row_index[stmt_no];
7502+
const char *data_end = stmt_no + 1 < page->count ?
7503+
page->data + page->row_index[stmt_no + 1] :
7504+
page->data + page->unpacked_size;
7505+
return xrow_header_decode(xrow, &data, data_end);
7506+
}
7507+
74967508
/**
74977509
* Read raw stmt data from the page
74987510
* @param page Page.
@@ -7509,13 +7521,8 @@ vy_page_stmt(struct vy_page *page, uint32_t stmt_no,
75097521
struct tuple_format *format, struct tuple_format *upsert_format,
75107522
struct key_def *key_def)
75117523
{
7512-
assert(stmt_no < page->count);
7513-
const char *data = page->data + page->row_index[stmt_no];
7514-
const char *data_end = stmt_no + 1 < page->count ?
7515-
page->data + page->row_index[stmt_no + 1] :
7516-
page->data + page->unpacked_size;
75177524
struct xrow_header xrow;
7518-
if (xrow_header_decode(&xrow, &data, data_end) != 0)
7525+
if (vy_page_xrow(page, stmt_no, &xrow) != 0)
75197526
return NULL;
75207527
struct tuple_format *format_to_use = (xrow.type == IPROTO_UPSERT)
75217528
? upsert_format : format;
@@ -10019,31 +10026,119 @@ vy_read_iterator_close(struct vy_read_iterator *itr)
1001910026

1002010027
/** {{{ Replication */
1002110028

10022-
int
10023-
vy_index_send(struct vy_index *index, vy_send_row_f sendrow, void *ctx)
10029+
/** Argument passed to vy_join_cb(). */
10030+
struct vy_join_arg {
10031+
/** Vinyl environment. */
10032+
struct vy_env *env;
10033+
/** Function for sending a statement. */
10034+
vy_send_row_f send_row;
10035+
/** Argument passed to send_row(). */
10036+
void *send_row_arg;
10037+
/** ID of the space currently being relayed. */
10038+
uint32_t space_id;
10039+
/** Ordinal number of the index. */
10040+
uint32_t index_id;
10041+
/** Path to the index directory. */
10042+
char *index_path;
10043+
/**
10044+
* LSN to assign to the next statement.
10045+
*
10046+
* We can't use original statements' LSNs, because we
10047+
* send statements not in the chronological order while
10048+
* the receiving end expects LSNs to grow monotonically
10049+
* due to the design of the lsregion allocator, which is
10050+
* used for storing statements in memory.
10051+
*/
10052+
int64_t lsn;
10053+
};
10054+
10055+
/** Relay callback, passed to xctl_relay(). */
10056+
static int
10057+
vy_join_cb(const struct xctl_record *record, void *cb_arg)
1002410058
{
10025-
struct vy_env *env = index->env;
10026-
static const int64_t vlsn = INT64_MAX;
10059+
struct vy_join_arg *arg = cb_arg;
1002710060
int rc = 0;
1002810061

10029-
struct vy_read_iterator ri;
10030-
struct tuple *stmt;
10031-
struct tuple *key = vy_stmt_new_select(env->key_format, NULL, 0);
10032-
if (key == NULL)
10062+
if (record->type == XCTL_CREATE_VY_INDEX) {
10063+
arg->space_id = record->space_id;
10064+
arg->index_id = record->iid;
10065+
vy_index_snprint_path(arg->index_path, PATH_MAX,
10066+
arg->env->conf->path,
10067+
arg->space_id, arg->index_id);
10068+
}
10069+
10070+
/*
10071+
* We are only interested in the primary index.
10072+
* Secondary keys will be rebuilt on the destination.
10073+
*/
10074+
if (arg->index_id != 0)
10075+
goto out;
10076+
10077+
/*
10078+
* We only send statements, not metadata, because the
10079+
* latter is a replica's private business.
10080+
*/
10081+
if (record->type != XCTL_INSERT_VY_RUN)
10082+
goto out;
10083+
10084+
rc = -1;
10085+
10086+
/* Load the run. */
10087+
struct vy_run *run = vy_run_new(record->vy_run_id);
10088+
if (run == NULL)
10089+
goto out;
10090+
if (vy_run_recover(run, arg->index_path) != 0)
10091+
goto out_free_run;
10092+
10093+
ZSTD_DStream *zdctx = vy_env_get_zdctx(arg->env);
10094+
if (zdctx == NULL)
10095+
goto out_free_run;
10096+
10097+
/* Send the run's statements to the replica. */
10098+
for (uint32_t page_no = 0; page_no < run->info.count; page_no++) {
10099+
struct vy_page_info *pi = vy_run_page_info(run, page_no);
10100+
struct vy_page *page = vy_page_new(pi);
10101+
if (page == NULL)
10102+
goto out_free_run;
10103+
if (vy_page_read(page, pi, run->fd, zdctx) != 0)
10104+
goto out_free_page;
10105+
for (uint32_t stmt_no = 0; stmt_no < pi->count; stmt_no++) {
10106+
struct xrow_header xrow;
10107+
if (vy_page_xrow(page, stmt_no, &xrow) != 0)
10108+
goto out_free_page;
10109+
xrow.lsn = arg->lsn++;
10110+
if (arg->send_row(&xrow, arg->send_row_arg) != 0)
10111+
goto out_free_page;
10112+
}
10113+
vy_page_delete(page);
10114+
continue;
10115+
out_free_page:
10116+
vy_page_delete(page);
10117+
goto out_free_run;
10118+
}
10119+
rc = 0; /* success */
10120+
10121+
out_free_run:
10122+
vy_run_delete(run);
10123+
out:
10124+
return rc;
10125+
}
10126+
10127+
int
10128+
vy_join(struct vy_env *env, vy_send_row_f send_row, void *send_row_arg)
10129+
{
10130+
struct vy_join_arg arg = {
10131+
.env = env,
10132+
.send_row = send_row,
10133+
.send_row_arg = send_row_arg,
10134+
};
10135+
arg.index_path = malloc(PATH_MAX);
10136+
if (arg.index_path == NULL) {
10137+
diag_set(OutOfMemory, PATH_MAX, "malloc", "path");
1003310138
return -1;
10034-
vy_read_iterator_open(&ri, index, NULL, ITER_GT, key, &vlsn, true);
10035-
rc = vy_read_iterator_next(&ri, &stmt);
10036-
for (; rc == 0 && stmt; rc = vy_read_iterator_next(&ri, &stmt)) {
10037-
uint32_t mp_size;
10038-
const char *mp_data;
10039-
mp_data = tuple_data_range(stmt, &mp_size);
10040-
int64_t lsn = vy_stmt_lsn(stmt);
10041-
rc = sendrow(ctx, mp_data, mp_size, lsn);
10042-
if (rc != 0)
10043-
break;
1004410139
}
10045-
vy_read_iterator_close(&ri);
10046-
tuple_unref(key);
10140+
int rc = xctl_relay(vy_join_cb, &arg);
10141+
free(arg.index_path);
1004710142
return rc;
1004810143
}
1004910144

src/box/vinyl.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct vclock;
5050
struct request;
5151
struct space;
5252
struct txn_stmt;
53+
struct xrow_header;
5354
enum iterator_type;
5455

5556
/*
@@ -297,9 +298,10 @@ vy_cursor_next(struct vy_cursor *cursor, struct tuple **result);
297298
*/
298299

299300
typedef int
300-
(*vy_send_row_f)(void *, const char *tuple, uint32_t tuple_size, int64_t lsn);
301+
(*vy_send_row_f)(struct xrow_header *row, void *arg);
302+
301303
int
302-
vy_index_send(struct vy_index *index, vy_send_row_f sendrow, void *ctx);
304+
vy_join(struct vy_env *env, vy_send_row_f send_row, void *send_row_arg);
303305

304306
#ifdef __cplusplus
305307
}

src/box/vinyl_engine.cc

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -156,74 +156,23 @@ VinylEngine::buildSecondaryKey(struct space *old_space,
156156
*/
157157
}
158158

159-
struct vinyl_send_row_arg {
160-
struct xstream *stream;
161-
uint32_t space_id;
162-
};
163-
164159
static int
165-
vinyl_send_row(void *arg, const char *tuple, uint32_t tuple_size, int64_t lsn)
160+
vinyl_send_row(struct xrow_header *row, void *arg)
166161
{
167-
struct xstream *stream = ((struct vinyl_send_row_arg *) arg)->stream;
168-
uint32_t space_id = ((struct vinyl_send_row_arg *) arg)->space_id;
169-
170-
struct request_replace_body body;
171-
memset(&body, 0, sizeof(body));
172-
body.m_body = 0x82; /* map of two elements. */
173-
body.k_space_id = IPROTO_SPACE_ID;
174-
body.m_space_id = 0xce; /* uint32 */
175-
body.v_space_id = mp_bswap_u32(space_id);
176-
body.k_tuple = IPROTO_TUPLE;
177-
struct xrow_header row;
178-
memset(&row, 0, sizeof(row));
179-
row.type = IPROTO_INSERT;
180-
row.replica_id = 0;
181-
row.lsn = lsn;
182-
row.bodycnt = 2;
183-
row.body[0].iov_base = &body;
184-
row.body[0].iov_len = sizeof(body);
185-
row.body[1].iov_base = (char *) tuple;
186-
row.body[1].iov_len = tuple_size;
162+
struct xstream *stream = (struct xstream *) arg;
187163
try {
188-
xstream_write(stream, &row);
164+
xstream_write(stream, row);
189165
} catch (Exception *e) {
190166
return -1;
191167
}
192168
return 0;
193169
}
194170

195-
struct join_send_space_arg {
196-
struct vy_env *env;
197-
struct xstream *stream;
198-
};
199-
200-
static void
201-
join_send_space(struct space *sp, void *data)
202-
{
203-
struct xstream *stream = ((struct join_send_space_arg *) data)->stream;
204-
if (space_is_temporary(sp))
205-
return;
206-
if (!space_is_vinyl(sp))
207-
return;
208-
VinylIndex *pk = (VinylIndex *) space_index(sp, 0);
209-
if (!pk)
210-
return;
211-
212-
/* send database */
213-
struct vinyl_send_row_arg arg = { stream, sp->def.id };
214-
if (vy_index_send(pk->db, vinyl_send_row, &arg) != 0)
215-
diag_raise();
216-
}
217-
218-
/**
219-
* Relay all data currently stored in Vinyl engine
220-
* to the replica.
221-
*/
222171
void
223172
VinylEngine::join(struct xstream *stream)
224173
{
225-
struct join_send_space_arg arg = { env, stream };
226-
space_foreach(join_send_space, &arg);
174+
if (vy_join(env, vinyl_send_row, stream) != 0)
175+
diag_raise();
227176
}
228177

229178
void

src/box/vinyl_space.cc

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,41 @@ VinylSpace::VinylSpace(Engine *e)
4646
void
4747
VinylSpace::applyInitialJoinRow(struct space *space, struct request *request)
4848
{
49-
assert(request->type == IPROTO_INSERT);
5049
assert(request->header != NULL);
5150
struct vy_env *env = ((VinylEngine *)space->handler->engine)->env;
5251

53-
/* Check the tuple fields. */
54-
if (tuple_validate_raw(space->format, request->tuple))
55-
diag_raise();
56-
5752
struct vy_tx *tx = vy_begin(env);
5853
if (tx == NULL)
5954
diag_raise();
6055

6156
int64_t signature = request->header->lsn;
6257

63-
if (vy_replace(tx, NULL, space, request))
58+
struct txn_stmt stmt;
59+
memset(&stmt, 0, sizeof(stmt));
60+
61+
int rc;
62+
switch (request->type) {
63+
case IPROTO_REPLACE:
64+
rc = vy_replace(tx, &stmt, space, request);
65+
break;
66+
case IPROTO_UPSERT:
67+
rc = vy_upsert(tx, &stmt, space, request);
68+
break;
69+
case IPROTO_DELETE:
70+
rc = vy_delete(tx, &stmt, space, request);
71+
break;
72+
default:
73+
tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
74+
(uint32_t) request->type);
75+
}
76+
if (rc != 0)
6477
diag_raise();
6578

79+
if (stmt.old_tuple)
80+
tuple_unref(stmt.old_tuple);
81+
if (stmt.new_tuple)
82+
tuple_unref(stmt.new_tuple);
83+
6684
if (vy_prepare(env, tx)) {
6785
vy_rollback(env, tx);
6886
diag_raise();

0 commit comments

Comments
 (0)