Skip to content

Commit

Permalink
Remove completed threshold
Browse files Browse the repository at this point in the history
The completed threshold in the TimescaleDB catalog is no longer used
by the refactored continuous aggregates, so it is removed.

Fixes #2178
  • Loading branch information
erimatnor committed Sep 15, 2020
1 parent a4e34fb commit 6b6a425
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 137 deletions.
8 changes: 0 additions & 8 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_aggs_invalidation_thr
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_invalidation_threshold', '');

CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_aggs_completed_threshold(
materialization_id INTEGER PRIMARY KEY
REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id)
ON DELETE CASCADE,
watermark BIGINT NOT NULL --exclusive (everything up to but not including watermark is done)
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_completed_threshold', '');

-- this does not have an FK on the materialization table since INSERTs to this
-- table are performance critical
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log(
Expand Down
6 changes: 4 additions & 2 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_job', 'WHERE
GRANT SELECT ON _timescaledb_config.bgw_job TO PUBLIC;
GRANT SELECT ON _timescaledb_config.bgw_job_id_seq TO PUBLIC;

-- drop completed_threshold table, which is no longer used
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_completed_threshold;
DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_completed_threshold;

-- rebuild continuous aggregate table
CREATE TABLE _timescaledb_catalog.continuous_agg_tmp AS SELECT * FROM _timescaledb_catalog.continuous_agg;

ALTER TABLE _timescaledb_catalog.continuous_aggs_completed_threshold DROP CONSTRAINT continuous_aggs_completed_threshold_materialization_id_fkey;
ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log DROP CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_agg;
Expand Down Expand Up @@ -339,5 +342,4 @@ GRANT SELECT ON _timescaledb_catalog.continuous_agg TO PUBLIC;
INSERT INTO _timescaledb_catalog.continuous_agg SELECT mat_hypertable_id,raw_hypertable_id,user_view_schema,user_view_name,partial_view_schema,partial_view_name,bucket_width,refresh_lag,direct_view_schema,direct_view_name,max_interval_per_job,ignore_invalidation_older_than,materialized_only FROM _timescaledb_catalog.continuous_agg_tmp;
DROP TABLE _timescaledb_catalog.continuous_agg_tmp;

ALTER TABLE _timescaledb_catalog.continuous_aggs_completed_threshold ADD CONSTRAINT continuous_aggs_completed_threshold_materialization_id_fkey FOREIGN KEY(materialization_id) REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id);
ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log ADD CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey FOREIGN KEY(materialization_id) REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id);
11 changes: 0 additions & 11 deletions src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGG_TABLE_NAME,
},
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_COMPLETED_THRESHOLD_TABLE_NAME,
},
[CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_TABLE_NAME,
Expand Down Expand Up @@ -213,12 +209,6 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[CONTINUOUS_AGG_RAW_HYPERTABLE_ID_IDX] = "continuous_agg_raw_hypertable_id_idx"
},
},
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD] = {
.length = _MAX_CONTINUOUS_AGGS_COMPLETED_THRESHOLD_INDEX,
.names = (char *[]) {
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY] = "continuous_aggs_completed_threshold_pkey",
},
},
[CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG] = {
.length = _MAX_CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_INDEX,
.names = (char *[]) {
Expand Down Expand Up @@ -270,7 +260,6 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
[TABLESPACE] = CATALOG_SCHEMA_NAME ".tablespace_id_seq",
[BGW_JOB] = CONFIG_SCHEMA_NAME ".bgw_job_id_seq",
[BGW_JOB_STAT] = NULL,
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD] = NULL,
[CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG] = NULL,
[CONTINUOUS_AGGS_INVALIDATION_THRESHOLD] = NULL,
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG] = NULL,
Expand Down
35 changes: 0 additions & 35 deletions src/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ typedef enum CatalogTable
METADATA,
BGW_POLICY_CHUNK_STATS,
CONTINUOUS_AGG,
CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG,
Expand Down Expand Up @@ -934,40 +933,6 @@ typedef enum Anum_continuous_agg_raw_hypertable_id_idx
#define Natts_continuous_agg_raw_hypertable_id_idx \
(_Anum_continuous_agg_raw_hypertable_id_idx_max - 1)

