Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: make add_continuous_aggregate_policy() work with variable-sized buckets #4040

Merged
merged 1 commit into from Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion tsl/src/bgw_policy/continuous_aggregate_api.c
Expand Up @@ -445,7 +445,39 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
else
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);

bucket_width = ts_continuous_agg_bucket_width(cagg);
if (ts_continuous_agg_bucket_width_variable(cagg))
{
/*
* There are several cases of variable-sized buckets:
* 1. Monthly buckets
* 2. Buckets with timezones
* 3. Cases 1 and 2 at the same time
*
* For months we simply take 31 days as the worst case scenario and
* multiply this number by the number of months in the bucket. This
* reduces the task to days/hours/minutes scenario.
*
* Days/hours/minutes case is handled the same way as for fixed-sized
* buckets. The refresh window at least two buckets in size is adequate
* for such corner cases as DST.
*/

/* bucket_function should always be specified for variable-sized buckets */
Assert(cagg->bucket_function != NULL);
/* ... and bucket_function->bucket_width too */
Assert(cagg->bucket_function->bucket_width != NULL);

/* Make a temporary copy of bucket_width */
Interval interval = *cagg->bucket_function->bucket_width;
interval.day += 31 * interval.month;
interval.month = 0;
bucket_width = ts_interval_value_to_internal(IntervalPGetDatum(&interval), INTERVALOID);
}
else
{
bucket_width = ts_continuous_agg_bucket_width(cagg);
}

if (ts_time_saturating_add(end_offset, bucket_width * 2, INT8OID) > start_offset)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down
63 changes: 63 additions & 0 deletions tsl/test/expected/cagg_monthly.out
Expand Up @@ -1250,3 +1250,66 @@ ORDER by month, city;
Moscow | 2021-06-01 | 22 | 34
(1 row)

-- Make sure add_continuous_aggregate_policy() works
CREATE TABLE conditions_policy(
day DATE NOT NULL,
city text NOT NULL,
temperature INT NOT NULL);
SELECT create_hypertable(
'conditions_policy', 'day',
chunk_time_interval => INTERVAL '1 day'
);
create_hypertable
---------------------------------
(21,public,conditions_policy,t)
(1 row)

INSERT INTO conditions_policy (day, city, temperature) VALUES
('2021-06-14', 'Moscow', 26),
('2021-06-15', 'Moscow', 22),
('2021-06-16', 'Moscow', 24),
('2021-06-17', 'Moscow', 24),
('2021-06-18', 'Moscow', 27),
('2021-06-19', 'Moscow', 28),
('2021-06-20', 'Moscow', 30),
('2021-06-21', 'Moscow', 31),
('2021-06-22', 'Moscow', 34),
('2021-06-23', 'Moscow', 34),
('2021-06-24', 'Moscow', 34),
('2021-06-25', 'Moscow', 32),
('2021-06-26', 'Moscow', 32),
('2021-06-27', 'Moscow', 31);
CREATE MATERIALIZED VIEW conditions_summary_policy
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT city,
timescaledb_experimental.time_bucket_ng('1 month', day) AS bucket,
MIN(temperature),
MAX(temperature)
FROM conditions_policy
GROUP BY city, bucket;
NOTICE: refreshing continuous aggregate "conditions_summary_policy"
SELECT * FROM conditions_summary_policy;
city | bucket | min | max
--------+------------+-----+-----
Moscow | 06-01-2021 | 22 | 34
(1 row)

