Skip to content

Commit

Permalink
vinyl: force read view in iterator in autocommit mode
Browse files Browse the repository at this point in the history
Every iteration over a secondary index tracks a point in the transaction
manager (due to lookup in the primary index). As a result, if the user
calls 'select' or 'pairs' over a huge data set, it will consume a lot of
memory due to this tracked points, even if the user doesn't uses
transactions.

To mitigate this, let's send all read only transactions to read view
immediately so that tracking is disabled completely during iteration.
Note, with this patch select() called outside a transaction doesn't
populate the cache any more, but it seems to be OK as caching large
select() requests results in cache thrashing.

Closes #2534
  • Loading branch information
locker committed Dec 19, 2017
1 parent 3c2a6d3 commit 24ff747
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 151 deletions.
123 changes: 74 additions & 49 deletions src/box/vinyl.c
Expand Up @@ -232,21 +232,25 @@ struct vinyl_iterator {
struct vy_env *env;
/** Vinyl index this iterator is for. */
struct vy_index *index;
/** Active transaction or NULL if autocommit. */
struct vy_tx *tx;
/**
* Points either to tx_autocommit for autocommit mode
* or to a multi-statement transaction active when the
* iterator was created.
* Read view to use for the iteration. Points either to
* tx->read_view or to rv_autocommit if autocommit.
*/
struct vy_tx *tx;
const struct vy_read_view **rv;
/** Search key. */
struct tuple *key;
/** Vinyl read iterator. */
struct vy_read_iterator iterator;
/**
* Built-in transaction created when iterator is opened
* in autocommit mode.
* If an iterator is open in autocommit mode, we create
* a read view for it immediately so as not to waste
* memory on tracking reads for conflict resolution.
* The following member points to the read view used by
* the iterator.
*/
struct vy_tx tx_autocommit;
struct vy_read_view *rv_autocommit;
/** Trigger invoked when tx ends to close the iterator. */
struct trigger on_tx_destroy;
};
Expand Down Expand Up @@ -2811,6 +2815,16 @@ vinyl_engine_end_recovery(struct engine *engine)
vy_gc(e, e->recovery, VY_GC_INCOMPLETE, INT64_MAX);
vy_recovery_delete(e->recovery);
e->recovery = NULL;
/*
* During WAL recovery we skip WAL rows that have
* already been dumped to disk so the LSN last seen
* by the transaction manager after WAL recovery is
* complete may be less than the newest LSN actually
* stored on disk. Update the LSN to make sure that
* the iterator sees all data when open in a read
* view.
*/
e->xm->lsn = vclock_sum(e->recovery_vclock);
e->recovery_vclock = NULL;
e->status = VINYL_ONLINE;
vy_quota_set_limit(&e->quota, e->memory);
Expand Down Expand Up @@ -3716,15 +3730,6 @@ vy_squash_schedule(struct vy_index *index, struct tuple *stmt, void *arg)

/* {{{ Cursor */

static void
vinyl_iterator_on_tx_destroy(struct trigger *trigger, void *event)
{
(void)event;
struct vinyl_iterator *it = container_of(trigger,
struct vinyl_iterator, on_tx_destroy);
it->tx = NULL;
}

static int
vinyl_iterator_last(struct iterator *base, struct tuple **ret)
{
Expand All @@ -3741,18 +3746,11 @@ vinyl_iterator_close(struct vinyl_iterator *it)
it->index = NULL;
tuple_unref(it->key);
it->key = NULL;
if (it->tx == &it->tx_autocommit) {
/*
* Rollback the automatic transaction.
* Use vy_tx_destroy() so as not to spoil
* the statistics of rollbacks issued by
* user transactions.
*/
vy_tx_destroy(it->tx);
} else {
trigger_clear(&it->on_tx_destroy);
}
it->tx = NULL;
it->rv = NULL;
trigger_clear(&it->on_tx_destroy);
if (it->rv_autocommit != NULL)
tx_manager_destroy_read_view(it->env->xm, it->rv_autocommit);
it->base.next = vinyl_iterator_last;
}

Expand All @@ -3763,12 +3761,8 @@ vinyl_iterator_next(struct iterator *base, struct tuple **ret)
struct vinyl_iterator *it = (struct vinyl_iterator *)base;
struct tuple *tuple;

