Skip to content

Commit

Permalink
Fix lost concurrent CAgg updates
Browse files Browse the repository at this point in the history
When two CAggs on the same hypertable are refreshed at the same type, we
had a race condition on the invalidation threshold table occur.

So far, the table has been locked with a non-self-conflicting lock.
Therefore, both scanners ran at the same time, but only one was able to
lock the threshold value with a proper tuple lock. The other scanner
ignored this failing lock and just returned. Therefore, the field
computed_invalidation_threshold was never populated and still contains
0.

So, invalidation_threshold_set_or_get returns and refresh end interval
of 0. As a consequence, the `if (refresh_window.start >=
refresh_window.end)` branch in continuous_agg_refresh_internal could be
taken and we return from the refresh without doing any work.

This patch adds proper error reporting and also changes the table lock
to a self-conflicting lock to avoid these problems.
  • Loading branch information
jnidzwetzki committed Dec 19, 2023
1 parent a1f7d35 commit 21dc711
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 5 deletions.
1 change: 1 addition & 0 deletions .unreleased/fix_6443
@@ -0,0 +1 @@
Fixes: #6443 Fix lost concurrent CAgg updates
18 changes: 16 additions & 2 deletions tsl/src/continuous_aggs/invalidation_threshold.c
Expand Up @@ -21,6 +21,7 @@
#include <time_utils.h>
#include <time_bucket.h>

#include "debug_point.h"
#include "ts_catalog/continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "invalidation_threshold.h"
Expand Down Expand Up @@ -74,10 +75,23 @@ typedef struct InvalidationThresholdData
static ScanTupleResult
invalidation_threshold_scan_update(TupleInfo *ti, void *const data)
{
DEBUG_WAITPOINT("invalidation_threshold_scan_update_enter");

InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data;

if (ti->lockresult != TM_Ok)
return SCAN_CONTINUE;
{
/* Since we are locking the table with a self-conflicting ShareUpdateExclusiveLock lock
* during the RUM cycle, we should not run into this during normal operations. However,
* it we are not able to get the tuple lock, we should log this problem as an ERROR to
* get re-executed by the user or the retry-policy of the BGW.
*/
elog(ERROR,
"unable to lock invalidation threshold tuple for hypertable %d",
invthresh->cagg->data.raw_hypertable_id);

pg_unreachable();

Check warning on line 93 in tsl/src/continuous_aggs/invalidation_threshold.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/invalidation_threshold.c#L93

Added line #L93 was not covered by tests
}

bool isnull;
Datum datum =
Expand Down Expand Up @@ -169,7 +183,7 @@ invalidation_threshold_set_or_get(const ContinuousAgg *cagg,
.data = &updatectx,
.limit = 1,
.tuple_found = invalidation_threshold_scan_update,
.lockmode = RowExclusiveLock,
.lockmode = ShareUpdateExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock,
Expand Down
38 changes: 38 additions & 0 deletions tsl/test/isolation/expected/cagg_concurrent_invalidation.out
@@ -0,0 +1,38 @@
Parsed test spec with 3 sessions

starting permutation: s3_lock_invalidation s1_run_update s2_run_update s3_release_invalidation s3_check_watermarks
step s3_lock_invalidation:
SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter');

debug_waitpoint_enable
----------------------

(1 row)

step s1_run_update:
CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
<waiting ...>
step s2_run_update:
CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
<waiting ...>
step s3_release_invalidation:
SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter');

debug_waitpoint_release
-----------------------

(1 row)

step s1_run_update: <... completed>
step s2_run_update: <... completed>
step s3_check_watermarks:
SELECT _timescaledb_functions.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_watermark
ORDER BY mat_hypertable_id;

to_timestamp
----------------------------
Wed Jan 01 16:00:00 2020 PST
Wed Jan 01 16:00:00 2020 PST
(2 rows)

11 changes: 8 additions & 3 deletions tsl/test/isolation/specs/CMakeLists.txt
Expand Up @@ -23,9 +23,14 @@ endif()

if(CMAKE_BUILD_TYPE MATCHES Debug)
list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG})
list(APPEND TEST_FILES compression_chunk_race.spec compression_freeze.spec
compression_merge_race.spec
decompression_chunk_and_parallel_query_wo_idx.spec)
list(
APPEND
TEST_FILES
cagg_concurrent_invalidation.spec
compression_chunk_race.spec
compression_freeze.spec
compression_merge_race.spec
decompression_chunk_and_parallel_query_wo_idx.spec)
if(PG_VERSION VERSION_GREATER_EQUAL "14.0")
list(APPEND TEST_FILES freeze_chunk.spec compression_dml_iso.spec)
endif()
Expand Down
93 changes: 93 additions & 0 deletions tsl/test/isolation/specs/cagg_concurrent_invalidation.spec
@@ -0,0 +1,93 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.


#
# Test concurrent CAgg refreshes and invalidation threshold updates. This isolation test
# checks that we don't skip CAgg updates when two sessions are trying to modify the
# invalidation threshold at the same time.
#
setup
{
CREATE TABLE temperature (
time timestamptz NOT NULL,
value float
);

SELECT create_hypertable('temperature', 'time');

INSERT INTO temperature
SELECT time, ceil(random() * 100)::int
FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,
'2000-01-01 23:59:59+0','1m') time;

CREATE MATERIALIZED VIEW cagg_1
WITH (timescaledb.continuous) AS
SELECT time_bucket('4 hour', time), avg(value)
FROM temperature
GROUP BY 1 ORDER BY 1
WITH NO DATA;

CREATE MATERIALIZED VIEW cagg_2
WITH (timescaledb.continuous) AS
SELECT time_bucket('4 hour', time), avg(value)
FROM temperature
GROUP BY 1 ORDER BY 1
WITH NO DATA;
}

# Refresh CAGGs in separate transactions
setup
{
CALL refresh_continuous_aggregate('cagg_1', NULL, NULL);
}

setup
{
CALL refresh_continuous_aggregate('cagg_2', NULL, NULL);
}

# Add new data to hypertable. This time in the year 2020 instead of 2000 as we
# did for the setup of the CAgg.
setup
{
INSERT INTO temperature
SELECT time, ceil(random() * 100)::int
FROM generate_series('2020-01-01 0:00:00+0'::timestamptz,
'2020-01-01 23:59:59+0','1m') time;
}

teardown {
DROP TABLE temperature CASCADE;
}

session "S1"
step "s1_run_update" {
CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
}

session "S2"
step "s2_run_update" {
CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
}

session "S3"
step "s3_lock_invalidation" {
SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter');
}

step "s3_release_invalidation" {
SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter');
}

# Check that both CAggs have a watermark in 2020 after the updates are executed.
# mat_hypertable_id is not included in the query to make the test independent of the
# actual hypertable ids.
step "s3_check_watermarks" {
SELECT _timescaledb_functions.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_watermark
ORDER BY mat_hypertable_id;
}

permutation "s3_lock_invalidation" "s1_run_update" "s2_run_update" "s3_release_invalidation" "s3_check_watermarks"("s1_run_update", "s2_run_update")

0 comments on commit 21dc711

Please sign in to comment.