Skip to content

Commit

Permalink
vinyl: pass read view to vy_index_get explicitly
Browse files Browse the repository at this point in the history
Currently, if tx is not NULL, tx->read_view is used, otherwise the
global read view is used. For the sake of #2534 (read view for read only
autocommit statements), we will need to pass an arbitrary read view to
this function. So let's add the corresponding argument. This also allows
us to drop env from the argument list.

While we are at it, let's also inline vy_index_full_by_stmt() as it is a
trivial wrapper around vy_index_get().

Needed for #2534
  • Loading branch information
locker committed Dec 14, 2017
1 parent ee24730 commit e0de0e6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 59 deletions.
90 changes: 31 additions & 59 deletions src/box/vinyl.c
Expand Up @@ -1227,9 +1227,9 @@ vy_is_committed(struct vy_env *env, struct space *space)

/**
* Get a vinyl tuple from the index by the key.
* @param env Vinyl environment.
* @param tx Current transaction.
* @param index Index in which search.
* @param tx Current transaction.
* @param rv Read view.
* @param key Key statement.
* @param[out] result The found tuple is stored here. Must be
* unreferenced after usage.
Expand All @@ -1238,7 +1238,8 @@ vy_is_committed(struct vy_env *env, struct space *space)
* @param -1 Memory error or read error.
*/
static inline int
vy_index_get(struct vy_env *env, struct vy_tx *tx, struct vy_index *index,
vy_index_get(struct vy_index *index, struct vy_tx *tx,
const struct vy_read_view **rv,
struct tuple *key, struct tuple **result)
{
/*
Expand All @@ -1247,15 +1248,8 @@ vy_index_get(struct vy_env *env, struct vy_tx *tx, struct vy_index *index,
*/
assert(tx == NULL || tx->state == VINYL_TX_READY);

const struct vy_read_view **p_read_view;
if (tx != NULL) {
p_read_view = (const struct vy_read_view **) &tx->read_view;
} else {
p_read_view = &env->xm->p_global_read_view;
}

struct vy_read_iterator itr;
vy_read_iterator_open(&itr, index, tx, ITER_EQ, key, p_read_view);
vy_read_iterator_open(&itr, index, tx, ITER_EQ, key, rv);
int rc = vy_read_iterator_next(&itr, result);
if (*result != NULL)
tuple_ref(*result);
Expand Down Expand Up @@ -1286,7 +1280,7 @@ vy_check_dup_key(struct vy_env *env, struct vy_tx *tx, struct space *space,
*/
if (env->status != VINYL_ONLINE)
return 0;
if (vy_index_get(env, tx, index, key, &found))
if (vy_index_get(index, tx, vy_tx_read_view(tx), key, &found))
return -1;

if (found) {
Expand Down Expand Up @@ -1384,6 +1378,7 @@ static inline int
vy_replace_one(struct vy_env *env, struct vy_tx *tx, struct space *space,
struct request *request, struct txn_stmt *stmt)
{
(void)env;
assert(tx != NULL && tx->state == VINYL_TX_READY);
struct vy_index *pk = vy_index(space->index[0]);
assert(pk->id == 0);
Expand All @@ -1399,7 +1394,8 @@ vy_replace_one(struct vy_env *env, struct vy_tx *tx, struct space *space,
* old tuple to pass it to the trigger.
*/
if (stmt != NULL && !rlist_empty(&space->on_replace)) {
if (vy_index_get(env, tx, pk, new_tuple, &stmt->old_tuple) != 0)
if (vy_index_get(pk, tx, vy_tx_read_view(tx),
new_tuple, &stmt->old_tuple) != 0)
goto error_unref;
}
if (vy_tx_set(tx, pk, new_tuple))
Expand Down Expand Up @@ -1454,7 +1450,8 @@ vy_replace_impl(struct vy_env *env, struct vy_tx *tx, struct space *space,
return -1;

/* Get full tuple from the primary index. */
if (vy_index_get(env, tx, pk, new_stmt, &old_stmt) != 0)
if (vy_index_get(pk, tx, vy_tx_read_view(tx),
new_stmt, &old_stmt) != 0)
goto error;

if (old_stmt == NULL) {
Expand Down Expand Up @@ -1552,41 +1549,13 @@ vy_unique_key_validate(struct vy_index *index, const char *key,
return key_validate_parts(index->cmp_def, key, part_count, false);
}

/**
* Get a tuple from the primary index by the partial tuple from
* the secondary index.
* @param env Vinyl environment.
* @param tx Current transaction.
* @param index Secondary index.
* @param partial Partial tuple from the secondary \p index.
* @param[out] full The full tuple is stored here. Must be
* unreferenced after usage.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
static inline int
vy_index_full_by_stmt(struct vy_env *env, struct vy_tx *tx,
struct vy_index *index,
struct tuple *partial, struct tuple **full)
{
assert(index->id > 0);
/*
* Fetch the primary key from the secondary index tuple.
*/
struct vy_index *pk = index->pk;
assert(pk != NULL);
/* Fetch the tuple from the primary index. */
return vy_index_get(env, tx, pk, partial, full);
}

/**
* Find a tuple in the primary index by the key of the specified
* index.
* @param env Vinyl environment.
* @param tx Current transaction.
* @param index Index for which the key is specified. Can be
* both primary and secondary.
* @param tx Current transaction.
* @param rv Read view.
* @param key_raw MessagePack'ed data, the array without a
* header.
* @param part_count Count of parts in the key.
Expand All @@ -1597,25 +1566,26 @@ vy_index_full_by_stmt(struct vy_env *env, struct vy_tx *tx,
* @retval -1 Memory error.
*/
static inline int
vy_index_full_by_key(struct vy_env *env, struct vy_tx *tx,
struct vy_index *index, const char *key_raw,
uint32_t part_count, struct tuple **result)
vy_index_full_by_key(struct vy_index *index, struct vy_tx *tx,
const struct vy_read_view **rv,
const char *key_raw, uint32_t part_count,
struct tuple **result)
{
int rc;
struct tuple *key = vy_stmt_new_select(index->env->key_format,
key_raw, part_count);
if (key == NULL)
return -1;
struct tuple *found;
rc = vy_index_get(env, tx, index, key, &found);
rc = vy_index_get(index, tx, rv, key, &found);
tuple_unref(key);
if (rc != 0)
return -1;
if (index->id == 0 || found == NULL) {
*result = found;
return 0;
}
rc = vy_index_full_by_stmt(env, tx, index, found, result);
rc = vy_index_get(index->pk, tx, rv, found, result);
tuple_unref(found);
return rc;
}
Expand Down Expand Up @@ -1703,8 +1673,8 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
* and pass them to indexes for deletion.
*/
if (has_secondary || !rlist_empty(&space->on_replace)) {
if (vy_index_full_by_key(env, tx, index, key, part_count,
&stmt->old_tuple))
if (vy_index_full_by_key(index, tx, vy_tx_read_view(tx),
key, part_count, &stmt->old_tuple) != 0)
return -1;
if (stmt->old_tuple == NULL)
return 0;
Expand Down Expand Up @@ -1786,8 +1756,8 @@ vy_update(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
if (vy_unique_key_validate(index, key, part_count))
return -1;

if (vy_index_full_by_key(env, tx, index, key, part_count,
&stmt->old_tuple))
if (vy_index_full_by_key(index, tx, vy_tx_read_view(tx),
key, part_count, &stmt->old_tuple) != 0)
return -1;
/* Nothing to update. */
if (stmt->old_tuple == NULL)
Expand Down Expand Up @@ -2061,7 +2031,8 @@ vy_upsert(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
pk->key_def, pk->env->key_format);
if (key == NULL)
return -1;
int rc = vy_index_get(env, tx, pk, key, &stmt->old_tuple);
int rc = vy_index_get(pk, tx, vy_tx_read_view(tx),
key, &stmt->old_tuple);
tuple_unref(key);
if (rc != 0)
return -1;
Expand Down Expand Up @@ -3813,8 +3784,9 @@ vinyl_iterator_next(struct iterator *base, struct tuple **ret)

if (it->index->id > 0) {
/* Get the full tuple from the primary index. */
if (vy_index_full_by_stmt(it->env, it->tx, it->index,
tuple, &tuple) != 0)
if (vy_index_get(it->index->pk, it->tx,
vy_tx_read_view(it->tx),
tuple, &tuple) != 0)
goto fail;
} else {
tuple_ref(tuple);
Expand Down Expand Up @@ -3903,11 +3875,11 @@ vinyl_index_get(struct index *base, const char *key,
struct vy_index *index = vy_index(base);
struct vy_env *env = vy_env(base->engine);
struct vy_tx *tx = in_txn() ? in_txn()->engine_tx : NULL;
assert(tx == NULL || tx->state == VINYL_TX_READY);
const struct vy_read_view **rv = (tx != NULL ? vy_tx_read_view(tx) :
&env->xm->p_global_read_view);

struct tuple *tuple;
if (vy_index_full_by_key(env, tx, index,
key, part_count, &tuple) != 0)
if (vy_index_full_by_key(index, tx, rv, key, part_count, &tuple) != 0)
return -1;

if (tuple != NULL) {
Expand Down
6 changes: 6 additions & 0 deletions src/box/vy_tx.h
Expand Up @@ -182,6 +182,12 @@ struct vy_tx {
struct rlist on_destroy;
};

static inline const struct vy_read_view **
vy_tx_read_view(struct vy_tx *tx)
{
return (const struct vy_read_view **)&tx->read_view;
}

/** Transaction manager object. */
struct tx_manager {
/**
Expand Down

0 comments on commit e0de0e6

Please sign in to comment.