Skip to content

Commit

Permalink
WT-5612 History store cleanup for non-timestamped tables (#5276)
Browse files Browse the repository at this point in the history
There is no need for any history store values for non-timestamped
tables when there are no old running transactions in parallel as
the newer transactions can see the newer data that is visible in
the data store. So clean the history store for the non-timestamped
tables whenever rollback to stable is called.
  • Loading branch information
kommiharibabu committed Feb 21, 2020
1 parent 9c4d211 commit b803a87
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 17 deletions.
2 changes: 0 additions & 2 deletions src/include/connection.h
Expand Up @@ -398,8 +398,6 @@ struct __wt_connection_impl {
uint64_t sweep_interval; /* Handle sweep interval */
uint64_t sweep_handles_min; /* Handle sweep minimum open */

/* Set of btree IDs not being rolled back */
uint8_t *stable_rollback_bitstring;
uint32_t stable_rollback_maxfile;

/* Locked: collator list */
Expand Down
124 changes: 114 additions & 10 deletions src/txn/txn_rollback_to_stable.c
Expand Up @@ -196,7 +196,13 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW
if (cmp != 0)
break;

/* Set this comparison as exact match of the search for later use. */
/*
* As part of the history store search, we never get an exact match based on our search
* criteria as we always search for a maximum record for that key. Make sure that we set the
* comparison result as an exact match to remove this key as part of rollback to stable. In
* case if we don't mark the comparison result as same, later the __wt_row_modify function
* will not properly remove the update from history store.
*/
cbt->compare = 0;

/* Get current value and convert to full update if it is a modify. */
Expand All @@ -215,6 +221,12 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW
}

WT_ERR(__wt_upd_alloc_tombstone(session, &hs_upd));

/*
* Any history store updates don't use transactions as those updates should be immediately
* visible and doesn't follow the transaction semantics. Due to this reason, the history
* store updates are directly modified using the low level api instead of cursor api.
*/
WT_WITH_BTREE(session, cbt->btree,
ret = __wt_row_modify(cbt, &hs_cursor->key, NULL, hs_upd, WT_UPDATE_INVALID, true));
WT_ERR(ret);
Expand Down Expand Up @@ -432,14 +444,28 @@ __rollback_abort_row_reconciled_page(
if ((mod = page->modify) == NULL)
return (0);

if (mod->rec_result == WT_PM_REC_REPLACE)
if (mod->rec_result == WT_PM_REC_REPLACE) {
WT_RET(__rollback_abort_row_reconciled_page_internal(session, mod->u1.r.disk_image,
mod->u1.r.replace.addr, mod->u1.r.replace.size, rollback_timestamp));
else if (mod->rec_result == WT_PM_REC_MULTIBLOCK) {

/*
* As this page has newer aborts that are aborted, make sure to mark the page as dirty to
* let the reconciliation happens again on the page. Otherwise, the eviction may pick the
* already reconciled page to write to disk with newer updates.
*/
__wt_page_only_modify_set(session, page);
} else if (mod->rec_result == WT_PM_REC_MULTIBLOCK) {
for (multi = mod->mod_multi, multi_entry = 0; multi_entry < mod->mod_multi_entries;
++multi, ++multi_entry)
WT_RET(__rollback_abort_row_reconciled_page_internal(
session, multi->disk_image, multi->addr.addr, multi->addr.size, rollback_timestamp));

/*
* As this page has newer aborts that are aborted, make sure to mark the page as dirty to
* let the reconciliation happens again on the page. Otherwise, the eviction may pick the
* already reconciled page to write to disk with newer updates.
*/
__wt_page_only_modify_set(session, page);
}

return (0);
Expand Down Expand Up @@ -661,14 +687,11 @@ __rollback_to_stable_btree(WT_SESSION_IMPL *session, wt_timestamp_t rollback_tim
* inconsistent.
*/
if (__wt_btree_immediately_durable(session)) {
/*
* Add the btree ID to the bitstring, so we can exclude any history store entries for this
* btree.
*/
if (btree->id >= conn->stable_rollback_maxfile)
WT_PANIC_RET(session, EINVAL, "btree file ID %" PRIu32 " larger than max %" PRIu32,
btree->id, conn->stable_rollback_maxfile);
__bit_set(conn->stable_rollback_bitstring, btree->id);
__wt_verbose(session, WT_VERB_RTS,
"%s: Immediately durable btree skipped for rollback to stable", btree->dhandle->name);
return (0);
}

Expand Down Expand Up @@ -726,6 +749,82 @@ __rollback_to_stable_check(WT_SESSION_IMPL *session)
return (ret);
}

/*
* __rollback_to_stable_btree_hs_cleanup --
* Wipe all history store updates for the btree (non-timestamped tables)
*/
static int
__rollback_to_stable_btree_hs_cleanup(WT_SESSION_IMPL *session, uint32_t btree_id)
{
WT_CURSOR *hs_cursor;
WT_CURSOR_BTREE *cbt;
WT_DECL_ITEM(hs_key);
WT_DECL_RET;
WT_ITEM key;
WT_TIME_PAIR hs_start, hs_stop;
WT_UPDATE *hs_upd;
uint32_t hs_btree_id, session_flags;
int exact;

hs_cursor = NULL;
WT_CLEAR(key);
hs_upd = NULL;
session_flags = 0;

WT_ERR(__wt_scr_alloc(session, 0, &hs_key));

/* Open a history store table cursor. */
WT_ERR(__wt_hs_cursor(session, &session_flags));
hs_cursor = session->hs_cursor;
cbt = (WT_CURSOR_BTREE *)hs_cursor;

/* Walk the history store for the given btree. */
hs_cursor->set_key(hs_cursor, btree_id, &key, WT_TS_NONE, WT_TXN_NONE, WT_TS_NONE, WT_TXN_NONE);
ret = hs_cursor->search_near(hs_cursor, &exact);

/*
* The search should always end up pointing either to the start of the required btree or end of
* previous btree. Move the cursor based on the result.
*/
WT_ASSERT(session, exact != 0);
if (ret == 0 && exact < 0)
ret = hs_cursor->next(hs_cursor);

for (; ret == 0; ret = hs_cursor->next(hs_cursor)) {
WT_ERR(hs_cursor->get_key(hs_cursor, &hs_btree_id, hs_key, &hs_start.timestamp,
&hs_start.txnid, &hs_stop.timestamp, &hs_stop.txnid));

/* Stop crossing into the next btree boundary. */
if (btree_id != hs_btree_id)
break;

/* Set this comparison as exact match of the search for later use. */
cbt->compare = 0;

WT_ERR(__wt_upd_alloc_tombstone(session, &hs_upd));

/*
* Any history store updates don't use transactions as those updates should be immediately
* visible and doesn't follow the transaction semantics. Due to this reason, the history
* store updates are directly modified using the low level api instead of cursor api.
*/
WT_WITH_BTREE(session, cbt->btree,
ret = __wt_row_modify(cbt, &hs_cursor->key, NULL, hs_upd, WT_UPDATE_INVALID, true));
WT_ERR(ret);
WT_STAT_CONN_INCR(session, txn_rts_hs_removed);
hs_upd = NULL;
}
WT_ERR_NOTFOUND_OK(ret);

err:
__wt_scr_free(session, &hs_key);
__wt_free(session, hs_upd);
if (hs_cursor != NULL)
WT_TRET(__wt_hs_cursor_close(session, session_flags));

return (ret);
}

/*
* __rollback_to_stable_btree_apply --
* Perform rollback to stable to all files listed in the metadata, apart from the metadata and
Expand Down Expand Up @@ -795,6 +894,13 @@ __rollback_to_stable_btree_apply(WT_SESSION_IMPL *session)
} else
__wt_verbose(session, WT_VERB_RTS, "%s: file skipped", uri);

/* Cleanup any history store entries for this non-timestamped table. */
if (newest_durable_ts == WT_TS_NONE && !F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) {
__wt_verbose(
session, WT_VERB_RTS, "%s: non-timestamped file history store cleanup", uri);
WT_TRET(__rollback_to_stable_btree_hs_cleanup(session, S2BT(session)->id));
}

WT_TRET(__wt_session_release_dhandle(session));
WT_ERR(ret);
}
Expand Down Expand Up @@ -839,12 +945,10 @@ __rollback_to_stable(WT_SESSION_IMPL *session, const char *cfg[])
* current value is already in use, and hence we need to add one here.
*/
conn->stable_rollback_maxfile = conn->next_file_id + 1;
WT_ERR(__bit_alloc(session, conn->stable_rollback_maxfile, &conn->stable_rollback_bitstring));
WT_WITH_SCHEMA_LOCK(session, ret = __rollback_to_stable_btree_apply(session));

err:
F_CLR(conn, WT_CONN_EVICTION_NO_HS);
__wt_free(session, conn->stable_rollback_bitstring);
return (ret);
}

Expand Down
22 changes: 18 additions & 4 deletions test/suite/test_rollback_to_stable01.py
Expand Up @@ -45,7 +45,11 @@ def large_updates(self, uri, value, ds, nrows, commit_ts):
for i in range(0, nrows):
session.begin_transaction()
cursor[ds.key(i)] = value
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))
if commit_ts == 0:
session.commit_transaction()
else:
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))