/****** CONTINUOUS_AGGS_COMPLETED_THRESHOLD_TABLE definitions*/
#define CONTINUOUS_AGGS_COMPLETED_THRESHOLD_TABLE_NAME "continuous_aggs_completed_threshold"
typedef enum Anum_continuous_aggs_completed_threshold
{
Anum_continuous_aggs_completed_threshold_materialization_id = 1,
Anum_continuous_aggs_completed_threshold_watermark,
_Anum_continuous_aggs_completed_threshold_max,
} Anum_continuous_aggs_completed_threshold;

#define Natts_continuous_aggs_completed_threshold \
(_Anum_continuous_aggs_completed_threshold_max - 1)

typedef struct FormData_continuous_aggs_completed_threshold
{
int32 materialization_id;
int64 watermark;
} FormData_continuous_aggs_completed_threshold;

typedef FormData_continuous_aggs_completed_threshold *Form_continuous_aggs_completed_threshold;

enum
{
CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY = 0,
_MAX_CONTINUOUS_AGGS_COMPLETED_THRESHOLD_INDEX,
};
typedef enum Anum_continuous_aggs_completed_threshold_pkey
{
Anum_continuous_aggs_completed_threshold_pkey_materialization_id = 1,
_Anum_continuous_aggs_completed_threshold_pkey_max,
} Anum_continuous_aggs_completed_threshold_pkey;

#define Natts_continuous_aggs_completed_threshold_pkey \
(_Anum_continuous_aggs_completed_threshold_pkey_max - 1)

/****** CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_TABLE definitions*/
#define CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_TABLE_NAME \
"continuous_aggs_hypertable_invalidation_log"
Expand Down
60 changes: 0 additions & 60 deletions src/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,6 @@ init_scan_by_raw_hypertable_id(ScanIterator *iterator, const int32 raw_hypertabl
Int32GetDatum(raw_hypertable_id));
}

static void
init_completed_threshold_scan_by_mat_id(ScanIterator *iterator, const int32 mat_hypertable_id)
{
iterator->ctx.index = catalog_get_index(ts_catalog_get(),
CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY);

ts_scan_iterator_scan_key_init(iterator,
Anum_continuous_aggs_completed_threshold_pkey_materialization_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(mat_hypertable_id));
}

static void
init_invalidation_threshold_scan_by_hypertable_id(ScanIterator *iterator,
const int32 raw_hypertable_id)
Expand Down Expand Up @@ -159,48 +145,6 @@ number_of_continuous_aggs_attached(int32 raw_hypertable_id)
return count;
}

TSDLLEXPORT
int64
ts_continuous_agg_get_completed_threshold(int32 materialization_id)
{
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
AccessShareLock,
CurrentMemoryContext);
int64 threshold = PG_INT64_MIN;
int count = 0;

init_completed_threshold_scan_by_mat_id(&iterator, materialization_id);
ts_scanner_foreach(&iterator)
{
bool isnull;
Datum datum = slot_getattr(ts_scan_iterator_slot(&iterator),
Anum_continuous_aggs_completed_threshold_watermark,
&isnull);

Assert(!isnull);
threshold = DatumGetInt64(datum);
count++;
}
Assert(count <= 1);
return threshold;
}

static void
completed_threshold_delete(int32 materialization_id)
{
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
RowExclusiveLock,
CurrentMemoryContext);

init_completed_threshold_scan_by_mat_id(&iterator, materialization_id);

ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
}
}

static void
invalidation_threshold_delete(int32 raw_hypertable_id)
{
Expand Down Expand Up @@ -531,8 +475,6 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
if (!raw_hypertable_has_other_caggs)
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG),
RowExclusiveLock);
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_COMPLETED_THRESHOLD),
RowExclusiveLock);
if (!raw_hypertable_has_other_caggs)
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
RowExclusiveLock);
Expand Down Expand Up @@ -589,8 +531,6 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
if (!raw_hypertable_has_other_caggs)
hypertable_invalidation_log_delete(form->raw_hypertable_id);

