diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 86e09ac00fb..920bbbb276b 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -1652,10 +1652,10 @@ decompress_batch(RowDecompressor *decompressor) /* Normal compressed column. */ Datum compressed_datum = PointerGetDatum( - detoaster_detoast_attr((struct varlena *) DatumGetPointer( - decompressor->compressed_datums[input_column]), - &decompressor->detoaster, - CurrentMemoryContext)); + detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer( + decompressor->compressed_datums[input_column]), + &decompressor->detoaster, + CurrentMemoryContext)); CompressedDataHeader *header = get_compressed_data_header(compressed_datum); column_info->iterator = definitions[header->compression_algorithm] diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 0eb9be2e035..c25c156a1fd 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -156,7 +156,8 @@ get_max_text_datum_size(ArrowArray *text_array) } static void -decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i) +decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, int i) { CompressionColumnDescription *column_description = &dcontext->template_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; @@ -168,9 +169,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state Assert(value_bytes != 0); bool isnull; - Datum value = slot_getattr(batch_state->compressed_slot, - column_description->compressed_scan_attno, - &isnull); + Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull); if (isnull) { @@ -188,9 +187,9 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state } /* Detoast the compressed datum. */ - value = PointerGetDatum(detoaster_detoast_attr((struct varlena *) DatumGetPointer(value), - &dcontext->detoaster, - batch_state->per_batch_context)); + value = PointerGetDatum(detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value), + &dcontext->detoaster, + batch_state->per_batch_context)); /* Decompress the entire batch if it is supported. */ CompressedDataHeader *header = (CompressedDataHeader *) value; @@ -330,8 +329,8 @@ translate_bitmap_from_dictionary(const ArrowArray *arrow, const uint64 *dict_res } static void -compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual, - uint64 *restrict result) +compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result) { /* * Some predicates can be evaluated to a Const at run time. @@ -423,7 +422,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat * skip decompressing some columns if the entire batch doesn't pass * the quals. */ - decompress_column(dcontext, batch_state, column_index); + decompress_column(dcontext, batch_state, compressed_slot, column_index); Assert(column_values->decompression_type != DT_Invalid); } @@ -566,16 +565,16 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat } static void compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, - Node *qual, uint64 *restrict result); + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result); static void compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batch_state, - List *quals, uint64 *restrict result) + TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result) { ListCell *lc; foreach (lc, quals) { - compute_one_qual(dcontext, batch_state, lfirst(lc), result); + compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), result); if (get_vector_qual_summary(result, batch_state->total_batch_rows) == NoRowsPass) { /* @@ -589,7 +588,7 @@ compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batc static void compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batch_state, - List *quals, uint64 *restrict result) + TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result) { const size_t n_rows = batch_state->total_batch_rows; const size_t n_result_words = (n_rows + 63) / 64; @@ -608,7 +607,7 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc { one_qual_result[i] = (uint64) -1; } - compute_one_qual(dcontext, batch_state, lfirst(lc), one_qual_result); + compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), one_qual_result); for (size_t i = 0; i < n_result_words; i++) { or_result[i] |= one_qual_result[i]; @@ -631,19 +630,19 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc } static void -compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual, - uint64 *restrict result) +compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result) { if (!IsA(qual, BoolExpr)) { - compute_plain_qual(dcontext, batch_state, qual, result); + compute_plain_qual(dcontext, batch_state, compressed_slot, qual, result); return; } BoolExpr *boolexpr = castNode(BoolExpr, qual); if (boolexpr->boolop == AND_EXPR) { - compute_qual_conjunction(dcontext, batch_state, boolexpr->args, result); + compute_qual_conjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result); return; } @@ -652,7 +651,7 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, * NOT and consider it non-vectorizable at planning time. So only OR is left. */ Ensure(boolexpr->boolop == OR_EXPR, "expected OR"); - compute_qual_disjunction(dcontext, batch_state, boolexpr->args, result); + compute_qual_disjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result); } /* @@ -661,7 +660,8 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, * optimizations. */ static VectorQualSummary -compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state) +compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot) { /* * Allocate the bitmap that will hold the vectorized qual results. We will @@ -688,6 +688,7 @@ compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_st */ compute_qual_conjunction(dcontext, batch_state, + compressed_slot, dcontext->vectorized_quals_constified, batch_state->vector_qual_result); @@ -709,7 +710,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) if (batch_state->per_batch_context != NULL) { - ExecClearTuple(batch_state->compressed_slot); ExecClearTuple(&batch_state->decompressed_scan_slot_data.base); MemoryContextReset(batch_state->per_batch_context); } @@ -720,7 +720,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) */ Assert(IsA(&batch_state->decompressed_scan_slot_data, Invalid)); Assert(batch_state->decompressed_scan_slot_data.base.tts_ops == NULL); - Assert(batch_state->compressed_slot == NULL); } } @@ -730,24 +729,12 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) * relatively expensive. */ static void -compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state, - TupleTableSlot *compressed_slot) +compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state) { /* Init memory context */ batch_state->per_batch_context = create_per_batch_mctx(dcontext); Assert(batch_state->per_batch_context != NULL); - Assert(batch_state->compressed_slot == NULL); - - /* Create a non ref-counted copy of the compressed tuple descriptor */ - if (dcontext->compressed_slot_tdesc == NULL) - dcontext->compressed_slot_tdesc = - CreateTupleDescCopyConstr(compressed_slot->tts_tupleDescriptor); - Assert(dcontext->compressed_slot_tdesc->tdrefcount == -1); - - batch_state->compressed_slot = - MakeSingleTupleTableSlot(dcontext->compressed_slot_tdesc, compressed_slot->tts_ops); - /* Get a reference to the output TupleTableSlot */ TupleTableSlot *decompressed_slot = dcontext->decompressed_slot; @@ -771,11 +758,19 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba slot->tts_mcxt = CurrentMemoryContext; slot->tts_nvalid = 0; - slot->tts_values = palloc(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) + - MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool))); + slot->tts_values = palloc0(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) + + MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool))); slot->tts_isnull = (bool *) ((char *) slot->tts_values) + MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)); + /* + * Have to initially set nulls to true, because this is the uncompressed chunk + * tuple, and some of its columns might be not even decompressed. The tuple + * slot functions will get confused by them, because they expect a non-null + * value for attributes not marked as null. + */ + memset(slot->tts_isnull, true, slot->tts_tupleDescriptor->natts * sizeof(bool)); + /* * DecompressChunk produces virtual tuple slots. */ @@ -788,7 +783,8 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba */ void compressed_batch_set_compressed_tuple(DecompressContext *dcontext, - DecompressBatchState *batch_state, TupleTableSlot *subslot) + DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot) { Assert(TupIsNull(compressed_batch_current_tuple(batch_state))); @@ -798,23 +794,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, */ if (batch_state->per_batch_context == NULL) { - compressed_batch_lazy_init(dcontext, batch_state, subslot); - } - else - { - Assert(batch_state->compressed_slot != NULL); + compressed_batch_lazy_init(dcontext, batch_state); } - - /* Ensure that all fields are empty. Calling ExecClearTuple is not enough - * because some attributes might not be populated (e.g., due to a dropped - * column) and these attributes need to be set to null. */ TupleTableSlot *decompressed_tuple = compressed_batch_current_tuple(batch_state); Assert(decompressed_tuple != NULL); - ExecStoreAllNullTuple(decompressed_tuple); - ExecClearTuple(decompressed_tuple); - - ExecCopySlot(batch_state->compressed_slot, subslot); - Assert(!TupIsNull(batch_state->compressed_slot)); batch_state->total_batch_rows = 0; batch_state->next_batch_row = 0; @@ -849,15 +832,43 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, */ AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno); decompressed_tuple->tts_values[attr] = - slot_getattr(batch_state->compressed_slot, + slot_getattr(compressed_slot, column_description->compressed_scan_attno, &decompressed_tuple->tts_isnull[attr]); + + /* + * Note that if it's not a by-value type, we should copy it into + * the slot context. + */ + if (!column_description->by_value && + DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) + { + if (column_description->value_bytes < 0) + { + /* This is a varlena type. */ + decompressed_tuple->tts_values[attr] = PointerGetDatum( + detoaster_detoast_attr_copy((struct varlena *) + decompressed_tuple->tts_values[attr], + &dcontext->detoaster, + batch_state->per_batch_context)); + } + else + { + /* This is a fixed-length by-reference type. */ + void *tmp = MemoryContextAlloc(batch_state->per_batch_context, + column_description->value_bytes); + memcpy(tmp, + DatumGetPointer(decompressed_tuple->tts_values[attr]), + column_description->value_bytes); + decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp); + } + } break; } case COUNT_COLUMN: { bool isnull; - Datum value = slot_getattr(batch_state->compressed_slot, + Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull); /* count column should never be NULL */ @@ -885,9 +896,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, } } - VectorQualSummary vector_qual_summary = dcontext->vectorized_quals_constified != NIL ? - compute_vector_quals(dcontext, batch_state) : - AllRowsPass; + VectorQualSummary vector_qual_summary = + dcontext->vectorized_quals_constified != NIL ? + compute_vector_quals(dcontext, batch_state, compressed_slot) : + AllRowsPass; if (vector_qual_summary == NoRowsPass && !dcontext->batch_sorted_merge) { /* @@ -917,7 +929,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Invalid) { - decompress_column(dcontext, batch_state, i); + decompress_column(dcontext, batch_state, compressed_slot, i); Assert(column_values->decompression_type != DT_Invalid); } } @@ -1225,16 +1237,14 @@ compressed_batch_destroy(DecompressBatchState *batch_state) batch_state->per_batch_context = NULL; } - if (batch_state->compressed_slot != NULL) + if (batch_state->decompressed_scan_slot_data.base.tts_values != NULL) { /* * Can be separately NULL in the current simplified prototype for * vectorized aggregation, but ideally it should change together with * per-batch context. */ - ExecDropSingleTupleTableSlot(batch_state->compressed_slot); - batch_state->compressed_slot = NULL; - pfree(batch_state->decompressed_scan_slot_data.base.tts_values); + batch_state->decompressed_scan_slot_data.base.tts_values = NULL; } } diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.h b/tsl/src/nodes/decompress_chunk/compressed_batch.h index bbde12a7119..486f3e9c637 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.h +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.h @@ -82,12 +82,6 @@ typedef struct DecompressBatchState */ VirtualTupleTableSlot decompressed_scan_slot_data; - /* - * Compressed target slot. We have to keep a local copy when doing batch - * sorted merge, because the segmentby column values might reference the - * original tuple, and a batch outlives its source tuple. - */ - TupleTableSlot *compressed_slot; uint16 total_batch_rows; uint16 next_batch_row; MemoryContext per_batch_context; @@ -104,7 +98,7 @@ typedef struct DecompressBatchState extern void compressed_batch_set_compressed_tuple(DecompressContext *dcontext, DecompressBatchState *batch_state, - TupleTableSlot *subslot); + TupleTableSlot *compressed_slot); extern void compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batch_state); diff --git a/tsl/src/nodes/decompress_chunk/decompress_context.h b/tsl/src/nodes/decompress_chunk/decompress_context.h index 7670163b0e2..0d084ace1ff 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_context.h +++ b/tsl/src/nodes/decompress_chunk/decompress_context.h @@ -27,7 +27,8 @@ typedef struct CompressionColumnDescription { CompressionColumnType type; Oid typid; - int value_bytes; + int16 value_bytes; + bool by_value; /* * Attno of the decompressed column in the output of DecompressChunk node. @@ -63,21 +64,6 @@ typedef struct DecompressContext TupleTableSlot *decompressed_slot; - /* - * Make non-refcounted copies of the tupdesc for reuse across all batch states - * and avoid spending CPU in ResourceOwner when creating a big number of table - * slots. This happens because each new slot pins its tuple descriptor using - * PinTupleDesc, and for reference-counting tuples this involves adding a new - * reference to ResourceOwner, which is not very efficient for a large number of - * references. - * - * We don't have to do this for the decompressed slot tuple descriptor, - * because there we use custom tuple slot (de)initialization functions, which - * don't use reference counting and just use a raw pointer to the tuple - * descriptor. - */ - TupleDesc compressed_slot_tdesc; - PlanState *ps; /* Set for filtering and instrumentation */ Detoaster detoaster; diff --git a/tsl/src/nodes/decompress_chunk/detoaster.c b/tsl/src/nodes/decompress_chunk/detoaster.c index b04d3369937..981998ce362 100644 --- a/tsl/src/nodes/decompress_chunk/detoaster.c +++ b/tsl/src/nodes/decompress_chunk/detoaster.c @@ -217,7 +217,7 @@ ts_fetch_toast(Detoaster *detoaster, struct varatt_external *toast_pointer, stru /* * The memory context is used to store intermediate data, and is supposed to - * live over the calls to detoaster_detoast_attr(). + * live over the calls to detoaster_detoast_attr_copy(). * That function itself can be called in a short-lived memory context. */ void @@ -338,15 +338,25 @@ ts_toast_decompress_datum(struct varlena *attr) /* * Modification of Postgres' detoast_attr() where we use the stateful Detoaster - * and skip some cases that don't occur for the toasted compressed data. + * and skip some cases that don't occur for the toasted compressed data. Even if + * the data is inline and no detoasting is needed, copies it into the destination + * memory context. */ struct varlena * -detoaster_detoast_attr(struct varlena *attr, Detoaster *detoaster, MemoryContext dest_mctx) +detoaster_detoast_attr_copy(struct varlena *attr, Detoaster *detoaster, MemoryContext dest_mctx) { if (!VARATT_IS_EXTENDED(attr)) { - /* Nothing to do here. */ - return attr; + /* + * This case is unlikely because the compressed data is almost always + * toasted and not inline, but we still have to copy the data into the + * destination memory context. The source compressed tuple may have + * independent unknown lifetime. + */ + Size len = VARSIZE(attr); + struct varlena *result = (struct varlena *) MemoryContextAlloc(dest_mctx, len); + memcpy(result, attr, len); + return result; } if (VARATT_IS_EXTERNAL_ONDISK(attr)) diff --git a/tsl/src/nodes/decompress_chunk/detoaster.h b/tsl/src/nodes/decompress_chunk/detoaster.h index b27ff6ffa5f..90c78d54590 100644 --- a/tsl/src/nodes/decompress_chunk/detoaster.h +++ b/tsl/src/nodes/decompress_chunk/detoaster.h @@ -27,5 +27,5 @@ typedef struct Detoaster void detoaster_init(Detoaster *detoaster, MemoryContext mctx); void detoaster_close(Detoaster *detoaster); -struct varlena *detoaster_detoast_attr(struct varlena *attr, Detoaster *detoaster, - MemoryContext dest_mctx); +struct varlena *detoaster_detoast_attr_copy(struct varlena *attr, Detoaster *detoaster, + MemoryContext dest_mctx); diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 238b52f927b..e9bebc5bb06 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -305,7 +305,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno)); column.typid = attribute->atttypid; - column.value_bytes = get_typlen(column.typid); + get_typlenbyval(column.typid, &column.value_bytes, &column.by_value); } if (list_nth_int(chunk_state->is_segmentby_column, compressed_index)) @@ -543,11 +543,10 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) /* We have at least one value */ decompressed_scan_slot->tts_isnull[0] = false; - CompressedDataHeader *header = - (CompressedDataHeader *) detoaster_detoast_attr((struct varlena *) DatumGetPointer( - value), - &dcontext->detoaster, - CurrentMemoryContext); + CompressedDataHeader *header = (CompressedDataHeader *) + detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value), + &dcontext->detoaster, + CurrentMemoryContext); ArrowArray *arrow = NULL;