Skip to content

Commit

Permalink
Add the "snapshot too old" feature
Browse files Browse the repository at this point in the history
This feature is controlled by a new old_snapshot_threshold GUC.  A
value of -1 disables the feature, and that is the default.  The
value of 0 is just intended for testing.  Above that it is the
number of minutes a snapshot can reach before pruning and vacuum
are allowed to remove dead tuples which the snapshot would
otherwise protect.  The xmin associated with a transaction ID does
still protect dead tuples.  A connection which is using an "old"
snapshot does not get an error unless it accesses a page modified
recently enough that it might not be able to produce accurate
results.

This is similar to the Oracle feature, and we use the same SQLSTATE
and error message for compatibility.
  • Loading branch information
kgrittn committed Apr 8, 2016
1 parent 8b65cf4 commit 848ef42
Show file tree
Hide file tree
Showing 41 changed files with 942 additions and 85 deletions.
3 changes: 2 additions & 1 deletion contrib/bloom/blscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
blkno, RBM_NORMAL, bas);

LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, scan->xs_snapshot, scan->indexRelation,
BGP_TEST_FOR_OLD_SNAPSHOT);

if (!BloomPageIsDeleted(page))
{
Expand Down
50 changes: 50 additions & 0 deletions doc/src/sgml/config.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -2041,6 +2041,42 @@ include_dir 'conf.d'
</para>
</listitem>
</varlistentry>

<varlistentry id="guc-old-snapshot-threshold" xreflabel="old_snapshot_threshold">
<term><varname>old_snapshot_threshold</varname> (<type>integer</type>)
<indexterm>
<primary><varname>old_snapshot_threshold</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Sets the minimum time that a snapshot can be used without risk of a
<literal>snapshot too old</> error occurring when using the snapshot.
This parameter can only be set at server start.
</para>

<para>
Beyond the threshold, old data may be vacuumed away. This can help
prevent bloat in the face of snapshots which remain in use for a
long time. To prevent incorrect results due to cleanup of data which
would otherwise be visible to the snapshot, an error is generated
when the snapshot is older than this threshold and the snapshot is
used to read a page which has been modified since the snapshot was
built.
</para>

<para>
A value of <literal>-1</> disables this feature, and is the default.
Useful values for production work probably range from a small number
of hours to a few days. The setting will be coerced to a granularity
of minutes, and small numbers (such as <literal>0</> or
<literal>1min</>) are only allowed because they may sometimes be
useful for testing. While a setting as high as <literal>60d</> is
allowed, please note that in many workloads extreme bloat or
transaction ID wraparound may occur in much shorter time frames.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
</sect1>
Expand Down Expand Up @@ -3051,6 +3087,10 @@ include_dir 'conf.d'
You should also consider setting <varname>hot_standby_feedback</>
on standby server(s) as an alternative to using this parameter.
</para>
<para>
This does not prevent cleanup of dead rows which have reached the age
specified by <varname>old_snapshot_threshold</>.
</para>
</listitem>
</varlistentry>

Expand Down Expand Up @@ -3198,6 +3238,16 @@ include_dir 'conf.d'
until it eventually reaches the primary. Standbys make no other use
of feedback they receive other than to pass upstream.
</para>
<para>
This setting does not override the behavior of
<varname>old_snapshot_threshold</> on the primary; a snapshot on the
standby which exceeds the primary's age threshold can become invalid,
resulting in cancellation of transactions on the standby. This is
because <varname>old_snapshot_threshold</> is intended to provide an
absolute limit on the time which dead rows can contribute to bloat,
which would otherwise be violated because of the configuration of a
standby.
</para>
</listitem>
</varlistentry>

Expand Down
19 changes: 11 additions & 8 deletions src/backend/access/brin/brin.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
MemoryContext tupcxt = NULL;
MemoryContext oldcxt = NULL;

revmap = brinRevmapInitialize(idxRel, &pagesPerRange);
revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);

for (;;)
{
Expand All @@ -152,7 +152,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
/* normalize the block number to be the first block in the range */
heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
BUFFER_LOCK_SHARE);
BUFFER_LOCK_SHARE, NULL);

/* if range is unsummarized, there's nothing to do */
if (!brtup)
Expand Down Expand Up @@ -285,7 +285,8 @@ brinbeginscan(Relation r, int nkeys, int norderbys)
scan = RelationGetIndexScan(r, nkeys, norderbys);

opaque = (BrinOpaque *) palloc(sizeof(BrinOpaque));
opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange);
opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
scan->xs_snapshot);
opaque->bo_bdesc = brin_build_desc(r);
scan->opaque = opaque;

Expand Down Expand Up @@ -368,7 +369,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
MemoryContextResetAndDeleteChildren(perRangeCxt);

tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
&off, &size, BUFFER_LOCK_SHARE);
&off, &size, BUFFER_LOCK_SHARE,
scan->xs_snapshot);
if (tup)
{
tup = brin_copy_tuple(tup, size);
Expand Down Expand Up @@ -647,7 +649,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/*
* Initialize our state, including the deformed tuple state.
*/
revmap = brinRevmapInitialize(index, &pagesPerRange);
revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
state = initialize_brin_buildstate(index, revmap, pagesPerRange);

/*
Expand Down Expand Up @@ -1045,7 +1047,8 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
* the same.)
*/
phtup = brinGetTupleForHeapBlock(state->bs_rmAccess, heapBlk, &phbuf,
&offset, &phsz, BUFFER_LOCK_SHARE);
&offset, &phsz, BUFFER_LOCK_SHARE,
NULL);
/* the placeholder tuple must exist */
if (phtup == NULL)
elog(ERROR, "missing placeholder tuple");
Expand Down Expand Up @@ -1080,7 +1083,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
BlockNumber pagesPerRange;
Buffer buf;

revmap = brinRevmapInitialize(index, &pagesPerRange);
revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);

/*
* Scan the revmap to find unsummarized items.
Expand All @@ -1095,7 +1098,7 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
CHECK_FOR_INTERRUPTS();

tup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL,
BUFFER_LOCK_SHARE);
BUFFER_LOCK_SHARE, NULL);
if (tup == NULL)
{
/* no revmap entry for this heap range. Summarize it. */
Expand Down
11 changes: 7 additions & 4 deletions src/backend/access/brin/brin_revmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ static void revmap_physical_extend(BrinRevmap *revmap);
* brinRevmapTerminate when caller is done with it.
*/
BrinRevmap *
brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)
brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
Snapshot snapshot)
{
BrinRevmap *revmap;
Buffer meta;
Expand All @@ -77,7 +78,7 @@ brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange)

meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
LockBuffer(meta, BUFFER_LOCK_SHARE);
page = BufferGetPage(meta, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(meta, snapshot, idxrel, BGP_TEST_FOR_OLD_SNAPSHOT);
metadata = (BrinMetaPageData *) PageGetContents(page);

revmap = palloc(sizeof(BrinRevmap));
Expand Down Expand Up @@ -187,7 +188,8 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
*/
BrinTuple *
brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
Buffer *buf, OffsetNumber *off, Size *size, int mode)
Buffer *buf, OffsetNumber *off, Size *size, int mode,
Snapshot snapshot)
{
Relation idxRel = revmap->rm_irel;
BlockNumber mapBlk;
Expand Down Expand Up @@ -264,7 +266,8 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
*buf = ReadBuffer(idxRel, blk);
}
LockBuffer(*buf, mode);
page = BufferGetPage(*buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(*buf, snapshot, idxRel,
BGP_TEST_FOR_OLD_SNAPSHOT);

/* If we land on a revmap page, start over */
if (BRIN_IS_REGULAR_PAGE(page))
Expand Down
9 changes: 5 additions & 4 deletions src/backend/access/gin/ginbtree.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ginTraverseLock(Buffer buffer, bool searchMode)
* is share-locked, and stack->parent is NULL.
*/
GinBtreeStack *
ginFindLeafPage(GinBtree btree, bool searchMode)
ginFindLeafPage(GinBtree btree, bool searchMode, Snapshot snapshot)
{
GinBtreeStack *stack;

Expand All @@ -89,7 +89,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)

stack->off = InvalidOffsetNumber;

page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(stack->buffer, snapshot, btree->index,
BGP_TEST_FOR_OLD_SNAPSHOT);

access = ginTraverseLock(stack->buffer, searchMode);

Expand All @@ -115,8 +116,8 @@ ginFindLeafPage(GinBtree btree, bool searchMode)

stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
page = BufferGetPage(stack->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(stack->buffer, snapshot, btree->index,
BGP_TEST_FOR_OLD_SNAPSHOT);

if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishSplit(btree, stack, false, NULL);
Expand Down
7 changes: 4 additions & 3 deletions src/backend/access/gin/gindatapage.c
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
{
/* search for the leaf page where the first item should go to */
btree.itemptr = insertdata.items[insertdata.curitem];
stack = ginFindLeafPage(&btree, false);
stack = ginFindLeafPage(&btree, false, NULL);

ginInsertValue(&btree, stack, &insertdata, buildStats);
}
Expand All @@ -1830,15 +1830,16 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
* Starts a new scan on a posting tree.
*/
GinBtreeStack *
ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno)
ginScanBeginPostingTree(GinBtree btree, Relation index, BlockNumber rootBlkno,
Snapshot snapshot)
{
GinBtreeStack *stack;

ginPrepareDataScan(btree, index, rootBlkno);

btree->fullScan = TRUE;

stack = ginFindLeafPage(btree, TRUE);
stack = ginFindLeafPage(btree, TRUE, snapshot);

return stack;
}
22 changes: 12 additions & 10 deletions src/backend/access/gin/ginget.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ scanPostingTree(Relation index, GinScanEntry scanEntry,
Page page;

/* Descend to the leftmost leaf page */
stack = ginScanBeginPostingTree(&btree, index, rootPostingTree);
stack = ginScanBeginPostingTree(&btree, index, rootPostingTree, snapshot);
buffer = stack->buffer;
IncrBufferRefCount(buffer); /* prevent unpin in freeGinBtreeStack */

Expand Down Expand Up @@ -146,7 +146,8 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
if (moveRightIfItNeeded(btree, stack) == false)
return true;

page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(stack->buffer, snapshot, btree->index,
BGP_TEST_FOR_OLD_SNAPSHOT);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));

