Skip to content

Commit

Permalink
Remove completed threshold
Browse files Browse the repository at this point in the history
The completed threshold in the TimescaleDB catalog is no longer used
by the refactored continuous aggregates, so it is removed.
  • Loading branch information
erimatnor committed Sep 14, 2020
1 parent 9e31889 commit ab124e2
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 187 deletions.
32 changes: 12 additions & 20 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_aggs_invalidation_thr
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_invalidation_threshold', '');

CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_aggs_completed_threshold(
materialization_id INTEGER PRIMARY KEY
REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id)
ON DELETE CASCADE,
watermark BIGINT NOT NULL --exclusive (everything up to but not including watermark is done)
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_completed_threshold', '');

-- this does not have an FK on the materialization table since INSERTs to this
-- table are performance critical
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log(
Expand Down Expand Up @@ -311,22 +303,22 @@ CREATE INDEX continuous_aggs_materialization_invalidation_log_idx
* the algorithms. This table is NOT dumped.
*/
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_algorithm(
id SMALLINT PRIMARY KEY,
version SMALLINT NOT NULL,
name NAME NOT NULL,
description TEXT
id SMALLINT PRIMARY KEY,
version SMALLINT NOT NULL,
name NAME NOT NULL,
description TEXT
);


CREATE TABLE IF NOT EXISTS _timescaledb_catalog.hypertable_compression (
hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
attname NAME NOT NULL,
compression_algorithm_id SMALLINT REFERENCES _timescaledb_catalog.compression_algorithm(id),
segmentby_column_index SMALLINT ,
orderby_column_index SMALLINT,
orderby_asc BOOLEAN,
orderby_nullsfirst BOOLEAN,
PRIMARY KEY (hypertable_id, attname),
hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
attname NAME NOT NULL,
compression_algorithm_id SMALLINT REFERENCES _timescaledb_catalog.compression_algorithm(id),
segmentby_column_index SMALLINT ,
orderby_column_index SMALLINT,
orderby_asc BOOLEAN,
orderby_nullsfirst BOOLEAN,
PRIMARY KEY (hypertable_id, attname),
UNIQUE (hypertable_id, segmentby_column_index),
UNIQUE (hypertable_id, orderby_column_index)
);
Expand Down
3 changes: 1 addition & 2 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ GRANT SELECT ON _timescaledb_config.bgw_job_id_seq TO PUBLIC;
-- rebuild continuous aggregate table
CREATE TABLE _timescaledb_catalog.continuous_agg_tmp AS SELECT * FROM _timescaledb_catalog.continuous_agg;

ALTER TABLE _timescaledb_catalog.continuous_aggs_completed_threshold DROP CONSTRAINT continuous_aggs_completed_threshold_materialization_id_fkey;
DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_completed_threshold;
ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log DROP CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey;

ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_agg;
Expand Down Expand Up @@ -339,5 +339,4 @@ GRANT SELECT ON _timescaledb_catalog.continuous_agg TO PUBLIC;
INSERT INTO _timescaledb_catalog.continuous_agg SELECT mat_hypertable_id,raw_hypertable_id,user_view_schema,user_view_name,partial_view_schema,partial_view_name,bucket_width,refresh_lag,direct_view_schema,direct_view_name,max_interval_per_job,ignore_invalidation_older_than,materialized_only FROM _timescaledb_catalog.continuous_agg_tmp;
DROP TABLE _timescaledb_catalog.continuous_agg_tmp;

ALTER TABLE _timescaledb_catalog.continuous_aggs_completed_threshold ADD CONSTRAINT continuous_aggs_completed_threshold_materialization_id_fkey FOREIGN KEY(materialization_id) REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id);
ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log ADD CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey FOREIGN KEY(materialization_id) REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id);
61 changes: 24 additions & 37 deletions sql/views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ CREATE OR REPLACE VIEW timescaledb_information.continuous_aggregates as

