Skip to content

Commit

Permalink
txm: postpone trackers creation to the on_yield
Browse files Browse the repository at this point in the history
Read and gap trackers must be stored in the story of a corresponding
tuple but are used only if the reader is interrupted by another txn.
So, if the reading txn doesn't yield, there is no need in read tracker.
This patch postpones tracker creations which may possibly need extra
bookkeeping to the on_yield trigger.

Needed for #6209
  • Loading branch information
Egor Elchinov committed Sep 27, 2021
1 parent 390e016 commit b8bf592
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 41 deletions.
241 changes: 223 additions & 18 deletions src/box/memtx_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,36 @@ struct full_scan_item {
struct txn *txn;
};

/**
* A structure storing arguments for creation of the `tx_read_tracker`.
* Fields are the arguments for delayed @a memtx_tx_track_read_slow call.
* @sa tx_read_tracker
* @sa memtx_tx_track_read_slow
*/
struct read_tracker_proxy {
struct rlist in_read_tracker_proxies;
struct space *space;
struct tuple *tuple;
};

/**
* A structure storing arguments for creation of the `tx_read_tracker`.
* Fields are the arguments for delayed @a memtx_tx_track_gap_slow call.
* Only short (<= 16 bytes) keys are supported.
* @sa tx_read_tracker
* @sa memtx_tx_track_gap_slow
*/
struct gap_item_proxy {
struct rlist in_gap_item_proxies;
struct space *space;
struct index *index;
struct tuple *successor;
uint32_t part_count;
enum iterator_type type;
uint8_t key_len;
char key[16];
};

/**
* Helper structure for searching for point_hole_item in the hash table,
* @sa point_hole_item_pool.
Expand Down Expand Up @@ -213,6 +243,9 @@ struct tx_manager
struct rlist all_txs;
/** Accumulated number of GC steps that should be done. */
size_t must_do_gc_steps;
/** Lists of proxy items used to delay trackers creation. */
struct rlist read_tracker_proxy_list;
struct rlist gap_item_proxy_list;
};

enum {
Expand Down Expand Up @@ -257,6 +290,8 @@ memtx_tx_manager_init()
rlist_create(&txm.all_txs);
txm.traverse_all_stories = &txm.all_stories;
txm.must_do_gc_steps = 0;
rlist_create(&txm.read_tracker_proxy_list);
rlist_create(&txm.gap_item_proxy_list);
}

void
Expand Down Expand Up @@ -923,6 +958,60 @@ memtx_tx_story_gc()
txm.must_do_gc_steps = 0;
}

static int
memtx_tx_track_read_slow(struct txn *txn, struct space *space,
struct tuple *tuple);

static int
memtx_tx_track_gap_slow(struct txn *txn, struct space *space, struct index *index,
struct tuple *successor, enum iterator_type type,
uint32_t key_len, const char *key, uint32_t part_count);

/**
* Create trackers from their proxies for @a txn
*/
static int
memtx_tx_flush_proxies(struct txn *txn)
{
struct read_tracker_proxy *read_proxy, *tmp_read_proxy;
rlist_foreach_entry_safe(read_proxy, &txm.read_tracker_proxy_list,
in_read_tracker_proxies, tmp_read_proxy) {
if (memtx_tx_track_read_slow(txn, read_proxy->space,
read_proxy->tuple) != 0)
return -1;
rlist_del(&read_proxy->in_read_tracker_proxies);
}
struct gap_item_proxy *gap_proxy, *tmp_gap_proxy;
rlist_foreach_entry_safe(gap_proxy, &txm.gap_item_proxy_list,
in_gap_item_proxies, tmp_gap_proxy) {
if (memtx_tx_track_gap_slow(txn, gap_proxy->space,
gap_proxy->index,
gap_proxy->successor,
gap_proxy->type, gap_proxy->key_len,
gap_proxy->key,
gap_proxy->part_count) != 0)
return -1;
rlist_del(&gap_proxy->in_gap_item_proxies);
}

return 0;
}

/**
* Procedure to be called on txn yield to fulfill all the delayed operations.
*/
int
memtx_tx_on_yield(struct txn *txn)
{
if (memtx_tx_manager_use_mvcc_engine) {
assert(txn != NULL);
if (memtx_tx_flush_proxies(txn) != 0)
return -1;
}

return 0;
}

