From ac97c564e2b96d7dffda66801f1b497f51c45cbb Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Tue, 19 Dec 2023 13:59:33 +0100 Subject: [PATCH] Fix lost concurrent CAgg updates When two CAggs on the same hypertable are refreshed at the same type, we had a race condition on the invalidation threshold table occur. So far, the table has been locked with a non-self-conflicting lock. Therefore, both scanners ran at the same time, but only one was able to lock the threshold value with a proper tuple lock. The other scanner ignored this failing lock and just returned. Therefore, the field computed_invalidation_threshold was never populated and still contains 0. So, invalidation_threshold_set_or_get returns and refresh end interval of 0. As a consequence, the `if (refresh_window.start >= refresh_window.end)` branch in continuous_agg_refresh_internal could be taken and we return from the refresh without doing any work. This patch adds proper error reporting and also implements some retry logic to avoid these problems. A self-conficting lock is not used due to the problems discussed in #5809. --- .unreleased/fix_6443 | 1 + src/scanner.c | 25 +++-- src/scanner.h | 3 +- .../continuous_aggs/invalidation_threshold.c | 16 +++- .../expected/cagg_concurrent_invalidation.out | 38 ++++++++ tsl/test/isolation/specs/CMakeLists.txt | 11 ++- .../specs/cagg_concurrent_invalidation.spec | 93 +++++++++++++++++++ 7 files changed, 175 insertions(+), 12 deletions(-) create mode 100644 .unreleased/fix_6443 create mode 100644 tsl/test/isolation/expected/cagg_concurrent_invalidation.out create mode 100644 tsl/test/isolation/specs/cagg_concurrent_invalidation.spec diff --git a/.unreleased/fix_6443 b/.unreleased/fix_6443 new file mode 100644 index 00000000000..16953c141d9 --- /dev/null +++ b/.unreleased/fix_6443 @@ -0,0 +1 @@ +Fixes: #6443 Fix lost concurrent CAgg updates diff --git a/src/scanner.c b/src/scanner.c index ccb1f6aba5e..86252ad3c99 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -461,15 +461,26 @@ ts_scanner_scan(ScannerCtx *ctx) for (ts_scanner_start_scan(ctx); (tinfo = ts_scanner_next(ctx));) { - /* Call tuple_found handler. Abort the scan if the handler wants us to */ - if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE) + if (ctx->tuple_found != NULL) { - if (!(ctx->flags & SCANNER_F_NOEND)) - ts_scanner_end_scan(ctx); + ScanTupleResult scan_result = ctx->tuple_found(tinfo, ctx->data); - if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE)) - ts_scanner_close(ctx); - break; + /* Call tuple_found handler. Abort the scan if the handler wants us to */ + if (scan_result == SCAN_DONE) + { + if (!(ctx->flags & SCANNER_F_NOEND)) + ts_scanner_end_scan(ctx); + + if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE)) + ts_scanner_close(ctx); + break; + } + else if (scan_result == SCAN_RESCAN) + { + ctx->internal.tinfo.count = 0; + ts_scanner_rescan(ctx, NULL); + continue; + } } } diff --git a/src/scanner.h b/src/scanner.h index c8314988b72..a7eed007361 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -49,7 +49,8 @@ typedef struct TupleInfo typedef enum ScanTupleResult { SCAN_DONE, - SCAN_CONTINUE + SCAN_CONTINUE, + SCAN_RESCAN } ScanTupleResult; typedef enum ScanFilterResult diff --git a/tsl/src/continuous_aggs/invalidation_threshold.c b/tsl/src/continuous_aggs/invalidation_threshold.c index d61b227a00b..414d9b44c33 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.c +++ b/tsl/src/continuous_aggs/invalidation_threshold.c @@ -21,6 +21,7 @@ #include #include +#include "debug_point.h" #include "ts_catalog/continuous_agg.h" #include "continuous_aggs/materialize.h" #include "invalidation_threshold.h" @@ -74,10 +75,23 @@ typedef struct InvalidationThresholdData static ScanTupleResult invalidation_threshold_scan_update(TupleInfo *ti, void *const data) { + DEBUG_WAITPOINT("invalidation_threshold_scan_update_enter"); + InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data; + /* If the tuple was modified concurrently, retry the operation */ + if (ti->lockresult == TM_Updated) + return SCAN_RESCAN; + if (ti->lockresult != TM_Ok) - return SCAN_CONTINUE; + { + elog(ERROR, + "unable to lock invalidation threshold tuple for hypertable %d (lock result %d)", + invthresh->cagg->data.raw_hypertable_id, + ti->lockresult); + + pg_unreachable(); + } bool isnull; Datum datum = diff --git a/tsl/test/isolation/expected/cagg_concurrent_invalidation.out b/tsl/test/isolation/expected/cagg_concurrent_invalidation.out new file mode 100644 index 00000000000..efccf2ebcd1 --- /dev/null +++ b/tsl/test/isolation/expected/cagg_concurrent_invalidation.out @@ -0,0 +1,38 @@ +Parsed test spec with 3 sessions + +starting permutation: s3_lock_invalidation s1_run_update s2_run_update s3_release_invalidation s3_check_watermarks +step s3_lock_invalidation: + SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter'); + +debug_waitpoint_enable +---------------------- + +(1 row) + +step s1_run_update: + CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00'); + +step s2_run_update: + CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00'); + +step s3_release_invalidation: + SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter'); + +debug_waitpoint_release +----------------------- + +(1 row) + +step s1_run_update: <... completed> +step s2_run_update: <... completed> +step s3_check_watermarks: + SELECT _timescaledb_functions.to_timestamp(watermark) + FROM _timescaledb_catalog.continuous_aggs_watermark + ORDER BY mat_hypertable_id; + +to_timestamp +---------------------------- +Wed Jan 01 16:00:00 2020 PST +Wed Jan 01 16:00:00 2020 PST +(2 rows) + diff --git a/tsl/test/isolation/specs/CMakeLists.txt b/tsl/test/isolation/specs/CMakeLists.txt index 19213938314..643e5d3057f 100644 --- a/tsl/test/isolation/specs/CMakeLists.txt +++ b/tsl/test/isolation/specs/CMakeLists.txt @@ -23,9 +23,14 @@ endif() if(CMAKE_BUILD_TYPE MATCHES Debug) list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG}) - list(APPEND TEST_FILES compression_chunk_race.spec compression_freeze.spec - compression_merge_race.spec - decompression_chunk_and_parallel_query_wo_idx.spec) + list( + APPEND + TEST_FILES + cagg_concurrent_invalidation.spec + compression_chunk_race.spec + compression_freeze.spec + compression_merge_race.spec + decompression_chunk_and_parallel_query_wo_idx.spec) if(PG_VERSION VERSION_GREATER_EQUAL "14.0") list(APPEND TEST_FILES freeze_chunk.spec compression_dml_iso.spec) endif() diff --git a/tsl/test/isolation/specs/cagg_concurrent_invalidation.spec b/tsl/test/isolation/specs/cagg_concurrent_invalidation.spec new file mode 100644 index 00000000000..5861ef9f410 --- /dev/null +++ b/tsl/test/isolation/specs/cagg_concurrent_invalidation.spec @@ -0,0 +1,93 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + + +# +# Test concurrent CAgg refreshes and invalidation threshold updates. This isolation test +# checks that we don't skip CAgg updates when two sessions are trying to modify the +# invalidation threshold at the same time. +# +setup +{ + CREATE TABLE temperature ( + time timestamptz NOT NULL, + value float + ); + + SELECT create_hypertable('temperature', 'time'); + + INSERT INTO temperature + SELECT time, ceil(random() * 100)::int + FROM generate_series('2000-01-01 0:00:00+0'::timestamptz, + '2000-01-01 23:59:59+0','1m') time; + + CREATE MATERIALIZED VIEW cagg_1 + WITH (timescaledb.continuous) AS + SELECT time_bucket('4 hour', time), avg(value) + FROM temperature + GROUP BY 1 ORDER BY 1 + WITH NO DATA; + + CREATE MATERIALIZED VIEW cagg_2 + WITH (timescaledb.continuous) AS + SELECT time_bucket('4 hour', time), avg(value) + FROM temperature + GROUP BY 1 ORDER BY 1 + WITH NO DATA; +} + +# Refresh CAGGs in separate transactions +setup +{ + CALL refresh_continuous_aggregate('cagg_1', NULL, NULL); +} + +setup +{ + CALL refresh_continuous_aggregate('cagg_2', NULL, NULL); +} + +# Add new data to hypertable. This time in the year 2020 instead of 2000 as we +# did for the setup of the CAgg. +setup +{ + INSERT INTO temperature + SELECT time, ceil(random() * 100)::int + FROM generate_series('2020-01-01 0:00:00+0'::timestamptz, + '2020-01-01 23:59:59+0','1m') time; +} + +teardown { + DROP TABLE temperature CASCADE; +} + +session "S1" +step "s1_run_update" { + CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00'); +} + +session "S2" +step "s2_run_update" { + CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00'); +} + +session "S3" +step "s3_lock_invalidation" { + SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter'); +} + +step "s3_release_invalidation" { + SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter'); +} + +# Check that both CAggs have a watermark in 2020 after the updates are executed. +# mat_hypertable_id is not included in the query to make the test independent of the +# actual hypertable ids. +step "s3_check_watermarks" { + SELECT _timescaledb_functions.to_timestamp(watermark) + FROM _timescaledb_catalog.continuous_aggs_watermark + ORDER BY mat_hypertable_id; +} + +permutation "s3_lock_invalidation" "s1_run_update" "s2_run_update" "s3_release_invalidation" "s3_check_watermarks"("s1_run_update", "s2_run_update")