Skip to content

Commit

Permalink
WT-4122 Add test and fix for MongoDB's downgrade usage. (#4131)
Browse files Browse the repository at this point in the history
* Force log archive on downgrade changes.
  • Loading branch information
sueloverso authored and michaelcahill committed Jun 14, 2018
1 parent 11c788f commit 58713fe
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 69 deletions.
2 changes: 1 addition & 1 deletion dist/stat_data.py
Expand Up @@ -370,7 +370,7 @@ def __init__(self, name, desc, flags=''):
LogStat('log_compress_write_fails', 'log records not compressed'),
LogStat('log_compress_writes', 'log records compressed'),
LogStat('log_flush', 'log flush operations'),
LogStat('log_force_ckpt_sleep', 'force checkpoint calls slept'),
LogStat('log_force_archive_sleep', 'force archive time sleeping (usecs)'),
LogStat('log_force_write', 'log force write operations'),
LogStat('log_force_write_skip', 'log force write operations skipped'),
LogStat('log_max_filesize', 'maximum log file size', 'no_clear,no_scale,size'),
Expand Down
36 changes: 21 additions & 15 deletions src/conn/conn_log.c
Expand Up @@ -40,24 +40,25 @@ __logmgr_sync_cfg(WT_SESSION_IMPL *session, const char **cfg)
}

