Permalink
Browse files

More GIN refactoring.

Split off the portion of ginInsertValue that inserts the tuple to current
level into a separate function, ginPlaceToPage. ginInsertValue's charter
is now to recurse up the tree to insert the downlink, when a page split is
required.

This is in preparation for a patch to change the way incomplete splits are
handled, which will need to do these operations separately. And IMHO makes
the code more readable anyway.
  • Loading branch information...
1 parent 5010126 commit 04eee1fa9ee80dabf7cf4b8b9106897272e9b291 @hlinnaka hlinnaka committed Nov 20, 2013
Showing with 148 additions and 128 deletions.
  1. +148 −128 src/backend/access/gin/ginbtree.c
@@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
}
/*
- * Insert value (stored in GinBtree) to tree described by stack
- *
- * During an index build, buildStats is non-null and the counters
- * it contains are incremented as needed.
+ * Returns true if the insertion is done, false if the page was split and
+ * downlink insertion is pending.
*
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * stack->buffer is locked on entry, and is kept locked.
*/
-void
-ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static bool
+ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
+ GinStatsData *buildStats)
{
- GinBtreeStack *parent;
- BlockNumber rootBlkno;
- Page page,
- rpage,
- lpage;
+ Page page = BufferGetPage(stack->buffer);
+ XLogRecData *rdata;
+ bool fit;
- /* extract root BlockNumber from stack */
- Assert(stack != NULL);
- parent = stack;
- while (parent->parent)
- parent = parent->parent;
- rootBlkno = parent->blkno;
- Assert(BlockNumberIsValid(rootBlkno));
+ START_CRIT_SECTION();
+ fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
+ if (fit)
+ {
+ MarkBufferDirty(stack->buffer);
- /* this loop crawls up the stack until the insertion is complete */
- for (;;)
+ if (RelationNeedsWAL(btree->index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ return true;
+ }
+ else
{
- XLogRecData *rdata;
+ /* Didn't fit, have to split */
+ Buffer rbuffer;
+ Page newlpage;
BlockNumber savedRightLink;
- bool fit;
+ GinBtreeStack *parent;
+ Page lpage,
+ rpage;
+
+ END_CRIT_SECTION();
+
+ rbuffer = GinNewBuffer(btree->index);
- page = BufferGetPage(stack->buffer);
savedRightLink = GinPageGetOpaque(page)->rightlink;
- START_CRIT_SECTION();
- fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
- if (fit)
+ /*
+ * newlpage is a pointer to memory page, it is not associated with
+ * a buffer. stack->buffer is not touched yet.
+ */
+ newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
+
+ ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
+
+ /* During index build, count the newly-split page */
+ if (buildStats)
{
+ if (btree->isData)
+ buildStats->nDataPages++;
+ else
+ buildStats->nEntryPages++;
+ }
+
+ parent = stack->parent;
+
+ if (parent == NULL)
+ {
+ /*
+ * split root, so we need to allocate new left page and place
+ * pointer on root to left and right page
+ */
+ Buffer lbuffer = GinNewBuffer(btree->index);
+
+ ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
+ ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
+
+ lpage = BufferGetPage(lbuffer);
+ rpage = BufferGetPage(rbuffer);
+
+ GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
+ GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+ ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
+
+ START_CRIT_SECTION();
+
+ GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
+ PageRestoreTempPage(newlpage, lpage);
+ btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
+
+ MarkBufferDirty(rbuffer);
+ MarkBufferDirty(lbuffer);
MarkBufferDirty(stack->buffer);
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(page, recptr);
+ PageSetLSN(lpage, recptr);
+ PageSetLSN(rpage, recptr);
}
- LockBuffer(stack->buffer, GIN_UNLOCK);
- END_CRIT_SECTION();
-
- freeGinBtreeStack(stack);
-
- return;
- }
- else
- {
- /* Didn't fit, have to split */
- Buffer rbuffer;
- Page newlpage;
-
+ UnlockReleaseBuffer(rbuffer);
+ UnlockReleaseBuffer(lbuffer);
END_CRIT_SECTION();
- rbuffer = GinNewBuffer(btree->index);
-
- /*
- * newlpage is a pointer to memory page, it is not associated with
- * a buffer. stack->buffer is not touched yet.
- */
- newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
-
- ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
-
- /* During index build, count the newly-split page */
+ /* During index build, count the newly-added root page */
if (buildStats)
{
if (btree->isData)
@@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
buildStats->nEntryPages++;
}
- parent = stack->parent;
-
- if (parent == NULL)
- {
- /*
- * split root, so we need to allocate new left page and place
- * pointer on root to left and right page
- */
- Buffer lbuffer = GinNewBuffer(btree->index);
-
- ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
- ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
- page = BufferGetPage(stack->buffer);
- lpage = BufferGetPage(lbuffer);
- rpage = BufferGetPage(rbuffer);
-
- GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
- GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
- ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
- START_CRIT_SECTION();
-
- GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
- PageRestoreTempPage(newlpage, lpage);
- btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
- MarkBufferDirty(rbuffer);
- MarkBufferDirty(lbuffer);
- MarkBufferDirty(stack->buffer);
+ return true;
+ }
+ else
+ {
+ /* split non-root page */
+ ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
+ ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
- if (RelationNeedsWAL(btree->index))
- {
- XLogRecPtr recptr;
+ lpage = BufferGetPage(stack->buffer);
+ rpage = BufferGetPage(rbuffer);
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
- PageSetLSN(page, recptr);
- PageSetLSN(lpage, recptr);
- PageSetLSN(rpage, recptr);
- }
+ GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+ GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
- UnlockReleaseBuffer(rbuffer);
- UnlockReleaseBuffer(lbuffer);
- LockBuffer(stack->buffer, GIN_UNLOCK);
- END_CRIT_SECTION();
+ START_CRIT_SECTION();
+ PageRestoreTempPage(newlpage, lpage);
- freeGinBtreeStack(stack);
+ MarkBufferDirty(rbuffer);
+ MarkBufferDirty(stack->buffer);
- /* During index build, count the newly-added root page */
- if (buildStats)
- {
- if (btree->isData)
- buildStats->nDataPages++;
- else
- buildStats->nEntryPages++;
- }
+ if (RelationNeedsWAL(btree->index))
+ {
+ XLogRecPtr recptr;
- return;
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ PageSetLSN(lpage, recptr);
+ PageSetLSN(rpage, recptr);
}
- else
- {
- /* split non-root page */
- ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
- ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
+ UnlockReleaseBuffer(rbuffer);
+ END_CRIT_SECTION();
- lpage = BufferGetPage(stack->buffer);
- rpage = BufferGetPage(rbuffer);
+ return false;
+ }
+ }
+}
- GinPageGetOpaque(rpage)->rightlink = savedRightLink;
- GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+/*
+ * Insert value (stored in GinBtree) to tree described by stack
+ *
+ * During an index build, buildStats is non-null and the counters
+ * it contains are incremented as needed.
+ *
+ * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+{
+ GinBtreeStack *parent;
+ BlockNumber rootBlkno;
+ Page page;
+
+ /* extract root BlockNumber from stack */
+ Assert(stack != NULL);
+ parent = stack;
+ while (parent->parent)
+ parent = parent->parent;
+ rootBlkno = parent->blkno;
+ Assert(BlockNumberIsValid(rootBlkno));
- START_CRIT_SECTION();
- PageRestoreTempPage(newlpage, lpage);
+ /* this loop crawls up the stack until the insertion is complete */
+ for (;;)
+ {
+ bool done;
- MarkBufferDirty(rbuffer);
- MarkBufferDirty(stack->buffer);
+ done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
- if (RelationNeedsWAL(btree->index))
- {
- XLogRecPtr recptr;
+ /* just to be extra sure we don't delete anything by accident... */
+ btree->isDelete = FALSE;
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
- PageSetLSN(lpage, recptr);
- PageSetLSN(rpage, recptr);
- }
- UnlockReleaseBuffer(rbuffer);
- END_CRIT_SECTION();
- }
+ if (done)
+ {
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ freeGinBtreeStack(stack);
+ break;
}
btree->prepareDownlink(btree, stack->buffer);
- btree->isDelete = FALSE;
/* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE);

0 comments on commit 04eee1f

Please sign in to comment.