Skip to content

Commit

Permalink
txm: postpone read trackers creation to the on_yield
Browse files Browse the repository at this point in the history
Read trackers must be stored in the story of a corresponding tuple
but are used only if the reader is interrupted by another txn.
So, if the reading txn doesn't yield, there is no need in read tracker.
This patch postpones tracker creations which may possibly need extra
bookkeping to the on_yield trigger.

Needed for #6209
  • Loading branch information
Egor Elchinov committed Sep 17, 2021
1 parent 1619d80 commit 14f6e1f
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 28 deletions.
117 changes: 107 additions & 10 deletions src/box/memtx_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ struct full_scan_item {
struct txn *txn;
};

/**
* A structure storing arguments for creation of the `tx_read_tracker`.
* Fields are the arguments for delayed @a memtx_tx_track_read_slow call.
* @sa tx_read_tracker
* @sa memtx_tx_track_read_slow
*/
struct read_tracker_proxy {
struct rlist in_read_tracker_proxies;
struct space *space;
struct tuple *tuple;
};

/**
* Helper structure for searching for point_hole_item in the hash table,
* @sa point_hole_item_pool.
Expand Down Expand Up @@ -213,6 +225,8 @@ struct tx_manager
struct rlist all_txs;
/** Accumulated number of GC steps that should be done. */
size_t must_do_gc_steps;
/** List of read_tracker_proxy items used to delay tracker creation. */
struct rlist read_tracker_proxy_list;
};

enum {
Expand Down Expand Up @@ -257,6 +271,7 @@ memtx_tx_manager_init()
rlist_create(&txm.all_txs);
txm.traverse_all_stories = &txm.all_stories;
txm.must_do_gc_steps = 0;
rlist_create(&txm.read_tracker_proxy_list);
}

void
Expand Down Expand Up @@ -923,6 +938,43 @@ memtx_tx_story_gc()
txm.must_do_gc_steps = 0;
}

static int
memtx_tx_track_read_slow(struct txn *txn, struct space *space,
struct tuple *tuple);

/**
* Create trackers from their proxies for @a txn
*/
static int
memtx_tx_flush_proxies(struct txn *txn)
{
struct read_tracker_proxy *read_proxy, *tmp_read_proxy;
rlist_foreach_entry_safe(read_proxy, &txm.read_tracker_proxy_list,
in_read_tracker_proxies, tmp_read_proxy) {
if (memtx_tx_track_read_slow(txn, read_proxy->space,
read_proxy->tuple) != 0)
return -1;
rlist_del(&read_proxy->in_read_tracker_proxies);
}

return 0;
}

/**
* Procedure to be called on txn yield to fulfill all the delayed operations.
*/
int
memtx_tx_on_yield(struct txn *txn)
{
if (memtx_tx_manager_use_mvcc_engine) {
assert(txn != NULL);
if (memtx_tx_flush_proxies(txn) != 0)
return -1;
}

return 0;
}

/**
* Check if a @a story is visible for transaction @a txn. Return visible tuple
* to @a visible_tuple (can be set to NULL).
Expand Down Expand Up @@ -2015,6 +2067,12 @@ memtx_tx_on_index_delete(struct index *index)
void
memtx_tx_on_space_delete(struct space *space)
{
struct read_tracker_proxy *read_proxy, *tmp_read_proxy;
rlist_foreach_entry_safe(read_proxy, &txm.read_tracker_proxy_list,
in_read_tracker_proxies, tmp_read_proxy) {
if (read_proxy->space == space)
rlist_del(&read_proxy->in_read_tracker_proxies);
}
while (!rlist_empty(&space->memtx_stories)) {
struct memtx_story *story
= rlist_first_entry(&space->memtx_stories,
Expand Down Expand Up @@ -2113,6 +2171,46 @@ memtx_tx_track_read_story(struct txn *txn, struct space *space,
return memtx_tx_track_read_story_slow(txn, story, index_mask);
}

static int
memtx_tx_track_read_slow(struct txn *txn, struct space *space,
struct tuple *tuple)
{
struct memtx_story *story = NULL;
if (tuple->is_dirty) {
story = memtx_tx_story_get(tuple);
} else {
story = memtx_tx_story_new(space, tuple);
if (story == NULL)
return -1;
}

// TODO: to add flag for the case where simple tracker allcation is necessary
return memtx_tx_track_read_story(txn, space, story, UINT64_MAX);
}

/**
* Allocate new read_tracker_proxy item for @a txn on its region.
*/
static int
read_tracker_proxy_new(struct txn *txn, struct space *space,
struct tuple *tuple)
{
struct read_tracker_proxy *proxy = NULL;
proxy = region_alloc(&txn->region, sizeof(*proxy));
if (proxy == NULL) {
diag_set(OutOfMemory, sizeof(*proxy), "tx region",
"read_tracker_proxy");
return -1;
}

proxy->space = space;
proxy->tuple = tuple;
rlist_add(&txm.read_tracker_proxy_list,
&proxy->in_read_tracker_proxies);

return 0;
}