/*
* __logmgr_force_ckpt --
* Force a checkpoint out, waiting for the checkpoint LSN in the log
* is up to the given log number.
* __logmgr_force_archive --
* Force a checkpoint out and then force an archive, waiting for the
* first log to be archived up to the given log number.
*/
static int
__logmgr_force_ckpt(WT_SESSION_IMPL *session, uint32_t lognum)
__logmgr_force_archive(WT_SESSION_IMPL *session, uint32_t lognum)
{
WT_CONNECTION_IMPL *conn;
WT_LOG *log;
WT_SESSION_IMPL *tmp_session;
int yield;
uint64_t sleep_usecs, yield_cnt;

conn = S2C(session);
log = conn->log;
yield = 0;
sleep_usecs = yield_cnt = 0;

WT_RET(__wt_open_internal_session(conn,
"compatibility-reconfig", true, 0, &tmp_session));
while (log->ckpt_lsn.l.file < lognum) {
while (log->first_lsn.l.file < lognum) {
/*
* Force a checkpoint to be written in the new log file and
* force the archiving of all previous log files. We do the
Expand All @@ -69,15 +70,16 @@ __logmgr_force_ckpt(WT_SESSION_IMPL *session, uint32_t lognum)
*/
WT_RET(tmp_session->iface.checkpoint(
&tmp_session->iface, "force=1"));
WT_RET(WT_SESSION_CHECK_PANIC(tmp_session));
/*
* Only sleep in the rare case that we had to come through
* this loop more than once.
* It's reasonable to start the back off prior to trying at all
* because the backoff is very gradual.
*/
if (yield++) {
WT_STAT_CONN_INCR(session, log_force_ckpt_sleep);
__wt_sleep(0, WT_THOUSAND);
}
__wt_spin_backoff(&yield_cnt, &sleep_usecs);
WT_STAT_CONN_INCRV(session,
log_force_archive_sleep, sleep_usecs);

WT_RET(WT_SESSION_CHECK_PANIC(tmp_session));
WT_RET(__wt_log_truncate_files(tmp_session, NULL, true));
}
WT_RET(tmp_session->iface.close(&tmp_session->iface, NULL));
return (0);
Expand Down Expand Up @@ -105,6 +107,10 @@ __logmgr_version(WT_SESSION_IMPL *session, bool reconfig)
* Set the log file format versions based on compatibility versions
* set in the connection. We must set this before we call log_open
* to open or create a log file.
*
* Note: downgrade in this context means the new version is not the
* latest possible version. It does not mean the direction of change
* from the release we may be running currently.
*/
if (conn->compat_major < WT_LOG_V2_MAJOR) {
new_version = 1;
Expand Down Expand Up @@ -167,7 +173,7 @@ __logmgr_version(WT_SESSION_IMPL *session, bool reconfig)
WT_RET(__wt_log_set_version(session, new_version,
first_record, downgrade, reconfig, &lognum));
if (reconfig && FLD_ISSET(conn->log_flags, WT_CONN_LOG_DOWNGRADED))
WT_RET(__logmgr_force_ckpt(session, lognum));
WT_RET(__logmgr_force_archive(session, lognum));
return (0);
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/stat.h
Expand Up @@ -508,7 +508,7 @@ struct __wt_connection_stats {
int64_t lock_txn_global_read_count;
int64_t lock_txn_global_write_count;
int64_t log_slot_switch_busy;
int64_t log_force_ckpt_sleep;
int64_t log_force_archive_sleep;
int64_t log_bytes_payload;
int64_t log_bytes_written;
int64_t log_zero_fills;
Expand Down
4 changes: 2 additions & 2 deletions src/include/wiredtiger.in
Expand Up @@ -5349,8 +5349,8 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection);
#define WT_STAT_CONN_LOCK_TXN_GLOBAL_WRITE_COUNT 1193
/*! log: busy returns attempting to switch slots */
#define WT_STAT_CONN_LOG_SLOT_SWITCH_BUSY 1194
/*! log: force checkpoint calls slept */
#define WT_STAT_CONN_LOG_FORCE_CKPT_SLEEP 1195
/*! log: force archive time sleeping (usecs) */
#define WT_STAT_CONN_LOG_FORCE_ARCHIVE_SLEEP 1195
/*! log: log bytes of payload data */
#define WT_STAT_CONN_LOG_BYTES_PAYLOAD 1196
/*! log: log bytes written */
Expand Down
6 changes: 3 additions & 3 deletions src/log/log.c
Expand Up @@ -975,8 +975,8 @@ __log_open_verify(WT_SESSION_IMPL *session, uint32_t id, WT_FH **fhp,
WT_LOG_VERSION, desc->version);

/*
* We error if the log version is less than the required minimum
* or larger than the required maximum.
* We error if the log version is less than the required minimum or
* larger than the required maximum.
*/
if (conn->req_max_major != WT_CONN_COMPAT_NONE &&
desc->version > conn->log_req_max)
Expand All @@ -997,7 +997,7 @@ __log_open_verify(WT_SESSION_IMPL *session, uint32_t id, WT_FH **fhp,
conn->log_req_min, desc->version);

/*
* Set up the return values if the magic number is valid.
* Set up the return values since the header is valid.
*/
if (versionp != NULL)
*versionp = desc->version;
Expand Down
7 changes: 4 additions & 3 deletions src/support/stat.c
Expand Up @@ -942,7 +942,7 @@ static const char * const __stats_connection_desc[] = {
"lock: txn global read lock acquisitions",
"lock: txn global write lock acquisitions",
"log: busy returns attempting to switch slots",
"log: force checkpoint calls slept",
"log: force archive time sleeping (usecs)",
"log: log bytes of payload data",
"log: log bytes written",
"log: log files manually zero-filled",
Expand Down Expand Up @@ -1339,7 +1339,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats)
stats->lock_txn_global_read_count = 0;
stats->lock_txn_global_write_count = 0;
stats->log_slot_switch_busy = 0;
stats->log_force_ckpt_sleep = 0;
stats->log_force_archive_sleep = 0;
stats->log_bytes_payload = 0;
stats->log_bytes_written = 0;
stats->log_zero_fills = 0;
Expand Down Expand Up @@ -1807,7 +1807,8 @@ __wt_stat_connection_aggregate(
to->lock_txn_global_write_count +=
WT_STAT_READ(from, lock_txn_global_write_count);
to->log_slot_switch_busy += WT_STAT_READ(from, log_slot_switch_busy);
to->log_force_ckpt_sleep += WT_STAT_READ(from, log_force_ckpt_sleep);
to->log_force_archive_sleep +=
WT_STAT_READ(from, log_force_archive_sleep);
to->log_bytes_payload += WT_STAT_READ(from, log_bytes_payload);
to->log_bytes_written += WT_STAT_READ(from, log_bytes_written);
to->log_zero_fills += WT_STAT_READ(from, log_zero_fills);
Expand Down
7 changes: 7 additions & 0 deletions src/txn/txn_recover.c
Expand Up @@ -704,6 +704,13 @@ done: WT_ERR(__recovery_set_checkpoint_timestamp(&r));
* archiving.
*/
WT_ERR(session->iface.checkpoint(&session->iface, "force=1"));

/*
* If we're downgrading and have newer log files, force an archive,
* no matter what the archive setting is.
*/
if (FLD_ISSET(conn->log_flags, WT_CONN_LOG_FORCE_DOWNGRADE))
WT_ERR(__wt_log_truncate_files(session, NULL, true));
FLD_SET(conn->log_flags, WT_CONN_LOG_RECOVER_DONE);

err: WT_TRET(__recovery_free(&r));
Expand Down
72 changes: 34 additions & 38 deletions test/suite/test_compat01.py
Expand Up @@ -47,28 +47,25 @@ class test_compat01(wttest.WiredTigerTestCase, suite_subprocess):
# Log version 2 introduced that record.
# Log version 3 continues to have that record.
min_logv = 2
latest_logv = 3

# The API uses only the major and minor numbers but accepts with
# and without the patch number. Test both.
start_compat = [
('def', dict(compat1='none', logv1=3)),
('31', dict(compat1="3.1", logv1=3)),
('31_patch', dict(compat1="3.1.0", logv1=3)),
('30', dict(compat1="3.0", logv1=2)),
('30_patch', dict(compat1="3.0.0", logv1=2)),
('26', dict(compat1="2.6", logv1=1)),
('26_patch', dict(compat1="2.6.1", logv1=1)),
('old', dict(compat1="1.8", logv1=1)),
('old_patch', dict(compat1="1.8.1", logv1=1)),
]
restart_compat = [
('def2', dict(compat2='none', logv2=3)),
('31.2', dict(compat2="3.1", logv2=3)),
('31_patch2', dict(compat2="3.1.0", logv2=3)),
('30.2', dict(compat2="3.0", logv2=2)),
('31_2', dict(compat2="3.1", logv2=3)),
('30_2', dict(compat2="3.0", logv2=2)),
('30_patch2', dict(compat2="3.0.0", logv2=2)),
('26.2', dict(compat2="2.6", logv2=1)),
('26_patch2', dict(compat2="2.6.1", logv2=1)),
('26_2', dict(compat2="2.6", logv2=1)),
('old2', dict(compat2="1.8", logv2=1)),
('old_patch2', dict(compat2="1.8.1", logv2=1)),
]
Expand All @@ -91,7 +88,7 @@ def conn_config(self):
self.pr("Conn config:" + log_str + compat_str)
return log_str + compat_str

def check_prev_lsn(self, conn_close, prev_lsn_count):
def check_prev_lsn(self, exists, conn_close):
#
# Run printlog and look for the prev_lsn log record. Verify its
# existence with the passed in expected result. We don't use
Expand All @@ -100,30 +97,30 @@ def check_prev_lsn(self, conn_close, prev_lsn_count):
# the entire file if needed and set a boolean for comparison.
#
self.runWt(['printlog'], outfilename='printlog.out', closeconn=conn_close)
pstr = str(prev_lsn_count)
self.pr("CHECK PREV: Looking for " + pstr + " prev LSN entries")
contains = 0
contains = False
with open('printlog.out') as logfile:
for line in logfile:
if 'optype' in line and 'prev_lsn' in line:
contains += 1
self.assertEqual(prev_lsn_count, contains)
contains = True
break
self.assertEqual(exists, contains)

def check_log(self, reconfig):
orig_logs = fnmatch.filter(os.listdir('.'), "*gerLog*")
compat_str = self.make_compat_str(False)
if self.logv1 >= self.min_logv:
prev_lsn_logs = len(orig_logs)
else:
prev_lsn_logs = 0
pstr = str(prev_lsn_logs)
self.pr("CHECK LOG: Orig " + pstr + " prev LSN log files")

if not reconfig:
#
# Close and open the connection to force recovery and reset the
# compatibility string on startup.
# Close and open the connection to force recovery and log archiving
# even if archive is turned off (in some circumstances). If we are
# downgrading we must archive newer logs. Verify the log files
# have or have not been archived.
#
exist = True
if self.logv1 < self.min_logv:
exist = False
self.check_prev_lsn(exist, True)

self.conn.close()
log_str = 'log=(enabled,file_max=%s,archive=false),' % self.logmax
restart_config = log_str + compat_str
Expand All @@ -134,31 +131,30 @@ def check_log(self, reconfig):
conn = self.wiredtiger_open('.', restart_config)
conn.close()
check_close = False
#
# If the version was upgraded we'll see a new log file containing
# the new log record no matter what the original setting was.
#
if self.logv2 > 1:
prev_lsn_logs += 1
else:
self.pr("Reconfigure: " + compat_str)
self.conn.reconfigure(compat_str)
check_close = True

#
# If we're reconfiguring, we'll see another new log file
# when transitioning between log version numbers. Staying
# at the same version has no effect. We'll only see another
# new log file with the prevlsn if the new version supports it.
# Archiving is turned off explicitly.
#
if self.logv1 != self.logv2 and self.logv2 >= self.min_logv:
prev_lsn_logs += 1
# Check logs. The original logs should have been archived only if
# we downgraded. In all other cases the original logs should be there.
# Downgrade means not running the latest possible log version, not
# the difference between original and current.
cur_logs = fnmatch.filter(os.listdir('.'), "*Log*")
log_present = True
if self.logv1 != self.logv2 and self.logv2 != self.latest_logv:
log_present = False
for o in orig_logs:
self.assertEqual(log_present, o in cur_logs)

# Run printlog and verify the new record does or does not exist.
# Need to check count of log files that should and should not have
# the prev_lsn record based on the count of log files that exist
# before and after. Pass that into this function and check the
# number of prev_lsn records we see.
self.check_prev_lsn(check_close, prev_lsn_logs)
exist = True
if self.logv2 < self.min_logv:
exist = False
self.check_prev_lsn(exist, check_close)

def run_test(self, reconfig):
# If reconfiguring with the empty string there is nothing to do.
Expand Down
6 changes: 3 additions & 3 deletions test/suite/test_compat02.py
Expand Up @@ -49,9 +49,9 @@ class test_compat02(wttest.WiredTigerTestCase, suite_subprocess):
min_logv = 2

# Test detecting a not-yet-existing log version. This should
# hold us for a couple years.
future_logv = 5
future_rel = "5.0"
# hold us for many years.
future_logv = 20
future_rel = "20.0"

# The API uses only the major and minor numbers but accepts with
# and without the patch number. Test one on release and the
Expand Down
6 changes: 3 additions & 3 deletions test/suite/test_compat03.py
Expand Up @@ -49,9 +49,9 @@ class test_compat03(wttest.WiredTigerTestCase, suite_subprocess):
min_logv = 2

# Test detecting a not-yet-existing log version. This should
# hold us for a couple years.
future_logv = 5
future_rel = "5.0"
# hold us for many years.
future_logv = 20
future_rel = "20.0"

# The API uses only the major and minor numbers but accepts with
# and without the patch number. Test one on release and the
Expand Down

0 comments on commit 58713fe

Please sign in to comment.