Skip to content

Commit

Permalink
WT-3034 Add support for named snapshots including updates. (#3161)
Browse files Browse the repository at this point in the history
This supports a model where one session performs updates in a transaction after creating a named snapshot and other sessions can use that snapshot and read the updates.  In other words, they see exactly what the updating session sees.
  • Loading branch information
michaelcahill committed Dec 5, 2016
1 parent 2573977 commit 2f18a85
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 9 deletions.
4 changes: 4 additions & 0 deletions dist/api_data.py
Expand Up @@ -1119,6 +1119,10 @@ def __cmp__(self, other):
Config('to', '', r'''
drop all snapshots up to and including the specified name'''),
]),
Config('include_updates', 'false', r'''
make updates from the current transaction visible to users of the
named snapshot. Transactions started with such a named snapshot are
restricted to being read-only''', type='boolean'),
Config('name', '', r'''specify a name for the snapshot'''),
]),

Expand Down
5 changes: 3 additions & 2 deletions src/config/config_def.c
Expand Up @@ -371,6 +371,7 @@ static const WT_CONFIG_CHECK confchk_WT_SESSION_snapshot[] = {
{ "drop", "category",
NULL, NULL,
confchk_WT_SESSION_snapshot_drop_subconfigs, 4 },
{ "include_updates", "boolean", NULL, NULL, NULL, 0 },
{ "name", "string", NULL, NULL, NULL, 0 },
{ NULL, NULL, NULL, NULL, NULL, 0 }
};
Expand Down Expand Up @@ -1142,8 +1143,8 @@ static const WT_CONFIG_ENTRY config_entries[] = {
confchk_WT_SESSION_salvage, 1
},
{ "WT_SESSION.snapshot",
"drop=(all=false,before=,names=,to=),name=",
confchk_WT_SESSION_snapshot, 2
"drop=(all=false,before=,names=,to=),include_updates=false,name=",
confchk_WT_SESSION_snapshot, 3
},
{ "WT_SESSION.strerror",
"",
Expand Down
2 changes: 1 addition & 1 deletion src/include/txn.h
Expand Up @@ -62,7 +62,7 @@ struct __wt_named_snapshot {

TAILQ_ENTRY(__wt_named_snapshot) q;

uint64_t pinned_id, snap_min, snap_max;
uint64_t id, pinned_id, snap_min, snap_max;
uint64_t *snapshot;
uint32_t snapshot_count;
};
Expand Down
4 changes: 4 additions & 0 deletions src/include/wiredtiger.in
Expand Up @@ -1641,6 +1641,10 @@ struct __wt_session {
* including the specified name., a string; default empty.}
* @config{
* ),,}
* @config{include_updates, make updates from the current transaction
* visible to users of the named snapshot. Transactions started with
* such a named snapshot are restricted to being read-only., a boolean
* flag; default \c false.}
* @config{name, specify a name for the snapshot., a string; default
* empty.}
* @configend
Expand Down
31 changes: 27 additions & 4 deletions src/txn/txn_nsnap.c
Expand Up @@ -152,26 +152,45 @@ __wt_txn_named_snapshot_begin(WT_SESSION_IMPL *session, const char *cfg[])
const char *txn_cfg[] =
{ WT_CONFIG_BASE(session, WT_SESSION_begin_transaction),
"isolation=snapshot", NULL };
bool started_txn;
bool include_updates, started_txn;

started_txn = false;
nsnap_new = NULL;
txn_global = &S2C(session)->txn_global;
txn = &session->txn;

WT_RET(__wt_config_gets_def(session, cfg, "include_updates", 0, &cval));
include_updates = cval.val != 0;

WT_RET(__wt_config_gets_def(session, cfg, "name", 0, &cval));
WT_ASSERT(session, cval.len != 0);

if (!F_ISSET(txn, WT_TXN_RUNNING)) {
if (include_updates)
WT_RET_MSG(session, EINVAL, "A transaction must be "
"running to include updates in a named snapshot");

WT_RET(__wt_txn_begin(session, txn_cfg));
started_txn = true;
}
F_SET(txn, WT_TXN_READONLY);
if (!include_updates)
F_SET(txn, WT_TXN_READONLY);