CREATE OR REPLACE VIEW timescaledb_information.continuous_aggregate_stats as
SELECT format('%1$I.%2$I', cagg.user_view_schema, cagg.user_view_name)::regclass as view_name,
CASE _timescaledb_internal.get_time_type(cagg.raw_hypertable_id)
WHEN 'TIMESTAMP'::regtype
THEN _timescaledb_internal.to_timestamp_without_timezone(ct.watermark)::TEXT
WHEN 'TIMESTAMPTZ'::regtype
THEN _timescaledb_internal.to_timestamp(ct.watermark)::TEXT
WHEN 'DATE'::regtype
THEN _timescaledb_internal.to_date(ct.watermark)::TEXT
ELSE ct.watermark::TEXT
END AS completed_threshold,
CASE _timescaledb_internal.get_time_type(cagg.raw_hypertable_id)
WHEN 'TIMESTAMP'::regtype
THEN _timescaledb_internal.to_timestamp_without_timezone(it.watermark)::TEXT
Expand Down Expand Up @@ -120,9 +111,7 @@ CREATE OR REPLACE VIEW timescaledb_information.continuous_aggregate_stats as
LEFT JOIN _timescaledb_internal.bgw_job_stat as bgw_job_stat
ON ( bgw_job.id = bgw_job_stat.job_id )
LEFT JOIN _timescaledb_catalog.continuous_aggs_invalidation_threshold as it
ON ( cagg.raw_hypertable_id = it.hypertable_id)
LEFT JOIN _timescaledb_catalog.continuous_aggs_completed_threshold as ct
ON ( cagg.mat_hypertable_id = ct.materialization_id);
ON ( cagg.raw_hypertable_id = it.hypertable_id);

CREATE OR REPLACE VIEW timescaledb_information.data_node AS
SELECT s.node_name, s.owner, s.options
Expand All @@ -136,7 +125,7 @@ CREATE OR REPLACE VIEW timescaledb_information.data_node AS
-- tables.
CREATE OR REPLACE VIEW timescaledb_information.chunks
AS
SELECT
SELECT
hypertable_schema, hypertable_name,
schema_name as chunk_schema , chunk_name ,
primary_dimension, primary_dimension_type,
Expand Down Expand Up @@ -189,24 +178,24 @@ SELECT
END as is_compressed,
pgtab.spcname as chunk_table_space,
chdn.node_list
FROM _timescaledb_catalog.chunk srcch
FROM _timescaledb_catalog.chunk srcch
INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = srcch.hypertable_id
INNER JOIN _timescaledb_catalog.chunk_constraint chcons ON srcch.id = chcons.chunk_id
INNER JOIN _timescaledb_catalog.dimension dim ON srcch.hypertable_id = dim.hypertable_id
INNER JOIN _timescaledb_catalog.dimension_slice dimsl ON dim.id = dimsl.dimension_id
and chcons.dimension_slice_id = dimsl.id
INNER JOIN
( SELECT relname, reltablespace, nspname as schema_name
( SELECT relname, reltablespace, nspname as schema_name
FROM pg_class , pg_namespace WHERE
pg_class.relnamespace = pg_namespace.oid) cl
pg_class.relnamespace = pg_namespace.oid) cl
ON srcch.table_name = cl.relname and srcch.schema_name = cl.schema_name
LEFT OUTER JOIN pg_tablespace pgtab ON pgtab.oid = reltablespace
left outer join (
SELECT chunk_id, array_agg(node_name ORDER BY node_name) as node_list
FROM _timescaledb_catalog.chunk_data_node
GROUP BY chunk_id) chdn
ON srcch.id = chdn.chunk_id
WHERE srcch.dropped is false
ON srcch.id = chdn.chunk_id
WHERE srcch.dropped is false
and ht.compressed = false ) finalq
WHERE chunk_dimension_num = 1
;
Expand All @@ -216,34 +205,34 @@ WHERE chunk_dimension_num = 1
-- as expected.
CREATE OR REPLACE VIEW timescaledb_information.dimensions
AS
SELECT
SELECT
ht.schema_name as hypertable_schema,
ht.table_name as hypertable_name,
rank() over(partition by hypertable_id order by dim.id) as dimension_number,
dim.column_name,
dim.column_type,
CASE WHEN dim.interval_length is NULL
THEN 'Space'
ELSE 'Time'
CASE WHEN dim.interval_length is NULL
THEN 'Space'
ELSE 'Time'
END as dimension_type,
CASE WHEN dim.interval_length is NOT NULL THEN
CASE
WHEN dim.column_type = 'TIMESTAMP'::regtype OR
dim.column_type = 'TIMESTAMPTZ'::regtype OR
dim.column_type = 'DATE'::regtype
THEN _timescaledb_internal.to_interval(dim.interval_length)
ELSE NULL
CASE
WHEN dim.column_type = 'TIMESTAMP'::regtype OR
dim.column_type = 'TIMESTAMPTZ'::regtype OR
dim.column_type = 'DATE'::regtype
THEN _timescaledb_internal.to_interval(dim.interval_length)
ELSE NULL
END
END as time_interval,
END as time_interval,
CASE WHEN dim.interval_length is NOT NULL THEN
CASE
WHEN dim.column_type = 'TIMESTAMP'::regtype OR
dim.column_type = 'TIMESTAMPTZ'::regtype OR
dim.column_type = 'DATE'::regtype
CASE
WHEN dim.column_type = 'TIMESTAMP'::regtype OR
dim.column_type = 'TIMESTAMPTZ'::regtype OR
dim.column_type = 'DATE'::regtype
THEN NULL
ELSE dim.interval_length
ELSE dim.interval_length
END
END as integer_interval,
END as integer_interval,
dim.integer_now_func,
dim.num_slices as num_partitions
FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.dimension dim
Expand Down Expand Up @@ -275,5 +264,3 @@ ORDER BY