completed_threshold_delete(form->mat_hypertable_id);

if (!raw_hypertable_has_other_caggs)
invalidation_threshold_delete(form->raw_hypertable_id);
materialization_invalidation_log_delete(form->mat_hypertable_id);
Expand Down
1 change: 0 additions & 1 deletion src/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id);
extern TSDLLEXPORT ContinuousAggHypertableStatus
ts_continuous_agg_hypertable_status(int32 hypertable_id);
extern TSDLLEXPORT List *ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id);
extern TSDLLEXPORT int64 ts_continuous_agg_get_completed_threshold(int32 materialization_id);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char *schema,
const char *name,
ContinuousAggViewType type);
Expand Down
3 changes: 1 addition & 2 deletions test/expected/drop_rename_hypertable.out
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ SELECT * FROM _timescaledb_catalog.hypertable;
_timescaledb_catalog | compression_algorithm | table | super_user
_timescaledb_catalog | compression_chunk_size | table | super_user
_timescaledb_catalog | continuous_agg | table | super_user
_timescaledb_catalog | continuous_aggs_completed_threshold | table | super_user
_timescaledb_catalog | continuous_aggs_hypertable_invalidation_log | table | super_user
_timescaledb_catalog | continuous_aggs_invalidation_threshold | table | super_user
_timescaledb_catalog | continuous_aggs_materialization_invalidation_log | table | super_user
Expand All @@ -213,7 +212,7 @@ SELECT * FROM _timescaledb_catalog.hypertable;
_timescaledb_catalog | metadata | table | super_user
_timescaledb_catalog | remote_txn | table | super_user
_timescaledb_catalog | tablespace | table | super_user
(19 rows)
(18 rows)

\dt "_timescaledb_internal".*
List of relations
Expand Down
14 changes: 4 additions & 10 deletions tsl/test/isolation/expected/continuous_aggs_multi.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Parsed test spec with 10 sessions
Parsed test spec with 9 sessions

starting permutation: Setup2 LockCompleted LockMat1 Refresh1 Refresh2 UnlockCompleted UnlockMat1
starting permutation: Setup2 LockMat1 Refresh1 Refresh2 UnlockMat1
step Setup2:
CREATE MATERIALIZED VIEW continuous_view_1( bkt, cnt)
WITH ( timescaledb.continuous, timescaledb.materialized_only = true)
Expand All @@ -16,19 +16,17 @@ step Setup2:
BEGIN EXECUTE format( 'lock table %s', name);
END; $$ LANGUAGE plpgsql;

step LockCompleted: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE;
step LockMat1: BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';

lock_mattable


step Refresh1: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 30); <waiting ...>
step Refresh2: CALL refresh_continuous_aggregate('continuous_view_2', NULL, NULL);
step UnlockCompleted: ROLLBACK;
step UnlockMat1: ROLLBACK;
step Refresh1: <... completed>

starting permutation: Setup2 Refresh1 Refresh2 LockCompleted LockMat1 I1 Refresh1 Refresh2 UnlockCompleted UnlockMat1 Refresh1_sel Refresh2_sel
starting permutation: Setup2 Refresh1 Refresh2 LockMat1 I1 Refresh1 Refresh2 UnlockMat1 Refresh1_sel Refresh2_sel
step Setup2:
CREATE MATERIALIZED VIEW continuous_view_1( bkt, cnt)
WITH ( timescaledb.continuous, timescaledb.materialized_only = true)
Expand All @@ -46,7 +44,6 @@ step Setup2:

step Refresh1: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 30);
step Refresh2: CALL refresh_continuous_aggregate('continuous_view_2', NULL, NULL);
step LockCompleted: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE;
step LockMat1: BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';

