Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename continuous aggregate policy API (+restructure) #2368

Merged
merged 2 commits into from Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions sql/CMakeLists.txt
Expand Up @@ -51,6 +51,9 @@ set(SOURCE_FILES
restoring.sql
timescaledb_fdw.sql
remote_txn.sql
job_api.sql
policy_api.sql
policy_internal.sql
)

# These files should be pre-pended to update scripts so that they are
Expand Down
93 changes: 0 additions & 93 deletions sql/bgw_scheduler.sql
Expand Up @@ -20,96 +20,3 @@ LANGUAGE C VOLATILE;
INSERT INTO _timescaledb_config.bgw_job (id, application_name, schedule_interval, max_runtime, max_retries, retry_period, proc_schema, proc_name, owner, scheduled) VALUES
(1, 'Telemetry Reporter [1]', INTERVAL '24h', INTERVAL '100s', -1, INTERVAL '1h', '_timescaledb_internal', 'policy_telemetry', CURRENT_ROLE, true)
ON CONFLICT (id) DO NOTHING;

CREATE OR REPLACE FUNCTION add_job(
proc REGPROC,
schedule_interval INTERVAL,
config JSONB DEFAULT NULL,
initial_start TIMESTAMPTZ DEFAULT NULL,
scheduled BOOL DEFAULT true
) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_job_add' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION delete_job(job_id INTEGER) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_job_delete' LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE PROCEDURE run_job(job_id INTEGER) AS '@MODULE_PATHNAME@', 'ts_job_run' LANGUAGE C;

-- Add a retention policy to a hypertable or continuous aggregate.
-- The retention_window (typically an INTERVAL) determines the
-- window beyond which data is dropped at the time
-- of execution of the policy (e.g., '1 week'). Note that the retention
-- window will always align with chunk boundaries, thus the window
-- might be larger than the given one, but never smaller. In other
-- words, some data beyond the retention window
-- might be kept, but data within the window will never be deleted.
CREATE OR REPLACE FUNCTION add_retention_policy(
hypertable REGCLASS,
retention_window "any",
if_not_exists BOOL = false
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION remove_retention_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_retention_remove'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_retention(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_retention_proc'
LANGUAGE C;

/* reorder policy */
CREATE OR REPLACE FUNCTION add_reorder_policy(hypertable REGCLASS, index_name NAME, if_not_exists BOOL = false) RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION remove_reorder_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_remove'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_reorder(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc'
LANGUAGE C;

/* compression policy */
CREATE OR REPLACE FUNCTION add_compression_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION remove_compression_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policy_compression_remove'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc'
LANGUAGE C;

/* continuous aggregates policy */
CREATE OR REPLACE FUNCTION add_refresh_continuous_aggregate_policy(continuous_aggregate REGCLASS, start_interval "any", end_interval "any" , schedule_interval INTERVAL, if_not_exists BOOL = false)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
LANGUAGE C VOLATILE ;

CREATE OR REPLACE FUNCTION remove_refresh_continuous_aggregate_policy(continuous_aggregate REGCLASS, if_not_exists BOOL = false)
RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_remove'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;

-- Returns the updated job schedule values
CREATE OR REPLACE FUNCTION alter_job(
job_id INTEGER,
schedule_interval INTERVAL = NULL,
max_runtime INTERVAL = NULL,
max_retries INTEGER = NULL,
retry_period INTERVAL = NULL,
scheduled BOOL = NULL,
config JSONB = NULL,
next_start TIMESTAMPTZ = NULL,
if_exists BOOL = FALSE
)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB, next_start TIMESTAMPTZ)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;
30 changes: 30 additions & 0 deletions sql/job_api.sql
@@ -0,0 +1,30 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

CREATE OR REPLACE FUNCTION add_job(
proc REGPROC,
schedule_interval INTERVAL,
config JSONB DEFAULT NULL,
initial_start TIMESTAMPTZ DEFAULT NULL,
scheduled BOOL DEFAULT true
) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_job_add' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION delete_job(job_id INTEGER) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_job_delete' LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE PROCEDURE run_job(job_id INTEGER) AS '@MODULE_PATHNAME@', 'ts_job_run' LANGUAGE C;

-- Returns the updated job schedule values
CREATE OR REPLACE FUNCTION alter_job(
job_id INTEGER,
schedule_interval INTERVAL = NULL,
max_runtime INTERVAL = NULL,
max_retries INTEGER = NULL,
retry_period INTERVAL = NULL,
scheduled BOOL = NULL,
config JSONB = NULL,
next_start TIMESTAMPTZ = NULL,
if_exists BOOL = FALSE
)
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB, next_start TIMESTAMPTZ)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;
53 changes: 53 additions & 0 deletions sql/policy_api.sql
@@ -0,0 +1,53 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