GRANT USAGE ON SCHEMA timescaledb_information TO PUBLIC;
GRANT SELECT ON ALL TABLES IN SCHEMA timescaledb_information TO PUBLIC;


11 changes: 0 additions & 11 deletions src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGG_TABLE_NAME,
},
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_COMPLETED_THRESHOLD_TABLE_NAME,
},
[CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_TABLE_NAME,
Expand Down Expand Up @@ -213,12 +209,6 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[CONTINUOUS_AGG_RAW_HYPERTABLE_ID_IDX] = "continuous_agg_raw_hypertable_id_idx"
},
},
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD] = {
.length = _MAX_CONTINUOUS_AGGS_COMPLETED_THRESHOLD_INDEX,
.names = (char *[]) {
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY] = "continuous_aggs_completed_threshold_pkey",
},
},
[CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG] = {
.length = _MAX_CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_INDEX,
.names = (char *[]) {
Expand Down Expand Up @@ -270,7 +260,6 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
[TABLESPACE] = CATALOG_SCHEMA_NAME ".tablespace_id_seq",
[BGW_JOB] = CONFIG_SCHEMA_NAME ".bgw_job_id_seq",
[BGW_JOB_STAT] = NULL,
[CONTINUOUS_AGGS_COMPLETED_THRESHOLD] = NULL,
[CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG] = NULL,
[CONTINUOUS_AGGS_INVALIDATION_THRESHOLD] = NULL,
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG] = NULL,
Expand Down
35 changes: 0 additions & 35 deletions src/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ typedef enum CatalogTable
METADATA,
BGW_POLICY_CHUNK_STATS,
CONTINUOUS_AGG,
CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG,
Expand Down Expand Up @@ -934,40 +933,6 @@ typedef enum Anum_continuous_agg_raw_hypertable_id_idx
#define Natts_continuous_agg_raw_hypertable_id_idx \
(_Anum_continuous_agg_raw_hypertable_id_idx_max - 1)

/****** CONTINUOUS_AGGS_COMPLETED_THRESHOLD_TABLE definitions*/
#define CONTINUOUS_AGGS_COMPLETED_THRESHOLD_TABLE_NAME "continuous_aggs_completed_threshold"
typedef enum Anum_continuous_aggs_completed_threshold
{
Anum_continuous_aggs_completed_threshold_materialization_id = 1,
Anum_continuous_aggs_completed_threshold_watermark,
_Anum_continuous_aggs_completed_threshold_max,
} Anum_continuous_aggs_completed_threshold;

#define Natts_continuous_aggs_completed_threshold \
(_Anum_continuous_aggs_completed_threshold_max - 1)

typedef struct FormData_continuous_aggs_completed_threshold
{
int32 materialization_id;
int64 watermark;
} FormData_continuous_aggs_completed_threshold;

typedef FormData_continuous_aggs_completed_threshold *Form_continuous_aggs_completed_threshold;

enum
{
CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY = 0,
_MAX_CONTINUOUS_AGGS_COMPLETED_THRESHOLD_INDEX,
};
typedef enum Anum_continuous_aggs_completed_threshold_pkey
{
Anum_continuous_aggs_completed_threshold_pkey_materialization_id = 1,
_Anum_continuous_aggs_completed_threshold_pkey_max,
} Anum_continuous_aggs_completed_threshold_pkey;

#define Natts_continuous_aggs_completed_threshold_pkey \
(_Anum_continuous_aggs_completed_threshold_pkey_max - 1)

/****** CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_TABLE definitions*/
#define CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_TABLE_NAME \
"continuous_aggs_hypertable_invalidation_log"
Expand Down
60 changes: 0 additions & 60 deletions src/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,6 @@ init_scan_by_raw_hypertable_id(ScanIterator *iterator, const int32 raw_hypertabl
Int32GetDatum(raw_hypertable_id));
}

