Skip to content

Commit

Permalink
Fix update triggers issue caused by decompression
Browse files Browse the repository at this point in the history
Update triggers were broken due to snapshot manipulation
and usage of dynamic snapshots for scanning tuples. Using
a static snapshot guarantees we cannot see tuples that would
be invisible during an update trigger.
  • Loading branch information
antekresic committed May 27, 2024
1 parent 180c7be commit 156e1cc
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 31 deletions.
2 changes: 2 additions & 0 deletions .unreleased/fix_6858
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes: #6858 Before update trigger not working correctly
Thanks: @edgarzamora for reporting issue with update triggers
13 changes: 9 additions & 4 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -706,18 +706,22 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
*/
if ((operation == CMD_DELETE || operation == CMD_UPDATE) && !ht_state->comp_chunks_processed)
{
if (ts_cm_functions->decompress_target_segments)
/* Modify snapshot only if something got decompressed */
if (ts_cm_functions->decompress_target_segments &&
ts_cm_functions->decompress_target_segments(ht_state))
{
ts_cm_functions->decompress_target_segments(ht_state);
ht_state->comp_chunks_processed = true;
/*
* save snapshot set during ExecutorStart(), since this is the same
* snapshot used to SeqScan of uncompressed chunks
*/
ht_state->snapshot = estate->es_snapshot;
/* use current transaction snapshot */
estate->es_snapshot = GetTransactionSnapshot();

CommandCounterIncrement();
/* use a static copy of current transaction snapshot
* this needs to be a copy so we don't read trigger updates
*/
estate->es_snapshot = RegisterSnapshot(GetTransactionSnapshot());
/* mark rows visible */
estate->es_output_cid = GetCurrentCommandId(true);

Expand Down Expand Up @@ -994,6 +998,7 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)

if (ht_state->comp_chunks_processed)
{
UnregisterSnapshot(estate->es_snapshot);
estate->es_snapshot = ht_state->snapshot;
ht_state->comp_chunks_processed = false;
}
Expand Down
87 changes: 60 additions & 27 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -2879,8 +2879,11 @@ report_error(TM_Result result)
* 4.insert decompressed rows to uncompressed chunk
*
* Return value:
* if all 4 steps defined above pass set chunk_status_changed to true and return true
* if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on
* return true if any tuples are decompressed or decompression of the same data happened
* in a concurrent operation. This is important for snapshot management in order to
* see the uncompressed data in this transaction.
* if all 4 steps defined above pass set chunk_status_changed to true
* Step 3 will fail if there are conflicting concurrent operations on
* same chunk.
*/
static bool
Expand All @@ -2891,6 +2894,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num

TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL);
TableScanDesc scan = table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys);
bool data_decompressed = false;
int num_scanned_rows = 0;
int num_filtered_rows = 0;

Expand Down Expand Up @@ -2946,13 +2950,24 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
&tmfd,
false);

/* skip reporting error if isolation level is < Repeatable Read
* since somebody decompressed the data concurrently, we need to take
* that data into account as well when in Read Committed level
*/
if (result == TM_Deleted && !IsolationUsesXactSnapshot())
{
data_decompressed = true;
continue;
}

if (result != TM_Ok)
{
table_endscan(scan);
report_error(result);
}
row_decompressor_decompress_row_to_table(decompressor);
*chunk_status_changed = true;
data_decompressed = true;
}
if (scankeys)
pfree(scankeys);
Expand All @@ -2968,7 +2983,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
num_filtered_rows);
}

return true;
return data_decompressed;
}

/*
Expand Down Expand Up @@ -3037,6 +3052,7 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
Snapshot snapshot = GetTransactionSnapshot();
int num_segmentby_filtered_rows = 0;
int num_heap_filtered_rows = 0;
bool data_decompressed = false;

IndexScanDesc scan =
index_beginscan(decompressor->in_rel, index_rel, snapshot, num_index_scankeys, 0);
Expand Down Expand Up @@ -3123,9 +3139,15 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
&tmfd,
false);

/* skip reporting error if isolation level is < Repeatable Read */
/* skip reporting error if isolation level is < Repeatable Read
* since somebody decompressed the data concurrently, we need to take
* that data into account as well when in Read Committed level
*/
if (result == TM_Deleted && !IsolationUsesXactSnapshot())
{
data_decompressed = true;
continue;
}

if (result != TM_Ok)
{
Expand All @@ -3135,6 +3157,7 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
}
row_decompressor_decompress_row_to_table(decompressor);
*chunk_status_changed = true;
data_decompressed = true;
}

if (ts_guc_debug_compression_path_info)
Expand All @@ -3148,8 +3171,8 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel

ExecDropSingleTupleTableSlot(slot);
index_endscan(scan);
CommandCounterIncrement();
return true;

return data_decompressed;
}