cursor.close()

def large_modifies(self, uri, value, ds, location, nbytes, nrows, commit_ts):
Expand All @@ -57,7 +61,11 @@ def large_modifies(self, uri, value, ds, location, nbytes, nrows, commit_ts):
cursor.set_key(i)
mods = [wiredtiger.Modify(value, location, nbytes)]
self.assertEqual(cursor.modify(mods), 0)
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))

if commit_ts == 0:
session.commit_transaction()
else:
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))
cursor.close()

def large_removes(self, uri, ds, nrows, commit_ts):
Expand All @@ -68,12 +76,18 @@ def large_removes(self, uri, ds, nrows, commit_ts):
session.begin_transaction()
cursor.set_key(i)
cursor.remove()
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))
if commit_ts == 0:
session.commit_transaction()
else:
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))
cursor.close()

def check(self, check_value, uri, nrows, read_ts):
session = self.session
session.begin_transaction('read_timestamp=' + timestamp_str(read_ts))
if read_ts == 0:
session.begin_transaction()
else:
session.begin_transaction('read_timestamp=' + timestamp_str(read_ts))
cursor = session.open_cursor(uri)
count = 0
for k, v in cursor:
Expand Down
2 changes: 1 addition & 1 deletion test/suite/test_rollback_to_stable02.py
Expand Up @@ -47,7 +47,7 @@ def test_rollback_to_stable(self):
nrows = 10000