int
memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
{
Expand All @@ -2125,17 +2223,14 @@ memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
if (space->def->opts.is_ephemeral)
return 0;

struct memtx_story *story = NULL;
if (tuple->is_dirty) {
story = memtx_tx_story_get(tuple);
} else {
story = memtx_tx_story_new(space, tuple);
if (story == NULL)
return -1;
}
/*
* Workaround for the case of detached txn
* where the on_yield trigger won't be useful.
*/
if (in_txn() != txn)
return memtx_tx_track_read(txn, space, tuple);

// TODO: to add flag for the case where simple tracker allcation is necessary
return memtx_tx_track_read_story(txn, space, story, UINT64_MAX);
return read_tracker_proxy_new(txn, space, tuple);
}

/**
Expand Down Expand Up @@ -2417,6 +2512,8 @@ memtx_tx_clean_txn(struct txn *txn)
in_full_scan_list);
memtx_tx_full_scan_item_delete(item);
}
/* Refresh tracker list for the next txn. */
rlist_del(&txm.read_tracker_proxy_list);
}

static uint32_t
Expand Down
7 changes: 7 additions & 0 deletions src/box/memtx_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ memtx_tx_manager_init();
void
memtx_tx_manager_free();

/**
* Fulfill delayed operations on yield moment (used by the on_yield trigger).
*/
int
memtx_tx_on_yield(struct txn *txn);

/**
* Transaction providing DDL changes is disallowed to yield after
* modifications of internal caches (i.e. after ALTER operation finishes).
Expand Down Expand Up @@ -290,6 +296,7 @@ memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,

/**
* Record in TX manager that a transaction @txn have read a @tuple in @space.
* If @a txn yields than @a tuple must be referenced until this moment.
* @return 0 on success, -1 on memory error.
*/
int
Expand Down
36 changes: 18 additions & 18 deletions src/box/txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ txn_begin(void)
memtx_tx_register_tx(txn);
txn->fiber = NULL;
fiber_set_txn(fiber(), txn);
/* fiber_on_yield is initialized by engine on demand */
trigger_create(&txn->fiber_on_yield, txn_on_yield, NULL, NULL);
trigger_add(&fiber()->on_yield, &txn->fiber_on_yield);
trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
/*
Expand Down Expand Up @@ -757,8 +758,7 @@ txn_prepare(struct txn *txn)
assert(rlist_empty(&txn->conflicted_by_list));

trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
trigger_clear(&txn->fiber_on_yield);

txn->start_tm = ev_monotonic_now(loop());
txn->status = TXN_PREPARED;
Expand Down Expand Up @@ -986,8 +986,7 @@ txn_rollback(struct txn *txn)
assert(txn->signature != TXN_SIGNATURE_UNKNOWN);
txn->status = TXN_ABORTED;
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
trigger_clear(&txn->fiber_on_yield);
txn_complete_fail(txn);
fiber_set_txn(fiber(), NULL);
}
Expand Down Expand Up @@ -1016,14 +1015,11 @@ txn_can_yield(struct txn *txn, bool set)
{
assert(txn == in_txn());
bool could = txn_has_flag(txn, TXN_CAN_YIELD);
if (set && !could) {
if (set)
txn_set_flags(txn, TXN_CAN_YIELD);
trigger_clear(&txn->fiber_on_yield);
} else if (!set && could) {
else
txn_clear_flags(txn, TXN_CAN_YIELD);
trigger_create(&txn->fiber_on_yield, txn_on_yield, NULL, NULL);
trigger_add(&fiber()->on_yield, &txn->fiber_on_yield);
}

return could;
}

Expand Down Expand Up @@ -1255,9 +1251,13 @@ txn_on_yield(struct trigger *trigger, void *event)
(void) event;
struct txn *txn = in_txn();
assert(txn != NULL);
assert(!txn_has_flag(txn, TXN_CAN_YIELD));
txn_rollback_to_svp(txn, NULL);
txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
txn_rollback_to_svp(txn, NULL);
txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
}
if (memtx_tx_on_yield(txn) != 0)
return -1;

return 0;
}

Expand All @@ -1267,10 +1267,8 @@ txn_detach(void)
struct txn *txn = in_txn();
if (txn == NULL)
return NULL;
if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
txn_on_yield(NULL, NULL);
trigger_clear(&txn->fiber_on_yield);
}
txn_on_yield(NULL, NULL);
trigger_clear(&txn->fiber_on_yield);
trigger_clear(&txn->fiber_on_stop);
fiber_set_txn(fiber(), NULL);
return txn;
Expand All @@ -1282,4 +1280,6 @@ txn_attach(struct txn *txn)
assert(txn != NULL);
assert(!in_txn());
fiber_set_txn(fiber(), txn);
trigger_add(&fiber()->on_yield, &txn->fiber_on_yield);
trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
}

0 comments on commit 14f6e1f

Please sign in to comment.