static void
init_completed_threshold_scan_by_mat_id(ScanIterator *iterator, const int32 mat_hypertable_id)
{
iterator->ctx.index = catalog_get_index(ts_catalog_get(),
CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY);

ts_scan_iterator_scan_key_init(iterator,
Anum_continuous_aggs_completed_threshold_pkey_materialization_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(mat_hypertable_id));
}

static void
init_invalidation_threshold_scan_by_hypertable_id(ScanIterator *iterator,
const int32 raw_hypertable_id)
Expand Down Expand Up @@ -171,48 +157,6 @@ number_of_continuous_aggs_attached(int32 raw_hypertable_id)
return count;
}

TSDLLEXPORT
int64
ts_continuous_agg_get_completed_threshold(int32 materialization_id)
{
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
AccessShareLock,
CurrentMemoryContext);
int64 threshold = PG_INT64_MIN;
int count = 0;

init_completed_threshold_scan_by_mat_id(&iterator, materialization_id);
ts_scanner_foreach(&iterator)
{
bool isnull;
Datum datum = slot_getattr(ts_scan_iterator_slot(&iterator),
Anum_continuous_aggs_completed_threshold_watermark,
&isnull);

Assert(!isnull);
threshold = DatumGetInt64(datum);
count++;
}
Assert(count <= 1);
return threshold;
}

static void
completed_threshold_delete(int32 materialization_id)
{
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
RowExclusiveLock,
CurrentMemoryContext);

init_completed_threshold_scan_by_mat_id(&iterator, materialization_id);

ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
}
}

static void
invalidation_threshold_delete(int32 raw_hypertable_id)
{
Expand Down Expand Up @@ -540,8 +484,6 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
if (!raw_hypertable_has_other_caggs)
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG),
RowExclusiveLock);
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_COMPLETED_THRESHOLD),
RowExclusiveLock);
if (!raw_hypertable_has_other_caggs)
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
RowExclusiveLock);
Expand Down Expand Up @@ -598,8 +540,6 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
if (!raw_hypertable_has_other_caggs)
hypertable_invalidation_log_delete(form->raw_hypertable_id);

completed_threshold_delete(form->mat_hypertable_id);

if (!raw_hypertable_has_other_caggs)
invalidation_threshold_delete(form->raw_hypertable_id);
materialization_invalidation_log_delete(form->mat_hypertable_id);
Expand Down
1 change: 0 additions & 1 deletion src/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ extern TSDLLEXPORT ContinuousAggHypertableStatus
ts_continuous_agg_hypertable_status(int32 hypertable_id);
extern TSDLLEXPORT List *ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id);
TSDLLEXPORT
int64 ts_continuous_agg_get_completed_threshold(int32 materialization_id);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char *schema,
const char *name,
ContinuousAggViewType type);
Expand Down
3 changes: 1 addition & 2 deletions test/expected/drop_rename_hypertable.out
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ SELECT * FROM _timescaledb_catalog.hypertable;
_timescaledb_catalog | compression_algorithm | table | super_user
_timescaledb_catalog | compression_chunk_size | table | super_user
_timescaledb_catalog | continuous_agg | table | super_user
_timescaledb_catalog | continuous_aggs_completed_threshold | table | super_user
_timescaledb_catalog | continuous_aggs_hypertable_invalidation_log | table | super_user
_timescaledb_catalog | continuous_aggs_invalidation_threshold | table | super_user
_timescaledb_catalog | continuous_aggs_materialization_invalidation_log | table | super_user
Expand All @@ -213,7 +212,7 @@ SELECT * FROM _timescaledb_catalog.hypertable;
_timescaledb_catalog | metadata | table | super_user
_timescaledb_catalog | remote_txn | table | super_user
_timescaledb_catalog | tablespace | table | super_user
(19 rows)
(18 rows)

\dt "_timescaledb_internal".*
List of relations
Expand Down
1 change: 0 additions & 1 deletion tsl/test/expected/continuous_aggs_usage.out
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ view_definition | SELECT time_bucket('@ 1 hour'::interval, device_re
SELECT * FROM timescaledb_information.continuous_aggregate_stats;
-[ RECORD 1 ]----------+-----------------------------
view_name | device_summary
completed_threshold |
invalidation_threshold | Sun Dec 30 22:00:00 2018 PST
job_id |
last_run_started_at |
Expand Down

0 comments on commit ab124e2

Please sign in to comment.