From cd3e862b0e47fe80e3f219e3a0d5bc1f7817f941 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 4 Apr 2024 21:41:54 +0200 Subject: [PATCH 1/8] Don't copy compressed slot to compressed batch struct There is overhead associated with copying the heap tuple and (un)pinning the respective heap buffers, which becomes apparent in vectorized aggregation. --- .../nodes/decompress_chunk/compressed_batch.c | 149 +++++++++++------- .../nodes/decompress_chunk/compressed_batch.h | 6 - .../decompress_chunk/decompress_context.h | 15 -- 3 files changed, 89 insertions(+), 81 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 0eb9be2e035..63f778d8336 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -156,7 +156,8 @@ get_max_text_datum_size(ArrowArray *text_array) } static void -decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i) +decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i, + TupleTableSlot *compressed_slot) { CompressionColumnDescription *column_description = &dcontext->template_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; @@ -168,9 +169,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state Assert(value_bytes != 0); bool isnull; - Datum value = slot_getattr(batch_state->compressed_slot, - column_description->compressed_scan_attno, - &isnull); + Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull); if (isnull) { @@ -330,8 +329,8 @@ translate_bitmap_from_dictionary(const ArrowArray *arrow, const uint64 *dict_res } static void -compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual, - uint64 *restrict result) +compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result) { /* * Some predicates can be evaluated to a Const at run time. @@ -423,7 +422,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat * skip decompressing some columns if the entire batch doesn't pass * the quals. */ - decompress_column(dcontext, batch_state, column_index); + decompress_column(dcontext, batch_state, column_index, compressed_slot); Assert(column_values->decompression_type != DT_Invalid); } @@ -566,16 +565,16 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat } static void compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, - Node *qual, uint64 *restrict result); + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result); static void compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batch_state, - List *quals, uint64 *restrict result) + TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result) { ListCell *lc; foreach (lc, quals) { - compute_one_qual(dcontext, batch_state, lfirst(lc), result); + compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), result); if (get_vector_qual_summary(result, batch_state->total_batch_rows) == NoRowsPass) { /* @@ -589,7 +588,7 @@ compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batc static void compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batch_state, - List *quals, uint64 *restrict result) + TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result) { const size_t n_rows = batch_state->total_batch_rows; const size_t n_result_words = (n_rows + 63) / 64; @@ -608,7 +607,7 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc { one_qual_result[i] = (uint64) -1; } - compute_one_qual(dcontext, batch_state, lfirst(lc), one_qual_result); + compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), one_qual_result); for (size_t i = 0; i < n_result_words; i++) { or_result[i] |= one_qual_result[i]; @@ -631,19 +630,19 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc } static void -compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual, - uint64 *restrict result) +compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result) { if (!IsA(qual, BoolExpr)) { - compute_plain_qual(dcontext, batch_state, qual, result); + compute_plain_qual(dcontext, batch_state, compressed_slot, qual, result); return; } BoolExpr *boolexpr = castNode(BoolExpr, qual); if (boolexpr->boolop == AND_EXPR) { - compute_qual_conjunction(dcontext, batch_state, boolexpr->args, result); + compute_qual_conjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result); return; } @@ -652,7 +651,7 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, * NOT and consider it non-vectorizable at planning time. So only OR is left. */ Ensure(boolexpr->boolop == OR_EXPR, "expected OR"); - compute_qual_disjunction(dcontext, batch_state, boolexpr->args, result); + compute_qual_disjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result); } /* @@ -661,7 +660,8 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, * optimizations. */ static VectorQualSummary -compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state) +compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot) { /* * Allocate the bitmap that will hold the vectorized qual results. We will @@ -688,6 +688,7 @@ compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_st */ compute_qual_conjunction(dcontext, batch_state, + compressed_slot, dcontext->vectorized_quals_constified, batch_state->vector_qual_result); @@ -709,7 +710,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) if (batch_state->per_batch_context != NULL) { - ExecClearTuple(batch_state->compressed_slot); ExecClearTuple(&batch_state->decompressed_scan_slot_data.base); MemoryContextReset(batch_state->per_batch_context); } @@ -720,7 +720,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) */ Assert(IsA(&batch_state->decompressed_scan_slot_data, Invalid)); Assert(batch_state->decompressed_scan_slot_data.base.tts_ops == NULL); - Assert(batch_state->compressed_slot == NULL); } } @@ -730,24 +729,12 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) * relatively expensive. */ static void -compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state, - TupleTableSlot *compressed_slot) +compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state) { /* Init memory context */ batch_state->per_batch_context = create_per_batch_mctx(dcontext); Assert(batch_state->per_batch_context != NULL); - Assert(batch_state->compressed_slot == NULL); - - /* Create a non ref-counted copy of the compressed tuple descriptor */ - if (dcontext->compressed_slot_tdesc == NULL) - dcontext->compressed_slot_tdesc = - CreateTupleDescCopyConstr(compressed_slot->tts_tupleDescriptor); - Assert(dcontext->compressed_slot_tdesc->tdrefcount == -1); - - batch_state->compressed_slot = - MakeSingleTupleTableSlot(dcontext->compressed_slot_tdesc, compressed_slot->tts_ops); - /* Get a reference to the output TupleTableSlot */ TupleTableSlot *decompressed_slot = dcontext->decompressed_slot; @@ -771,11 +758,19 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba slot->tts_mcxt = CurrentMemoryContext; slot->tts_nvalid = 0; - slot->tts_values = palloc(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) + - MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool))); + slot->tts_values = palloc0(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) + + MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool))); slot->tts_isnull = (bool *) ((char *) slot->tts_values) + MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)); + /* + * Have to initially set nulls to true, because this is the uncompressed chunk + * tuple, and some of its columns might be not even decompressed. The tuple + * slot functions will get confused by them, because they expect a non-null + * value for attributes not marked as null. + */ + memset(slot->tts_isnull, true, slot->tts_tupleDescriptor->natts * sizeof(bool)); + /* * DecompressChunk produces virtual tuple slots. */ @@ -788,7 +783,8 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba */ void compressed_batch_set_compressed_tuple(DecompressContext *dcontext, - DecompressBatchState *batch_state, TupleTableSlot *subslot) + DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot) { Assert(TupIsNull(compressed_batch_current_tuple(batch_state))); @@ -798,23 +794,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, */ if (batch_state->per_batch_context == NULL) { - compressed_batch_lazy_init(dcontext, batch_state, subslot); - } - else - { - Assert(batch_state->compressed_slot != NULL); + compressed_batch_lazy_init(dcontext, batch_state); } - - /* Ensure that all fields are empty. Calling ExecClearTuple is not enough - * because some attributes might not be populated (e.g., due to a dropped - * column) and these attributes need to be set to null. */ TupleTableSlot *decompressed_tuple = compressed_batch_current_tuple(batch_state); Assert(decompressed_tuple != NULL); - ExecStoreAllNullTuple(decompressed_tuple); - ExecClearTuple(decompressed_tuple); - - ExecCopySlot(batch_state->compressed_slot, subslot); - Assert(!TupIsNull(batch_state->compressed_slot)); batch_state->total_batch_rows = 0; batch_state->next_batch_row = 0; @@ -849,15 +832,33 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, */ AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno); decompressed_tuple->tts_values[attr] = - slot_getattr(batch_state->compressed_slot, + slot_getattr(compressed_slot, column_description->compressed_scan_attno, &decompressed_tuple->tts_isnull[attr]); + + // fprintf(stderr, "segmentby column [%d]: value %p, null %d\n", + // attr, (void*) decompressed_tuple->tts_values[attr], + // decompressed_tuple->tts_isnull[attr]); + + /* + * Note that if it's not a by-value type, we should copy it into + * the slot context. + */ + if (!get_typbyval(column_description->typid) && + DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) + { + MemoryContext old = MemoryContextSwitchTo(decompressed_tuple->tts_mcxt); + decompressed_tuple->tts_values[attr] = PointerGetDatum(pg_detoast_datum_copy( + (struct varlena *) decompressed_tuple->tts_values[attr])); + MemoryContextSwitchTo(old); + } + break; } case COUNT_COLUMN: { bool isnull; - Datum value = slot_getattr(batch_state->compressed_slot, + Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull); /* count column should never be NULL */ @@ -885,9 +886,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, } } - VectorQualSummary vector_qual_summary = dcontext->vectorized_quals_constified != NIL ? - compute_vector_quals(dcontext, batch_state) : - AllRowsPass; + VectorQualSummary vector_qual_summary = + dcontext->vectorized_quals_constified != NIL ? + compute_vector_quals(dcontext, batch_state, compressed_slot) : + AllRowsPass; if (vector_qual_summary == NoRowsPass && !dcontext->batch_sorted_merge) { /* @@ -917,7 +919,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Invalid) { - decompress_column(dcontext, batch_state, i); + decompress_column(dcontext, batch_state, i, compressed_slot); Assert(column_values->decompression_type != DT_Invalid); } } @@ -931,6 +933,9 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, batch_state->vector_qual_result = NULL; } } + + /* FIXME this is for testing, better done by the caller. */ + ExecClearTuple(compressed_slot); } static void @@ -960,6 +965,8 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com Assert(batch_state->total_batch_rows > 0); Assert(batch_state->next_batch_row < batch_state->total_batch_rows); + // fprintf(stderr, "make next tuple [%d]\n", batch_state->next_batch_row); + for (int i = 0; i < num_compressed_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; @@ -975,6 +982,10 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com *column_values->output_isnull = result.is_null; *column_values->output_value = result.val; + + // fprintf(stderr, "iterator column #%d: value %p, null %d\n", + // i, (void*) *column_values->output_value, + // *column_values->output_isnull); } else if (column_values->decompression_type > SIZEOF_DATUM) { @@ -989,6 +1000,10 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com *column_values->output_value = PointerGetDatum(&src[value_bytes * arrow_row]); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); + + // fprintf(stderr, "by-ref column #%d: value %p, null %d\n", + // i, (void*) *column_values->output_value, + // *column_values->output_isnull); } else if (column_values->decompression_type > 0) { @@ -1006,12 +1021,20 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com memcpy(column_values->output_value, &src[value_bytes * arrow_row], SIZEOF_DATUM); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); + + // fprintf(stderr, "by-val column #%d: value %p, null %d\n", + // i, (void*) *column_values->output_value, + // *column_values->output_isnull); } else if (column_values->decompression_type == DT_ArrowText) { store_text_datum(column_values, arrow_row); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); + + // fprintf(stderr, "arrow text column #%d: value %p, null %d\n", + // i, (void*) *column_values->output_value, + // *column_values->output_isnull); } else if (column_values->decompression_type == DT_ArrowTextDict) { @@ -1019,11 +1042,19 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com store_text_datum(column_values, index); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); + + // fprintf(stderr, "arrow text dict column #%d: value %p, null %d\n", + // i, (void*) *column_values->output_value, + // *column_values->output_isnull); } else { /* A compressed column with default value, do nothing. */ Assert(column_values->decompression_type == DT_Default); + + // fprintf(stderr, "default column #%d: value %p, null %d\n", + // i, (void*) *column_values->output_value, + // *column_values->output_isnull); } } @@ -1225,16 +1256,14 @@ compressed_batch_destroy(DecompressBatchState *batch_state) batch_state->per_batch_context = NULL; } - if (batch_state->compressed_slot != NULL) + if (batch_state->decompressed_scan_slot_data.base.tts_values != NULL) { /* * Can be separately NULL in the current simplified prototype for * vectorized aggregation, but ideally it should change together with * per-batch context. */ - ExecDropSingleTupleTableSlot(batch_state->compressed_slot); - batch_state->compressed_slot = NULL; - pfree(batch_state->decompressed_scan_slot_data.base.tts_values); + batch_state->decompressed_scan_slot_data.base.tts_values = NULL; } } diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.h b/tsl/src/nodes/decompress_chunk/compressed_batch.h index bbde12a7119..96bd5721d24 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.h +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.h @@ -82,12 +82,6 @@ typedef struct DecompressBatchState */ VirtualTupleTableSlot decompressed_scan_slot_data; - /* - * Compressed target slot. We have to keep a local copy when doing batch - * sorted merge, because the segmentby column values might reference the - * original tuple, and a batch outlives its source tuple. - */ - TupleTableSlot *compressed_slot; uint16 total_batch_rows; uint16 next_batch_row; MemoryContext per_batch_context; diff --git a/tsl/src/nodes/decompress_chunk/decompress_context.h b/tsl/src/nodes/decompress_chunk/decompress_context.h index 7670163b0e2..7d16e5ac069 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_context.h +++ b/tsl/src/nodes/decompress_chunk/decompress_context.h @@ -63,21 +63,6 @@ typedef struct DecompressContext TupleTableSlot *decompressed_slot; - /* - * Make non-refcounted copies of the tupdesc for reuse across all batch states - * and avoid spending CPU in ResourceOwner when creating a big number of table - * slots. This happens because each new slot pins its tuple descriptor using - * PinTupleDesc, and for reference-counting tuples this involves adding a new - * reference to ResourceOwner, which is not very efficient for a large number of - * references. - * - * We don't have to do this for the decompressed slot tuple descriptor, - * because there we use custom tuple slot (de)initialization functions, which - * don't use reference counting and just use a raw pointer to the tuple - * descriptor. - */ - TupleDesc compressed_slot_tdesc; - PlanState *ps; /* Set for filtering and instrumentation */ Detoaster detoaster; From 6693d9ff58ce4d1d8b8b6a5349bc4e516c9c2fd2 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 4 Apr 2024 21:52:36 +0200 Subject: [PATCH 2/8] Always copy out when detoasting as well --- tsl/src/compression/compression.c | 8 ++++---- .../nodes/decompress_chunk/compressed_batch.c | 15 +++++++------- tsl/src/nodes/decompress_chunk/detoaster.c | 20 ++++++++++++++----- tsl/src/nodes/decompress_chunk/detoaster.h | 4 ++-- tsl/src/nodes/decompress_chunk/exec.c | 9 ++++----- 5 files changed, 33 insertions(+), 23 deletions(-) diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 86e09ac00fb..920bbbb276b 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -1652,10 +1652,10 @@ decompress_batch(RowDecompressor *decompressor) /* Normal compressed column. */ Datum compressed_datum = PointerGetDatum( - detoaster_detoast_attr((struct varlena *) DatumGetPointer( - decompressor->compressed_datums[input_column]), - &decompressor->detoaster, - CurrentMemoryContext)); + detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer( + decompressor->compressed_datums[input_column]), + &decompressor->detoaster, + CurrentMemoryContext)); CompressedDataHeader *header = get_compressed_data_header(compressed_datum); column_info->iterator = definitions[header->compression_algorithm] diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 63f778d8336..ecff996dc93 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -187,9 +187,9 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state } /* Detoast the compressed datum. */ - value = PointerGetDatum(detoaster_detoast_attr((struct varlena *) DatumGetPointer(value), - &dcontext->detoaster, - batch_state->per_batch_context)); + value = PointerGetDatum(detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value), + &dcontext->detoaster, + batch_state->per_batch_context)); /* Decompress the entire batch if it is supported. */ CompressedDataHeader *header = (CompressedDataHeader *) value; @@ -847,10 +847,11 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, if (!get_typbyval(column_description->typid) && DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) { - MemoryContext old = MemoryContextSwitchTo(decompressed_tuple->tts_mcxt); - decompressed_tuple->tts_values[attr] = PointerGetDatum(pg_detoast_datum_copy( - (struct varlena *) decompressed_tuple->tts_values[attr])); - MemoryContextSwitchTo(old); + decompressed_tuple->tts_values[attr] = PointerGetDatum( + detoaster_detoast_attr_copy((struct varlena *) + decompressed_tuple->tts_values[attr], + &dcontext->detoaster, + batch_state->per_batch_context)); } break; diff --git a/tsl/src/nodes/decompress_chunk/detoaster.c b/tsl/src/nodes/decompress_chunk/detoaster.c index b04d3369937..981998ce362 100644 --- a/tsl/src/nodes/decompress_chunk/detoaster.c +++ b/tsl/src/nodes/decompress_chunk/detoaster.c @@ -217,7 +217,7 @@ ts_fetch_toast(Detoaster *detoaster, struct varatt_external *toast_pointer, stru /* * The memory context is used to store intermediate data, and is supposed to - * live over the calls to detoaster_detoast_attr(). + * live over the calls to detoaster_detoast_attr_copy(). * That function itself can be called in a short-lived memory context. */ void @@ -338,15 +338,25 @@ ts_toast_decompress_datum(struct varlena *attr) /* * Modification of Postgres' detoast_attr() where we use the stateful Detoaster - * and skip some cases that don't occur for the toasted compressed data. + * and skip some cases that don't occur for the toasted compressed data. Even if + * the data is inline and no detoasting is needed, copies it into the destination + * memory context. */ struct varlena * -detoaster_detoast_attr(struct varlena *attr, Detoaster *detoaster, MemoryContext dest_mctx) +detoaster_detoast_attr_copy(struct varlena *attr, Detoaster *detoaster, MemoryContext dest_mctx) { if (!VARATT_IS_EXTENDED(attr)) { - /* Nothing to do here. */ - return attr; + /* + * This case is unlikely because the compressed data is almost always + * toasted and not inline, but we still have to copy the data into the + * destination memory context. The source compressed tuple may have + * independent unknown lifetime. + */ + Size len = VARSIZE(attr); + struct varlena *result = (struct varlena *) MemoryContextAlloc(dest_mctx, len); + memcpy(result, attr, len); + return result; } if (VARATT_IS_EXTERNAL_ONDISK(attr)) diff --git a/tsl/src/nodes/decompress_chunk/detoaster.h b/tsl/src/nodes/decompress_chunk/detoaster.h index b27ff6ffa5f..90c78d54590 100644 --- a/tsl/src/nodes/decompress_chunk/detoaster.h +++ b/tsl/src/nodes/decompress_chunk/detoaster.h @@ -27,5 +27,5 @@ typedef struct Detoaster void detoaster_init(Detoaster *detoaster, MemoryContext mctx); void detoaster_close(Detoaster *detoaster); -struct varlena *detoaster_detoast_attr(struct varlena *attr, Detoaster *detoaster, - MemoryContext dest_mctx); +struct varlena *detoaster_detoast_attr_copy(struct varlena *attr, Detoaster *detoaster, + MemoryContext dest_mctx); diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 238b52f927b..09ffa2ce492 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -543,11 +543,10 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) /* We have at least one value */ decompressed_scan_slot->tts_isnull[0] = false; - CompressedDataHeader *header = - (CompressedDataHeader *) detoaster_detoast_attr((struct varlena *) DatumGetPointer( - value), - &dcontext->detoaster, - CurrentMemoryContext); + CompressedDataHeader *header = (CompressedDataHeader *) + detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value), + &dcontext->detoaster, + CurrentMemoryContext); ArrowArray *arrow = NULL; From 991711a1121b23a10113abb353fc52170cf404ca Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 4 Apr 2024 22:22:51 +0200 Subject: [PATCH 3/8] cache the byvalue flag --- tsl/src/nodes/decompress_chunk/compressed_batch.c | 5 +---- tsl/src/nodes/decompress_chunk/decompress_context.h | 3 ++- tsl/src/nodes/decompress_chunk/exec.c | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index ecff996dc93..5271ff3ceb4 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -844,7 +844,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, * Note that if it's not a by-value type, we should copy it into * the slot context. */ - if (!get_typbyval(column_description->typid) && + if (!column_description->by_value && DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) { decompressed_tuple->tts_values[attr] = PointerGetDatum( @@ -934,9 +934,6 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, batch_state->vector_qual_result = NULL; } } - - /* FIXME this is for testing, better done by the caller. */ - ExecClearTuple(compressed_slot); } static void diff --git a/tsl/src/nodes/decompress_chunk/decompress_context.h b/tsl/src/nodes/decompress_chunk/decompress_context.h index 7d16e5ac069..0d084ace1ff 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_context.h +++ b/tsl/src/nodes/decompress_chunk/decompress_context.h @@ -27,7 +27,8 @@ typedef struct CompressionColumnDescription { CompressionColumnType type; Oid typid; - int value_bytes; + int16 value_bytes; + bool by_value; /* * Attno of the decompressed column in the output of DecompressChunk node. diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 09ffa2ce492..e9bebc5bb06 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -305,7 +305,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno)); column.typid = attribute->atttypid; - column.value_bytes = get_typlen(column.typid); + get_typlenbyval(column.typid, &column.value_bytes, &column.by_value); } if (list_nth_int(chunk_state->is_segmentby_column, compressed_index)) From e34ad75933c58ac59e8b8f554a633c607df043d2 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 4 Apr 2024 22:50:19 +0200 Subject: [PATCH 4/8] fixed-length by-reference types --- .../nodes/decompress_chunk/compressed_batch.c | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 5271ff3ceb4..2e83bf8b3b1 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -847,13 +847,26 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, if (!column_description->by_value && DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) { - decompressed_tuple->tts_values[attr] = PointerGetDatum( - detoaster_detoast_attr_copy((struct varlena *) - decompressed_tuple->tts_values[attr], - &dcontext->detoaster, - batch_state->per_batch_context)); + if (column_description->value_bytes < 0) + { + /* This is a varlena type. */ + decompressed_tuple->tts_values[attr] = PointerGetDatum( + detoaster_detoast_attr_copy((struct varlena *) + decompressed_tuple->tts_values[attr], + &dcontext->detoaster, + batch_state->per_batch_context)); + } + else + { + /* This is a fixed-length by-reference type. */ + void *tmp = MemoryContextAlloc(batch_state->per_batch_context, + column_description->value_bytes); + memcpy(tmp, + DatumGetPointer(decompressed_tuple->tts_values[attr]), + column_description->value_bytes); + decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp); + } } - break; } case COUNT_COLUMN: From fe855596feedf0cbcfdef8f08b05ce3d84d8e479 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 4 Apr 2024 22:58:06 +0200 Subject: [PATCH 5/8] benchmark no compressed copy (2024-04-04 no. 5) From 119220cf0aae5e4d59b8b0ea35c78678fdbe3374 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 4 Apr 2024 23:00:00 +0200 Subject: [PATCH 6/8] cleanup --- tsl/src/nodes/decompress_chunk/compressed_batch.c | 8 ++++---- tsl/src/nodes/decompress_chunk/compressed_batch.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 2e83bf8b3b1..91cb4ce429e 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -156,8 +156,8 @@ get_max_text_datum_size(ArrowArray *text_array) } static void -decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i, - TupleTableSlot *compressed_slot) +decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, int i) { CompressionColumnDescription *column_description = &dcontext->template_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; @@ -422,7 +422,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat * skip decompressing some columns if the entire batch doesn't pass * the quals. */ - decompress_column(dcontext, batch_state, column_index, compressed_slot); + decompress_column(dcontext, batch_state, compressed_slot, column_index); Assert(column_values->decompression_type != DT_Invalid); } @@ -933,7 +933,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Invalid) { - decompress_column(dcontext, batch_state, i, compressed_slot); + decompress_column(dcontext, batch_state, compressed_slot, i); Assert(column_values->decompression_type != DT_Invalid); } } diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.h b/tsl/src/nodes/decompress_chunk/compressed_batch.h index 96bd5721d24..486f3e9c637 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.h +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.h @@ -98,7 +98,7 @@ typedef struct DecompressBatchState extern void compressed_batch_set_compressed_tuple(DecompressContext *dcontext, DecompressBatchState *batch_state, - TupleTableSlot *subslot); + TupleTableSlot *compressed_slot); extern void compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batch_state); From 16e539024c37f59d3d637b2a0c4bb47e8c74ba18 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:45:15 +0200 Subject: [PATCH 7/8] boop the CI From d4716c0074f558aa866881d5591424feee74f050 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 9 Apr 2024 13:03:18 +0200 Subject: [PATCH 8/8] remove debug prints --- .../nodes/decompress_chunk/compressed_batch.c | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 91cb4ce429e..c25c156a1fd 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -836,10 +836,6 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, column_description->compressed_scan_attno, &decompressed_tuple->tts_isnull[attr]); - // fprintf(stderr, "segmentby column [%d]: value %p, null %d\n", - // attr, (void*) decompressed_tuple->tts_values[attr], - // decompressed_tuple->tts_isnull[attr]); - /* * Note that if it's not a by-value type, we should copy it into * the slot context. @@ -976,8 +972,6 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com Assert(batch_state->total_batch_rows > 0); Assert(batch_state->next_batch_row < batch_state->total_batch_rows); - // fprintf(stderr, "make next tuple [%d]\n", batch_state->next_batch_row); - for (int i = 0; i < num_compressed_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; @@ -993,10 +987,6 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com *column_values->output_isnull = result.is_null; *column_values->output_value = result.val; - - // fprintf(stderr, "iterator column #%d: value %p, null %d\n", - // i, (void*) *column_values->output_value, - // *column_values->output_isnull); } else if (column_values->decompression_type > SIZEOF_DATUM) { @@ -1011,10 +1001,6 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com *column_values->output_value = PointerGetDatum(&src[value_bytes * arrow_row]); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); - - // fprintf(stderr, "by-ref column #%d: value %p, null %d\n", - // i, (void*) *column_values->output_value, - // *column_values->output_isnull); } else if (column_values->decompression_type > 0) { @@ -1032,20 +1018,12 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com memcpy(column_values->output_value, &src[value_bytes * arrow_row], SIZEOF_DATUM); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); - - // fprintf(stderr, "by-val column #%d: value %p, null %d\n", - // i, (void*) *column_values->output_value, - // *column_values->output_isnull); } else if (column_values->decompression_type == DT_ArrowText) { store_text_datum(column_values, arrow_row); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); - - // fprintf(stderr, "arrow text column #%d: value %p, null %d\n", - // i, (void*) *column_values->output_value, - // *column_values->output_isnull); } else if (column_values->decompression_type == DT_ArrowTextDict) { @@ -1053,19 +1031,11 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com store_text_datum(column_values, index); *column_values->output_isnull = !arrow_row_is_valid(column_values->buffers[0], arrow_row); - - // fprintf(stderr, "arrow text dict column #%d: value %p, null %d\n", - // i, (void*) *column_values->output_value, - // *column_values->output_isnull); } else { /* A compressed column with default value, do nothing. */ Assert(column_values->decompression_type == DT_Default); - - // fprintf(stderr, "default column #%d: value %p, null %d\n", - // i, (void*) *column_values->output_value, - // *column_values->output_isnull); } }