From bca00aa2716d1199163967e6017c348228d1feda Mon Sep 17 00:00:00 2001 From: gayyappan Date: Wed, 1 Jul 2020 18:10:49 -0400 Subject: [PATCH] Fix concurrent tuple deletes for continuous aggregates When we have multiple continuous aggregates defined on the same hypertable, they could try to delete the hypertable invalidation logs concurrently. Resolve this by serializing invalidation log deletes by raw hypertable id. Fixes #1940 --- tsl/src/continuous_aggs/materialize.c | 82 ++++++++++++++++++- .../expected/continuous_aggs_multi.out | 52 +++++++++++- .../specs/continuous_aggs_multi.spec | 18 ++++ 3 files changed, 150 insertions(+), 2 deletions(-) diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index c28be242578..de4b2474c33 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -78,6 +78,7 @@ static int64 get_materialization_end_point_for_table(int32 raw_hypertable_id, int64 completed_threshold, bool *materializing_new_range, bool *truncated_materialization, bool verbose); +static void lock_invalidation_threshold_hypertable_row(int32 raw_hypertable_id); static void drain_invalidation_log(int32 raw_hypertable_id, List **invalidations_out); static void insert_materialization_invalidation_logs(List *caggs, List *invalidations, Relation rel); @@ -197,8 +198,21 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op catalog = ts_catalog_get(); - /* copy over all the materializations from the raw hypertable to all the continuous aggs */ + /* copy over all the materializations from the raw hypertable to all the continuous aggs + */ caggs = ts_continuous_aggs_find_by_raw_table_id(cagg_data.raw_hypertable_id); + if (list_length(caggs) > 1) + { + /* REFRESH cagg_1; REFRESH cagg_2 can try to read+delete the + * invalidation logs for the same hypertable. + * If both REFRESH jobs try to delete the + * same set of rows, we run into "tuple concurrently deleted error". + * (github issue 1940) + * Prevent this by serializing on the raw hypertable row + */ + + lock_invalidation_threshold_hypertable_row(cagg_data.raw_hypertable_id); + } drain_invalidation_log(cagg_data.raw_hypertable_id, &invalidations); materialization_invalidation_log_table_relation = table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG), @@ -660,6 +674,66 @@ scan_take_invalidation_tuple(TupleInfo *ti, void *data) return SCAN_CONTINUE; } +static ScanTupleResult +invalidation_threshold_htid_found(TupleInfo *tinfo, void *data) +{ + if (tinfo->lockresult != TM_Ok) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("unexpected lockresult %d in invalidation_threshold_htid_found", + tinfo->lockresult))); + } + return SCAN_DONE; +} + +/* lock row corresponding to hypertable id in + * continuous_aggs_invalidation_threshold table in AccessExclusive mode, + * block till lock is acquired. + */ +static void +lock_invalidation_threshold_hypertable_row(int32 raw_hypertable_id) +{ + ScanTupLock scantuplock = { + .waitpolicy = LockWaitBlock, + .lockmode = LockTupleExclusive, + }; + Relation invalidation_threshold_table_relation; + Catalog *catalog = ts_catalog_get(); + ScanKeyData scankey[1]; + + ScanKeyInit(&scankey[0], + Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(raw_hypertable_id)); + + /* lock table in RowExclusive mode and the row with AccessExclusive */ + ScannerCtx scanctx = { .table = catalog_get_table_id(catalog, + CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), + .index = catalog_get_index(catalog, + CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, + CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY), + .nkeys = 1, + .scankey = scankey, + .limit = 1, + .tuple_found = invalidation_threshold_htid_found, + .lockmode = AccessShareLock, + .scandirection = ForwardScanDirection, + .result_mctx = CurrentMemoryContext, + .tuplock = &scantuplock }; + /* Acquire lock explicitly. + * the catalog scanning code , releases it at end of the call + */ + invalidation_threshold_table_relation = + table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), + AccessShareLock); + ts_scanner_scan(&scanctx); + table_close(invalidation_threshold_table_relation, NoLock); /*release lock at end + of transaction */ +} + +/* Read invalidation log table, copy entries and delete them from this table */ static void drain_invalidation_log(int32 raw_hypertable_id, List **invalidations_out) { @@ -793,6 +867,12 @@ materialization_invalidation_log_get_range(int32 materialization_id, Oid type, i return invalidation_range; } +/* We already locked the materialization table in + * ShareRowExclusiveLock mode for the Session in the first txn. + * This implies that 2 concurrent refreshes on the same cont aggregate + * i.e. REFRESH cagg_1; REFRESH cagg_1 + * are serialized. So 2 concurrent refresh on cagg_1 will not get here. + */ static void materialization_invalidation_log_delete_or_cut(int32 cagg_id, InternalTimeRange invalidation_range, int64 completed_threshold) diff --git a/tsl/test/isolation/expected/continuous_aggs_multi.out b/tsl/test/isolation/expected/continuous_aggs_multi.out index 399fe9f68e9..846e8735fa1 100644 --- a/tsl/test/isolation/expected/continuous_aggs_multi.out +++ b/tsl/test/isolation/expected/continuous_aggs_multi.out @@ -1,4 +1,4 @@ -Parsed test spec with 9 sessions +Parsed test spec with 11 sessions starting permutation: Setup2 LockCompleted LockMat1 Refresh1 Refresh2 UnlockCompleted UnlockMat1 step Setup2: @@ -130,3 +130,53 @@ bkt maxl 0 4 40 1000 + +starting permutation: Setup2 Refresh1 Refresh2 Refresh1_sel Refresh2_sel U1 U2 LInvRow Refresh1 Refresh2 UnlockInvRow Refresh1_sel Refresh2_sel +step Setup2: + CREATE VIEW continuous_view_1( bkt, cnt) + WITH ( timescaledb.continuous, timescaledb.refresh_lag = '-5', timescaledb.refresh_interval='72 hours', timescaledb.materialized_only = true) + AS SELECT time_bucket('5', time), COUNT(val) + FROM ts_continuous_test + GROUP BY 1; + CREATE VIEW continuous_view_2(bkt, maxl) + WITH ( timescaledb.continuous,timescaledb.refresh_lag='-10', timescaledb.refresh_interval='72 hours', timescaledb.materialized_only = true) + AS SELECT time_bucket('5', time), max(val) + FROM ts_continuous_test + GROUP BY 1; + CREATE FUNCTION lock_mattable( name text) RETURNS void AS $$ + BEGIN EXECUTE format( 'lock table %s', name); + END; $$ LANGUAGE plpgsql; + +R1: LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 30 +step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; +R2: LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30 +step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; +step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30 +bkt cnt + +0 5 +step Refresh2_sel: select * from continuous_view_2 where bkt = 0 or bkt > 30 order by bkt; +bkt maxl + +0 4 +step U1: update ts_continuous_test SET val = 5555 where time < 10; +step U2: update ts_continuous_test SET val = 5 where time > 15 and time < 25; +step LInvRow: BEGIN; update _timescaledb_catalog.continuous_aggs_invalidation_threshold set watermark = 20 where hypertable_id in ( select raw_hypertable_id from _timescaledb_catalog.continuous_agg where user_view_name like 'continuous_view_1' ); + +step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; +step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; +step UnlockInvRow: ROLLBACK; +R1: LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29 +R1: LOG: materializing continuous aggregate public.continuous_view_1: processing invalidations, no new range +step Refresh1: <... completed> +R2: LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29 +R2: LOG: materializing continuous aggregate public.continuous_view_2: processing invalidations, no new range +step Refresh2: <... completed> +step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30 +bkt cnt + +0 5 +step Refresh2_sel: select * from continuous_view_2 where bkt = 0 or bkt > 30 order by bkt; +bkt maxl + +0 5555 diff --git a/tsl/test/isolation/specs/continuous_aggs_multi.spec b/tsl/test/isolation/specs/continuous_aggs_multi.spec index e2620ac8e38..b26b474791a 100644 --- a/tsl/test/isolation/specs/continuous_aggs_multi.spec +++ b/tsl/test/isolation/specs/continuous_aggs_multi.spec @@ -68,6 +68,19 @@ step "UnlockMat1" { ROLLBACK; } #alter the refresh_lag for continuous_view_1 session "CVddl" step "AlterLag1" { alter view continuous_view_1 set (timescaledb.refresh_lag = 10); } + +#update the hypertable +session "Upd" +step "U1" { update ts_continuous_test SET val = 5555 where time < 10; } +step "U2" { update ts_continuous_test SET val = 5 where time > 15 and time < 25; } + +#simulate an update to the invalidation threshold table that would lock the hypertable row +#this would block refresh that needs to get a row lock for the hypertable +session "LInv" +step "LInvRow" { BEGIN; update _timescaledb_catalog.continuous_aggs_invalidation_threshold set watermark = 20 where hypertable_id in ( select raw_hypertable_id from _timescaledb_catalog.continuous_agg where user_view_name like 'continuous_view_1' ); +} +step "UnlockInvRow" { ROLLBACK; } + #refresh1, refresh2 can run concurrently permutation "Setup2" "LockCompleted" "LockMat1" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" @@ -75,6 +88,11 @@ permutation "Setup2" "LockCompleted" "LockMat1" "Refresh1" "Refresh2" "UnlockCom #refresh1 and refresh2 run concurrently and see the correct invalidation #test1 - both see the same invalidation permutation "Setup2" "Refresh1" "Refresh2" "LockCompleted" "LockMat1" "I1" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" "Refresh1_sel" "Refresh2_sel" + ##test2 - continuous_view_2 should see results from insert but not the other one. ## Refresh2 will complete first due to LockMat1 and write the invalidation logs out. permutation "Setup2" "AlterLag1" "Refresh1" "Refresh2" "Refresh1_sel" "Refresh2_sel" "LockCompleted" "LockMat1" "I2" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" "Refresh1_sel" "Refresh2_sel" + +#test3 - both see the updates i.e. the invalidations +##Refresh1 and Refresh2 are blocked by LockInvRow, when that is unlocked, they should complete serially +permutation "Setup2" "Refresh1" "Refresh2" "Refresh1_sel" "Refresh2_sel" "U1" "U2" "LInvRow" "Refresh1" "Refresh2" "UnlockInvRow" "Refresh1_sel" "Refresh2_sel"