Skip to content

Commit

Permalink
Predicate locking in rum index
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhambaraiss committed Aug 8, 2017
1 parent 10e2ea6 commit e28e31d
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 25 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ endif
wal-check: temp-install
$(prove_check)

check: isolationcheck

submake-isolation:
$(MAKE) -C $(top_builddir)/src/test/isolation all
submake-rum:
$(MAKE) -C $(top_builddir)/contrib/rum


all: rum--1.1.sql

#9.6 requires 1.1 file but 10.0 could live with 1.0 + 1.0-1.1 files
Expand All @@ -48,3 +56,9 @@ install: installincludes

installincludes:
$(INSTALL_DATA) $(addprefix $(srcdir)/, $(INCLUDES)) '$(DESTDIR)$(includedir_server)/'

ISOLATIONCHECKS= predicate-rum predicate-rum-2
isolationcheck: | submake-isolation submake-rum temp-install
$(pg_isolation_regress_check) \
--temp-config $(top_srcdir)/contrib/rum/logical.conf \
$(ISOLATIONCHECKS)
2 changes: 2 additions & 0 deletions logical.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
wal_level = logical
max_replication_slots = 4
13 changes: 13 additions & 0 deletions src/rumbtree.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "access/generic_xlog.h"
#include "miscadmin.h"
#include "storage/predicate.h"

#include "rum.h"

Expand Down Expand Up @@ -485,6 +486,14 @@ rumInsertValue(Relation index, RumBtree btree, RumBtreeStack * stack,
btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer,
page, lpage, rpage);

PredicateLockPageSplit(btree->index,
BufferGetBlockNumber(stack->buffer),
BufferGetBlockNumber(lbuffer));

PredicateLockPageSplit(btree->index,
BufferGetBlockNumber(stack->buffer),
BufferGetBlockNumber(rbuffer));

