Skip to content

Commit

Permalink
Minor cleanup of the BRIN parallel build code
Browse files Browse the repository at this point in the history
Commit b437571 added support for parallel builds for BRIN indexes,
using code similar to BTREE parallel builds, and also a new tuplesort
variant. This commit simplifies the new code in two ways:

* The "spool" grouping tuplesort and the heap/index is not necessary.
  The heap/index are available as separate arguments, causing confusion.
  So remove the spool, and use the tuplesort directly.

* The new tuplesort variant does not need the heap/index, as it sorts
  simply by the range block number, without accessing the tuple data.
  So simplify that too.

Initial report and patch by Ranier Vilela, further cleanup by me.

Author: Ranier Vilela
Discussion: https://postgr.es/m/CAEudQAqD7f2i4iyEaAz-5o-bf6zXVX-AkNUBm-YjUXEemaEh6A%40mail.gmail.com
  • Loading branch information
tvondra committed Dec 30, 2023
1 parent 5632d6e commit 6c63bcb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 90 deletions.
98 changes: 36 additions & 62 deletions src/backend/access/brin/brin.c
Expand Up @@ -50,16 +50,6 @@
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)

/*
* Status record for spooling/sorting phase.
*/
typedef struct BrinSpool
{
Tuplesortstate *sortstate; /* state data for tuplesort.c */
Relation heap;
Relation index;
} BrinSpool;

/*
* Status for index builds performed in parallel. This is allocated in a
* dynamic shared memory segment.
Expand Down Expand Up @@ -183,7 +173,13 @@ typedef struct BrinBuildState
*/
BrinLeader *bs_leader;
int bs_worker_id;
BrinSpool *bs_spool;

/*
* The sortstate is used by workers (including the leader). It has to be
* part of the build state, because that's the only thing passed to the
* build callback etc.
*/
Tuplesortstate *bs_sortstate;
} BrinBuildState;

/*
Expand Down Expand Up @@ -231,12 +227,11 @@ static void brin_fill_empty_ranges(BrinBuildState *state,
/* parallel index builds */
static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
bool isconcurrent, int request);
static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
Relation heap, Relation index);
static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
BrinSpool *brinspool,
BrinShared *brinshared,
Sharedsort *sharedsort,
Relation heap, Relation index,
Expand Down Expand Up @@ -1143,10 +1138,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
state = initialize_brin_buildstate(index, revmap, pagesPerRange,
RelationGetNumberOfBlocks(heap));

state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
state->bs_spool->heap = heap;
state->bs_spool->index = index;

/*
* Attempt to launch parallel worker scan when required
*
Expand All @@ -1160,11 +1151,13 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
indexInfo->ii_ParallelWorkers);

/*
* Now scan the relation. No syncscan allowed here because we want the
* heap blocks in physical order.
*
* If parallel build requested and at least one worker process was
* successfully launched, set up coordination state
* successfully launched, set up coordination state, wait for workers to
* complete. Then read all tuples from the shared tuplesort and insert
* them into the index.
*
* In serial mode, simply scan the table and build the index one index
* tuple at a time.
*/
if (state->bs_leader)
{
Expand All @@ -1176,9 +1169,8 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
state->bs_leader->nparticipanttuplesorts;
coordinate->sharedsort = state->bs_leader->sharedsort;


/*
* Begin serial/leader tuplesort.
* Begin leader tuplesort.
*
* In cases where parallelism is involved, the leader receives the
* same share of maintenance_work_mem as a serial sort (it is
Expand All @@ -1199,19 +1191,18 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* INDEX operation, regardless of the use of parallelism or any other
* factor.
*/
state->bs_spool->sortstate =
tuplesort_begin_index_brin(heap, index,
maintenance_work_mem, coordinate,
state->bs_sortstate =
tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
TUPLESORT_NONE);

/*
* In parallel mode, wait for workers to complete, and then read all
* tuples from the shared tuplesort and insert them into the index.
*/
_brin_end_parallel(state->bs_leader, state);
}
else /* no parallel index build */
{
/*
* Now scan the relation. No syncscan allowed here because we want
* the heap blocks in physical order.
*/
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
brinbuildCallback, (void *) state, NULL);

Expand Down Expand Up @@ -1671,7 +1662,7 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
state->bs_leader = NULL;
state->bs_worker_id = 0;
state->bs_spool = NULL;
state->bs_sortstate = NULL;
state->bs_context = CurrentMemoryContext;
state->bs_emptyTuple = NULL;
state->bs_emptyTupleLen = 0;
Expand Down Expand Up @@ -2002,7 +1993,7 @@ form_and_spill_tuple(BrinBuildState *state)
state->bs_dtuple, &size);

/* write the BRIN tuple to the tuplesort */
tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size);
tuplesort_putbrintuple(state->bs_sortstate, tup, size);

state->bs_numtuples++;

Expand Down Expand Up @@ -2522,7 +2513,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
Size tuplen;
BrinShared *brinshared = brinleader->brinshared;
BlockNumber prevblkno = InvalidBlockNumber;
BrinSpool *spool;
MemoryContext rangeCxt,
oldCxt;

Expand All @@ -2541,8 +2531,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
state->bs_numtuples = brinshared->indtuples;

/* do the actual sort in the leader */
spool = state->bs_spool;
tuplesort_performsort(spool->sortstate);
tuplesort_performsort(state->bs_sortstate);

/*
* Initialize BrinMemTuple we'll use to union summaries from workers (in
Expand All @@ -2568,7 +2557,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
* That probably gives us an index that is cheaper to scan, thanks to
* mostly getting data from the same index page as before.
*/
while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
{
/* Ranges should be multiples of pages_per_range for the index. */
Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
Expand Down Expand Up @@ -2640,7 +2629,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
prevblkno = btup->bt_blkno;
}

