Skip to content

Commit

Permalink
Don't decompress on compressed INSERT unless necessary
Browse files Browse the repository at this point in the history
Previously for INSERTs into compressed chunks with unique constraints
we would decompress the batch which would contain the tuple matching
according to the constraints. This patch will skip the decompressing
if the batch does not contain an actual matching tuples. This patch
adds the optimization for INSERT with unique constraints.
Similar optimizations for UPDATE and DELETE will be added in followup
patches.
  • Loading branch information
svenklemm committed Jul 3, 2024
1 parent 473f0c9 commit 731c800
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 28 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7075
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7075 Reduce decompression on compressed INSERT
160 changes: 158 additions & 2 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ static int create_segment_filter_scankey(Relation in_rel, char *segment_filter_c
Bitmapset **null_columns, Datum value, bool is_null_check,
bool is_array_op);
static void create_per_compressed_column(RowDecompressor *decompressor);
static ScanKeyData *build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings,
Relation out_rel, Bitmapset *key_columns,
TupleTableSlot *slot, int *num_scankeys);
static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys);

/********************
** compress_chunk **
Expand Down Expand Up @@ -1443,6 +1447,14 @@ build_decompressor(Relation in_rel, Relation out_rel)
return decompressor;
}

void
row_decompressor_reset(RowDecompressor *decompressor)
{
decompressor->unprocessed_tuples = 0;
decompressor->batches_decompressed = 0;
decompressor->tuples_decompressed = 0;
}

void
row_decompressor_close(RowDecompressor *decompressor)
{
Expand Down Expand Up @@ -1580,6 +1592,9 @@ create_per_compressed_column(RowDecompressor *decompressor)
static int
decompress_batch(RowDecompressor *decompressor)
{
if (decompressor->unprocessed_tuples)
return decompressor->unprocessed_tuples;

MemoryContext old_ctx = MemoryContextSwitchTo(decompressor->per_compressed_row_ctx);

/*
Expand Down Expand Up @@ -1705,6 +1720,8 @@ decompress_batch(RowDecompressor *decompressor)
decompressor->batches_decompressed++;
decompressor->tuples_decompressed += n_batch_rows;

decompressor->unprocessed_tuples = n_batch_rows;

return n_batch_rows;
}

Expand Down Expand Up @@ -1769,6 +1786,8 @@ row_decompressor_decompress_row_to_table(RowDecompressor *decompressor)
MemoryContextSwitchTo(old_ctx);
MemoryContextReset(decompressor->per_compressed_row_ctx);

row_decompressor_reset(decompressor);

return n_batch_rows;
}

Expand All @@ -1787,6 +1806,8 @@ row_decompressor_decompress_row_to_tuplesort(RowDecompressor *decompressor,

MemoryContextSwitchTo(old_ctx);
MemoryContextReset(decompressor->per_compressed_row_ctx);

row_decompressor_reset(decompressor);
}

/********************/
Expand Down Expand Up @@ -2453,6 +2474,7 @@ static struct decompress_batches_stats
decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_rel,
Snapshot snapshot, ScanKeyData *index_scankeys, int num_index_scankeys,
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
Bitmapset *null_columns, List *is_nulls)
{
HeapTuple compressed_tuple;
Expand Down Expand Up @@ -2597,8 +2619,8 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
*/
static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, Bitmapset *null_columns,
List *is_nulls)
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, Bitmapset *null_columns, List *is_nulls)
{
RowDecompressor decompressor;
bool decompressor_initialized = false;
Expand Down Expand Up @@ -2662,6 +2684,12 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

if (num_mem_scankeys && !batch_matches(&decompressor, mem_scankeys, num_mem_scankeys))
{
row_decompressor_reset(&decompressor);
continue;
}

write_logical_replication_msg_decompression_start();
result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple);
/* skip reporting error if isolation level is < Repeatable Read
Expand Down Expand Up @@ -2738,6 +2766,15 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
Bitmapset *null_columns = NULL;
struct decompress_batches_stats stats;

/* the scan keys used for in memory tests of the decompressed tuples */
int num_mem_scankeys;
ScanKeyData *mem_scankeys = build_scankeys_for_uncompressed(cis->hypertable_relid,
settings,
out_rel,
key_columns,
slot,
&num_mem_scankeys);

int num_index_scankeys;
Relation index_rel = NULL;
ScanKeyData *index_scankeys = build_index_scankeys_using_slot(cis->hypertable_relid,
Expand Down Expand Up @@ -2781,6 +2818,8 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_index_scankeys,
heap_scankeys,
num_heap_scankeys,
mem_scankeys,
num_mem_scankeys,
NULL, /* no null column check for non-segmentby
columns */
NIL);
Expand All @@ -2803,6 +2842,8 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
GetLatestSnapshot(),
heap_scankeys,
num_heap_scankeys,
mem_scankeys,
num_mem_scankeys,
null_columns,
NIL);
bms_free(key_columns);
Expand Down Expand Up @@ -3414,6 +3455,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_index_scankeys,
scankeys,
num_scankeys,
NULL,
0,
null_columns,
is_null);
/* close the selected index */
Expand All @@ -3426,6 +3469,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
GetTransactionSnapshot(),
scankeys,
num_scankeys,
NULL,
0,
null_columns,
is_null);
}
Expand Down Expand Up @@ -3584,3 +3629,114 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)