if (btree->rumstate->isBuild)
{
START_CRIT_SECTION();
Expand Down Expand Up @@ -548,6 +557,10 @@ rumInsertValue(Relation index, RumBtree btree, RumBtreeStack * stack,
RumPageGetOpaque(rpage)->leftlink = BufferGetBlockNumber(stack->buffer);
RumPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);

PredicateLockPageSplit(btree->index,
BufferGetBlockNumber(stack->buffer),
BufferGetBlockNumber(rbuffer));

/*
* it's safe because we don't have right-to-left walking
* with locking bth pages except vacuum. But vacuum will
Expand Down
73 changes: 49 additions & 24 deletions src/rumget.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rumsort.h"

#include "access/relscan.h"
#include "storage/predicate.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/datum.h"
Expand All @@ -29,7 +30,7 @@ static bool scanPage(RumState * rumstate, RumScanEntry entry, RumItem *item,
bool equalOk);
static void insertScanItem(RumScanOpaque so, bool recheck);
static int scan_entry_cmp(const void *p1, const void *p2, void *arg);
static void entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList);
static void entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList, Snapshot snapshot);


static bool
Expand Down Expand Up @@ -182,7 +183,8 @@ moveRightIfItNeeded(RumBtreeData * btree, RumBtreeStack * stack)
*/
static void
scanPostingTree(Relation index, RumScanEntry scanEntry,
BlockNumber rootPostingTree, OffsetNumber attnum, RumState * rumstate)
BlockNumber rootPostingTree, OffsetNumber attnum,
RumState * rumstate, Snapshot snapshot)
{
RumPostingTreeScan *gdi;
Buffer buffer;
Expand All @@ -194,6 +196,9 @@ scanPostingTree(Relation index, RumScanEntry scanEntry,
ForwardScanDirection, attnum, rumstate);

buffer = rumScanBeginPostingTree(gdi, NULL);

PredicateLockPage(index, BufferGetBlockNumber(buffer), snapshot);

IncrBufferRefCount(buffer); /* prevent unpin in freeRumBtreeStack */

freeRumBtreeStack(gdi->stack);
Expand Down Expand Up @@ -232,6 +237,9 @@ scanPostingTree(Relation index, RumScanEntry scanEntry,
break; /* no more pages */

buffer = rumStep(buffer, index, RUM_SHARE, ForwardScanDirection);

PredicateLockPage(index, BufferGetBlockNumber(buffer), snapshot);

}

UnlockReleaseBuffer(buffer);
Expand All @@ -252,7 +260,7 @@ scanPostingTree(Relation index, RumScanEntry scanEntry,
*/
static bool
collectMatchBitmap(RumBtreeData * btree, RumBtreeStack * stack,
RumScanEntry scanEntry)
RumScanEntry scanEntry, Snapshot snapshot)
{
OffsetNumber attnum;
Form_pg_attribute attr;
Expand Down Expand Up @@ -370,7 +378,8 @@ collectMatchBitmap(RumBtreeData * btree, RumBtreeStack * stack,
LockBuffer(stack->buffer, RUM_UNLOCK);

/* Collect all the TIDs in this entry's posting tree */
scanPostingTree(btree->index, scanEntry, rootPostingTree, attnum, rumstate);
scanPostingTree(btree->index, scanEntry, rootPostingTree,
attnum, rumstate, snapshot);

/*
* We lock again the entry page and while it was unlocked insert
Expand Down Expand Up @@ -494,7 +503,7 @@ setListPositionScanEntry(RumState * rumstate, RumScanEntry entry)
* Start* functions setup beginning state of searches: finds correct buffer and pins it.
*/
static void
startScanEntry(RumState * rumstate, RumScanEntry entry)
startScanEntry(RumState * rumstate, RumScanEntry entry, Snapshot snapshot)
{
RumBtreeData btreeEntry;
RumBtreeStack *stackEntry;
Expand Down Expand Up @@ -527,6 +536,8 @@ startScanEntry(RumState * rumstate, RumScanEntry entry)

entry->isFinished = TRUE;

PredicateLockPage(rumstate->index, BufferGetBlockNumber(stackEntry->buffer), snapshot);

if (entry->isPartialMatch ||
(entry->queryCategory == RUM_CAT_EMPTY_QUERY &&
!entry->scanWithAddInfo))
Expand All @@ -539,7 +550,7 @@ startScanEntry(RumState * rumstate, RumScanEntry entry)
* for the entry type.
*/
btreeEntry.findItem(&btreeEntry, stackEntry);
if (collectMatchBitmap(&btreeEntry, stackEntry, entry) == false)
if (collectMatchBitmap(&btreeEntry, stackEntry, entry, snapshot) == false)
{
/*
* RUM tree was seriously restructured, so we will cleanup all
Expand Down Expand Up @@ -598,6 +609,8 @@ startScanEntry(RumState * rumstate, RumScanEntry entry)

entry->gdi = gdi;

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

/*
* We keep buffer pinned because we need to prevent deletion of
* page during scan. See RUM's vacuum implementation. RefCount is
Expand Down Expand Up @@ -707,7 +720,7 @@ startScan(IndexScanDesc scan)
MemoryContextSwitchTo(so->keyCtx);
for (i = 0; i < so->totalentries; i++)
{
startScanEntry(rumstate, so->entries[i]);
startScanEntry(rumstate, so->entries[i], scan->xs_snapshot);
}
MemoryContextSwitchTo(oldCtx);

Expand Down Expand Up @@ -795,7 +808,7 @@ startScan(IndexScanDesc scan)
for (i = 0; i < so->totalentries; i++)
{
if (!so->sortedEntries[i]->isFinished)
entryGetItem(&so->rumstate, so->sortedEntries[i], NULL);
entryGetItem(&so->rumstate, so->sortedEntries[i], NULL, scan->xs_snapshot);
}
qsort_arg(so->sortedEntries, so->totalentries, sizeof(RumScanEntry),
scan_entry_cmp, rumstate);
Expand All @@ -810,7 +823,7 @@ startScan(IndexScanDesc scan)
* to prevent interference with vacuum
*/
static void
entryGetNextItem(RumState * rumstate, RumScanEntry entry)
entryGetNextItem(RumState * rumstate, RumScanEntry entry, Snapshot snapshot)
{
Page page;

Expand All @@ -826,6 +839,8 @@ entryGetNextItem(RumState * rumstate, RumScanEntry entry)
LockBuffer(entry->buffer, RUM_SHARE);
page = BufferGetPage(entry->buffer);

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

if (scanPage(rumstate, entry, &entry->curItem, false))
{
LockBuffer(entry->buffer, RUM_UNLOCK);
Expand Down Expand Up @@ -863,6 +878,8 @@ entryGetNextItem(RumState * rumstate, RumScanEntry entry)
entry->gdi->stack->blkno = BufferGetBlockNumber(entry->buffer);
page = BufferGetPage(entry->buffer);

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

entry->offset = -1;
maxoff = RumPageGetOpaque(page)->maxoff;
entry->nlist = maxoff;
Expand Down Expand Up @@ -911,7 +928,7 @@ entryGetNextItem(RumState * rumstate, RumScanEntry entry)
}

static bool
entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
entryGetNextItemList(RumState * rumstate, RumScanEntry entry, Snapshot snapshot)
{
Page page;
IndexTuple itup;
Expand Down Expand Up @@ -1005,6 +1022,8 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
entry->buffer = rumScanBeginPostingTree(gdi, NULL);
entry->gdi = gdi;

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

/*
* We keep buffer pinned because we need to prevent deletion of
* page during scan. See RUM's vacuum implementation. RefCount is
Expand Down Expand Up @@ -1068,7 +1087,7 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
* Item pointers must be returned in ascending order.
*/
static void
entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList)
entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList, Snapshot snapshot)
{
Assert(!entry->isFinished);

Expand Down Expand Up @@ -1198,7 +1217,7 @@ entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList)
else if (entry->stack)
{
entry->offset++;
if (entryGetNextItemList(rumstate, entry) && nextEntryList)
if (entryGetNextItemList(rumstate, entry, snapshot) && nextEntryList)
*nextEntryList = true;
}
else
Expand All @@ -1211,14 +1230,14 @@ entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList)
{
do
{
entryGetNextItem(rumstate, entry);
entryGetNextItem(rumstate, entry, snapshot);
} while (entry->isFinished == FALSE &&
entry->reduceResult == TRUE &&
dropItem(entry));
if (entry->stack && entry->isFinished)
{
entry->isFinished = FALSE;
if (entryGetNextItemList(rumstate, entry) && nextEntryList)
if (entryGetNextItemList(rumstate, entry, snapshot) && nextEntryList)
*nextEntryList = true;
}
}
Expand Down Expand Up @@ -1381,7 +1400,7 @@ scanGetItemRegular(IndexScanDesc scan, RumItem *advancePast,
compareCurRumItemScanDirection(rumstate, entry,
&myAdvancePast) <= 0))
{
entryGetItem(rumstate, entry, NULL);
entryGetItem(rumstate, entry, NULL, scan->xs_snapshot);

if (!ItemPointerIsValid(&myAdvancePast.iptr))
break;
Expand Down Expand Up @@ -1612,7 +1631,7 @@ scanPage(RumState * rumstate, RumScanEntry entry, RumItem *item, bool equalOk)
*/

static void
entryFindItem(RumState * rumstate, RumScanEntry entry, RumItem * item)
entryFindItem(RumState * rumstate, RumScanEntry entry, RumItem * item, Snapshot snapshot)
{
if (entry->nlist == 0)
{
Expand Down Expand Up @@ -1656,6 +1675,8 @@ entryFindItem(RumState * rumstate, RumScanEntry entry, RumItem * item)
/* Check rest of page */
LockBuffer(entry->buffer, RUM_SHARE);

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

if (scanPage(rumstate, entry, item, true))
{
LockBuffer(entry->buffer, RUM_UNLOCK);
Expand All @@ -1671,6 +1692,8 @@ entryFindItem(RumState * rumstate, RumScanEntry entry, RumItem * item)
entry->gdi->stack = rumReFindLeafPage(&entry->gdi->btree, entry->gdi->stack);
entry->buffer = entry->gdi->stack->buffer;

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

if (scanPage(rumstate, entry, item, true))
{
LockBuffer(entry->buffer, RUM_UNLOCK);
Expand All @@ -1684,6 +1707,8 @@ entryFindItem(RumState * rumstate, RumScanEntry entry, RumItem * item)
RUM_SHARE, entry->scanDirection);
entry->gdi->stack->buffer = entry->buffer;

PredicateLockPage(rumstate->index, BufferGetBlockNumber(entry->buffer), snapshot);

if (entry->buffer == InvalidBuffer)
{
ItemPointerSetInvalid(&entry->curItem.iptr);
Expand Down Expand Up @@ -1760,7 +1785,7 @@ preConsistentCheck(RumScanOpaque so)
* to i.
*/
static void
entryShift(int i, RumScanOpaque so, bool find)
entryShift(int i, RumScanOpaque so, bool find, Snapshot snapshot)
{
int minIndex = -1,
j;
Expand All @@ -1784,9 +1809,9 @@ entryShift(int i, RumScanOpaque so, bool find)
/* Do shift of required type */
if (find)
entryFindItem(rumstate, so->sortedEntries[minIndex],
&so->sortedEntries[i - 1]->curItem);
&so->sortedEntries[i - 1]->curItem, snapshot);
else if (!so->sortedEntries[minIndex]->isFinished)
entryGetItem(rumstate, so->sortedEntries[minIndex], NULL);
entryGetItem(rumstate, so->sortedEntries[minIndex], NULL, snapshot);

/* Restore order of so->sortedEntries */
while (minIndex > 0 &&
Expand Down Expand Up @@ -1819,7 +1844,7 @@ scanGetItemFast(IndexScanDesc scan, RumItem *advancePast,
if (so->entriesIncrIndex >= 0)
{
for (k = so->entriesIncrIndex; k < so->totalentries; k++)
entryShift(k, so, false);
entryShift(k, so, false, scan->xs_snapshot);
}

for (;;)
Expand Down Expand Up @@ -1855,7 +1880,7 @@ scanGetItemFast(IndexScanDesc scan, RumItem *advancePast,

if (preConsistentResult == false)
{
entryShift(i, so, true);
entryShift(i, so, true, scan->xs_snapshot);
continue;
}

Expand Down Expand Up @@ -1892,7 +1917,7 @@ scanGetItemFast(IndexScanDesc scan, RumItem *advancePast,
{
consistentResult = false;
for (j = k; j < so->totalentries; j++)
entryShift(j, so, false);
entryShift(j, so, false, scan->xs_snapshot);
continue;
}
}
Expand Down Expand Up @@ -1951,7 +1976,7 @@ scanGetItemFull(IndexScanDesc scan, RumItem *advancePast,
*/
entry = so->entries[0];

entryGetItem(&so->rumstate, entry, &nextEntryList);
entryGetItem(&so->rumstate, entry, &nextEntryList, scan->xs_snapshot);
if (entry->isFinished == TRUE)
return false;

Expand Down Expand Up @@ -1982,7 +2007,7 @@ scanGetItemFull(IndexScanDesc scan, RumItem *advancePast,
(!ItemPointerIsValid(&orderEntry->curItem.iptr) ||
compareCurRumItemScanDirection(&so->rumstate, orderEntry,
&entry->curItem) < 0))
entryGetItem(&so->rumstate, orderEntry, NULL);
entryGetItem(&so->rumstate, orderEntry, NULL, scan->xs_snapshot);
}

*item = entry->curItem;
Expand Down
3 changes: 3 additions & 0 deletions src/ruminsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "postgres.h"

#include "access/generic_xlog.h"
#include "storage/predicate.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -421,6 +422,8 @@ rumEntryInsert(RumState * rumstate,
stack = rumFindLeafPage(&btree, NULL);
page = BufferGetPage(stack->buffer);

CheckForSerializableConflictIn(btree.index, NULL, stack->buffer);

if (btree.findItem(&btree, stack))
{
/* found pre-existing entry */
Expand Down
2 changes: 1 addition & 1 deletion src/rumutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ rumhandler(PG_FUNCTION_ARGS)
amroutine->amsearchnulls = false;
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->ampredlocks = true;
amroutine->amkeytype = InvalidOid;

amroutine->ambuild = rumbuild;
Expand Down
Loading

0 comments on commit e28e31d

Please sign in to comment.