Skip to content

Commit

Permalink
WT-7784 Enable RTS to use checkpoint snapshot on timestamp tables (#6752
Browse files Browse the repository at this point in the history
)

Usually, for timestamped tables, the commit/durable timestamp will be
always greater than the stable timestamp and there is no need of
checkpoint snapshot to abort unstable updates. Allowing it to use
checkpoint snapshot shouldn't be a problem unless the commit/durable
timestamp doesn't adhere to the condition that it must be less than the
stable timestamp.
  • Loading branch information
kommiharibabu committed Jul 7, 2021
1 parent 3846a7b commit f29e427
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 32 deletions.
34 changes: 13 additions & 21 deletions src/txn/txn_rollback_to_stable.c
Expand Up @@ -8,9 +8,8 @@

#include "wt_internal.h"

#define WT_CHECK_RECOVERY_FLAG_TS_TXNID(session, txnid, durablets) \
((durablets) == WT_TS_NONE && F_ISSET(S2C(session), WT_CONN_RECOVERING) && \
(txnid) >= S2C(session)->recovery_ckpt_snap_min)
#define WT_CHECK_RECOVERY_FLAG_TXNID(session, txnid) \
(F_ISSET(S2C(session), WT_CONN_RECOVERING) && (txnid) >= S2C(session)->recovery_ckpt_snap_min)

