Skip to content

Commit

Permalink
WT-6961 RTS no stable timestamp (#6158)
Browse files Browse the repository at this point in the history
Allow RTS to operate when a stable timestamp is not set. 
Skip RTS for empty tables and only during recovery and shutdown.
  • Loading branch information
raviprakashgiri29 committed Nov 30, 2020
1 parent 5a37d21 commit d6a0e16
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 46 deletions.
7 changes: 6 additions & 1 deletion src/block/block_open.c
Expand Up @@ -391,7 +391,12 @@ __desc_read(WT_SESSION_IMPL *session, uint32_t allocsize, WT_BLOCK *block)
*/
if (F_ISSET(session, WT_SESSION_IMPORT_REPAIR))
goto err;
WT_ERR_MSG(session, WT_ERROR, "%s does not appear to be a WiredTiger file", block->name);

if (F_ISSET(session, WT_SESSION_ROLLBACK_TO_STABLE))
ret = ENOENT;
else
WT_ERR_MSG(
session, WT_ERROR, "%s does not appear to be a WiredTiger file", block->name);
}

if (desc->majorv > WT_BLOCK_MAJOR_VERSION ||
Expand Down
13 changes: 6 additions & 7 deletions src/txn/txn_recover.c
Expand Up @@ -931,13 +931,9 @@ __wt_txn_recover(WT_SESSION_IMPL *session, const char *cfg[])
* Perform rollback to stable only when the following conditions met.
* 1. The connection is not read-only. A read-only connection expects that there shouldn't be
* any changes that need to be done on the database other than reading.
* 2. A valid recovery timestamp. The recovery timestamp is the stable timestamp retrieved
* from the metadata checkpoint information to indicate the stable timestamp when the
* checkpoint happened. Anything updates newer than this timestamp must rollback.
* 3. The history store file was found in the metadata.
* 2. The history store file was found in the metadata.
*/
if (hs_exists && !F_ISSET(conn, WT_CONN_READONLY) &&
conn->txn_global.recovery_timestamp != WT_TS_NONE) {
if (hs_exists && !F_ISSET(conn, WT_CONN_READONLY)) {
/* Start the eviction threads for rollback to stable if not already started. */
if (!eviction_started) {
WT_ERR(__wt_evict_create(session));
Expand All @@ -964,7 +960,10 @@ __wt_txn_recover(WT_SESSION_IMPL *session, const char *cfg[])
* stable.
*/
conn->txn_global.stable_timestamp = conn->txn_global.recovery_timestamp;
conn->txn_global.has_stable_timestamp = true;
conn->txn_global.has_stable_timestamp = false;

if (conn->txn_global.recovery_timestamp != WT_TS_NONE)
conn->txn_global.has_stable_timestamp = true;

__wt_verbose(session, WT_VERB_RTS,
"Performing recovery rollback_to_stable with stable timestamp: %s and oldest timestamp: "
Expand Down
62 changes: 44 additions & 18 deletions src/txn/txn_rollback_to_stable.c
Expand Up @@ -978,17 +978,9 @@ __rollback_to_stable_btree(WT_SESSION_IMPL *session, wt_timestamp_t rollback_tim
static int
__rollback_to_stable_check(WT_SESSION_IMPL *session)
{
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
WT_TXN_GLOBAL *txn_global;
bool txn_active;

conn = S2C(session);
txn_global = &conn->txn_global;

if (!txn_global->has_stable_timestamp)
WT_RET_MSG(session, EINVAL, "rollback_to_stable requires a stable timestamp");

/*
* Help the user comply with the requirement that there are no concurrent operations. Protect
* against spurious conflicts with the sweep server: we exclude it from running concurrent with
Expand Down Expand Up @@ -1160,17 +1152,19 @@ static int
__rollback_to_stable_btree_apply(WT_SESSION_IMPL *session)
{
WT_CONFIG ckptconf;
WT_CONFIG_ITEM cval, durableval, key;
WT_CONFIG_ITEM cval, value, key;
WT_CURSOR *cursor;
WT_DECL_RET;
WT_TXN_GLOBAL *txn_global;
wt_timestamp_t max_durable_ts, newest_start_durable_ts, newest_stop_durable_ts,
rollback_timestamp;
size_t addr_size;
char ts_string[2][WT_TS_INT_STRING_SIZE];
const char *config, *uri;
bool durable_ts_found, prepared_updates;

txn_global = &S2C(session)->txn_global;
addr_size = 0;

/*
* Copy the stable timestamp, otherwise we'd need to lock it each time it's accessed. Even
Expand Down Expand Up @@ -1204,32 +1198,64 @@ __rollback_to_stable_btree_apply(WT_SESSION_IMPL *session)
WT_ERR(__wt_config_getones(session, config, "checkpoint", &cval));
__wt_config_subinit(session, &ckptconf, &cval);
for (; __wt_config_next(&ckptconf, &key, &cval) == 0;) {
ret = __wt_config_subgets(session, &cval, "newest_start_durable_ts", &durableval);
ret = __wt_config_subgets(session, &cval, "newest_start_durable_ts", &value);
if (ret == 0) {
newest_start_durable_ts =
WT_MAX(newest_start_durable_ts, (wt_timestamp_t)durableval.val);
WT_MAX(newest_start_durable_ts, (wt_timestamp_t)value.val);
durable_ts_found = true;
}
WT_ERR_NOTFOUND_OK(ret, false);
ret = __wt_config_subgets(session, &cval, "newest_stop_durable_ts", &durableval);
ret = __wt_config_subgets(session, &cval, "newest_stop_durable_ts", &value);
if (ret == 0) {
newest_stop_durable_ts =
WT_MAX(newest_stop_durable_ts, (wt_timestamp_t)durableval.val);
newest_stop_durable_ts = WT_MAX(newest_stop_durable_ts, (wt_timestamp_t)value.val);
durable_ts_found = true;
}
WT_ERR_NOTFOUND_OK(ret, false);
ret = __wt_config_subgets(session, &cval, "prepare", &durableval);
ret = __wt_config_subgets(session, &cval, "prepare", &value);
if (ret == 0) {
if (durableval.val)
if (value.val)
prepared_updates = true;
}
WT_ERR_NOTFOUND_OK(ret, false);
ret = __wt_config_subgets(session, &cval, "addr", &value);
if (ret == 0)
addr_size = value.len;
WT_ERR_NOTFOUND_OK(ret, false);
}
max_durable_ts = WT_MAX(newest_start_durable_ts, newest_stop_durable_ts);

/*
* The rollback to stable will skip the tables during recovery and shutdown in the following
* conditions.
* 1. Empty table.
* 2. Table has timestamped updates without a stable timestamp.
*/
if ((F_ISSET(S2C(session), WT_CONN_RECOVERING) ||
F_ISSET(S2C(session), WT_CONN_CLOSING_TIMESTAMP)) &&
(addr_size == 0 ||
(txn_global->stable_timestamp == WT_TS_NONE && max_durable_ts != WT_TS_NONE))) {
__wt_verbose(session, WT_VERB_RTS, "Skip rollback to stable on file %s because %s", uri,
addr_size == 0 ? "its checkpoint address length is 0" :
"it has timestamped updates and the stable timestamp is 0");
continue;
}

/* Set this flag to return error instead of panic if file is corrupted. */
F_SET(session, WT_SESSION_QUIET_CORRUPT_FILE);
ret = __wt_session_get_dhandle(session, uri, NULL, NULL, 0);
/* Ignore performing rollback to stable on files that don't exist. */
if (ret == ENOENT)
F_CLR(session, WT_SESSION_QUIET_CORRUPT_FILE);

/*
* Ignore performing rollback to stable on files that does not exist or the files where
* corruption is detected.
*/
if ((ret == ENOENT) ||
(ret == WT_ERROR && F_ISSET(S2C(session), WT_CONN_DATA_CORRUPTION))) {
__wt_verbose(session, WT_VERB_RTS,
"Ignore performing rollback to stable on %s because the file %s", uri,
ret == ENOENT ? "does not exist" : "is corrupted.");
continue;
}
WT_ERR(ret);

/*
Expand Down
8 changes: 6 additions & 2 deletions test/suite/test_prepare_hs04.py
Expand Up @@ -165,8 +165,12 @@ def prepare_updates(self, ds):
# After simulating a crash, search for the keys inserted.

txn_config = 'read_timestamp=' + timestamp_str(5) + ',ignore_prepare=false'
# Search keys with timestamp 5, ignore_prepare=false and expect the cursor value to be commit_value.
self.search_keys_timestamp_and_ignore(ds, txn_config, commit_value)
if self.commit == True:
# Search keys with timestamp 5, ignore_prepare=false and expect the cursor search to return WT_NOTFOUND.
self.search_keys_timestamp_and_ignore(ds, txn_config, None)
else:
# Search keys with timestamp 5, ignore_prepare=false and expect the cursor value to be commit_value.
self.search_keys_timestamp_and_ignore(ds, txn_config, commit_value)

txn_config = 'read_timestamp=' + timestamp_str(20) + ',ignore_prepare=true'
# Search keys with timestamp 20, ignore_prepare=true and expect the cursor search to return WT_NOTFOUND.
Expand Down
10 changes: 0 additions & 10 deletions test/suite/test_rollback_to_stable05.py
Expand Up @@ -76,10 +76,6 @@ def test_rollback_to_stable(self):
self, uri_2, 0, key_format="i", value_format="S", config='log=(enabled=false)')
ds_2.populate()

# Pin oldest and stable to timestamp 1.
self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(1) +
',stable_timestamp=' + timestamp_str(1))

valuea = "aaaaa" * 100
valueb = "bbbbb" * 100
valuec = "ccccc" * 100
Expand Down Expand Up @@ -113,12 +109,6 @@ def test_rollback_to_stable(self):
self.large_updates(uri_2, valued, ds_2, nrows, 0)
self.check(valued, uri_2, nrows, 0)

# Pin stable to timestamp 20 if prepare otherwise 10.
if self.prepare:
self.conn.set_timestamp('stable_timestamp=' + timestamp_str(20))
else:
self.conn.set_timestamp('stable_timestamp=' + timestamp_str(10))

# Checkpoint to ensure that all the data is flushed.
if not self.in_memory:
self.session.checkpoint()
Expand Down
5 changes: 4 additions & 1 deletion test/suite/test_schema08.py
Expand Up @@ -140,10 +140,13 @@ def run_recovery(self, uri, suburi):
# Make an initial copy as well as a copy for each LSN we save.
# Truncate the log to the appropriate offset as we make each copy.
olddir = "."
errfile="errfile.txt"
for lsn in self.lsns:
newdir = self.backup_pfx + str(lsn)
outfile = newdir + '.txt'
self.runWt(['-R', '-h', newdir, 'list', '-v'], outfilename=outfile)
self.runWt(['-R', '-h', newdir, 'list', '-v'], errfilename=errfile, outfilename=outfile)
if os.path.isfile(errfile) and os.path.getsize(errfile) > 0:
self.check_file_contains(errfile,'No such file or directory')

# Test that creating and dropping tables does not write individual
# log records.
Expand Down
9 changes: 4 additions & 5 deletions test/suite/test_txn16.py
Expand Up @@ -92,11 +92,10 @@ def run_toggle(self, homedir):
cur_logs = fnmatch.filter(os.listdir(homedir), "*gerLog*")
scur = set(cur_logs)
sorig = set(orig_logs)
# There should never be overlap with the log files that
# were there originally. Mostly this checks that after
# opening with logging disabled and then re-enabled, we
# don't see log file 1.
self.assertEqual(scur.isdisjoint(sorig), True)
# There can be overlap with the log files that were
# there originally. Because some pages are rolled back
# as part of RTS.
self.assertEqual(scur.isdisjoint(sorig), False)
if loop > 1:
# We should be creating the same log files each time.
for l in cur_logs:
Expand Down
4 changes: 4 additions & 0 deletions test/suite/test_txn22.py
Expand Up @@ -170,5 +170,9 @@ def test_corrupt_meta(self):
lambda: self.reopen_conn(salvagedir, salvage_config),
'/.*/')

# The test may output the following error message while opening a file that
# does not exist. Ignore that.
self.ignoreStderrPatternIfExists('No such file or directory')

if __name__ == '__main__':
wttest.run()
8 changes: 6 additions & 2 deletions test/suite/test_verify.py
Expand Up @@ -182,7 +182,9 @@ def test_verify_process_truncated(self):
f.truncate(0)
self.runWt(["verify", "table:" + self.tablename],
errfilename="verifyerr.out", failure=True)
self.check_non_empty_file("verifyerr.out")
# The test may output the following error message while opening a file that
# does not exist. Ignore that.
self.ignoreStderrPatternIfExists('No such file or directory')

def test_verify_process_zero_length(self):
"""
Expand All @@ -195,7 +197,9 @@ def test_verify_process_zero_length(self):
f.truncate(0)
self.runWt(["verify", "table:" + self.tablename],
errfilename="verifyerr.out", failure=True)
self.check_non_empty_file("verifyerr.out")
# The test may output the following error message while opening a file that
# does not exist. Ignore that.
self.ignoreStderrPatternIfExists('No such file or directory')

if __name__ == '__main__':
wttest.run()
4 changes: 4 additions & 0 deletions test/suite/wttest.py
Expand Up @@ -527,6 +527,10 @@ def ignoreStdoutPatternIfExists(self, pat):
if self.captureout.hasUnexpectedOutput(self):
self.captureout.checkAdditionalPattern(self, pat)

def ignoreStderrPatternIfExists(self, pat):
if self.captureerr.hasUnexpectedOutput(self):
self.captureerr.checkAdditionalPattern(self, pat)

def assertRaisesWithMessage(self, exceptionType, expr, message):
"""
Like TestCase.assertRaises(), but also checks to see
Expand Down

0 comments on commit d6a0e16

Please sign in to comment.