return planstate_tree_walker(ps, decompress_chunk_walker, ctx);
}

/*
* Build scankeys for decompressed tuple to check if it is part of the batch.
*
* The key_columns are the columns of the uncompressed chunk.
*/
static ScanKeyData *
build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, Relation out_rel,
Bitmapset *key_columns, TupleTableSlot *slot, int *num_scankeys)
{
ScanKeyData *scankeys = NULL;
int key_index = 0;
TupleDesc out_desc = RelationGetDescr(out_rel);

if (bms_is_empty(key_columns))
{
*num_scankeys = key_index;
return scankeys;
}

scankeys = palloc(sizeof(ScanKeyData) * bms_num_members(key_columns));

int i = -1;
while ((i = bms_next_member(key_columns, i)) > 0)
{
AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber;
bool isnull;

/*
* slot has the physical layout of the hypertable, so we need to
* get the attribute number of the hypertable for the column.
*/
char *attname = get_attname(out_rel->rd_id, attno, false);

/*
* We can skip any segmentby columns here since they have already been
* checked during batch filtering.
*/
if (ts_array_is_member(settings->fd.segmentby, attname))
{
continue;
}

AttrNumber ht_attno = get_attnum(ht_relid, attname);
Datum value = slot_getattr(slot, ht_attno, &isnull);

Oid atttypid = out_desc->attrs[AttrNumberGetAttrOffset(attno)].atttypid;
TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY);

/*
* Should never happen since the column is part of unique constraint
* and should therefore have the required opfamily
*/
if (!OidIsValid(tce->btree_opf))
elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid));

Oid opr = get_opfamily_member(tce->btree_opf, atttypid, atttypid, BTEqualStrategyNumber);

/*
* Fall back to btree operator input type when it is binary compatible with
* the column type and no operator for column type could be found.
*/
if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype))
{
opr = get_opfamily_member(tce->btree_opf,
tce->btree_opintype,
tce->btree_opintype,
BTEqualStrategyNumber);
}

if (!OidIsValid(opr))
elog(ERROR, "no operator found for type \"%s\"", format_type_be(atttypid));

ScanKeyEntryInitialize(&scankeys[key_index++],
isnull ? SK_ISNULL | SK_SEARCHNULL : 0,
attno,
BTEqualStrategyNumber,
InvalidOid,
out_desc->attrs[AttrNumberGetAttrOffset(attno)].attcollation,
get_opcode(opr),
isnull ? 0 : value);
}

*num_scankeys = key_index;
return scankeys;
}

static bool
batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys)
{
int num_tuples = decompress_batch(decompressor);

bool valid = false;

for (int row = 0; row < num_tuples; row++)
{
TupleTableSlot *decompressed_slot = decompressor->decompressed_slots[row];
HeapTuple tuple = decompressed_slot->tts_ops->get_heap_tuple(decompressed_slot);
#if PG16_LT
HeapKeyTest(tuple, decompressor->out_desc, num_scankeys, scankeys, valid);
#else
valid = HeapKeyTest(tuple, decompressor->out_desc, num_scankeys, scankeys);
#endif
if (valid)
{
return true;
}
}

return false;
}
2 changes: 2 additions & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ typedef struct RowDecompressor
int64 tuples_decompressed;

TupleTableSlot **decompressed_slots;
int unprocessed_tuples;

Detoaster detoaster;
} RowDecompressor;
Expand Down Expand Up @@ -368,6 +369,7 @@ extern void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_nu

extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel);

extern void row_decompressor_reset(RowDecompressor *decompressor);
extern void row_decompressor_close(RowDecompressor *decompressor);
extern enum CompressionAlgorithms compress_get_default_algorithm(Oid typeoid);
/*
Expand Down
1 change: 0 additions & 1 deletion tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -2844,7 +2844,6 @@ SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('compressed
SET timescaledb.max_tuples_decompressed_per_dml_transaction = 1;
\set ON_ERROR_STOP 0
COPY compressed_table (time,a,b,c) FROM stdin;
ERROR: tuple decompression limit exceeded by operation
\set ON_ERROR_STOP 1
RESET timescaledb.max_tuples_decompressed_per_dml_transaction;
-- Test decompression with DML which compares int8 to int4
Expand Down
2 changes: 1 addition & 1 deletion tsl/test/expected/compression_conflicts.out
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ BEGIN;
SELECT count(*) FROM ONLY :CHUNK;
count
-------
1001
1
(1 row)

ROLLBACK;
Expand Down
Loading

0 comments on commit 731c800

Please sign in to comment.