Skip to content

Commit

Permalink
Simplify control flow in decompress_chunk_exec
Browse files Browse the repository at this point in the history
No functional changes, mostly just reshuffles the code to prepare for
batch decompression.

Also removes unneeded repeated column value stores and ExecStoreTuple,
to save 3-5% execution time on some queries.
  • Loading branch information
akuzm committed Mar 22, 2023
1 parent 63b416b commit 5c07a57
Showing 1 changed file with 126 additions and 99 deletions.
225 changes: 126 additions & 99 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ typedef struct DecompressChunkState
int hypertable_id;
Oid chunk_relid;
List *hypertable_compression_info;
int counter;
int total_batch_rows;
int current_batch_row;
MemoryContext per_batch_context;
} DecompressChunkState;

Expand Down Expand Up @@ -301,14 +302,18 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
}

static void
initialize_batch(DecompressChunkState *state, TupleTableSlot *slot)
initialize_batch(DecompressChunkState *state, TupleTableSlot *compressed_slot,
TupleTableSlot *decompressed_slot)
{
Datum value;
bool isnull;
int i;
MemoryContext old_context = MemoryContextSwitchTo(state->per_batch_context);
MemoryContextReset(state->per_batch_context);

state->total_batch_rows = 0;
state->current_batch_row = 0;

for (i = 0; i < state->num_columns; i++)
{
DecompressChunkColumnState *column = &state->columns[i];
Expand All @@ -317,35 +322,60 @@ initialize_batch(DecompressChunkState *state, TupleTableSlot *slot)
{
case COMPRESSED_COLUMN:
{
value = slot_getattr(slot, column->compressed_scan_attno, &isnull);
if (!isnull)
{
CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value);
column->compressed.iterator = NULL;

column->compressed.iterator =
tsl_get_decompression_iterator_init(header->compression_algorithm,
state->reverse)(PointerGetDatum(header),
column->typid);
value = slot_getattr(compressed_slot, column->compressed_scan_attno, &isnull);

if (isnull)
{
/*
* The column will have a default value for the entire batch,
* set it now.
*/
AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno);
decompressed_slot->tts_values[attr] =
getmissingattr(decompressed_slot->tts_tupleDescriptor,
attr + 1,
&decompressed_slot->tts_isnull[attr]);
break;
}
else
column->compressed.iterator = NULL;

CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value);

column->compressed.iterator =
tsl_get_decompression_iterator_init(header->compression_algorithm,
state->reverse)(PointerGetDatum(header),
column->typid);

break;
}
case SEGMENTBY_COLUMN:
value = slot_getattr(slot, column->compressed_scan_attno, &isnull);
if (!isnull)
column->segmentby.value = value;
else
column->segmentby.value = (Datum) 0;

column->segmentby.isnull = isnull;
{
/*
* A segmentby column is not going to change during one batch,
* and our output tuples are read-only, so it's enough to only
* save it once per batch, which we do here.
*/
AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno);
decompressed_slot->tts_values[attr] =
slot_getattr(compressed_slot,
column->compressed_scan_attno,
&decompressed_slot->tts_isnull[attr]);
break;
}
case COUNT_COLUMN:
value = slot_getattr(slot, column->compressed_scan_attno, &isnull);
state->counter = DatumGetInt32(value);
value = slot_getattr(compressed_slot, column->compressed_scan_attno, &isnull);
/* count column should never be NULL */
Assert(!isnull);
int count_value = DatumGetInt32(value);
if (count_value <= 0)
{
ereport(ERROR,
(errmsg("the compressed data is corrupt: got a segment with length %d",
count_value)));
}
Assert(state->total_batch_rows == 0);
state->total_batch_rows = count_value;
break;
case SEQUENCE_NUM_COLUMN:
/*
Expand All @@ -370,12 +400,12 @@ decompress_chunk_exec(CustomScanState *node)

while (true)
{
TupleTableSlot *slot = decompress_chunk_create_tuple(state);
TupleTableSlot *decompressed_slot = decompress_chunk_create_tuple(state);

if (TupIsNull(slot))
if (TupIsNull(decompressed_slot))
return NULL;

econtext->ecxt_scantuple = slot;
econtext->ecxt_scantuple = decompressed_slot;

/* Reset expression memory context to clean out any cruft from
* previous tuple. */
Expand All @@ -384,12 +414,12 @@ decompress_chunk_exec(CustomScanState *node)
if (node->ss.ps.qual && !ExecQual(node->ss.ps.qual, econtext))
{
InstrCountFiltered1(node, 1);
ExecClearTuple(slot);
ExecClearTuple(decompressed_slot);
continue;
}

if (!node->ss.ps.ps_ProjInfo)
return slot;
return decompressed_slot;