/**
* Check if a @a story is visible for transaction @a txn. Return visible tuple
* to @a visible_tuple (can be set to NULL).
Expand Down Expand Up @@ -1308,7 +1397,7 @@ check_dup_common(struct txn_stmt *stmt, struct tuple *new_tuple,

static struct gap_item *
memtx_tx_gap_item_new(struct txn *txn, enum iterator_type type,
const char *key, uint32_t part_count);
uint32_t key_len, const char *key, uint32_t part_count);

static int
memtx_tx_track_read_story(struct txn *txn, struct space *space,
Expand Down Expand Up @@ -2010,6 +2099,12 @@ memtx_tx_on_index_delete(struct index *index)
in_full_scans);
memtx_tx_full_scan_item_delete(item);
}
struct gap_item_proxy *gap_proxy, *tmp_gap_proxy;
rlist_foreach_entry_safe(gap_proxy, &txm.gap_item_proxy_list,
in_gap_item_proxies, tmp_gap_proxy) {
if (gap_proxy->index == index)
rlist_del(&gap_proxy->in_gap_item_proxies);
}
}

void
Expand All @@ -2036,6 +2131,18 @@ memtx_tx_on_space_delete(struct space *space)
}
memtx_tx_story_delete(story);
}
struct read_tracker_proxy *read_proxy, *tmp_read_proxy;
rlist_foreach_entry_safe(read_proxy, &txm.read_tracker_proxy_list,
in_read_tracker_proxies, tmp_read_proxy) {
if (read_proxy->space == space)
rlist_del(&read_proxy->in_read_tracker_proxies);
}
struct gap_item_proxy *gap_proxy, *tmp_gap_proxy;
rlist_foreach_entry_safe(gap_proxy, &txm.gap_item_proxy_list,
in_gap_item_proxies, tmp_gap_proxy) {
if (gap_proxy->space == space)
rlist_del(&gap_proxy->in_gap_item_proxies);
}
}

/**
Expand Down Expand Up @@ -2113,6 +2220,46 @@ memtx_tx_track_read_story(struct txn *txn, struct space *space,
return memtx_tx_track_read_story_slow(txn, story, index_mask);
}

static int
memtx_tx_track_read_slow(struct txn *txn, struct space *space,
struct tuple *tuple)
{
struct memtx_story *story = NULL;
if (tuple->is_dirty) {
story = memtx_tx_story_get(tuple);
} else {
story = memtx_tx_story_new(space, tuple);
if (story == NULL)
return -1;
}

// TODO: to add flag for the case where simple tracker allcation is necessary
return memtx_tx_track_read_story(txn, space, story, UINT64_MAX);
}

/**
* Allocate new read_tracker_proxy item for @a txn on its region.
*/
static int
read_tracker_proxy_new(struct txn *txn, struct space *space,
struct tuple *tuple)
{
struct read_tracker_proxy *proxy = NULL;
proxy = region_alloc(&txn->region, sizeof(*proxy));
if (proxy == NULL) {
diag_set(OutOfMemory, sizeof(*proxy), "tx region",
"read_tracker_proxy");
return -1;
}

proxy->space = space;
proxy->tuple = tuple;
rlist_add(&txm.read_tracker_proxy_list,
&proxy->in_read_tracker_proxies);

return 0;
}

int
memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
{
Expand All @@ -2125,17 +2272,14 @@ memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
if (space->def->opts.is_ephemeral)
return 0;

struct memtx_story *story = NULL;
if (tuple->is_dirty) {
story = memtx_tx_story_get(tuple);
} else {
story = memtx_tx_story_new(space, tuple);
if (story == NULL)
return -1;
}
/*
* Workaround for the case of detached txn
* where the on_yield trigger won't be useful.
*/
if (in_txn() != txn)
return memtx_tx_track_read_slow(txn, space, tuple);

// TODO: to add flag for the case where simple tracker allcation is necessary
return memtx_tx_track_read_story(txn, space, story, UINT64_MAX);
return read_tracker_proxy_new(txn, space, tuple);
}

