diff --git a/.unreleased/fix_6858 b/.unreleased/fix_6858 new file mode 100644 index 00000000000..df8794453d0 --- /dev/null +++ b/.unreleased/fix_6858 @@ -0,0 +1,2 @@ +Fixes: #6858 Before update trigger not working correctly +Thanks: @edgarzamora for reporting issue with update triggers diff --git a/src/nodes/hypertable_modify.c b/src/nodes/hypertable_modify.c index 7247b226034..d286a0797e7 100644 --- a/src/nodes/hypertable_modify.c +++ b/src/nodes/hypertable_modify.c @@ -706,18 +706,22 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate) */ if ((operation == CMD_DELETE || operation == CMD_UPDATE) && !ht_state->comp_chunks_processed) { - if (ts_cm_functions->decompress_target_segments) + /* Modify snapshot only if something got decompressed */ + if (ts_cm_functions->decompress_target_segments && + ts_cm_functions->decompress_target_segments(ht_state)) { - ts_cm_functions->decompress_target_segments(ht_state); ht_state->comp_chunks_processed = true; /* * save snapshot set during ExecutorStart(), since this is the same * snapshot used to SeqScan of uncompressed chunks */ ht_state->snapshot = estate->es_snapshot; - /* use current transaction snapshot */ - estate->es_snapshot = GetTransactionSnapshot(); + CommandCounterIncrement(); + /* use a static copy of current transaction snapshot + * this needs to be a copy so we don't read trigger updates + */ + estate->es_snapshot = RegisterSnapshot(GetTransactionSnapshot()); /* mark rows visible */ estate->es_output_cid = GetCurrentCommandId(true); @@ -994,6 +998,7 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate) if (ht_state->comp_chunks_processed) { + UnregisterSnapshot(estate->es_snapshot); estate->es_snapshot = ht_state->snapshot; ht_state->comp_chunks_processed = false; } diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 0fdd6226902..271d2bdf396 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -2879,8 +2879,11 @@ report_error(TM_Result result) * 4.insert decompressed rows to uncompressed chunk * * Return value: - * if all 4 steps defined above pass set chunk_status_changed to true and return true - * if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on + * return true if any tuples are decompressed or decompression of the same data happened + * in a concurrent operation. This is important for snapshot management in order to + * see the uncompressed data in this transaction. + * if all 4 steps defined above pass set chunk_status_changed to true + * Step 3 will fail if there are conflicting concurrent operations on * same chunk. */ static bool @@ -2891,6 +2894,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL); TableScanDesc scan = table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys); + bool data_decompressed = false; int num_scanned_rows = 0; int num_filtered_rows = 0; @@ -2946,6 +2950,16 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num &tmfd, false); + /* skip reporting error if isolation level is < Repeatable Read + * since somebody decompressed the data concurrently, we need to take + * that data into account as well when in Read Committed level + */ + if (result == TM_Deleted && !IsolationUsesXactSnapshot()) + { + data_decompressed = true; + continue; + } + if (result != TM_Ok) { table_endscan(scan); @@ -2953,6 +2967,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num } row_decompressor_decompress_row_to_table(decompressor); *chunk_status_changed = true; + data_decompressed = true; } if (scankeys) pfree(scankeys); @@ -2968,7 +2983,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num num_filtered_rows); } - return true; + return data_decompressed; } /* @@ -3037,6 +3052,7 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel Snapshot snapshot = GetTransactionSnapshot(); int num_segmentby_filtered_rows = 0; int num_heap_filtered_rows = 0; + bool data_decompressed = false; IndexScanDesc scan = index_beginscan(decompressor->in_rel, index_rel, snapshot, num_index_scankeys, 0); @@ -3123,9 +3139,15 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel &tmfd, false); - /* skip reporting error if isolation level is < Repeatable Read */ + /* skip reporting error if isolation level is < Repeatable Read + * since somebody decompressed the data concurrently, we need to take + * that data into account as well when in Read Committed level + */ if (result == TM_Deleted && !IsolationUsesXactSnapshot()) + { + data_decompressed = true; continue; + } if (result != TM_Ok) { @@ -3135,6 +3157,7 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel } row_decompressor_decompress_row_to_table(decompressor); *chunk_status_changed = true; + data_decompressed = true; } if (ts_guc_debug_compression_path_info) @@ -3148,8 +3171,8 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel ExecDropSingleTupleTableSlot(slot); index_endscan(scan); - CommandCounterIncrement(); - return true; + + return data_decompressed; } /* @@ -3159,8 +3182,10 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel * 2. Build scan keys for SEGMENT BY columns. * 3. Move scanned rows to staging area. * 4. Update catalog table to change status of moved chunk. + * + * Returns true if it decompresses any data. */ -static void +static bool decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk, List *predicates, EState *estate) { @@ -3178,6 +3203,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu BatchFilter *filter; bool chunk_status_changed = false; + bool data_decompressed = false; ScanKeyData *scankeys = NULL; Bitmapset *null_columns = NULL; int num_scankeys = 0; @@ -3208,26 +3234,26 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu { index_scankeys = build_index_scankeys(matching_index_rel, index_filters, &num_index_scankeys); - decompress_batches_using_index(&decompressor, - matching_index_rel, - index_scankeys, - num_index_scankeys, - scankeys, - num_scankeys, - null_columns, - is_null, - &chunk_status_changed); + data_decompressed = decompress_batches_using_index(&decompressor, + matching_index_rel, + index_scankeys, + num_index_scankeys, + scankeys, + num_scankeys, + null_columns, + is_null, + &chunk_status_changed); /* close the selected index */ index_close(matching_index_rel, AccessShareLock); } else { - decompress_batches(&decompressor, - scankeys, - num_scankeys, - null_columns, - is_null, - &chunk_status_changed); + data_decompressed = decompress_batches(&decompressor, + scankeys, + num_scankeys, + null_columns, + is_null, + &chunk_status_changed); } write_logical_replication_msg_decompression_end(); @@ -3255,6 +3281,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu } ht_state->batches_decompressed += decompressor.batches_decompressed; ht_state->tuples_decompressed += decompressor.tuples_decompressed; + + return data_decompressed; } /* @@ -3267,6 +3295,8 @@ struct decompress_chunk_context { List *relids; HypertableModifyState *ht_state; + /* indicates decompression actually occurred */ + bool batches_decompressed; }; static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx); @@ -3283,7 +3313,8 @@ decompress_target_segments(HypertableModifyState *ht_state) }; Assert(ctx.relids); - return decompress_chunk_walker(&ps->ps, &ctx); + decompress_chunk_walker(&ps->ps, &ctx); + return ctx.batches_decompressed; } static bool @@ -3292,6 +3323,7 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx) RangeTblEntry *rte = NULL; bool needs_decompression = false; bool should_rescan = false; + bool batches_decompressed = false; List *predicates = NIL; Chunk *current_chunk; if (ps == NULL) @@ -3349,10 +3381,11 @@ decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx) errmsg("UPDATE/DELETE is disabled on compressed chunks"), errhint("Set timescaledb.enable_dml_decompression to TRUE."))); - decompress_batches_for_update_delete(ctx->ht_state, - current_chunk, - predicates, - ps->state); + batches_decompressed = decompress_batches_for_update_delete(ctx->ht_state, + current_chunk, + predicates, + ps->state); + ctx->batches_decompressed |= batches_decompressed; /* This is a workaround specifically for bitmap heap scans: * during node initialization, initialize the scan state with the active snapshot diff --git a/tsl/test/expected/compression_update_delete.out b/tsl/test/expected/compression_update_delete.out index 2c33af9ca88..a8aae54fac4 100644 --- a/tsl/test/expected/compression_update_delete.out +++ b/tsl/test/expected/compression_update_delete.out @@ -3242,3 +3242,69 @@ BEGIN; :EXPLAIN DELETE FROM test_pushdown WHERE device IN ('a',current_query()); Rows Removed by Filter: 2 (10 rows) +-- github issue #6858 +-- check update triggers work correctly both on uncompressed and compressed chunks +CREATE TABLE update_trigger_test ( + "entity_id" "uuid" NOT NULL, + "effective_date_time" timestamp with time zone NOT NULL, + "measurement" numeric NOT NULL, + "modified_at" timestamp with time zone DEFAULT "now"() NOT NULL +); +SELECT create_hypertable('update_trigger_test', 'effective_date_time'); + create_hypertable +----------------------------------- + (41,public,update_trigger_test,t) +(1 row) + +CREATE OR REPLACE FUNCTION update_modified_at_test() +RETURNS TRIGGER +LANGUAGE PLPGSQL AS $$ +BEGIN + NEW.modified_at = NOW(); + RETURN NEW; +END; $$; +CREATE TRIGGER update_trigger_test__before_update_sync_modified_at +BEFORE UPDATE ON update_trigger_test +FOR EACH ROW +EXECUTE PROCEDURE update_modified_at_test(); +INSERT INTO update_trigger_test +SELECT 'f2ca7073-1395-5770-8378-7d0339804580', '2024-04-16 04:50:00+02', +1100.00, '2024-04-23 11:56:38.494095+02' FROM generate_series(1,2500,1) c; +VACUUM FULL update_trigger_test; +BEGIN; +UPDATE update_trigger_test SET measurement = measurement + 2 +WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC'; +ROLLBACK; +-- try with default compression +ALTER TABLE update_trigger_test SET (timescaledb.compress); +WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes +NOTICE: default segment by for hypertable "update_trigger_test" is set to "" +NOTICE: default order by for hypertable "update_trigger_test" is set to "effective_date_time DESC" +SELECT compress_chunk(show_chunks('update_trigger_test')); + compress_chunk +------------------------------------------ + _timescaledb_internal._hyper_41_81_chunk +(1 row) + +BEGIN; +UPDATE update_trigger_test SET measurement = measurement + 2 +WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC'; +ROLLBACK; +-- lets try with segmentby +SELECT decompress_chunk(show_chunks('update_trigger_test')); + decompress_chunk +------------------------------------------ + _timescaledb_internal._hyper_41_81_chunk +(1 row) + +ALTER TABLE update_trigger_test SET (timescaledb.compress, timescaledb.compress_segmentby='entity_id'); +SELECT compress_chunk(show_chunks('update_trigger_test')); + compress_chunk +------------------------------------------ + _timescaledb_internal._hyper_41_81_chunk +(1 row) + +BEGIN; +UPDATE update_trigger_test SET measurement = measurement + 2 +WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC'; +ROLLBACK; diff --git a/tsl/test/isolation/expected/compression_dml_iso.out b/tsl/test/isolation/expected/compression_dml_iso.out index 4313b22e5f4..030c9525aa8 100644 --- a/tsl/test/isolation/expected/compression_dml_iso.out +++ b/tsl/test/isolation/expected/compression_dml_iso.out @@ -153,6 +153,164 @@ time|device|location|value (0 rows) +starting permutation: NOS CA1 CAc SH I1 Ic SH UPD1 UPDc SH DEL1 DELc SH UPD1 UPDc SH +step NOS: + ALTER TABLE ts_device_table set(timescaledb.compress, timescaledb.compress_orderby='time'); + +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step I1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 100, 100); +step Ic: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + + +starting permutation: NOS IN1 INc CA1 CAc SH SS DEL1 UPD1 DELc UPDc SH SS +step NOS: + ALTER TABLE ts_device_table set(timescaledb.compress, timescaledb.compress_orderby='time'); + +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELc: COMMIT; +step UPD1: <... completed> +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: NOS IN1 INc CA1 CAc SH SS UPD1 DEL1 UPDc DELc SH SS +step NOS: + ALTER TABLE ts_device_table set(timescaledb.compress, timescaledb.compress_orderby='time'); + +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; +step UPDc: COMMIT; +step DEL1: <... completed> +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + starting permutation: IN1 INc CA1 CAc SH SS DEL1 UPDrr DELc UPDc SH SS step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); step INc: COMMIT; diff --git a/tsl/test/isolation/specs/compression_dml_iso.spec b/tsl/test/isolation/specs/compression_dml_iso.spec index 0c7184b2df0..49a4acc1ff1 100644 --- a/tsl/test/isolation/specs/compression_dml_iso.spec +++ b/tsl/test/isolation/specs/compression_dml_iso.spec @@ -65,6 +65,10 @@ step "CA1" { FROM show_chunks('ts_device_table') AS ch ORDER BY ch::text; } +step "NOS" +{ + ALTER TABLE ts_device_table set(timescaledb.compress, timescaledb.compress_orderby='time'); +} step "CAc" { COMMIT; } # Test concurrent update/delete operations @@ -72,6 +76,11 @@ permutation "CA1" "CAc" "SH" "I1" "Ic" "SH" "UPD1" "UPDc" "SH" "DEL1" "DELc" "SH permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPD1" "DELc" "UPDc" "SH" "SS" permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DEL1" "UPDc" "DELc" "SH" "SS" +# Test same operations with no segmentby columns (no index scanning) +permutation "NOS" "CA1" "CAc" "SH" "I1" "Ic" "SH" "UPD1" "UPDc" "SH" "DEL1" "DELc" "SH" "UPD1" "UPDc" "SH" +permutation "NOS" "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPD1" "DELc" "UPDc" "SH" "SS" +permutation "NOS" "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DEL1" "UPDc" "DELc" "SH" "SS" + #Test interaction with upper isolation levels permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPDrr" "DELc" "UPDc" "SH" "SS" permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DELrr" "UPDc" "DELc" "SH" "SS" diff --git a/tsl/test/sql/compression_update_delete.sql b/tsl/test/sql/compression_update_delete.sql index 2ce2caa6be9..25cc67b7904 100644 --- a/tsl/test/sql/compression_update_delete.sql +++ b/tsl/test/sql/compression_update_delete.sql @@ -1544,3 +1544,56 @@ BEGIN; :EXPLAIN DELETE FROM test_pushdown WHERE time IN ('2020-01-01','2020-01-0 BEGIN; :EXPLAIN DELETE FROM test_pushdown WHERE device = current_query(); ROLLBACK; BEGIN; :EXPLAIN DELETE FROM test_pushdown WHERE device IN ('a',current_query()); ROLLBACK; +-- github issue #6858 +-- check update triggers work correctly both on uncompressed and compressed chunks +CREATE TABLE update_trigger_test ( + "entity_id" "uuid" NOT NULL, + "effective_date_time" timestamp with time zone NOT NULL, + "measurement" numeric NOT NULL, + "modified_at" timestamp with time zone DEFAULT "now"() NOT NULL +); + +SELECT create_hypertable('update_trigger_test', 'effective_date_time'); + +CREATE OR REPLACE FUNCTION update_modified_at_test() +RETURNS TRIGGER +LANGUAGE PLPGSQL AS $$ +BEGIN + NEW.modified_at = NOW(); + RETURN NEW; +END; $$; + +CREATE TRIGGER update_trigger_test__before_update_sync_modified_at +BEFORE UPDATE ON update_trigger_test +FOR EACH ROW +EXECUTE PROCEDURE update_modified_at_test(); + +INSERT INTO update_trigger_test +SELECT 'f2ca7073-1395-5770-8378-7d0339804580', '2024-04-16 04:50:00+02', +1100.00, '2024-04-23 11:56:38.494095+02' FROM generate_series(1,2500,1) c; + +VACUUM FULL update_trigger_test; + +BEGIN; +UPDATE update_trigger_test SET measurement = measurement + 2 +WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC'; +ROLLBACK; + +-- try with default compression +ALTER TABLE update_trigger_test SET (timescaledb.compress); +SELECT compress_chunk(show_chunks('update_trigger_test')); + +BEGIN; +UPDATE update_trigger_test SET measurement = measurement + 2 +WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC'; +ROLLBACK; + +-- lets try with segmentby +SELECT decompress_chunk(show_chunks('update_trigger_test')); +ALTER TABLE update_trigger_test SET (timescaledb.compress, timescaledb.compress_segmentby='entity_id'); +SELECT compress_chunk(show_chunks('update_trigger_test')); + +BEGIN; +UPDATE update_trigger_test SET measurement = measurement + 2 +WHERE update_trigger_test.effective_date_time >= '2020-01-01T00:00:00'::timestamp AT TIME ZONE 'UTC'; +ROLLBACK;