-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
What type of enhancement is this?
Performance
What subsystems and features will be improved?
Continuous aggregate
What does the enhancement do?
I have a hypertable for monitoring sensor data data (device_sensor_id, created, value). I have a cagg that aggregates data into 3-hour buckets, with a null start_offset. I have policy that updates up to 100 batches per invocation, running every 30 minutes.
A small percentage of devices send data for the past, can be data several months old. This causes the data in the cagg to get invalidated. While this happens for a very small number of devices, even a single device sending old data has the result of invalidating the entire cagg for months.
The cagg policy is simply unable to keep up, continuously crunching through just the most recent data, never able to refresh the older data that arrived. Each policy invokation takes 10-20 minutes, and covers ~20 days of data. In the next invokation, frequently the same time range gets re-processed.
I would suggest to improve invalidation tracking for caggs, to allow tracking invalidation per device_sensor_id. This would probably reduce the load of refreshing the cagg by over 1000x.
job_id | 1012
application_name | Refresh Continuous Aggregate Policy [1012]
schedule_interval | 00:30:00
max_runtime | 00:25:00
max_retries | -1
retry_period | 03:00:00
proc_schema | _timescaledb_functions
proc_name | policy_refresh_continuous_aggregate
owner | watchdog2
scheduled | t
fixed_schedule | f
config | {"end_offset": "6 months", "verbose_log": true, "start_offset": null, "buckets_per_batch": 2, "mat_hypertable_id": 5, "max_batches_per_execution": 100}
next_start | 2025-05-15 09:46:15.723041+00
initial_start |
hypertable_schema | _timescaledb_internal
hypertable_name | _materialized_hypertable_5
check_schema | _timescaledb_functions
Table "public.data_rawdata"
Column | Type | Collation | Nullable | Default
------------------+--------------------------+-----------+----------+---------
device_sensor_id | integer | | not null |
value | numeric(19,10) | | not null |
created | timestamp with time zone | | not null |
Indexes:
"data_rawdata_created_idx" btree (created DESC)
"data_rawdata_device_sensor_id_created_idx" UNIQUE, btree (device_sensor_id, created)
select * from timescaledb_information.dimensions where hypertable_name = 'data_rawdata';
-[ RECORD 1 ]-----+-------------------------
hypertable_schema | public
hypertable_name | data_rawdata
dimension_number | 1
column_name | created
column_type | timestamp with time zone
dimension_type | Time
time_interval | 4 days
integer_interval |
integer_now_func |
num_partitions |
\d+ cagg_data_rawdata_threehourly
View "public.cagg_data_rawdata_threehourly"
Column | Type | Collation | Nullable | Default | Storage | Description
------------------+--------------------------+-----------+----------+---------+----------+-------------
device_sensor_id | integer | | | | plain |
created | timestamp with time zone | | | | plain |
value | numeric | | | | main |
pct_agg | uddsketch | | | | extended |
View definition:
SELECT _materialized_hypertable_5.device_sensor_id,
_materialized_hypertable_5.created,
_materialized_hypertable_5.value,
_materialized_hypertable_5.pct_agg
FROM _timescaledb_internal._materialized_hypertable_5
WHERE _materialized_hypertable_5.created < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(5)), '-infinity'::timestamp with time zone)
UNION ALL
SELECT data_rawdata.device_sensor_id,
time_bucket('03:00:00'::interval, data_rawdata.created) AS created,
first(data_rawdata.value, data_rawdata.created) AS value,
percentile_agg(data_rawdata.value::double precision) AS pct_agg
FROM data_rawdata
WHERE data_rawdata.created >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(5)), '-infinity'::timestamp with time zone)
GROUP BY data_rawdata.device_sensor_id, (time_bucket('03:00:00'::interval, data_rawdata.created));
Implementation challenges
Some parameter, maybe during the cagg definition, could define which fields to use for invalidation tracking (device_sensor_id). This may also be automatically inferred from the cagg definition.