/**
Expand Down Expand Up @@ -2276,7 +2420,7 @@ memtx_tx_track_point_slow(struct txn *txn, struct index *index, const char *key)

static struct gap_item *
memtx_tx_gap_item_new(struct txn *txn, enum iterator_type type,
const char *key, uint32_t part_count)
uint32_t key_len, const char *key, uint32_t part_count)
{
struct gap_item *item = (struct gap_item *)
mempool_alloc(&txm.gap_item_mempoool);
Expand All @@ -2288,10 +2432,7 @@ memtx_tx_gap_item_new(struct txn *txn, enum iterator_type type,
item->txn = txn;
item->type = type;
item->part_count = part_count;
const char *tmp = key;
for (uint32_t i = 0; i < part_count; i++)
mp_next(&tmp);
item->key_len = tmp - key;
item->key_len = key_len;
if (part_count == 0) {
item->key = NULL;
} else if (item->key_len <= sizeof(item->short_key)) {
Expand Down Expand Up @@ -2322,12 +2463,12 @@ memtx_tx_gap_item_new(struct txn *txn, enum iterator_type type,
int
memtx_tx_track_gap_slow(struct txn *txn, struct space *space, struct index *index,
struct tuple *successor, enum iterator_type type,
const char *key, uint32_t part_count)
uint32_t key_len, const char *key, uint32_t part_count)
{
if (txn->status != TXN_INPROGRESS)
return 0;

struct gap_item *item = memtx_tx_gap_item_new(txn, type, key,
struct gap_item *item = memtx_tx_gap_item_new(txn, type, key_len, key,
part_count);
if (item == NULL)
return -1;
Expand All @@ -2353,6 +2494,67 @@ memtx_tx_track_gap_slow(struct txn *txn, struct space *space, struct index *inde
return 0;
}

/**
* Allocate new gap_item_proxy item for @a txn on its region.
*/
static int
memtx_tx_gap_item_proxy_new(struct txn *txn, struct space *space,
struct index *index, struct tuple *successor,
enum iterator_type type, uint32_t key_len,
const char *key, uint32_t part_count)
{
assert(key_len <= 16);
struct gap_item_proxy *proxy = NULL;
proxy = region_alloc(&txn->region, sizeof(*proxy));
if (proxy == NULL) {
diag_set(OutOfMemory, sizeof(*proxy), "tx region",
"gap_item_proxy");
return -1;
}

proxy->space = space;
proxy->index = index;
proxy->successor = successor;
proxy->part_count = part_count;
proxy->type = type;
proxy->key_len = (uint8_t) key_len;
memcpy(proxy->key, key, key_len);
rlist_add(&txm.gap_item_proxy_list, &proxy->in_gap_item_proxies);

return 0;
}

/**
* Create either gap record or the proxy to postpone this tracker creation
* depending on the @a key length.
* @sa memtx_tx_track_gap_slow
*/
int
memtx_tx_track_gap_proxy(struct txn *txn, struct space *space,
struct index *index, struct tuple *successor,
enum iterator_type type, const char *key,
uint32_t part_count)
{
uint32_t key_len = 0;
uint32_t key_max = sizeof(((struct gap_item_proxy *)NULL)->key);
const char *tmp = key;
for (uint32_t i = 0; i < part_count; i++)
mp_next(&tmp);
key_len = tmp - key;

/*
* Workaround for the case of detached txn where the on_yield trigger
* won't be useful and for the situations where the key is too long.
*/
if (in_txn() != txn || key_len > key_max) {
return memtx_tx_track_gap_slow(txn, space, index, successor,
type, key_len, key, part_count);
}

return memtx_tx_gap_item_proxy_new(txn, space, index, successor, type,
key_len, key, part_count);
}

static struct full_scan_item *
memtx_tx_full_scan_item_new(struct txn *txn)
{
Expand Down Expand Up @@ -2417,6 +2619,9 @@ memtx_tx_clean_txn(struct txn *txn)
in_full_scan_list);
memtx_tx_full_scan_item_delete(item);
}
/* Refresh tracker lists for the next txn. */
rlist_del(&txm.read_tracker_proxy_list);
rlist_del(&txm.gap_item_proxy_list);
}

static uint32_t
Expand Down
18 changes: 13 additions & 5 deletions src/box/memtx_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ memtx_tx_manager_init();
void
memtx_tx_manager_free();

/**
* Fulfill delayed operations on yield moment (used by the on_yield trigger).
*/
int
memtx_tx_on_yield(struct txn *txn);

/**
* Transaction providing DDL changes is disallowed to yield after
* modifications of internal caches (i.e. after ALTER operation finishes).
Expand Down Expand Up @@ -290,6 +296,7 @@ memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,

/**
* Record in TX manager that a transaction @txn have read a @tuple in @space.
* If @a txn yields than @a tuple must be referenced until this moment.
* @return 0 on success, -1 on memory error.
*/
int
Expand Down Expand Up @@ -326,9 +333,10 @@ memtx_tx_track_point(struct txn *txn, struct space *space,
* Helper of memtx_tx_track_gap.
*/
int
memtx_tx_track_gap_slow(struct txn *txn, struct space *space, struct index *index,
struct tuple *successor, enum iterator_type type,
const char *key, uint32_t part_count);
memtx_tx_track_gap_proxy(struct txn *txn, struct space *space,
struct index *index, struct tuple *successor,
enum iterator_type type, const char *key,
uint32_t part_count);

/**
* Record in TX manager that a transaction @a txn have read nothing
Expand All @@ -351,8 +359,8 @@ memtx_tx_track_gap(struct txn *txn, struct space *space, struct index *index,
/* Skip ephemeral spaces. */
if (space == NULL || space->def->id == 0)
return 0;
return memtx_tx_track_gap_slow(txn, space, index, successor,
type, key, part_count);
return memtx_tx_track_gap_proxy(txn, space, index, successor,
type, key, part_count);
}

/**
Expand Down

0 comments on commit b8bf592

Please sign in to comment.