/*
Expand All @@ -3159,8 +3182,10 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
* 2. Build scan keys for SEGMENT BY columns.
* 3. Move scanned rows to staging area.
* 4. Update catalog table to change status of moved chunk.
*
* Returns true if it decompresses any data.
*/
static void
static bool
decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk,
List *predicates, EState *estate)
{
Expand All @@ -3178,6 +3203,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
BatchFilter *filter;

bool chunk_status_changed = false;
bool data_decompressed = false;
ScanKeyData *scankeys = NULL;
Bitmapset *null_columns = NULL;
int num_scankeys = 0;
Expand Down Expand Up @@ -3208,26 +3234,26 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
{
index_scankeys =
build_index_scankeys(matching_index_rel, index_filters, &num_index_scankeys);
decompress_batches_using_index(&decompressor,
matching_index_rel,
index_scankeys,
num_index_scankeys,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed);
data_decompressed = decompress_batches_using_index(&decompressor,
matching_index_rel,
index_scankeys,
num_index_scankeys,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed);
/* close the selected index */
index_close(matching_index_rel, AccessShareLock);
}
else
{
decompress_batches(&decompressor,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed);
data_decompressed = decompress_batches(&decompressor,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed);
}
write_logical_replication_msg_decompression_end();

Expand Down Expand Up @@ -3255,6 +3281,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
}
ht_state->batches_decompressed += decompressor.batches_decompressed;
ht_state->tuples_decompressed += decompressor.tuples_decompressed;

return data_decompressed;
}

/*
Expand All @@ -3267,6 +3295,8 @@ struct decompress_chunk_context
{
List *relids;
HypertableModifyState *ht_state;
/* indicates decompression actually occurred */
bool batches_decompressed;
};

static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx);
Expand All @@ -3283,7 +3313,8 @@ decompress_target_segments(HypertableModifyState *ht_state)
};
Assert(ctx.relids);

return decompress_chunk_walker(&ps->ps, &ctx);
decompress_chunk_walker(&ps->ps, &ctx);
return ctx.batches_decompressed;
}

static bool
Expand All @@ -3292,6 +3323,7 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)
RangeTblEntry *rte = NULL;
bool needs_decompression = false;
bool should_rescan = false;
bool batches_decompressed = false;
List *predicates = NIL;
Chunk *current_chunk;
if (ps == NULL)
Expand Down Expand Up @@ -3349,10 +3381,11 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)
errmsg("UPDATE/DELETE is disabled on compressed chunks"),
errhint("Set timescaledb.enable_dml_decompression to TRUE.")));

decompress_batches_for_update_delete(ctx->ht_state,
current_chunk,
predicates,
ps->state);
batches_decompressed = decompress_batches_for_update_delete(ctx->ht_state,
current_chunk,
predicates,
ps->state);
ctx->batches_decompressed |= batches_decompressed;

/* This is a workaround specifically for bitmap heap scans:
* during node initialization, initialize the scan state with the active snapshot
Expand Down
66 changes: 66 additions & 0 deletions tsl/test/expected/compression_update_delete.out
Original file line number Diff line number Diff line change
Expand Up @@ -3242,3 +3242,69 @@ BEGIN; :EXPLAIN DELETE FROM test_pushdown WHERE device IN ('a',current_query());
Rows Removed by Filter: 2
(10 rows)

-- github issue #6858
-- check update triggers work correctly both on uncompressed and compressed chunks
CREATE TABLE update_trigger_test (
"entity_id" "uuid" NOT NULL,
"effective_date_time" timestamp with time zone NOT NULL,
"measurement" numeric NOT NULL,
"modified_at" timestamp with time zone DEFAULT "now"() NOT NULL
);
SELECT create_hypertable('update_trigger_test', 'effective_date_time');
create_hypertable
-----------------------------------
(41,public,update_trigger_test,t)
(1 row)

CREATE OR REPLACE FUNCTION update_modified_at_test()
RETURNS TRIGGER
LANGUAGE PLPGSQL AS $$
BEGIN
NEW.modified_at = NOW();
RETURN NEW;
END; $$;
CREATE TRIGGER update_trigger_test__before_update_sync_modified_at
BEFORE UPDATE ON update_trigger_test
FOR EACH ROW
EXECUTE PROCEDURE update_modified_at_test();
INSERT INTO update_trigger_test
SELECT 'f2ca7073-1395-5770-8378-7d0339804580', '2024-04-16 04:50:00+02',
1100.00, '2024-04-23 11:56:38.494095+02' FROM generate_series(1,2500,1) c;
VACUUM FULL update_trigger_test;
BEGIN;
UPDATE update_trigger_test SET measurement = measurement + 2
WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC';
ROLLBACK;
-- try with default compression
ALTER TABLE update_trigger_test SET (timescaledb.compress);
WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes
NOTICE: default segment by for hypertable "update_trigger_test" is set to ""
NOTICE: default order by for hypertable "update_trigger_test" is set to "effective_date_time DESC"
SELECT compress_chunk(show_chunks('update_trigger_test'));
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_41_81_chunk
(1 row)

BEGIN;
UPDATE update_trigger_test SET measurement = measurement + 2
WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC';
ROLLBACK;
-- lets try with segmentby
SELECT decompress_chunk(show_chunks('update_trigger_test'));
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_41_81_chunk
(1 row)

ALTER TABLE update_trigger_test SET (timescaledb.compress, timescaledb.compress_segmentby='entity_id');
SELECT compress_chunk(show_chunks('update_trigger_test'));
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_41_81_chunk
(1 row)

BEGIN;
UPDATE update_trigger_test SET measurement = measurement + 2
WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC';
ROLLBACK;
Loading

0 comments on commit 156e1cc

Please sign in to comment.