if (it->tx == NULL) {
diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
goto fail;
}
if (it->tx->state == VINYL_TX_ABORT || it->tx->read_view->is_aborted) {
diag_set(ClientError, ER_READ_VIEW_ABORTED);
if ((*it->rv)->is_aborted ||
(it->tx != NULL && it->tx->state == VINYL_TX_ABORT)) {
goto fail;
}

Expand All @@ -3784,17 +3778,17 @@ vinyl_iterator_next(struct iterator *base, struct tuple **ret)

if (it->index->id > 0) {
/* Get the full tuple from the primary index. */
if (vy_index_get(it->index->pk, it->tx,
vy_tx_read_view(it->tx),
if (vy_index_get(it->index->pk, it->tx, it->rv,
tuple, &tuple) != 0)
goto fail;
} else {
tuple_ref(tuple);
}
*ret = tuple_bless(tuple);
tuple_unref(*ret);
tuple_unref(tuple);
if (*ret == NULL)
goto fail;

return 0;
fail:
vinyl_iterator_close(it);
Expand All @@ -3806,11 +3800,30 @@ vinyl_iterator_free(struct iterator *base)
{
assert(base->free == vinyl_iterator_free);
struct vinyl_iterator *it = (struct vinyl_iterator *)base;
if (base->next != vinyl_iterator_last)
if (it->index != NULL)
vinyl_iterator_close(it);
mempool_free(&it->env->iterator_pool, it);
}

static int
vinyl_iterator_no_tx(struct iterator *base, struct tuple **ret)
{
(void)base;
(void)ret;
diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
return -1;
}

static void
vinyl_iterator_on_tx_destroy(struct trigger *trigger, void *event)
{
(void)event;
struct vinyl_iterator *it = container_of(trigger,
struct vinyl_iterator, on_tx_destroy);
vinyl_iterator_close(it);
it->base.next = vinyl_iterator_no_tx;
}

static struct iterator *
vinyl_index_create_iterator(struct index *base, enum iterator_type type,
const char *key, uint32_t part_count)
Expand All @@ -3821,20 +3834,18 @@ vinyl_index_create_iterator(struct index *base, enum iterator_type type,
if (type > ITER_GT) {
diag_set(UnsupportedIndexFeature, base->def,
"requested iterator type");
return NULL;
goto err;
}

struct vinyl_iterator *it = mempool_alloc(&env->iterator_pool);
if (it == NULL) {
diag_set(OutOfMemory, sizeof(struct vinyl_iterator),
"mempool", "struct vinyl_iterator");
return NULL;
goto err;
}
it->key = vy_stmt_new_select(index->env->key_format, key, part_count);
if (it->key == NULL) {
mempool_free(&env->iterator_pool, it);
return NULL;
}
if (it->key == NULL)
goto err_key;

iterator_create(&it->base, base);
it->base.next = vinyl_iterator_next;
Expand All @@ -3844,25 +3855,39 @@ vinyl_index_create_iterator(struct index *base, enum iterator_type type,
it->index = index;
vy_index_ref(index);

it->rv_autocommit = NULL;
trigger_create(&it->on_tx_destroy,
vinyl_iterator_on_tx_destroy, NULL, NULL);

const struct vy_read_view **rv;
struct vy_tx *tx = in_txn() ? in_txn()->engine_tx : NULL;
assert(tx == NULL || tx->state == VINYL_TX_READY);
if (tx != NULL) {
/*
* Register a trigger that will abort this iterator
* when the transaction ends.
*/
trigger_create(&it->on_tx_destroy,
vinyl_iterator_on_tx_destroy, NULL, NULL);
trigger_add(&tx->on_destroy, &it->on_tx_destroy);
rv = (const struct vy_read_view **)&tx->read_view;
} else {
tx = &it->tx_autocommit;
vy_tx_create(env->xm, tx);
it->rv_autocommit = tx_manager_read_view(env->xm);
if (it->rv_autocommit == NULL)
goto err_rv;
rv = (const struct vy_read_view **)&it->rv_autocommit;
}
it->tx = tx;
it->rv = rv;

vy_read_iterator_open(&it->iterator, index, tx, type, it->key,
(const struct vy_read_view **)&tx->read_view);
vy_read_iterator_open(&it->iterator, index, tx, type, it->key, rv);
return (struct iterator *)it;

err_rv:
vy_index_unref(index);
tuple_unref(it->key);
err_key:
mempool_free(&env->iterator_pool, it);
err:
return NULL;
}

static int
Expand Down
12 changes: 6 additions & 6 deletions src/box/vy_tx.c
Expand Up @@ -133,8 +133,7 @@ tx_manager_delete(struct tx_manager *xm)
free(xm);
}

/** Create or reuse an instance of a read view. */
static struct vy_read_view *
struct vy_read_view *
tx_manager_read_view(struct tx_manager *xm)
{
struct vy_read_view *rv;
Expand Down Expand Up @@ -177,8 +176,7 @@ tx_manager_read_view(struct tx_manager *xm)
return rv;
}

/** Dereference and possibly destroy a read view. */
static void
void
tx_manager_destroy_read_view(struct tx_manager *xm,
const struct vy_read_view *read_view)
{
Expand Down Expand Up @@ -277,7 +275,8 @@ vy_tx_read_set_free_cb(vy_tx_read_set_t *read_set,
return NULL;
}

void
/** Initialize a tx object. */
static void
vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
{
stailq_create(&tx->log);
Expand All @@ -292,7 +291,8 @@ vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
rlist_create(&tx->on_destroy);
}

void
/** Destroy a tx object. */
static void
vy_tx_destroy(struct vy_tx *tx)
{
trigger_run(&tx->on_destroy, NULL);
Expand Down
17 changes: 9 additions & 8 deletions src/box/vy_tx.h
Expand Up @@ -257,6 +257,15 @@ tx_manager_new(void);
void
tx_manager_delete(struct tx_manager *xm);

/** Create or reuse an instance of a read view. */
struct vy_read_view *
tx_manager_read_view(struct tx_manager *xm);

/** Dereference and possibly destroy a read view. */
void
tx_manager_destroy_read_view(struct tx_manager *xm,
const struct vy_read_view *read_view);

/*
* Determine the lowest possible vlsn, i.e. the level below
* which the history could be compacted.
Expand All @@ -269,14 +278,6 @@ tx_manager_delete(struct tx_manager *xm);
int64_t
tx_manager_vlsn(struct tx_manager *xm);

/** Initialize a tx object. */
void
vy_tx_create(struct tx_manager *xm, struct vy_tx *tx);

/** Destroy a tx object. */
void
vy_tx_destroy(struct vy_tx *tx);

/** Begin a new transaction. */
struct vy_tx *
vy_tx_begin(struct tx_manager *xm);
Expand Down
16 changes: 10 additions & 6 deletions test/engine/iterator.result
Expand Up @@ -4016,17 +4016,21 @@ r
- [3, 2]
- [3, 3]
...
itr1,itr2,itr3 = s:pairs({2}, {iterator = 'REQ'})
-- In memtx an iterator opened in autocommit mode works in
-- the read-committed isolation level while in vinyl it
-- creates a read view. To make sure the result is the same
-- for both engines, start a transaction.
inspector:cmd("setopt delimiter ';'")
---
- true
...
box.begin()
itr1,itr2,itr3 = s:pairs({2}, {iterator = 'REQ'})
s:replace{2, 4}
---
- [2, 4]
...
r = {}
---
...
for k,v in itr1,itr2,itr3 do table.insert(r, v) end
box.commit()
inspector:cmd("setopt delimiter ''");
---
...
r
Expand Down
8 changes: 8 additions & 0 deletions test/engine/iterator.test.lua
Expand Up @@ -337,10 +337,18 @@ r = {}
for k,v in itr1,itr2,itr3 do table.insert(r, v) end
r

-- In memtx an iterator opened in autocommit mode works in
-- the read-committed isolation level while in vinyl it
-- creates a read view. To make sure the result is the same
-- for both engines, start a transaction.
inspector:cmd("setopt delimiter ';'")
box.begin()
itr1,itr2,itr3 = s:pairs({2}, {iterator = 'REQ'})
s:replace{2, 4}
r = {}
for k,v in itr1,itr2,itr3 do table.insert(r, v) end
box.commit()
inspector:cmd("setopt delimiter ''");
r

r = nil
Expand Down

0 comments on commit 24ff747

Please sign in to comment.