/* Save a copy of the transaction's snapshot. */
WT_ERR(__wt_calloc_one(session, &nsnap_new));
nsnap = nsnap_new;
WT_ERR(__wt_strndup(session, cval.str, cval.len, &nsnap->name));

/*
* To include updates from a writing transaction, make sure a
* transaction ID has been allocated.
*/
if (include_updates) {
WT_ERR(__wt_txn_id_check(session));
WT_ASSERT(session, txn->id != WT_TXN_NONE);
nsnap->id = txn->id;
} else
nsnap->id = WT_TXN_NONE;
nsnap->pinned_id = WT_SESSION_TXN_STATE(session)->pinned_id;
nsnap->snap_min = txn->snap_min;
nsnap->snap_max = txn->snap_max;
Expand Down Expand Up @@ -209,8 +228,7 @@ err: if (started_txn) {
WT_TRET(__wt_txn_rollback(session, NULL));
WT_DIAGNOSTIC_YIELD;
WT_ASSERT(session, !__wt_txn_visible_all(session, pinned_id));
} else if (ret == 0)
F_SET(txn, WT_TXN_NAMED_SNAPSHOT);
}

if (nsnap_new != NULL)
__nsnap_destroy(session, nsnap_new);
Expand Down Expand Up @@ -303,6 +321,11 @@ __wt_txn_named_snapshot_get(WT_SESSION_IMPL *session, WT_CONFIG_ITEM *nameval)
memcpy(txn->snapshot, nsnap->snapshot,
nsnap->snapshot_count *
sizeof(*nsnap->snapshot));
if (nsnap->id != WT_TXN_NONE) {
WT_ASSERT(session, txn->id == WT_TXN_NONE);
txn->id = nsnap->id;
F_SET(txn, WT_TXN_READONLY);
}
F_SET(txn, WT_TXN_HAS_SNAPSHOT);
break;
}
Expand Down
37 changes: 35 additions & 2 deletions test/suite/test_nsnap04.py
Expand Up @@ -38,14 +38,18 @@ class test_nsnap04(wttest.WiredTigerTestCase, suite_subprocess):
uri = 'table:' + tablename
nrows_per_itr = 10

def check_named_snapshot(self, snapshot, expected):
def check_named_snapshot(self, snapshot, expected, skip_snapshot=False):
new_session = self.conn.open_session()
c = new_session.open_cursor(self.uri)
new_session.begin_transaction("snapshot=" + str(snapshot))
if skip_snapshot:
new_session.begin_transaction()
else:
new_session.begin_transaction("snapshot=" + str(snapshot))
count = 0
for row in c:
count += 1
new_session.commit_transaction()
new_session.close()
# print "Checking snapshot %d, expect %d, found %d" % (snapshot, expected, count)
self.assertEqual(count, expected)

Expand Down Expand Up @@ -80,5 +84,34 @@ def test_named_snapshots(self):
self.session.snapshot("name=0")
self.check_named_snapshot(0, 2 * self.nrows_per_itr)

def test_include_updates(self):
# Populate a table
end = start = 0
SimpleDataSet(self, self.uri, 0, key_format='i').populate()

snapshots = []
c = self.session.open_cursor(self.uri)
for i in xrange(self.nrows_per_itr):
c[i] = "some value"

self.session.begin_transaction("isolation=snapshot")
count = 0
for row in c:
count += 1
self.session.snapshot("name=0,include_updates=true")

self.check_named_snapshot(0, self.nrows_per_itr)

# Insert some more content using the active session.
for i in xrange(self.nrows_per_itr):
c[self.nrows_per_itr + i] = "some value"

self.check_named_snapshot(0, 2 * self.nrows_per_itr)
# Ensure transactions not tracking the snapshot don't see the updates
self.check_named_snapshot(0, self.nrows_per_itr, skip_snapshot=True)
self.session.commit_transaction()
# Ensure content is visible to non-snapshot transactions after commit
self.check_named_snapshot(0, 2 * self.nrows_per_itr, skip_snapshot=True)

if __name__ == '__main__':
wttest.run()

0 comments on commit 2f18a85

Please sign in to comment.