return ExecProject(node->ss.ps.ps_ProjInfo);
}
Expand All @@ -415,102 +445,99 @@ decompress_chunk_end(CustomScanState *node)
static TupleTableSlot *
decompress_chunk_create_tuple(DecompressChunkState *state)
{
TupleTableSlot *slot = state->csstate.ss.ss_ScanTupleSlot;
bool batch_done = false;
int i;
TupleTableSlot *decompressed_slot = state->csstate.ss.ss_ScanTupleSlot;

while (true)
{
if (state->initialized && state->current_batch_row >= state->total_batch_rows)
{
/*
* Reached end of batch. Check that the columns that we're decompressing
* row-by-row have also ended.
*/
state->initialized = false;
for (int i = 0; i < state->num_columns; i++)
{
DecompressChunkColumnState *column = &state->columns[i];
if (column->type == COMPRESSED_COLUMN && column->compressed.iterator)
{
DecompressResult result =
column->compressed.iterator->try_next(column->compressed.iterator);
if (!result.is_done)
{
elog(ERROR, "compressed column out of sync with batch counter");
}
}
}
}

if (!state->initialized)
{
TupleTableSlot *subslot = ExecProcNode(linitial(state->csstate.custom_ps));
ExecClearTuple(decompressed_slot);

/*
* Reset expression memory context to clean out any cruft from
* previous batch. Our batches are 1000 rows max, and this memory
* context is used by ExecProject and ExecQual, which shouldn't
* leak too much. So we only do this per batch and not per tuple to
* save some CPU.
*/
ExprContext *econtext = state->csstate.ss.ps.ps_ExprContext;
ResetExprContext(econtext);

if (TupIsNull(subslot))
TupleTableSlot *compressed_slot = ExecProcNode(linitial(state->csstate.custom_ps));

if (TupIsNull(compressed_slot))
return NULL;

batch_done = false;
initialize_batch(state, subslot);
initialize_batch(state, compressed_slot, decompressed_slot);
}

ExecClearTuple(slot);
Assert(state->initialized);
Assert(state->total_batch_rows > 0);
Assert(state->current_batch_row < state->total_batch_rows);

for (i = 0; i < state->num_columns; i++)
for (int i = 0; i < state->num_columns; i++)
{
DecompressChunkColumnState *column = &state->columns[i];
switch (column->type)
if (column->type != COMPRESSED_COLUMN)
{
case COUNT_COLUMN:
if (state->counter <= 0)
/*
* we continue checking other columns even if counter
* reaches zero to sanity check all columns are in sync
* and agree about batch end
*/
batch_done = true;
else
state->counter--;
break;
case COMPRESSED_COLUMN:
{
AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno);
continue;
}

if (!column->compressed.iterator)
{
slot->tts_values[attr] = getmissingattr(slot->tts_tupleDescriptor,
attr + 1,
&slot->tts_isnull[attr]);
}
else
{
DecompressResult result;
result = column->compressed.iterator->try_next(column->compressed.iterator);

if (result.is_done)
{
batch_done = true;
continue;
}
else if (batch_done)
{
/*
* since the count column is the first column batch_done
* might be true if compressed column is out of sync with
* the batch counter.
*/
elog(ERROR, "compressed column out of sync with batch counter");
}

slot->tts_values[attr] = result.val;
slot->tts_isnull[attr] = result.is_null;
}
const AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno);

break;
}
case SEGMENTBY_COLUMN:
{
AttrNumber attr = AttrNumberGetAttrOffset(column->output_attno);
if (column->compressed.iterator != NULL)
{
DecompressResult result =
column->compressed.iterator->try_next(column->compressed.iterator);

slot->tts_values[attr] = column->segmentby.value;
slot->tts_isnull[attr] = column->segmentby.isnull;
break;
if (result.is_done)
{
elog(ERROR, "compressed column out of sync with batch counter");
}
case SEQUENCE_NUM_COLUMN:
/*
* nothing to do here for sequence number
* we only needed this for sorting in node below
*/
break;

decompressed_slot->tts_isnull[attr] = result.is_null;
decompressed_slot->tts_values[attr] = result.val;
}
}

if (batch_done)
/*
* It's a virtual tuple slot, so no point in clearing/storing it
* per each row, we can just update the values in-place. This saves
* some CPU. We have to store it after ExecQual returns false (the tuple
* didn't pass the filter), or after a new batch. The standard protocol
* is to clear and set the tuple slot for each row, but our output tuple
* slots are read-only, and the memory is owned by this node, so it is
* safe to violate this protocol.
*/
Assert(TTS_IS_VIRTUAL(decompressed_slot));
if (TTS_EMPTY(decompressed_slot))
{
state->initialized = false;
continue;
ExecStoreVirtualTuple(decompressed_slot);
}

ExecStoreVirtualTuple(slot);

return slot;
state->current_batch_row++;
return decompressed_slot;
}
}

0 comments on commit 5c07a57

Please sign in to comment.