\set ON_ERROR_STOP 0
-- Check for "policy refresh window too small" error
SELECT add_continuous_aggregate_policy('conditions_summary_policy',
-- Historically, 1 month is just a synonym to 30 days here.
-- See interval_to_int64() and interval_to_int128().
start_offset => INTERVAL '2 months',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
ERROR: policy refresh window too small
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('conditions_summary_policy',
start_offset => INTERVAL '65 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
add_continuous_aggregate_policy
---------------------------------
1000
(1 row)

68 changes: 68 additions & 0 deletions tsl/test/expected/cagg_with_timezone.out
Expand Up @@ -838,3 +838,71 @@ NOTICE: drop cascades to 3 other objects
DROP DATABASE :DATA_NODE_1;
DROP DATABASE :DATA_NODE_2;
DROP DATABASE :DATA_NODE_3;
-- Make sure add_continuous_aggregate_policy() works
CREATE TABLE conditions_policy(
day TIMESTAMPTZ NOT NULL,
city text NOT NULL,
temperature INT NOT NULL);
SELECT create_hypertable(
'conditions_policy', 'day',
chunk_time_interval => INTERVAL '1 day'
);
create_hypertable
---------------------------------
(14,public,conditions_policy,t)
(1 row)

INSERT INTO conditions_policy (day, city, temperature) VALUES
('2021-06-14 00:00:00 MSK', 'Moscow', 26),
('2021-06-14 10:00:00 MSK', 'Moscow', 22),
('2021-06-14 20:00:00 MSK', 'Moscow', 24),
('2021-06-15 00:00:00 MSK', 'Moscow', 24),
('2021-06-15 10:00:00 MSK', 'Moscow', 27),
('2021-06-15 20:00:00 MSK', 'Moscow', 28),
('2021-06-16 00:00:00 MSK', 'Moscow', 30),
('2021-06-16 10:00:00 MSK', 'Moscow', 31),
('2021-06-16 20:00:00 MSK', 'Moscow', 34),
('2021-06-17 00:00:00 MSK', 'Moscow', 34),
('2021-06-17 10:00:00 MSK', 'Moscow', 34),
('2021-06-17 20:00:00 MSK', 'Moscow', 32),
('2021-06-18 00:00:00 MSK', 'Moscow', 32),
('2021-06-18 10:00:00 MSK', 'Moscow', 31),
('2021-06-18 20:00:00 MSK', 'Moscow', 26);
CREATE MATERIALIZED VIEW conditions_summary_policy
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT city,
timescaledb_experimental.time_bucket_ng('1 day', day, 'Europe/Moscow') AS bucket,
MIN(temperature),
MAX(temperature)
FROM conditions_policy
GROUP BY city, bucket;
NOTICE: refreshing continuous aggregate "conditions_summary_policy"
SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') as month, min, max
FROM conditions_summary_policy
ORDER by month, city;
city | month | min | max
--------+---------------------+-----+-----
Moscow | 2021-06-14 00:00:00 | 22 | 26
Moscow | 2021-06-15 00:00:00 | 24 | 28
Moscow | 2021-06-16 00:00:00 | 30 | 34
Moscow | 2021-06-17 00:00:00 | 32 | 34
Moscow | 2021-06-18 00:00:00 | 26 | 32
(5 rows)

\set ON_ERROR_STOP 0
-- Check for "policy refresh window too small" error
SELECT add_continuous_aggregate_policy('conditions_summary_policy',
start_offset => INTERVAL '2 days 23 hours',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
ERROR: policy refresh window too small
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('conditions_summary_policy',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
add_continuous_aggregate_policy
---------------------------------
1000
(1 row)

54 changes: 54 additions & 0 deletions tsl/test/sql/cagg_monthly.sql
Expand Up @@ -555,3 +555,57 @@ CALL refresh_continuous_aggregate('conditions_summary_empty', '2021-06-01', '202
SELECT city, to_char(bucket, 'YYYY-MM-DD') AS month, min, max
FROM conditions_summary_empty
ORDER by month, city;

-- Make sure add_continuous_aggregate_policy() works

CREATE TABLE conditions_policy(
day DATE NOT NULL,
city text NOT NULL,
temperature INT NOT NULL);

SELECT create_hypertable(
'conditions_policy', 'day',
chunk_time_interval => INTERVAL '1 day'
);

INSERT INTO conditions_policy (day, city, temperature) VALUES
('2021-06-14', 'Moscow', 26),
('2021-06-15', 'Moscow', 22),
('2021-06-16', 'Moscow', 24),
('2021-06-17', 'Moscow', 24),
('2021-06-18', 'Moscow', 27),
('2021-06-19', 'Moscow', 28),
('2021-06-20', 'Moscow', 30),
('2021-06-21', 'Moscow', 31),
('2021-06-22', 'Moscow', 34),
('2021-06-23', 'Moscow', 34),
('2021-06-24', 'Moscow', 34),
('2021-06-25', 'Moscow', 32),
('2021-06-26', 'Moscow', 32),
('2021-06-27', 'Moscow', 31);

CREATE MATERIALIZED VIEW conditions_summary_policy
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT city,
timescaledb_experimental.time_bucket_ng('1 month', day) AS bucket,
MIN(temperature),
MAX(temperature)
FROM conditions_policy
GROUP BY city, bucket;

SELECT * FROM conditions_summary_policy;

\set ON_ERROR_STOP 0
-- Check for "policy refresh window too small" error
SELECT add_continuous_aggregate_policy('conditions_summary_policy',
-- Historically, 1 month is just a synonym to 30 days here.
-- See interval_to_int64() and interval_to_int128().
start_offset => INTERVAL '2 months',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
\set ON_ERROR_STOP 1

SELECT add_continuous_aggregate_policy('conditions_summary_policy',
start_offset => INTERVAL '65 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
55 changes: 55 additions & 0 deletions tsl/test/sql/cagg_with_timezone.sql
Expand Up @@ -545,3 +545,58 @@ DROP TABLE conditions_dist CASCADE;
DROP DATABASE :DATA_NODE_1;
DROP DATABASE :DATA_NODE_2;
DROP DATABASE :DATA_NODE_3;

-- Make sure add_continuous_aggregate_policy() works

CREATE TABLE conditions_policy(
day TIMESTAMPTZ NOT NULL,
city text NOT NULL,
temperature INT NOT NULL);

SELECT create_hypertable(
'conditions_policy', 'day',
chunk_time_interval => INTERVAL '1 day'
);

INSERT INTO conditions_policy (day, city, temperature) VALUES
('2021-06-14 00:00:00 MSK', 'Moscow', 26),
('2021-06-14 10:00:00 MSK', 'Moscow', 22),
('2021-06-14 20:00:00 MSK', 'Moscow', 24),
('2021-06-15 00:00:00 MSK', 'Moscow', 24),
('2021-06-15 10:00:00 MSK', 'Moscow', 27),
('2021-06-15 20:00:00 MSK', 'Moscow', 28),
('2021-06-16 00:00:00 MSK', 'Moscow', 30),
('2021-06-16 10:00:00 MSK', 'Moscow', 31),
('2021-06-16 20:00:00 MSK', 'Moscow', 34),
('2021-06-17 00:00:00 MSK', 'Moscow', 34),
('2021-06-17 10:00:00 MSK', 'Moscow', 34),
('2021-06-17 20:00:00 MSK', 'Moscow', 32),
('2021-06-18 00:00:00 MSK', 'Moscow', 32),
('2021-06-18 10:00:00 MSK', 'Moscow', 31),
('2021-06-18 20:00:00 MSK', 'Moscow', 26);

CREATE MATERIALIZED VIEW conditions_summary_policy
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT city,
timescaledb_experimental.time_bucket_ng('1 day', day, 'Europe/Moscow') AS bucket,
MIN(temperature),
MAX(temperature)
FROM conditions_policy
GROUP BY city, bucket;

SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') as month, min, max
FROM conditions_summary_policy
ORDER by month, city;

\set ON_ERROR_STOP 0
-- Check for "policy refresh window too small" error
SELECT add_continuous_aggregate_policy('conditions_summary_policy',
start_offset => INTERVAL '2 days 23 hours',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
\set ON_ERROR_STOP 1

SELECT add_continuous_aggregate_policy('conditions_summary_policy',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');