Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use data inheritance from VirtualTupleTableSlot in compressed batch #6615

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 4 additions & 18 deletions tsl/src/nodes/decompress_chunk/batch_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ batch_array_destroy(BatchArray *array)
for (int i = 0; i < array->n_batch_states; i++)
{
DecompressBatchState *batch_state = batch_array_get_at(array, i);
Assert(batch_state != NULL);

if (batch_state->compressed_slot != NULL)
ExecDropSingleTupleTableSlot(batch_state->compressed_slot);

if (batch_state->decompressed_scan_slot != NULL)
ExecDropSingleTupleTableSlot(batch_state->decompressed_scan_slot);
compressed_batch_destroy(batch_state);
}

pfree(array->batch_states);
Expand Down Expand Up @@ -86,16 +80,7 @@ batch_array_clear_at(BatchArray *array, int batch_index)
DecompressBatchState *batch_state = batch_array_get_at(array, batch_index);

/* Reset batch state */
batch_state->total_batch_rows = 0;
batch_state->next_batch_row = 0;
batch_state->vector_qual_result = NULL;

if (batch_state->per_batch_context != NULL)
{
ExecClearTuple(batch_state->compressed_slot);
ExecClearTuple(batch_state->decompressed_scan_slot);
MemoryContextReset(batch_state->per_batch_context);
}
compressed_batch_discard_tuples(batch_state);

array->unused_batch_states = bms_add_member(array->unused_batch_states, batch_index);
}
Expand Down Expand Up @@ -124,7 +109,8 @@ batch_array_get_unused_slot(BatchArray *array)

Assert(next_unused_batch >= 0);
Assert(next_unused_batch < array->n_batch_states);
Assert(TupIsNull(batch_array_get_at(array, next_unused_batch)->decompressed_scan_slot));

Assert(TupIsNull(compressed_batch_current_tuple(batch_array_get_at(array, next_unused_batch))));

array->unused_batch_states = bms_del_member(array->unused_batch_states, next_unused_batch);

Expand Down
8 changes: 4 additions & 4 deletions tsl/src/nodes/decompress_chunk/batch_queue_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ batch_queue_fifo_free(BatchQueue *bq)
static inline bool
batch_queue_fifo_needs_next_batch(BatchQueue *bq)
{
return TupIsNull(batch_array_get_at(&bq->batch_array, 0)->decompressed_scan_slot);
return TupIsNull(compressed_batch_current_tuple(batch_array_get_at(&bq->batch_array, 0)));
}

static inline void
batch_queue_fifo_pop(BatchQueue *bq, DecompressContext *dcontext)
{
DecompressBatchState *batch_state = batch_array_get_at(&bq->batch_array, 0);
if (TupIsNull(batch_state->decompressed_scan_slot))
if (TupIsNull(compressed_batch_current_tuple(batch_state)))
{
/* Allow this function to be called on the initial empty queue. */
return;
Expand All @@ -40,7 +40,7 @@ batch_queue_fifo_push_batch(BatchQueue *bq, DecompressContext *dcontext,
{
BatchArray *batch_array = &bq->batch_array;
DecompressBatchState *batch_state = batch_array_get_at(batch_array, 0);
Assert(TupIsNull(batch_array_get_at(batch_array, 0)->decompressed_scan_slot));
Assert(TupIsNull(compressed_batch_current_tuple(batch_array_get_at(batch_array, 0))));
compressed_batch_set_compressed_tuple(dcontext, batch_state, compressed_slot);
compressed_batch_advance(dcontext, batch_state);
}
Expand All @@ -54,7 +54,7 @@ batch_queue_fifo_reset(BatchQueue *bq)
static inline TupleTableSlot *
batch_queue_fifo_top_tuple(BatchQueue *bq)
{
return batch_array_get_at(&bq->batch_array, 0)->decompressed_scan_slot;
return compressed_batch_current_tuple(batch_array_get_at(&bq->batch_array, 0));
}

static const struct BatchQueueFunctions BatchQueueFunctionsFifo = {
Expand Down
23 changes: 13 additions & 10 deletions tsl/src/nodes/decompress_chunk/batch_queue_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ batch_queue_heap_pop(BatchQueue *bq, DecompressContext *dcontext)

compressed_batch_advance(dcontext, top_batch);

if (TupIsNull(top_batch->decompressed_scan_slot))
TupleTableSlot *top_tuple = compressed_batch_current_tuple(top_batch);
if (TupIsNull(top_tuple))
{
/* Batch is exhausted, recycle batch_state */
(void) binaryheap_remove_first(queue->merge_heap);
Expand All @@ -205,11 +206,11 @@ batch_queue_heap_pop(BatchQueue *bq, DecompressContext *dcontext)
/*
* We're working with virtual tuple slots so no need for slot_getattr().
*/
Assert(TTS_IS_VIRTUAL(top_batch->decompressed_scan_slot));
Assert(TTS_IS_VIRTUAL(top_tuple));
queue->heap_entries[top_batch_index * queue->nkeys + key].value =
top_batch->decompressed_scan_slot->tts_values[attr];
top_tuple->tts_values[attr];
queue->heap_entries[top_batch_index * queue->nkeys + key].null =
top_batch->decompressed_scan_slot->tts_isnull[attr];
top_tuple->tts_isnull[attr];
}

/* Place this batch on the heap according to its new decompressed tuple. */
Expand Down Expand Up @@ -287,7 +288,8 @@ batch_queue_heap_push_batch(BatchQueue *_queue, DecompressContext *dcontext,
queue->last_batch_first_tuple_slot->tts_isnull[attr];
}

if (TupIsNull(batch_state->decompressed_scan_slot))
TupleTableSlot *current_tuple = compressed_batch_current_tuple(batch_state);
if (TupIsNull(current_tuple))
{
/* Might happen if there are no tuples in the batch that pass the quals. */
batch_array_clear_at(batch_array, new_batch_index);
Expand All @@ -305,11 +307,11 @@ batch_queue_heap_push_batch(BatchQueue *_queue, DecompressContext *dcontext,
/*
* We're working with virtual tuple slots so no need for slot_getattr().
*/
Assert(TTS_IS_VIRTUAL(batch_state->decompressed_scan_slot));
Assert(TTS_IS_VIRTUAL(current_tuple));
queue->heap_entries[new_batch_index * queue->nkeys + key].value =
batch_state->decompressed_scan_slot->tts_values[attr];
current_tuple->tts_values[attr];
queue->heap_entries[new_batch_index * queue->nkeys + key].null =
batch_state->decompressed_scan_slot->tts_isnull[attr];
current_tuple->tts_isnull[attr];
}

/*
Expand All @@ -331,8 +333,9 @@ batch_queue_heap_top_tuple(BatchQueue *bq)

const int top_batch_index = DatumGetInt32(binaryheap_first(bqh->merge_heap));
DecompressBatchState *top_batch = batch_array_get_at(batch_array, top_batch_index);
Assert(!TupIsNull(top_batch->decompressed_scan_slot));
return top_batch->decompressed_scan_slot;
TupleTableSlot *top_tuple = compressed_batch_current_tuple(top_batch);
Assert(!TupIsNull(top_tuple));
return top_tuple;
}

static void
Expand Down