Skip to content

Commit

Permalink
Merge pull request #2035 from wiredtiger/col-var-ovfl-track-txn
Browse files Browse the repository at this point in the history
  • Loading branch information
agorrod committed Jul 1, 2015
2 parents 0925658 + 8711c3a commit cd1704d
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 85 deletions.
12 changes: 12 additions & 0 deletions src/btree/bt_sync.c
Expand Up @@ -189,6 +189,18 @@ __sync_file(WT_SESSION_IMPL *session, int syncop)
__wt_txn_release_snapshot(session);

if (btree->checkpointing) {
/*
* Update the checkpoint generation for this handle so visible
* updates newer than the checkpoint can be evicted.
*
* This has to be published before eviction is enabled again,
* so that eviction knows that the checkpoint has completed.
*/
WT_PUBLISH(btree->checkpoint_gen,
S2C(session)->txn_global.checkpoint_gen);
WT_STAT_FAST_DATA_SET(session,
btree_checkpoint_generation, btree->checkpoint_gen);

/*
* Clear the checkpoint flag and push the change; not required,
* but publishing the change means stalled eviction gets moving
Expand Down
2 changes: 1 addition & 1 deletion src/evict/evict_lru.c
Expand Up @@ -1475,7 +1475,7 @@ __wt_cache_eviction_worker(WT_SESSION_IMPL *session, int busy, int pct_full)
* to make sure there is free space in the cache.
*/
txn_global = &conn->txn_global;
txn_state = &txn_global->states[session->id];
txn_state = WT_SESSION_TXN_STATE(session);
txn_busy = txn_state->id != WT_TXN_NONE ||
session->nhazard > 0 ||
(txn_state->snap_min != WT_TXN_NONE &&
Expand Down
3 changes: 3 additions & 0 deletions src/evict/evict_page.c
Expand Up @@ -59,6 +59,9 @@ __wt_evict(WT_SESSION_IMPL *session, WT_REF *ref, int closing)

conn = S2C(session);

/* Checkpoints should never do eviction. */
WT_ASSERT(session, !WT_SESSION_IS_CHECKPOINT(session));

page = ref->page;
forced_eviction = page->read_gen == WT_READGEN_OLDEST;
inmem_split = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/include/btmem.h
Expand Up @@ -195,6 +195,11 @@ struct __wt_page_modify {
/* The largest update transaction ID (approximate). */
uint64_t update_txn;

#ifdef HAVE_DIAGNOSTIC
/* Check that transaction time moves forward. */
uint64_t last_oldest_id;
#endif

/* Dirty bytes added to the cache. */
size_t bytes_dirty;

Expand Down
5 changes: 4 additions & 1 deletion src/include/txn.h
Expand Up @@ -25,6 +25,9 @@

#define WT_SESSION_TXN_STATE(s) (&S2C(s)->txn_global.states[(s)->id])

#define WT_SESSION_IS_CHECKPOINT(s) \
((s)->id != 0 && (s)->id == S2C(s)->txn_global.checkpoint_id)

struct __wt_named_snapshot {
const char *name;

Expand Down Expand Up @@ -64,7 +67,7 @@ struct __wt_txn_global {
*/
volatile uint32_t checkpoint_id; /* Checkpoint's session ID */
volatile uint64_t checkpoint_gen;
volatile uint64_t checkpoint_snap_min;
volatile uint64_t checkpoint_pinned;

/* Named snapshot state. */
WT_RWLOCK *nsnap_rwlock;
Expand Down
30 changes: 13 additions & 17 deletions src/include/txn.i
Expand Up @@ -105,8 +105,8 @@ __wt_txn_oldest_id(WT_SESSION_IMPL *session)
{
WT_BTREE *btree;
WT_TXN_GLOBAL *txn_global;
uint64_t checkpoint_snap_min, oldest_id;
uint32_t checkpoint_id;
uint64_t checkpoint_pinned, oldest_id;
uint32_t checkpoint_gen;

txn_global = &S2C(session)->txn_global;
btree = S2BT_SAFE(session);
Expand All @@ -115,9 +115,9 @@ __wt_txn_oldest_id(WT_SESSION_IMPL *session)
* Take a local copy of these IDs in case they are updated while we are
* checking visibility.
*/
checkpoint_id = txn_global->checkpoint_id;
checkpoint_snap_min = txn_global->checkpoint_snap_min;
oldest_id = txn_global->oldest_id;
WT_ORDERED_READ(oldest_id, txn_global->oldest_id);
WT_ORDERED_READ(checkpoint_gen, txn_global->checkpoint_gen);
WT_ORDERED_READ(checkpoint_pinned, txn_global->checkpoint_pinned);

/*
* Checkpoint transactions often fall behind ordinary application
Expand All @@ -129,17 +129,13 @@ __wt_txn_oldest_id(WT_SESSION_IMPL *session)
* checkpoint, or this handle is up to date with the active checkpoint
* then it's safe to ignore the checkpoint ID in the visibility check.
*/
if (checkpoint_snap_min != WT_TXN_NONE &&
checkpoint_id != session->id && (btree == NULL ||
btree->checkpoint_gen != txn_global->checkpoint_gen) &&
WT_TXNID_LT(checkpoint_snap_min, oldest_id))
/*
* Use the checkpoint ID for the visibility check if it is the
* oldest ID in the system.
*/
oldest_id = checkpoint_snap_min;
if (checkpoint_pinned == WT_TXN_NONE ||
WT_TXNID_LT(oldest_id, checkpoint_pinned) ||
WT_SESSION_IS_CHECKPOINT(session) ||
(btree != NULL && btree->checkpoint_gen == checkpoint_gen))
return (oldest_id);

return (oldest_id);
return (checkpoint_pinned);
}

/*
Expand Down Expand Up @@ -355,7 +351,7 @@ __wt_txn_id_check(WT_SESSION_IMPL *session)
if (!F_ISSET(txn, WT_TXN_HAS_ID)) {
conn = S2C(session);
txn_global = &conn->txn_global;
txn_state = &txn_global->states[session->id];
txn_state = WT_SESSION_TXN_STATE(session);

WT_ASSERT(session, txn_state->id == WT_TXN_NONE);

Expand Down Expand Up @@ -447,7 +443,7 @@ __wt_txn_cursor_op(WT_SESSION_IMPL *session)

txn = &session->txn;
txn_global = &S2C(session)->txn_global;
txn_state = &txn_global->states[session->id];
txn_state = WT_SESSION_TXN_STATE(session);

/*
* If there is no transaction running (so we don't have an ID), and no
Expand Down
21 changes: 19 additions & 2 deletions src/reconcile/rec_write.c
Expand Up @@ -363,6 +363,19 @@ __wt_reconcile(WT_SESSION_IMPL *session,
WT_STAT_FAST_DATA_INCR(session, rec_pages_eviction);
}

#ifdef HAVE_DIAGNOSTIC
{
/*
* Check that transaction time always moves forward for a given page.
* If this check fails, reconciliation can free something that a future
* reconciliation will need.
*/
uint64_t oldest_id = __wt_txn_oldest_id(session);
WT_ASSERT(session, WT_TXNID_LE(mod->last_oldest_id, oldest_id));
mod->last_oldest_id = oldest_id;
}
#endif

/* Record the most recent transaction ID we will *not* write. */
mod->disk_snap_min = session->txn.snap_min;

Expand Down Expand Up @@ -838,6 +851,7 @@ static inline int
__rec_txn_read(WT_SESSION_IMPL *session, WT_RECONCILE *r,
WT_INSERT *ins, WT_ROW *rip, WT_CELL_UNPACK *vpack, WT_UPDATE **updp)
{
WT_DECL_RET;
WT_ITEM ovfl;
WT_PAGE *page;
WT_UPDATE *upd, *upd_list, *upd_ovfl;
Expand Down Expand Up @@ -976,8 +990,11 @@ __rec_txn_read(WT_SESSION_IMPL *session, WT_RECONCILE *r,
*/
if (vpack != NULL && vpack->raw == WT_CELL_VALUE_OVFL_RM &&
!__wt_txn_visible_all(session, min_txn)) {
WT_RET(__wt_ovfl_txnc_search(
page, vpack->data, vpack->size, &ovfl));
if ((ret = __wt_ovfl_txnc_search(
page, vpack->data, vpack->size, &ovfl)) != 0)
WT_PANIC_RET(session, ret,
"cached overflow item discarded early");

/*
* Create an update structure with an impossibly low transaction
* ID and append it to the update list we're about to save.
Expand Down
45 changes: 17 additions & 28 deletions src/txn/txn.c
Expand Up @@ -98,7 +98,6 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session)
WT_ASSERT(session,
txn_state->snap_min == WT_TXN_NONE ||
session->txn.isolation == WT_ISO_READ_UNCOMMITTED ||
session->id == S2C(session)->txn_global.checkpoint_id ||
!__wt_txn_visible_all(session, txn_state->snap_min));

txn_state->snap_min = WT_TXN_NONE;
Expand All @@ -118,13 +117,13 @@ __wt_txn_get_snapshot(WT_SESSION_IMPL *session)
WT_TXN_STATE *s, *txn_state;
uint64_t current_id, id;
uint64_t prev_oldest_id, snap_min;
uint32_t ckpt_id, i, n, session_cnt;
uint32_t i, n, session_cnt;
int32_t count;

conn = S2C(session);
txn = &session->txn;
txn_global = &conn->txn_global;
txn_state = &txn_global->states[session->id];
txn_state = WT_SESSION_TXN_STATE(session);

current_id = snap_min = txn_global->current;
prev_oldest_id = txn_global->oldest_id;
Expand Down Expand Up @@ -157,12 +156,7 @@ __wt_txn_get_snapshot(WT_SESSION_IMPL *session)

/* Walk the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
ckpt_id = txn_global->checkpoint_id;
for (i = n = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/* Skip the checkpoint transaction; it is never read from. */
if (i == ckpt_id)
continue;

/*
* Build our snapshot of any concurrent transaction IDs.
*
Expand Down Expand Up @@ -221,7 +215,7 @@ __wt_txn_update_oldest(WT_SESSION_IMPL *session, int force)
WT_TXN_GLOBAL *txn_global;
WT_TXN_STATE *s;
uint64_t current_id, id, oldest_id, prev_oldest_id, snap_min;
uint32_t ckpt_id, i, session_cnt;
uint32_t i, session_cnt;
int32_t count;
int last_running_moved;

Expand Down Expand Up @@ -257,12 +251,7 @@ __wt_txn_update_oldest(WT_SESSION_IMPL *session, int force)

/* Walk the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
ckpt_id = txn_global->checkpoint_id;
for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/* Skip the checkpoint transaction; it is never read from. */
if (i == ckpt_id)
continue;

/*
* Update the oldest ID.
*
Expand Down Expand Up @@ -310,15 +299,7 @@ __wt_txn_update_oldest(WT_SESSION_IMPL *session, int force)
if (WT_TXNID_LT(prev_oldest_id, oldest_id) &&
WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) {
WT_ORDERED_READ(session_cnt, conn->session_cnt);
ckpt_id = txn_global->checkpoint_id;
for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/*
* Skip the checkpoint transaction; it is never read
* from.
*/
if (i == ckpt_id)
continue;

if ((id = s->id) != WT_TXN_NONE &&
WT_TXNID_LT(id, oldest_id))
oldest_id = id;
Expand Down Expand Up @@ -414,10 +395,17 @@ __wt_txn_release(WT_SESSION_IMPL *session)
txn->notify = NULL;

txn_global = &S2C(session)->txn_global;
txn_state = &txn_global->states[session->id];
txn_state = WT_SESSION_TXN_STATE(session);

/* Clear the transaction's ID from the global table. */
if (F_ISSET(txn, WT_TXN_HAS_ID)) {
if (WT_SESSION_IS_CHECKPOINT(session)) {
WT_ASSERT(session, txn_state->id == WT_TXN_NONE);
txn->id = WT_TXN_NONE;

/* Clear the global checkpoint transaction IDs. */
txn_global->checkpoint_id = 0;
txn_global->checkpoint_pinned = WT_TXN_NONE;
} else if (F_ISSET(txn, WT_TXN_HAS_ID)) {
WT_ASSERT(session, txn_state->id != WT_TXN_NONE &&
txn->id != WT_TXN_NONE);
WT_PUBLISH(txn_state->id, WT_TXN_NONE);
Expand Down Expand Up @@ -518,6 +506,7 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
*/
__wt_txn_release_snapshot(session);
ret = __wt_txn_log_commit(session, cfg);
WT_ASSERT(session, ret == 0);
}

/*
Expand Down Expand Up @@ -648,19 +637,19 @@ __wt_txn_stats_update(WT_SESSION_IMPL *session)
WT_TXN_GLOBAL *txn_global;
WT_CONNECTION_IMPL *conn;
WT_CONNECTION_STATS *stats;
uint64_t checkpoint_snap_min;
uint64_t checkpoint_pinned;

conn = S2C(session);
txn_global = &conn->txn_global;
stats = &conn->stats;
checkpoint_snap_min = txn_global->checkpoint_snap_min;
checkpoint_pinned = txn_global->checkpoint_pinned;

WT_STAT_SET(stats, txn_pinned_range,
txn_global->current - txn_global->oldest_id);

WT_STAT_SET(stats, txn_pinned_checkpoint_range,
checkpoint_snap_min == WT_TXN_NONE ?
0 : txn_global->current - checkpoint_snap_min);
checkpoint_pinned == WT_TXN_NONE ?
0 : txn_global->current - checkpoint_pinned);
}

/*
Expand Down

0 comments on commit cd1704d

Please sign in to comment.