Skip to content

Commit

Permalink
Populate CAgg bucket catalog table for all CAggs
Browse files Browse the repository at this point in the history
This changes the behavior of the CAgg catalog tables. From now on, all
CAggs that use a time_bucket function create an entry in the catalog
table continuous_aggs_bucket_function. In addition, the duplicate
bucket_width attribute is removed from the catalog table continuous_agg.
  • Loading branch information
jnidzwetzki committed Mar 13, 2024
1 parent 8433e3b commit 8dcb6ee
Show file tree
Hide file tree
Showing 35 changed files with 710 additions and 373 deletions.
1 change: 0 additions & 1 deletion sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ CREATE TABLE _timescaledb_catalog.continuous_agg (
user_view_name name NOT NULL,
partial_view_schema name NOT NULL,
partial_view_name name NOT NULL,
bucket_width bigint NOT NULL,
direct_view_schema name NOT NULL,
direct_view_name name NOT NULL,
materialized_only bool NOT NULL DEFAULT FALSE,
Expand Down
146 changes: 146 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,149 @@ BEGIN
END LOOP;
END
$$;

--
-- Rebuild the catalog table `_timescaledb_catalog.continuous_agg`
--

-- (1) Create missing entries in _timescaledb_catalog.continuous_aggs_bucket_function
CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_get_bucket_function(
mat_hypertable_id INTEGER
) RETURNS regprocedure AS '@MODULE_PATHNAME@', 'ts_continuous_agg_get_bucket_function' LANGUAGE C STRICT VOLATILE;

-- Make sure function points to the new version of TSDB
CREATE OR REPLACE FUNCTION _timescaledb_functions.to_interval(unixtime_us BIGINT) RETURNS INTERVAL
AS '@MODULE_PATHNAME@', 'ts_pg_unix_microseconds_to_interval' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;

-- We need to create entries in continuous_aggs_bucket_function for all CAggs that were treated so far
-- as fixed indicated by a bucket_width != -1
INSERT INTO _timescaledb_catalog.continuous_aggs_bucket_function
SELECT
mat_hypertable_id,
_timescaledb_functions.cagg_get_bucket_function(mat_hypertable_id),
-- Intervals needs to be converted into the proper interval format
-- Function name could be prefixed with 'public.'. Therefore LIKE instead of starts_with is used
CASE WHEN _timescaledb_functions.cagg_get_bucket_function(mat_hypertable_id)::text LIKE '%time_bucket(interval,%' THEN
_timescaledb_functions.to_interval(bucket_width)::text
ELSE
bucket_width::text
END,
NULL, -- bucket_origin
NULL, -- bucket_offset
NULL, -- bucket_timezone
true -- bucket_fixed_width
FROM _timescaledb_catalog.continuous_agg WHERE bucket_width != -1;

ALTER EXTENSION timescaledb DROP FUNCTION _timescaledb_functions.cagg_get_bucket_function(INTEGER);
DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_get_bucket_function(INTEGER);

-- (2) Rebuild catalog table
DROP VIEW IF EXISTS timescaledb_experimental.policies;
DROP VIEW IF EXISTS timescaledb_information.hypertables;
DROP VIEW IF EXISTS timescaledb_information.continuous_aggregates;

DROP PROCEDURE IF EXISTS @extschema@.cagg_migrate (REGCLASS, BOOLEAN, BOOLEAN);
DROP FUNCTION IF EXISTS _timescaledb_internal.cagg_migrate_pre_validation (TEXT, TEXT, TEXT);
DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_migrate_pre_validation (TEXT, TEXT, TEXT);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_create_plan (_timescaledb_catalog.continuous_agg, TEXT, BOOLEAN, BOOLEAN);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_create_plan (_timescaledb_catalog.continuous_agg, TEXT, BOOLEAN, BOOLEAN);

DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_migrate_plan_exists (INTEGER);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_plan (_timescaledb_catalog.continuous_agg);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_plan (_timescaledb_catalog.continuous_agg);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_create_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_create_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_disable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_disable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_enable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_enable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_copy_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_copy_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_refresh_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_refresh_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_copy_data (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_copy_data (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_override_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_override_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_drop_old_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_drop_old_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);

ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
DROP CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey;

ALTER TABLE _timescaledb_catalog.continuous_aggs_watermark
DROP CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey;

ALTER EXTENSION timescaledb
DROP TABLE _timescaledb_catalog.continuous_agg;

CREATE TABLE _timescaledb_catalog._tmp_continuous_agg AS
SELECT
mat_hypertable_id,
raw_hypertable_id,
parent_mat_hypertable_id,
user_view_schema,
user_view_name,
partial_view_schema,
partial_view_name,
direct_view_schema,
direct_view_name,
materialized_only,
finalized
FROM
_timescaledb_catalog.continuous_agg
ORDER BY
mat_hypertable_id;

DROP TABLE _timescaledb_catalog.continuous_agg;

CREATE TABLE _timescaledb_catalog.continuous_agg (
mat_hypertable_id integer NOT NULL,
raw_hypertable_id integer NOT NULL,
parent_mat_hypertable_id integer,
user_view_schema name NOT NULL,
user_view_name name NOT NULL,
partial_view_schema name NOT NULL,
partial_view_name name NOT NULL,
direct_view_schema name NOT NULL,
direct_view_name name NOT NULL,
materialized_only bool NOT NULL DEFAULT FALSE,
finalized bool NOT NULL DEFAULT TRUE,
-- table constraints
CONSTRAINT continuous_agg_pkey PRIMARY KEY (mat_hypertable_id),
CONSTRAINT continuous_agg_partial_view_schema_partial_view_name_key UNIQUE (partial_view_schema, partial_view_name),
CONSTRAINT continuous_agg_user_view_schema_user_view_name_key UNIQUE (user_view_schema, user_view_name),
CONSTRAINT continuous_agg_mat_hypertable_id_fkey
FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE,
CONSTRAINT continuous_agg_raw_hypertable_id_fkey
FOREIGN KEY (raw_hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE,
CONSTRAINT continuous_agg_parent_mat_hypertable_id_fkey
FOREIGN KEY (parent_mat_hypertable_id)
REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.continuous_agg
SELECT * FROM _timescaledb_catalog._tmp_continuous_agg;
DROP TABLE _timescaledb_catalog._tmp_continuous_agg;

CREATE INDEX continuous_agg_raw_hypertable_id_idx ON _timescaledb_catalog.continuous_agg (raw_hypertable_id);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_agg', '');

GRANT SELECT ON TABLE _timescaledb_catalog.continuous_agg TO PUBLIC;

ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
ADD CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey
FOREIGN KEY (materialization_id)
REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id) ON DELETE CASCADE;

ALTER TABLE _timescaledb_catalog.continuous_aggs_watermark
ADD CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey
FOREIGN KEY (mat_hypertable_id)
REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE;

ANALYZE _timescaledb_catalog.continuous_agg;

--
-- END Rebuild the catalog table `_timescaledb_catalog.continuous_agg`
--
136 changes: 136 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,141 @@
DROP FUNCTION IF EXISTS _timescaledb_functions.remove_dropped_chunk_metadata(INTEGER);


--
-- Rebuild the catalog table `_timescaledb_catalog.continuous_agg`
--
DROP VIEW IF EXISTS timescaledb_experimental.policies;
DROP VIEW IF EXISTS timescaledb_information.hypertables;
DROP VIEW IF EXISTS timescaledb_information.continuous_aggregates;

DROP PROCEDURE IF EXISTS @extschema@.cagg_migrate (REGCLASS, BOOLEAN, BOOLEAN);
DROP FUNCTION IF EXISTS _timescaledb_internal.cagg_migrate_pre_validation (TEXT, TEXT, TEXT);
DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_migrate_pre_validation (TEXT, TEXT, TEXT);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_create_plan (_timescaledb_catalog.continuous_agg, TEXT, BOOLEAN, BOOLEAN);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_create_plan (_timescaledb_catalog.continuous_agg, TEXT, BOOLEAN, BOOLEAN);

DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_migrate_plan_exists (INTEGER);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_plan (_timescaledb_catalog.continuous_agg);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_plan (_timescaledb_catalog.continuous_agg);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_create_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_create_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_disable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_disable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_enable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_enable_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_copy_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_copy_policies (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_refresh_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_refresh_new_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_copy_data (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_copy_data (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_override_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_override_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_execute_drop_old_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);
DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_execute_drop_old_cagg (_timescaledb_catalog.continuous_agg, _timescaledb_catalog.continuous_agg_migrate_plan_step);

ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
DROP CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey;

ALTER TABLE _timescaledb_catalog.continuous_aggs_watermark
DROP CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey;

ALTER EXTENSION timescaledb
DROP TABLE _timescaledb_catalog.continuous_agg;

CREATE TABLE _timescaledb_catalog._tmp_continuous_agg AS
SELECT
mat_hypertable_id,
raw_hypertable_id,
parent_mat_hypertable_id,
user_view_schema,
user_view_name,
partial_view_schema,
partial_view_name,
-1::bigint as bucket_width, -- -1 means variable width. Will be modified soon if not variable.
direct_view_schema,
direct_view_name,
materialized_only,
finalized
FROM
_timescaledb_catalog.continuous_agg
ORDER BY
mat_hypertable_id;

-- Migrate CAggs with fixed bucket on interval back
WITH fixed_buckets AS (
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE bucket_fixed_width = true AND bucket_func::text LIKE '%time_bucket(interval%'
)
UPDATE _timescaledb_catalog._tmp_continuous_agg cagg
SET bucket_width = _timescaledb_functions.interval_to_usec(fb.bucket_width::interval)
FROM fixed_buckets fb
WHERE cagg.mat_hypertable_id = fb.mat_hypertable_id;

-- Migrate CAggs with fixed bucket on integer back
WITH fixed_buckets AS (
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE bucket_fixed_width = true AND bucket_func::text NOT LIKE '%time_bucket(interval%'
)
UPDATE _timescaledb_catalog._tmp_continuous_agg cagg
SET bucket_width = fb.bucket_width::bigint
FROM fixed_buckets fb
WHERE cagg.mat_hypertable_id = fb.mat_hypertable_id;

DELETE FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE bucket_fixed_width = true;

DROP TABLE _timescaledb_catalog.continuous_agg;

CREATE TABLE _timescaledb_catalog.continuous_agg (
mat_hypertable_id integer NOT NULL,
raw_hypertable_id integer NOT NULL,
parent_mat_hypertable_id integer,
user_view_schema name NOT NULL,
user_view_name name NOT NULL,
partial_view_schema name NOT NULL,
partial_view_name name NOT NULL,
bucket_width bigint NOT NULL,
direct_view_schema name NOT NULL,
direct_view_name name NOT NULL,
materialized_only bool NOT NULL DEFAULT FALSE,
finalized bool NOT NULL DEFAULT TRUE,
-- table constraints
CONSTRAINT continuous_agg_pkey PRIMARY KEY (mat_hypertable_id),
CONSTRAINT continuous_agg_partial_view_schema_partial_view_name_key UNIQUE (partial_view_schema, partial_view_name),
CONSTRAINT continuous_agg_user_view_schema_user_view_name_key UNIQUE (user_view_schema, user_view_name),
CONSTRAINT continuous_agg_mat_hypertable_id_fkey
FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE,
CONSTRAINT continuous_agg_raw_hypertable_id_fkey
FOREIGN KEY (raw_hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE,
CONSTRAINT continuous_agg_parent_mat_hypertable_id_fkey
FOREIGN KEY (parent_mat_hypertable_id)
REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.continuous_agg
SELECT * FROM _timescaledb_catalog._tmp_continuous_agg;
DROP TABLE _timescaledb_catalog._tmp_continuous_agg;

CREATE INDEX continuous_agg_raw_hypertable_id_idx ON _timescaledb_catalog.continuous_agg (raw_hypertable_id);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_agg', '');

GRANT SELECT ON TABLE _timescaledb_catalog.continuous_agg TO PUBLIC;

ALTER TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
ADD CONSTRAINT continuous_aggs_materialization_invalid_materialization_id_fkey
FOREIGN KEY (materialization_id)
REFERENCES _timescaledb_catalog.continuous_agg(mat_hypertable_id) ON DELETE CASCADE;

ALTER TABLE _timescaledb_catalog.continuous_aggs_watermark
ADD CONSTRAINT continuous_aggs_watermark_mat_hypertable_id_fkey
FOREIGN KEY (mat_hypertable_id)
REFERENCES _timescaledb_catalog.continuous_agg (mat_hypertable_id) ON DELETE CASCADE;

ANALYZE _timescaledb_catalog.continuous_agg;

--
-- END Rebuild the catalog table `_timescaledb_catalog.continuous_agg`
--

--
-- Rebuild the catalog table `_timescaledb_catalog.continuous_aggs_bucket_function`
--
Expand Down
7 changes: 6 additions & 1 deletion src/func_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,15 @@ initialize_func_info()
ObjectIdGetDatum(namespaceoid));

if (!HeapTupleIsValid(tuple))
elog(ERROR,
{
/* The function cache could be accessed during an extension upgrade. Not all expected
* functions have to exist at this point. */
elog(ts_extension_is_loaded() ? ERROR : NOTICE,
"cache lookup failed for function \"%s\" with %d args",
finfo->funcname,
finfo->nargs);
continue;
}

funcid = proc_get_oid(tuple);

Expand Down
13 changes: 0 additions & 13 deletions src/ts_catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,6 @@ typedef enum Anum_continuous_agg
Anum_continuous_agg_user_view_name,
Anum_continuous_agg_partial_view_schema,
Anum_continuous_agg_partial_view_name,
Anum_continuous_agg_bucket_width,
Anum_continuous_agg_direct_view_schema,
Anum_continuous_agg_direct_view_name,
Anum_continuous_agg_materialize_only,
Expand All @@ -812,18 +811,6 @@ typedef struct FormData_continuous_agg
NameData user_view_name;
NameData partial_view_schema;
NameData partial_view_name;
/*
* bucket_width is BUCKET_WIDTH_VARIABLE (see continuous_agg.h) for buckets
* with variable size - monthly buckets, buckets with timezone, etc. For such
* buckets the information about the bucketing function is stored in
* _timescaledb_catalog.continuous_aggs_bucket_function.
*
* When possible, don't access bucket_width directly. Use corresponding
* procedures instead, such as:
* - ts_continuous_agg_bucket_width_variable
* - ts_continuous_agg_bucket_width
*/
int64 bucket_width;
NameData direct_view_schema;
NameData direct_view_name;
bool materialized_only;
Expand Down
Loading

0 comments on commit 8dcb6ee

Please sign in to comment.