-- Add a retention policy to a hypertable or continuous aggregate.
-- The retention_window (typically an INTERVAL) determines the
-- window beyond which data is dropped at the time
-- of execution of the policy (e.g., '1 week'). Note that the retention
-- window will always align with chunk boundaries, thus the window
-- might be larger than the given one, but never smaller. In other
-- words, some data beyond the retention window
-- might be kept, but data within the window will never be deleted.
CREATE OR REPLACE FUNCTION add_retention_policy(
hypertable REGCLASS,
retention_window "any",
if_not_exists BOOL = false
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION remove_retention_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_retention_remove'
LANGUAGE C VOLATILE STRICT;

/* reorder policy */
CREATE OR REPLACE FUNCTION add_reorder_policy(hypertable REGCLASS, index_name NAME, if_not_exists BOOL = false) RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION remove_reorder_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_remove'
LANGUAGE C VOLATILE STRICT;

/* compression policy */
CREATE OR REPLACE FUNCTION add_compression_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION remove_compression_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policy_compression_remove'
LANGUAGE C VOLATILE STRICT;

/* continuous aggregates policy */
CREATE OR REPLACE FUNCTION add_continuous_aggregate_policy(continuous_aggregate REGCLASS, start_interval "any", end_interval "any", schedule_interval INTERVAL, if_not_exists BOOL = false)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION remove_continuous_aggregate_policy(continuous_aggregate REGCLASS, if_not_exists BOOL = false)
RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_remove'
LANGUAGE C VOLATILE STRICT;
19 changes: 19 additions & 0 deletions sql/policy_internal.sql
@@ -0,0 +1,19 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_retention(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_retention_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_reorder(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc'
LANGUAGE C;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;
8 changes: 4 additions & 4 deletions test/expected/extension.out
Expand Up @@ -17,13 +17,13 @@ WHERE oid IN (
AND classid = 'pg_catalog.pg_proc'::regclass)
AND pronamespace = 'public'::regnamespace
ORDER BY proname;
proname
--------------------------------------------
proname
------------------------------------
add_compression_policy
add_continuous_aggregate_policy
add_data_node
add_dimension
add_job
add_refresh_continuous_aggregate_policy
add_reorder_policy
add_retention_policy
allow_new_chunks
Expand Down Expand Up @@ -58,7 +58,7 @@ WHERE oid IN (
move_chunk
refresh_continuous_aggregate
remove_compression_policy
remove_refresh_continuous_aggregate_policy
remove_continuous_aggregate_policy
remove_reorder_policy
remove_retention_policy
reorder_chunk
Expand Down
2 changes: 1 addition & 1 deletion test/sql/updates/setup.continuous_aggs.sql
Expand Up @@ -119,7 +119,7 @@ BEGIN
FROM conditions_before
GROUP BY bucket, location
HAVING min(location) >= 'NYC' and avg(temperature) > 2 WITH NO DATA;
PERFORM add_refresh_continuous_aggregate_policy('mat_before', NULL, '-30 days'::interval, '336 h');
PERFORM add_continuous_aggregate_policy('mat_before', NULL, '-30 days'::interval, '336 h');

END IF;
END $$;
Expand Down
4 changes: 2 additions & 2 deletions test/sql/updates/setup.continuous_aggs.v2.sql
Expand Up @@ -120,7 +120,7 @@ BEGIN
FROM conditions_before
GROUP BY bucket, location
HAVING min(location) >= 'NYC' and avg(temperature) > 2 WITH NO DATA;
PERFORM add_refresh_continuous_aggregate_policy('mat_before', NULL, '-30 days'::interval, '336 h');
PERFORM add_continuous_aggregate_policy('mat_before', NULL, '-30 days'::interval, '336 h');
END IF;
END $$;

Expand Down Expand Up @@ -225,7 +225,7 @@ BEGIN
FROM conditions_before
GROUP BY bucket, location
HAVING min(location) >= 'NYC' and avg(temperature) > 2 WITH NO DATA;
PERFORM add_refresh_continuous_aggregate_policy('cagg.realtime_mat', NULL, '-30 days'::interval, '336 h');
PERFORM add_continuous_aggregate_policy('cagg.realtime_mat', NULL, '-30 days'::interval, '336 h');
END IF;
END $$;

Expand Down
8 changes: 4 additions & 4 deletions tsl/test/expected/compression_ddl.out
Expand Up @@ -509,10 +509,10 @@ CREATE MATERIALIZED VIEW test1_cont_view WITH (timescaledb.continuous, timescale
AS SELECT time_bucket('1 hour', "Time"), SUM(i)
FROM test1
GROUP BY 1 WITH NO DATA;
SELECT add_refresh_continuous_aggregate_policy('test1_cont_view', NULL, '1 hour'::interval, '1 day'::interval);
add_refresh_continuous_aggregate_policy
-----------------------------------------
1001
SELECT add_continuous_aggregate_policy('test1_cont_view', NULL, '1 hour'::interval, '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1001
(1 row)

REFRESH MATERIALIZED VIEW test1_cont_view;
Expand Down
32 changes: 16 additions & 16 deletions tsl/test/expected/continuous_aggs.out
Expand Up @@ -49,10 +49,10 @@ select a, count(b)
from foo
group by time_bucket(1, a), a WITH NO DATA;
NOTICE: adding index _materialized_hypertable_2_a_time_partition_col_idx ON _timescaledb_internal._materialized_hypertable_2 USING BTREE(a, time_partition_col)
SELECT add_refresh_continuous_aggregate_policy('mat_m1', NULL, 2::integer, '12 h'::interval);
add_refresh_continuous_aggregate_policy
-----------------------------------------
1000
SELECT add_continuous_aggregate_policy('mat_m1', NULL, 2::integer, '12 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1000
(1 row)

SELECT * FROM _timescaledb_config.bgw_job;
Expand Down Expand Up @@ -868,10 +868,10 @@ group by time_bucket('1day', timec), location, humidity, temperature WITH NO DAT
NOTICE: adding index _materialized_hypertable_20_grp_5_5_timec_idx ON _timescaledb_internal._materialized_hypertable_20 USING BTREE(grp_5_5, timec)
NOTICE: adding index _materialized_hypertable_20_grp_6_6_timec_idx ON _timescaledb_internal._materialized_hypertable_20 USING BTREE(grp_6_6, timec)
NOTICE: adding index _materialized_hypertable_20_grp_7_7_timec_idx ON _timescaledb_internal._materialized_hypertable_20 USING BTREE(grp_7_7, timec)
SELECT add_refresh_continuous_aggregate_policy('mat_with_test', NULL, '5 h'::interval, '12 h'::interval);
add_refresh_continuous_aggregate_policy
-----------------------------------------
1001
SELECT add_continuous_aggregate_policy('mat_with_test', NULL, '5 h'::interval, '12 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1001
(1 row)

SELECT alter_job(id, schedule_interval => '1h') FROM _timescaledb_config.bgw_job;
Expand Down Expand Up @@ -974,10 +974,10 @@ as
select time_bucket(100, timec), min(location), sum(temperature),sum(humidity)
from conditions
group by time_bucket(100, timec) WITH NO DATA;
SELECT add_refresh_continuous_aggregate_policy('mat_with_test', NULL, 500::integer, '12 h'::interval);
add_refresh_continuous_aggregate_policy
-----------------------------------------
1002
SELECT add_continuous_aggregate_policy('mat_with_test', NULL, 500::integer, '12 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1002
(1 row)

SELECT alter_job(id, schedule_interval => '2h') FROM _timescaledb_config.bgw_job;
Expand Down Expand Up @@ -1260,10 +1260,10 @@ select * from conditions_grpby_view order by 1, 2;
200 | 75
(4 rows)

CREATE MATERIALIZED VIEW conditions_grpby_view2 with (timescaledb.continuous, timescaledb.refresh_lag = '-200') as
CREATE MATERIALIZED VIEW conditions_grpby_view2 with (timescaledb.continuous, timescaledb.refresh_lag = '-200') as
select time_bucket(100, timec), sum(humidity)
from conditions
group by time_bucket(100, timec), location
group by time_bucket(100, timec), location
having avg(temperature) > 0
WITH NO DATA;
NOTICE: adding index _materialized_hypertable_31_grp_3_3_time_bucket_idx ON _timescaledb_internal._materialized_hypertable_31 USING BTREE(grp_3_3, time_bucket)
Expand Down Expand Up @@ -1345,7 +1345,7 @@ SELECT * FROM mat_test5;
--verify that watermark is limited by max value and not by
-- the current time (now value)--
SET timescaledb.current_timestamp_mock = '2018-05-11';
SELECT view_name, completed_threshold, invalidation_threshold
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'mat_test5';
view_name | completed_threshold | invalidation_threshold
Expand All @@ -1357,7 +1357,7 @@ REFRESH MATERIALIZED VIEW mat_test5;
LOG: new materialization range not found for public.conditions (time column time): not enough new data past completion threshold of Sat Mar 10 16:00:00 2001 PST as of Fri May 11 00:00:00 2018 PDT
LOG: materializing continuous aggregate public.mat_test5: nothing to invalidate, no new range
LOG: materializing continuous aggregate public.mat_test5: no new range to materialize or invalidations found, exiting early
SELECT view_name, completed_threshold, invalidation_threshold
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'mat_test5';
view_name | completed_threshold | invalidation_threshold
Expand Down