Skip to content

Commit

Permalink
Fix concurrent tuple deletes for continuous aggregates
Browse files Browse the repository at this point in the history
When we have multiple continuous aggregates defined on
the same hypertable, they could try to delete the hypertable
invalidation logs concurrently. Resolve this by serializing
invalidation log deletes by raw hypertable id.

Fixes #1940
  • Loading branch information
gayyappan committed Jul 3, 2020
1 parent d649d49 commit bca00aa
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 2 deletions.
82 changes: 81 additions & 1 deletion tsl/src/continuous_aggs/materialize.c
Expand Up @@ -78,6 +78,7 @@ static int64 get_materialization_end_point_for_table(int32 raw_hypertable_id,
int64 completed_threshold,
bool *materializing_new_range,
bool *truncated_materialization, bool verbose);
static void lock_invalidation_threshold_hypertable_row(int32 raw_hypertable_id);
static void drain_invalidation_log(int32 raw_hypertable_id, List **invalidations_out);
static void insert_materialization_invalidation_logs(List *caggs, List *invalidations,
Relation rel);
Expand Down Expand Up @@ -197,8 +198,21 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op

catalog = ts_catalog_get();

/* copy over all the materializations from the raw hypertable to all the continuous aggs */
/* copy over all the materializations from the raw hypertable to all the continuous aggs
*/
caggs = ts_continuous_aggs_find_by_raw_table_id(cagg_data.raw_hypertable_id);
if (list_length(caggs) > 1)
{
/* REFRESH cagg_1; REFRESH cagg_2 can try to read+delete the
* invalidation logs for the same hypertable.
* If both REFRESH jobs try to delete the
* same set of rows, we run into "tuple concurrently deleted error".
* (github issue 1940)
* Prevent this by serializing on the raw hypertable row
*/

lock_invalidation_threshold_hypertable_row(cagg_data.raw_hypertable_id);
}
drain_invalidation_log(cagg_data.raw_hypertable_id, &invalidations);
materialization_invalidation_log_table_relation =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG),
Expand Down Expand Up @@ -660,6 +674,66 @@ scan_take_invalidation_tuple(TupleInfo *ti, void *data)
return SCAN_CONTINUE;
}

static ScanTupleResult
invalidation_threshold_htid_found(TupleInfo *tinfo, void *data)
{
if (tinfo->lockresult != TM_Ok)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unexpected lockresult %d in invalidation_threshold_htid_found",
tinfo->lockresult)));
}
return SCAN_DONE;
}

/* lock row corresponding to hypertable id in
* continuous_aggs_invalidation_threshold table in AccessExclusive mode,
* block till lock is acquired.
*/
static void
lock_invalidation_threshold_hypertable_row(int32 raw_hypertable_id)
{
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
Relation invalidation_threshold_table_relation;
Catalog *catalog = ts_catalog_get();
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0],
Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(raw_hypertable_id));

/* lock table in RowExclusive mode and the row with AccessExclusive */
ScannerCtx scanctx = { .table = catalog_get_table_id(catalog,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
.index = catalog_get_index(catalog,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY),
.nkeys = 1,
.scankey = scankey,
.limit = 1,
.tuple_found = invalidation_threshold_htid_found,
.lockmode = AccessShareLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock };
/* Acquire lock explicitly.
* the catalog scanning code , releases it at end of the call
*/
invalidation_threshold_table_relation =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessShareLock);
ts_scanner_scan(&scanctx);
table_close(invalidation_threshold_table_relation, NoLock); /*release lock at end
of transaction */
}