/* Enable rollback to stable verbose messaging during recovery. */
#define WT_VERB_RECOVERY_RTS(session) \
Expand Down Expand Up @@ -414,10 +413,8 @@ __rollback_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_REF *ref, WT_PAGE *page
*/
/* Retrieve the time window from the history cursor. */
__wt_hs_upd_time_window(hs_cursor, &hs_tw);
if (!replace &&
(hs_stop_durable_ts != WT_TS_NONE ||
!__rollback_check_if_txnid_non_committed(session, hs_tw->stop_txn)) &&
(hs_stop_durable_ts <= rollback_timestamp)) {
if (!replace && !__rollback_check_if_txnid_non_committed(session, hs_tw->stop_txn) &&
hs_stop_durable_ts <= rollback_timestamp) {
__wt_verbose(session, WT_VERB_RECOVERY_RTS(session),
"history store update valid with stop timestamp: %s, stable timestamp: %s, txnid: "
"%" PRIu64 " and type: %" PRIu8,
Expand All @@ -430,9 +427,8 @@ __rollback_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_REF *ref, WT_PAGE *page
* Stop processing when we find a stable update according to the given timestamp and
* transaction id.
*/
if ((hs_durable_ts != WT_TS_NONE ||
!__rollback_check_if_txnid_non_committed(session, hs_tw->start_txn)) &&
(hs_durable_ts <= rollback_timestamp)) {
if (!__rollback_check_if_txnid_non_committed(session, hs_tw->start_txn) &&
hs_durable_ts <= rollback_timestamp) {
__wt_verbose(session, WT_VERB_RECOVERY_RTS(session),
"history store update valid with start timestamp: %s, durable timestamp: %s, stop "
"timestamp: %s, stable timestamp: %s, txnid: %" PRIu64 " and type: %" PRIu8,
Expand Down Expand Up @@ -639,9 +635,8 @@ __rollback_abort_ondisk_kv(WT_SESSION_IMPL *session, WT_REF *ref, WT_COL *cip, W
WT_STAT_CONN_DATA_INCR(session, txn_rts_sweep_hs_keys);
} else
return (0);
} else if (((vpack->tw.durable_start_ts > rollback_timestamp) ||
(vpack->tw.durable_start_ts == WT_TS_NONE &&
__rollback_check_if_txnid_non_committed(session, vpack->tw.start_txn))) ||
} else if (vpack->tw.durable_start_ts > rollback_timestamp ||
__rollback_check_if_txnid_non_committed(session, vpack->tw.start_txn) ||
(!WT_TIME_WINDOW_HAS_STOP(&vpack->tw) && prepared)) {
__wt_verbose(session, WT_VERB_RECOVERY_RTS(session),
"on-disk update aborted with start durable timestamp: %s, commit timestamp: %s, "
Expand All @@ -661,10 +656,8 @@ __rollback_abort_ondisk_kv(WT_SESSION_IMPL *session, WT_REF *ref, WT_COL *cip, W
WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_removed);
}
} else if (WT_TIME_WINDOW_HAS_STOP(&vpack->tw) &&
(((vpack->tw.durable_stop_ts > rollback_timestamp) ||
(vpack->tw.durable_stop_ts == WT_TS_NONE &&
__rollback_check_if_txnid_non_committed(session, vpack->tw.stop_txn))) ||
prepared)) {
(vpack->tw.durable_stop_ts > rollback_timestamp ||
__rollback_check_if_txnid_non_committed(session, vpack->tw.stop_txn) || prepared)) {
/*
* Clear the remove operation from the key by inserting the original on-disk value as a
* standard update.
Expand Down Expand Up @@ -1062,14 +1055,14 @@ __rollback_page_needs_abort(
prepared = vpack.ta.prepare;
newest_txn = vpack.ta.newest_txn;
result = (durable_ts > rollback_timestamp) || prepared ||
WT_CHECK_RECOVERY_FLAG_TS_TXNID(session, newest_txn, durable_ts);
WT_CHECK_RECOVERY_FLAG_TXNID(session, newest_txn);
} else if (addr != NULL) {
tag = "address";
durable_ts = __rollback_get_ref_max_durable_timestamp(session, &addr->ta);
prepared = addr->ta.prepare;
newest_txn = addr->ta.newest_txn;
result = (durable_ts > rollback_timestamp) || prepared ||
WT_CHECK_RECOVERY_FLAG_TS_TXNID(session, newest_txn, durable_ts);
WT_CHECK_RECOVERY_FLAG_TXNID(session, newest_txn);
}

__wt_verbose(session, WT_VERB_RECOVERY_RTS(session),
Expand Down Expand Up @@ -1479,8 +1472,7 @@ __rollback_to_stable_btree_apply(
WT_RET_NOTFOUND_OK(ret);
}
max_durable_ts = WT_MAX(newest_start_durable_ts, newest_stop_durable_ts);
has_txn_updates_gt_than_ckpt_snap =
WT_CHECK_RECOVERY_FLAG_TS_TXNID(session, rollback_txnid, max_durable_ts);
has_txn_updates_gt_than_ckpt_snap = WT_CHECK_RECOVERY_FLAG_TXNID(session, rollback_txnid);

/* Increment the inconsistent checkpoint stats counter. */
if (has_txn_updates_gt_than_ckpt_snap)
Expand Down
86 changes: 75 additions & 11 deletions test/suite/test_checkpoint_snapshot02.py
Expand Up @@ -51,19 +51,25 @@ def conn_config(self):
config = 'cache_size=10MB,statistics=(all),statistics_log=(json,on_close,wait=1),log=(enabled=true),timing_stress_for_test=[checkpoint_slow]'
return config

def large_updates(self, uri, value, ds, nrows):
def large_updates(self, uri, value, ds, nrows, commit_ts):
# Update a large number of records.
session = self.session
cursor = session.open_cursor(uri)
for i in range(0, nrows):
session.begin_transaction()
cursor[ds.key(i)] = value
session.commit_transaction()
if commit_ts == 0:
session.commit_transaction()
else:
session.commit_transaction('commit_timestamp=' + timestamp_str(commit_ts))
cursor.close()

def check(self, check_value, uri, nrows):
def check(self, check_value, uri, nrows, read_ts):
session = self.session
session.begin_transaction()
if read_ts == 0:
session.begin_transaction()
else:
session.begin_transaction('read_timestamp=' + timestamp_str(read_ts))
cursor = session.open_cursor(uri)
count = 0
for k, v in cursor:
Expand All @@ -77,14 +83,68 @@ def test_checkpoint_snapshot(self):
ds = SimpleDataSet(self, self.uri, 0, key_format="S", value_format="S",config='log=(enabled=false)')
ds.populate()
valuea = "aaaaa" * 100
valueb = "bbbbb" * 100
valuec = "ccccc" * 100
valued = "ddddd" * 100

cursor = self.session.open_cursor(self.uri)
self.large_updates(self.uri, valuea, ds, self.nrows)
self.large_updates(self.uri, valuea, ds, self.nrows, 0)
self.check(valuea, self.uri, self.nrows, 0)

self.check(valuea, self.uri, self.nrows)
session1 = self.conn.open_session()
session1.begin_transaction()
cursor1 = session1.open_cursor(self.uri)

for i in range(self.nrows, self.nrows*2):
cursor1.set_key(ds.key(i))
cursor1.set_value(valuea)
self.assertEqual(cursor1.insert(), 0)

# Create a checkpoint thread
done = threading.Event()
ckpt = checkpoint_thread(self.conn, done)
try:
ckpt.start()
# Sleep for sometime so that checkpoint starts before committing last transaction.
time.sleep(2)
session1.commit_transaction()

finally:
done.set()
ckpt.join()

#Simulate a crash by copying to a new directory(RESTART).
copy_wiredtiger_home(self, ".", "RESTART")

# Open the new directory.
self.conn = self.setUpConnectionOpen("RESTART")
self.session = self.setUpSessionOpen(self.conn)

# Check the table contains the last checkpointed value.
self.check(valuea, self.uri, self.nrows, 0)

stat_cursor = self.session.open_cursor('statistics:', None, None)
inconsistent_ckpt = stat_cursor[stat.conn.txn_rts_inconsistent_ckpt][2]
keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2]
keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2]
pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2]
upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2]
stat_cursor.close()

self.assertGreater(inconsistent_ckpt, 0)
self.assertEqual(upd_aborted, 0)
self.assertGreaterEqual(keys_removed, 0)
self.assertEqual(keys_restored, 0)
self.assertGreaterEqual(pages_visited, 0)

def test_checkpoint_snapshot_with_timestamp(self):

ds = SimpleDataSet(self, self.uri, 0, key_format="S", value_format="S",config='log=(enabled=false)')
ds.populate()
valuea = "aaaaa" * 100

# Pin oldest and stable timestamps to 10.
self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) +
',stable_timestamp=' + timestamp_str(10))

self.large_updates(self.uri, valuea, ds, self.nrows, 20)
self.check(valuea, self.uri, self.nrows, 20)

session1 = self.conn.open_session()
session1.begin_transaction()
Expand All @@ -94,6 +154,10 @@ def test_checkpoint_snapshot(self):
cursor1.set_key(ds.key(i))
cursor1.set_value(valuea)
self.assertEqual(cursor1.insert(), 0)
session1.timestamp_transaction('commit_timestamp=' + timestamp_str(30))

# Set stable timestamp to 40
self.conn.set_timestamp('stable_timestamp=' + timestamp_str(40))

# Create a checkpoint thread
done = threading.Event()
Expand All @@ -116,7 +180,7 @@ def test_checkpoint_snapshot(self):
self.session = self.setUpSessionOpen(self.conn)

# Check the table contains the last checkpointed value.
self.check(valuea, self.uri, self.nrows)
self.check(valuea, self.uri, self.nrows, 30)

stat_cursor = self.session.open_cursor('statistics:', None, None)
inconsistent_ckpt = stat_cursor[stat.conn.txn_rts_inconsistent_ckpt][2]
Expand Down

0 comments on commit f29e427

Please sign in to comment.