/*
Expand Down Expand Up @@ -320,7 +321,7 @@ startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot)
ginPrepareEntryScan(&btreeEntry, entry->attnum,
entry->queryKey, entry->queryCategory,
ginstate);
stackEntry = ginFindLeafPage(&btreeEntry, true);
stackEntry = ginFindLeafPage(&btreeEntry, true, snapshot);
page = BufferGetPage(stackEntry->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
needUnlock = TRUE;

Expand Down Expand Up @@ -385,7 +386,7 @@ startScanEntry(GinState *ginstate, GinScanEntry entry, Snapshot snapshot)
needUnlock = FALSE;

stack = ginScanBeginPostingTree(&entry->btree, ginstate->index,
rootPostingTree);
rootPostingTree, snapshot);
entry->buffer = stack->buffer;

/*
Expand Down Expand Up @@ -627,7 +628,7 @@ entryLoadMoreItems(GinState *ginstate, GinScanEntry entry,
entry->btree.itemptr.ip_posid++;
}
entry->btree.fullScan = false;
stack = ginFindLeafPage(&entry->btree, true);
stack = ginFindLeafPage(&entry->btree, true, snapshot);

/* we don't need the stack, just the buffer. */
entry->buffer = stack->buffer;
Expand Down Expand Up @@ -1335,8 +1336,8 @@ scanGetCandidate(IndexScanDesc scan, pendingPosition *pos)
ItemPointerSetInvalid(&pos->item);
for (;;)
{
page = BufferGetPage(pos->pendingBuffer, NULL,
NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);

maxoff = PageGetMaxOffsetNumber(page);
if (pos->firstOffset > maxoff)
Expand Down Expand Up @@ -1516,8 +1517,8 @@ collectMatchesForHeapRow(IndexScanDesc scan, pendingPosition *pos)
memset(datumExtracted + pos->firstOffset - 1, 0,
sizeof(bool) * (pos->lastOffset - pos->firstOffset));

page = BufferGetPage(pos->pendingBuffer, NULL,
NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(pos->pendingBuffer, scan->xs_snapshot,
scan->indexRelation, BGP_TEST_FOR_OLD_SNAPSHOT);

for (i = 0; i < so->nkeys; i++)
{
Expand Down Expand Up @@ -1710,7 +1711,8 @@ scanPendingInsert(IndexScanDesc scan, TIDBitmap *tbm, int64 *ntids)
*ntids = 0;

LockBuffer(metabuffer, GIN_SHARE);
page = BufferGetPage(metabuffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(metabuffer, scan->xs_snapshot, scan->indexRelation,
BGP_TEST_FOR_OLD_SNAPSHOT);
blkno = GinPageGetMeta(page)->head;

/*
Expand Down
2 changes: 1 addition & 1 deletion src/backend/access/gin/gininsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ ginEntryInsert(GinState *ginstate,

ginPrepareEntryScan(&btree, attnum, key, category, ginstate);

stack = ginFindLeafPage(&btree, false);
stack = ginFindLeafPage(&btree, false, NULL);
page = BufferGetPage(stack->buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);

if (btree.findItem(&btree, stack))
Expand Down
2 changes: 1 addition & 1 deletion src/backend/access/gist/gistget.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
buffer = ReadBuffer(scan->indexRelation, pageItem->blkno);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(scan->indexRelation, buffer);
page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buffer, scan->xs_snapshot, r, BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = GistPageGetOpaque(page);

/*
Expand Down
3 changes: 2 additions & 1 deletion src/backend/access/hash/hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)

buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
page = BufferGetPage(buf, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
maxoffnum = PageGetMaxOffsetNumber(page);
for (offnum = ItemPointerGetOffsetNumber(current);
offnum <= maxoffnum;
Expand Down
10 changes: 6 additions & 4 deletions src/backend/access/hash/hashsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)

/* Read the metapage */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
page = BufferGetPage(metabuf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(metabuf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
metap = HashPageGetMeta(page);

/*
Expand Down Expand Up @@ -242,8 +242,8 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)

/* Fetch the primary bucket page for the bucket */
buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
page = BufferGetPage(buf, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
page = BufferGetPage(buf, scan->xs_snapshot, rel,
BGP_TEST_FOR_OLD_SNAPSHOT);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);

Expand Down Expand Up @@ -350,6 +350,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
_hash_readnext(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
Expand Down Expand Up @@ -391,6 +392,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
_hash_readprev(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
}
Expand Down

0 comments on commit 848ef42

Please sign in to comment.