/* Read invalidation log table, copy entries and delete them from this table */
static void
drain_invalidation_log(int32 raw_hypertable_id, List **invalidations_out)
{
Expand Down Expand Up @@ -793,6 +867,12 @@ materialization_invalidation_log_get_range(int32 materialization_id, Oid type, i
return invalidation_range;
}

/* We already locked the materialization table in
* ShareRowExclusiveLock mode for the Session in the first txn.
* This implies that 2 concurrent refreshes on the same cont aggregate
* i.e. REFRESH cagg_1; REFRESH cagg_1
* are serialized. So 2 concurrent refresh on cagg_1 will not get here.
*/
static void
materialization_invalidation_log_delete_or_cut(int32 cagg_id, InternalTimeRange invalidation_range,
int64 completed_threshold)
Expand Down
52 changes: 51 additions & 1 deletion tsl/test/isolation/expected/continuous_aggs_multi.out
@@ -1,4 +1,4 @@
Parsed test spec with 9 sessions
Parsed test spec with 11 sessions

starting permutation: Setup2 LockCompleted LockMat1 Refresh1 Refresh2 UnlockCompleted UnlockMat1
step Setup2:
Expand Down Expand Up @@ -130,3 +130,53 @@ bkt maxl

0 4
40 1000

starting permutation: Setup2 Refresh1 Refresh2 Refresh1_sel Refresh2_sel U1 U2 LInvRow Refresh1 Refresh2 UnlockInvRow Refresh1_sel Refresh2_sel
step Setup2:
CREATE VIEW continuous_view_1( bkt, cnt)
WITH ( timescaledb.continuous, timescaledb.refresh_lag = '-5', timescaledb.refresh_interval='72 hours', timescaledb.materialized_only = true)
AS SELECT time_bucket('5', time), COUNT(val)
FROM ts_continuous_test
GROUP BY 1;
CREATE VIEW continuous_view_2(bkt, maxl)
WITH ( timescaledb.continuous,timescaledb.refresh_lag='-10', timescaledb.refresh_interval='72 hours', timescaledb.materialized_only = true)
AS SELECT time_bucket('5', time), max(val)
FROM ts_continuous_test
GROUP BY 1;
CREATE FUNCTION lock_mattable( name text) RETURNS void AS $$
BEGIN EXECUTE format( 'lock table %s', name);
END; $$ LANGUAGE plpgsql;

R1: LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 30
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
R2: LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30
bkt cnt

0 5
step Refresh2_sel: select * from continuous_view_2 where bkt = 0 or bkt > 30 order by bkt;
bkt maxl

0 4
step U1: update ts_continuous_test SET val = 5555 where time < 10;
step U2: update ts_continuous_test SET val = 5 where time > 15 and time < 25;
step LInvRow: BEGIN; update _timescaledb_catalog.continuous_aggs_invalidation_threshold set watermark = 20 where hypertable_id in ( select raw_hypertable_id from _timescaledb_catalog.continuous_agg where user_view_name like 'continuous_view_1' );

step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockInvRow: ROLLBACK;
R1: LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29
R1: LOG: materializing continuous aggregate public.continuous_view_1: processing invalidations, no new range
step Refresh1: <... completed>
R2: LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29
R2: LOG: materializing continuous aggregate public.continuous_view_2: processing invalidations, no new range
step Refresh2: <... completed>
step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30
bkt cnt

0 5
step Refresh2_sel: select * from continuous_view_2 where bkt = 0 or bkt > 30 order by bkt;
bkt maxl

0 5555
18 changes: 18 additions & 0 deletions tsl/test/isolation/specs/continuous_aggs_multi.spec
Expand Up @@ -68,13 +68,31 @@ step "UnlockMat1" { ROLLBACK; }
#alter the refresh_lag for continuous_view_1
session "CVddl"
step "AlterLag1" { alter view continuous_view_1 set (timescaledb.refresh_lag = 10); }

#update the hypertable
session "Upd"
step "U1" { update ts_continuous_test SET val = 5555 where time < 10; }
step "U2" { update ts_continuous_test SET val = 5 where time > 15 and time < 25; }

#simulate an update to the invalidation threshold table that would lock the hypertable row
#this would block refresh that needs to get a row lock for the hypertable
session "LInv"
step "LInvRow" { BEGIN; update _timescaledb_catalog.continuous_aggs_invalidation_threshold set watermark = 20 where hypertable_id in ( select raw_hypertable_id from _timescaledb_catalog.continuous_agg where user_view_name like 'continuous_view_1' );
}
step "UnlockInvRow" { ROLLBACK; }


#refresh1, refresh2 can run concurrently
permutation "Setup2" "LockCompleted" "LockMat1" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1"

#refresh1 and refresh2 run concurrently and see the correct invalidation
#test1 - both see the same invalidation
permutation "Setup2" "Refresh1" "Refresh2" "LockCompleted" "LockMat1" "I1" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" "Refresh1_sel" "Refresh2_sel"

##test2 - continuous_view_2 should see results from insert but not the other one.
## Refresh2 will complete first due to LockMat1 and write the invalidation logs out.
permutation "Setup2" "AlterLag1" "Refresh1" "Refresh2" "Refresh1_sel" "Refresh2_sel" "LockCompleted" "LockMat1" "I2" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" "Refresh1_sel" "Refresh2_sel"

#test3 - both see the updates i.e. the invalidations
##Refresh1 and Refresh2 are blocked by LockInvRow, when that is unlocked, they should complete serially
permutation "Setup2" "Refresh1" "Refresh2" "Refresh1_sel" "Refresh2_sel" "U1" "U2" "LInvRow" "Refresh1" "Refresh2" "UnlockInvRow" "Refresh1_sel" "Refresh2_sel"

0 comments on commit bca00aa

Please sign in to comment.