Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lost concurrent CAgg updates #6443

Merged
merged 1 commit into from Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/fix_6443
@@ -0,0 +1 @@
Fixes: #6443 Fix lost concurrent CAgg updates
25 changes: 18 additions & 7 deletions src/scanner.c
Expand Up @@ -461,15 +461,26 @@ ts_scanner_scan(ScannerCtx *ctx)

for (ts_scanner_start_scan(ctx); (tinfo = ts_scanner_next(ctx));)
{
/* Call tuple_found handler. Abort the scan if the handler wants us to */
if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE)
if (ctx->tuple_found != NULL)
{
if (!(ctx->flags & SCANNER_F_NOEND))
ts_scanner_end_scan(ctx);
ScanTupleResult scan_result = ctx->tuple_found(tinfo, ctx->data);

if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE))
ts_scanner_close(ctx);
break;
/* Call tuple_found handler. Abort the scan if the handler wants us to */
if (scan_result == SCAN_DONE)
{
if (!(ctx->flags & SCANNER_F_NOEND))
ts_scanner_end_scan(ctx);

if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE))
ts_scanner_close(ctx);
break;
}
else if (scan_result == SCAN_RESCAN)
{
ctx->internal.tinfo.count = 0;
ts_scanner_rescan(ctx, NULL);
continue;
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/scanner.h
Expand Up @@ -49,7 +49,8 @@ typedef struct TupleInfo
typedef enum ScanTupleResult
{
SCAN_DONE,
SCAN_CONTINUE
SCAN_CONTINUE,
SCAN_RESCAN
} ScanTupleResult;

typedef enum ScanFilterResult
Expand Down
16 changes: 15 additions & 1 deletion tsl/src/continuous_aggs/invalidation_threshold.c
Expand Up @@ -21,6 +21,7 @@
#include <time_utils.h>
#include <time_bucket.h>

#include "debug_point.h"
#include "ts_catalog/continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "invalidation_threshold.h"
Expand Down Expand Up @@ -74,10 +75,23 @@
static ScanTupleResult
invalidation_threshold_scan_update(TupleInfo *ti, void *const data)
{
DEBUG_WAITPOINT("invalidation_threshold_scan_update_enter");

InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data;

/* If the tuple was modified concurrently, retry the operation */
if (ti->lockresult == TM_Updated)
return SCAN_RESCAN;

if (ti->lockresult != TM_Ok)
return SCAN_CONTINUE;
{
elog(ERROR,
"unable to lock invalidation threshold tuple for hypertable %d (lock result %d)",
invthresh->cagg->data.raw_hypertable_id,
ti->lockresult);

pg_unreachable();

Check warning on line 93 in tsl/src/continuous_aggs/invalidation_threshold.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/invalidation_threshold.c#L93

Added line #L93 was not covered by tests
}

bool isnull;
Datum datum =
Expand Down
38 changes: 38 additions & 0 deletions tsl/test/isolation/expected/cagg_concurrent_invalidation.out
@@ -0,0 +1,38 @@
Parsed test spec with 3 sessions

starting permutation: s3_lock_invalidation s1_run_update s2_run_update s3_release_invalidation s3_check_watermarks
step s3_lock_invalidation:
SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter');

debug_waitpoint_enable
----------------------

(1 row)

step s1_run_update:
CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
<waiting ...>
step s2_run_update:
CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
<waiting ...>
step s3_release_invalidation:
SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter');

debug_waitpoint_release
-----------------------

(1 row)

step s1_run_update: <... completed>
step s2_run_update: <... completed>
step s3_check_watermarks:
SELECT _timescaledb_functions.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_watermark
ORDER BY mat_hypertable_id;

to_timestamp
----------------------------
Wed Jan 01 16:00:00 2020 PST
Wed Jan 01 16:00:00 2020 PST
(2 rows)

11 changes: 8 additions & 3 deletions tsl/test/isolation/specs/CMakeLists.txt
Expand Up @@ -23,9 +23,14 @@ endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG})
list(APPEND TEST_FILES compression_chunk_race.spec compression_freeze.spec
compression_merge_race.spec
decompression_chunk_and_parallel_query_wo_idx.spec)
list(
APPEND
TEST_FILES
cagg_concurrent_invalidation.spec
compression_chunk_race.spec
compression_freeze.spec
compression_merge_race.spec
decompression_chunk_and_parallel_query_wo_idx.spec)
if(PG_VERSION VERSION_GREATER_EQUAL "14.0")
list(APPEND TEST_FILES freeze_chunk.spec compression_dml_iso.spec)
endif()
Expand Down
93 changes: 93 additions & 0 deletions tsl/test/isolation/specs/cagg_concurrent_invalidation.spec
@@ -0,0 +1,93 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.


#
# Test concurrent CAgg refreshes and invalidation threshold updates. This isolation test
# checks that we don't skip CAgg updates when two sessions are trying to modify the
# invalidation threshold at the same time.
#
setup
{
CREATE TABLE temperature (
time timestamptz NOT NULL,
value float
);

SELECT create_hypertable('temperature', 'time');

INSERT INTO temperature
SELECT time, ceil(random() * 100)::int
FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,
'2000-01-01 23:59:59+0','1m') time;

CREATE MATERIALIZED VIEW cagg_1
WITH (timescaledb.continuous) AS
SELECT time_bucket('4 hour', time), avg(value)
FROM temperature
GROUP BY 1 ORDER BY 1
WITH NO DATA;

CREATE MATERIALIZED VIEW cagg_2
WITH (timescaledb.continuous) AS
SELECT time_bucket('4 hour', time), avg(value)
FROM temperature
GROUP BY 1 ORDER BY 1
WITH NO DATA;
}

# Refresh CAGGs in separate transactions
setup
{
CALL refresh_continuous_aggregate('cagg_1', NULL, NULL);
}

setup
{
CALL refresh_continuous_aggregate('cagg_2', NULL, NULL);
}

# Add new data to hypertable. This time in the year 2020 instead of 2000 as we
# did for the setup of the CAgg.
setup
{
INSERT INTO temperature
SELECT time, ceil(random() * 100)::int
FROM generate_series('2020-01-01 0:00:00+0'::timestamptz,
'2020-01-01 23:59:59+0','1m') time;
}

teardown {
DROP TABLE temperature CASCADE;
}

session "S1"
step "s1_run_update" {
CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
}

session "S2"
step "s2_run_update" {
CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
}

session "S3"
step "s3_lock_invalidation" {
SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter');
}

step "s3_release_invalidation" {
SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter');
}

# Check that both CAggs have a watermark in 2020 after the updates are executed.
# mat_hypertable_id is not included in the query to make the test independent of the
# actual hypertable ids.
step "s3_check_watermarks" {
SELECT _timescaledb_functions.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_watermark
ORDER BY mat_hypertable_id;
}

permutation "s3_lock_invalidation" "s1_run_update" "s2_run_update" "s3_release_invalidation" "s3_check_watermarks"("s1_run_update", "s2_run_update")