tuplesort_end(spool->sortstate);
tuplesort_end(state->bs_sortstate);

/* Fill the BRIN tuple for the last page range with data. */
if (prevblkno != InvalidBlockNumber)
Expand Down Expand Up @@ -2704,11 +2693,6 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
BrinLeader *brinleader = buildstate->bs_leader;
int sortmem;

/* Allocate memory and initialize private spool */
buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
buildstate->bs_spool->heap = buildstate->bs_spool->heap;
buildstate->bs_spool->index = buildstate->bs_spool->index;

/*
* Might as well use reliable figure when doling out maintenance_work_mem
* (when requested number of workers were not launched, this will be
Expand All @@ -2717,27 +2701,25 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;

/* Perform work common to all participants */
_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared,
_brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
brinleader->sharedsort, heap, index, sortmem, true);
}

/*
* Perform a worker's portion of a parallel sort.
*
* This generates a tuplesort for passed btspool, and a second tuplesort
* state if a second btspool is need (i.e. for unique index builds). All
* other spool fields should already be set when this is called.
* This generates a tuplesort for the worker portion of the table.
*
* sortmem is the amount of working memory to use within each worker,
* expressed in KBs.
*
* When this returns, workers are done, and need only release resources.
*/
static void
_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
_brin_parallel_scan_and_build(BrinBuildState *state,
BrinShared *brinshared, Sharedsort *sharedsort,
Relation heap, Relation index, int sortmem,
bool progress)
Relation heap, Relation index,
int sortmem, bool progress)
{
SortCoordinate coordinate;
TableScanDesc scan;
Expand All @@ -2751,10 +2733,8 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
coordinate->sharedsort = sharedsort;

/* Begin "partial" tuplesort */
brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap,
brinspool->index,
sortmem, coordinate,
TUPLESORT_NONE);
state->bs_sortstate = tuplesort_begin_index_brin(sortmem, coordinate,
TUPLESORT_NONE);

/* Join parallel scan */
indexInfo = BuildIndexInfo(index);
Expand All @@ -2770,7 +2750,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
form_and_spill_tuple(state);

/* sort the BRIN ranges built by this worker */
tuplesort_performsort(brinspool->sortstate);
tuplesort_performsort(state->bs_sortstate);

state->bs_reltuples += reltuples;

Expand All @@ -2786,7 +2766,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
/* Notify leader */
ConditionVariableSignal(&brinshared->workersdonecv);

tuplesort_end(brinspool->sortstate);
tuplesort_end(state->bs_sortstate);
}

/*
Expand Down Expand Up @@ -2844,11 +2824,6 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
brinshared->pagesPerRange,
InvalidBlockNumber);

/* Initialize worker's own spool */
buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
buildstate->bs_spool->heap = heapRel;
buildstate->bs_spool->index = indexRel;

/* Look up shared state private to tuplesort.c */
sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
tuplesort_attach_shared(sharedsort, seg);
Expand All @@ -2863,8 +2838,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
*/
sortmem = maintenance_work_mem / brinshared->scantuplesortstates;

_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool,
brinshared, sharedsort,
_brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
heapRel, indexRel, sortmem, false);

/* Report WAL/buffer usage during parallel execution */
Expand Down
26 changes: 2 additions & 24 deletions src/backend/utils/sort/tuplesortvariants.c
Expand Up @@ -137,16 +137,6 @@ typedef struct
uint32 max_buckets;
} TuplesortIndexHashArg;

/*
* Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase.
*/
typedef struct
{
TuplesortIndexArg index;

/* XXX do we need something here? */
} TuplesortIndexBrinArg;

/*
* Data struture pointed by "TuplesortPublic.arg" for the Datum case.
* Set by tuplesort_begin_datum and used only by the DatumTuple routines.
Expand Down Expand Up @@ -562,20 +552,13 @@ tuplesort_begin_index_gist(Relation heapRel,
}

Tuplesortstate *
tuplesort_begin_index_brin(Relation heapRel,
Relation indexRel,
int workMem,
tuplesort_begin_index_brin(int workMem,
SortCoordinate coordinate,
int sortopt)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
sortopt);
TuplesortPublic *base = TuplesortstateGetPublic(state);
MemoryContext oldcontext;
TuplesortIndexBrinArg *arg;

oldcontext = MemoryContextSwitchTo(base->maincontext);
arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg));

#ifdef TRACE_SORT
if (trace_sort)
Expand All @@ -592,12 +575,7 @@ tuplesort_begin_index_brin(Relation heapRel,
base->writetup = writetup_index_brin;
base->readtup = readtup_index_brin;
base->haveDatum1 = true;
base->arg = arg;

arg->index.heapRel = heapRel;
arg->index.indexRel = indexRel;

MemoryContextSwitchTo(oldcontext);
base->arg = NULL;

return state;
}
Expand Down
4 changes: 1 addition & 3 deletions src/include/utils/tuplesort.h
Expand Up @@ -430,9 +430,7 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
Relation indexRel,
int workMem, SortCoordinate coordinate,
int sortopt);
extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel,
Relation indexRel,
int workMem, SortCoordinate coordinate,
extern Tuplesortstate *tuplesort_begin_index_brin(int workMem, SortCoordinate coordinate,
int sortopt);
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, Oid sortCollation,
Expand Down
1 change: 0 additions & 1 deletion src/tools/pgindent/typedefs.list
Expand Up @@ -307,7 +307,6 @@ BrinRevmap
BrinShared
BrinSortTuple
BrinSpecialSpace
BrinSpool
BrinStatsData
BrinTuple
BrinValues
Expand Down

0 comments on commit 6c63bcb

Please sign in to comment.