# Create a table without logging.
uri = "table:rollback_to_stable01"
uri = "table:rollback_to_stable02"
ds = SimpleDataSet(
self, uri, 0, key_format="i", value_format="S", config='log=(enabled=false)')
ds.populate()
Expand Down
127 changes: 127 additions & 0 deletions test/suite/test_rollback_to_stable05.py
@@ -0,0 +1,127 @@
#!/usr/bin/env python
#
# Public Domain 2014-2020 MongoDB, Inc.
# Public Domain 2008-2014 WiredTiger, Inc.
#
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.

import time
from helper import copy_wiredtiger_home
import unittest, wiredtiger, wttest
from wtdataset import SimpleDataSet
from wiredtiger import stat
from test_rollback_to_stable01 import test_rollback_to_stable_base

def timestamp_str(t):
return '%x' % t

# test_rollback_to_stable05.py
# Test that rollback to stable cleans history store for non-timestamp tables.
class test_rollback_to_stable05(test_rollback_to_stable_base):
# Force a small cache.
conn_config = 'cache_size=50MB,log=(enabled),statistics=(all)'
session_config = 'isolation=snapshot'

def test_rollback_to_stable(self):
nrows = 5000

# Create two tables without logging.
uri_1 = "table:rollback_to_stable05_1"
ds_1 = SimpleDataSet(
self, uri_1, 0, key_format="i", value_format="S", config='log=(enabled=false)')
ds_1.populate()

uri_2 = "table:rollback_to_stable05_2"
ds_2 = SimpleDataSet(
self, uri_2, 0, key_format="i", value_format="S", config='log=(enabled=false)')
ds_2.populate()

# Pin oldest and stable to timestamp 1.
self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(1) +
',stable_timestamp=' + timestamp_str(1))

valuea = "aaaaa" * 100
valueb = "bbbbb" * 100
valuec = "ccccc" * 100
valued = "ddddd" * 100
self.large_updates(uri_1, valuea, ds_1, nrows, 0)
self.check(valuea, uri_1, nrows, 0)

self.large_updates(uri_2, valuea, ds_2, nrows, 0)
self.check(valuea, uri_2, nrows, 0)

# Start a long running transaction and keep it open.
session_2 = self.conn.open_session()
session_2.begin_transaction('isolation=snapshot')

self.large_updates(uri_1, valueb, ds_1, nrows, 0)
self.check(valueb, uri_1, nrows, 0)

self.large_updates(uri_1, valuec, ds_1, nrows, 0)
self.check(valuec, uri_1, nrows, 0)

self.large_updates(uri_1, valued, ds_1, nrows, 0)
self.check(valued, uri_1, nrows, 0)

# Add updates to the another table.
self.large_updates(uri_2, valueb, ds_2, nrows, 0)
self.check(valueb, uri_2, nrows, 0)

self.large_updates(uri_2, valuec, ds_2, nrows, 0)
self.check(valuec, uri_2, nrows, 0)

self.large_updates(uri_2, valued, ds_2, nrows, 0)
self.check(valued, uri_2, nrows, 0)

# Pin stable to timestamp 10.
self.conn.set_timestamp('stable_timestamp=' + timestamp_str(10))

# Checkpoint to ensure that all the data is flushed.
self.session.checkpoint()

# Clear all running transactions before rollback to stable.
session_2.commit_transaction()
session_2.close()

self.conn.rollback_to_stable()
self.check(valued, uri_1, nrows, 0)
self.check(valued, uri_2, nrows, 0)

stat_cursor = self.session.open_cursor('statistics:', None, None)
calls = stat_cursor[stat.conn.txn_rts][2]
upd_aborted = (stat_cursor[stat.conn.txn_rts_upd_aborted][2] +
stat_cursor[stat.conn.txn_rts_hs_removed][2])
keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2]
keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2]
pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2]
stat_cursor.close()

self.assertEqual(calls, 1)
self.assertEqual(keys_removed, 0)
self.assertEqual(keys_restored, 0)
self.assertEqual(pages_visited, 0)
self.assertGreaterEqual(upd_aborted, nrows * 3 * 2)

if __name__ == '__main__':
wttest.run()

0 comments on commit b803a87

Please sign in to comment.