lock_mattable
Expand All @@ -55,7 +52,6 @@ lock_mattable
step I1: INSERT INTO ts_continuous_test SELECT 0, i*10 FROM (SELECT generate_series(0, 10) AS i) AS i;
step Refresh1: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 30); <waiting ...>
step Refresh2: CALL refresh_continuous_aggregate('continuous_view_2', NULL, NULL);
step UnlockCompleted: ROLLBACK;
step UnlockMat1: ROLLBACK;
step Refresh1: <... completed>
step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30
Expand All @@ -67,7 +63,7 @@ bkt maxl

0 100

starting permutation: Setup2 Refresh1 Refresh2 Refresh1_sel Refresh2_sel LockCompleted LockMat1 I2 Refresh1 Refresh2 UnlockCompleted UnlockMat1 Refresh1_sel Refresh2_sel
starting permutation: Setup2 Refresh1 Refresh2 Refresh1_sel Refresh2_sel LockMat1 I2 Refresh1 Refresh2 UnlockMat1 Refresh1_sel Refresh2_sel
step Setup2:
CREATE MATERIALIZED VIEW continuous_view_1( bkt, cnt)
WITH ( timescaledb.continuous, timescaledb.materialized_only = true)
Expand All @@ -93,7 +89,6 @@ step Refresh2_sel: select * from continuous_view_2 where bkt = 0 or bkt > 30 ord
bkt maxl

0 4
step LockCompleted: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE;
step LockMat1: BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';

lock_mattable
Expand All @@ -102,7 +97,6 @@ lock_mattable
step I2: INSERT INTO ts_continuous_test SELECT 40, 1000 ;
step Refresh1: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 30); <waiting ...>
step Refresh2: CALL refresh_continuous_aggregate('continuous_view_2', NULL, NULL);
step UnlockCompleted: ROLLBACK;
step UnlockMat1: ROLLBACK;
R1: NOTICE: continuous aggregate "continuous_view_1" is already up-to-date
step Refresh1: <... completed>
Expand Down
11 changes: 3 additions & 8 deletions tsl/test/isolation/specs/continuous_aggs_multi.spec
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ step "Refresh2" { CALL refresh_continuous_aggregate('continuous_view_2', NULL, N
session "R2_sel"
step "Refresh2_sel" { select * from continuous_view_2 where bkt = 0 or bkt > 30 order by bkt; }

#the completed threshold will block the REFRESH from writing
session "LC"
step "LockCompleted" { BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE; }
step "UnlockCompleted" { ROLLBACK; }

#locking the materialized table will block refresh1
session "LM1"
step "LockMat1" { BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';
Expand All @@ -82,15 +77,15 @@ step "UnlockInvRow" { ROLLBACK; }


#refresh1, refresh2 can run concurrently
permutation "Setup2" "LockCompleted" "LockMat1" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1"
permutation "Setup2" "LockMat1" "Refresh1" "Refresh2" "UnlockMat1"

#refresh1 and refresh2 run concurrently and see the correct invalidation
#test1 - both see the same invalidation
permutation "Setup2" "Refresh1" "Refresh2" "LockCompleted" "LockMat1" "I1" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" "Refresh1_sel" "Refresh2_sel"
permutation "Setup2" "Refresh1" "Refresh2" "LockMat1" "I1" "Refresh1" "Refresh2" "UnlockMat1" "Refresh1_sel" "Refresh2_sel"

##test2 - continuous_view_2 should see results from insert but not the other one.
## Refresh2 will complete first due to LockMat1 and write the invalidation logs out.
permutation "Setup2" "Refresh1" "Refresh2" "Refresh1_sel" "Refresh2_sel" "LockCompleted" "LockMat1" "I2" "Refresh1" "Refresh2" "UnlockCompleted" "UnlockMat1" "Refresh1_sel" "Refresh2_sel"
permutation "Setup2" "Refresh1" "Refresh2" "Refresh1_sel" "Refresh2_sel" "LockMat1" "I2" "Refresh1" "Refresh2" "UnlockMat1" "Refresh1_sel" "Refresh2_sel"

#test3 - both see the updates i.e. the invalidations
##Refresh1 and Refresh2 are blocked by LockInvRow, when that is unlocked, they should complete serially
Expand Down

0 comments on commit 6b6a425

Please sign in to comment.