Skip to content

Commit

Permalink
Add invalidations for incomplete aggregates
Browse files Browse the repository at this point in the history
As part of the 2.0 continous aggregate changes, we are removing the
continuous_aggs_completed_threshold table.  However, this may result
in currently running aggregates being considered complete even if
their completed threshold hadn't reached the invalidation threshold.
This change fixes this by adding an entry to the invalidation log
for any such aggregates.

Fixes #2314
  • Loading branch information
Brian Rowe committed Sep 18, 2020
1 parent 5179447 commit bb63ea0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 1 deletion.
2 changes: 1 addition & 1 deletion scripts/test_updates.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ do
docker rmi -f ${UPDATE_TO_IMAGE}:${UPDATE_TO_TAG}
;;
d)
echo "Keeping temporary directory"
echo "Keeping temporary directory ${TEST_TMPDIR}"
KEEP_TEMP_DIRS=true
TEST_UPDATE_FROM_TAGS_EXTRA_ARGS="-d"
;;
Expand Down
5 changes: 5 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_job', 'WHERE
GRANT SELECT ON _timescaledb_config.bgw_job TO PUBLIC;
GRANT SELECT ON _timescaledb_config.bgw_job_id_seq TO PUBLIC;

-- set up future invalidation for current caggs
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
SELECT materialization_id, BIGINT '-9223372036854775808', watermark, 9223372036854775807
FROM _timescaledb_catalog.continuous_aggs_completed_threshold;

-- drop completed_threshold table, which is no longer used
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_completed_threshold;
DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_completed_threshold;
Expand Down
4 changes: 4 additions & 0 deletions test/sql/updates/post.continuous_aggs.v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ SELECT * FROM cagg.realtime_mat ORDER BY bucket, location;
SELECT view_name, refresh_interval, materialized_only, materialization_hypertable FROM timescaledb_information.continuous_aggregates ORDER BY view_name::text;

SELECT maxtemp FROM mat_ignoreinval ORDER BY 1;

SELECT count(*) FROM mat_inval;
REFRESH MATERIALIZED VIEW mat_inval;
SELECT count(*) FROM mat_inval;
48 changes: 48 additions & 0 deletions test/sql/updates/setup.continuous_aggs.v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,51 @@ BEGIN
END $$;

REFRESH MATERIALIZED VIEW mat_ignoreinval;

-- test proper invalidations after update
CREATE TABLE inval_test (time TIMESTAMPTZ, location TEXT, temperature DOUBLE PRECISION);
SELECT create_hypertable('inval_test', 'time', chunk_time_interval => INTERVAL '1 hour');

INSERT INTO inval_test
SELECT tm.tval, loc.lval, temp
FROM (SELECT generate_series(now() - '4 hours'::interval, now(), '30 seconds') AS tval) tm
CROSS JOIN (select 'loc_' || generate_series(1,3,1) AS lval) loc,
generate_series(50.0, 70.0, 1443) temp;

DO LANGUAGE PLPGSQL $$
DECLARE
ts_version TEXT;
BEGIN
SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb';

IF ts_version < '2.0.0' THEN
CREATE VIEW mat_inval
WITH ( timescaledb.continuous, timescaledb.materialized_only=true,
timescaledb.refresh_lag='-2 hour',
timescaledb.ignore_invalidation_older_than = '24 hour',
timescaledb.refresh_interval='20 minutes' )
AS
SELECT time_bucket('10 minute', time) as bucket, location, min(temperature) as min_temp,
max(temperature) as max_temp, avg(temperature) as avg_temp
FROM inval_test
GROUP BY bucket, location;
ELSE
CREATE MATERIALIZED VIEW mat_inval
WITH ( timescaledb.continuous, timescaledb.materialized_only=true )
AS
SELECT time_bucket('10 minute', time) as bucket, location, min(temperature) as min_temp,
max(temperature) as max_temp, avg(temperature) as avg_temp
FROM inval_test
GROUP BY bucket, location WITH NO DATA;

PERFORM add_continuous_aggregate_policy('mat_inval', '1 day'::interval, '-2 hours'::interval, '20 minutes');
END IF;
END $$;

REFRESH MATERIALIZED VIEW mat_inval;

INSERT INTO inval_test
SELECT tm.tval, loc.lval, temp
FROM (SELECT generate_series(now() + '30 seconds'::interval, now() + '4 hours'::interval, '30 seconds') AS tval) tm
CROSS JOIN (select 'loc_' || generate_series(1,3,1) AS lval) loc,
generate_series(70.0, 90.0, 1440) temp;

0 comments on commit bb63ea0

Please sign in to comment.