Skip to content

Commit

Permalink
Add plan-time chunk exclusion for real-time CAggs
Browse files Browse the repository at this point in the history
  • Loading branch information
jnidzwetzki committed Nov 21, 2023
1 parent a3aec8c commit a067ac9
Show file tree
Hide file tree
Showing 9 changed files with 1,068 additions and 497 deletions.
1 change: 1 addition & 0 deletions .unreleased/enhancement_6325
@@ -0,0 +1 @@
Implements: #6325 Add plan-time chunk exclusion for real-time CAggs
2 changes: 1 addition & 1 deletion sql/util_time.sql
Expand Up @@ -39,7 +39,7 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.time_to_internal(time_val ANYE
RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_watermark(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL RESTRICTED;
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C IMMUTABLE STRICT PARALLEL RESTRICTED;

CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_watermark_materialized(hypertable_id INTEGER)
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark_materialized' LANGUAGE C STABLE STRICT PARALLEL SAFE;
Expand Down
28 changes: 26 additions & 2 deletions src/ts_catalog/continuous_aggs_watermark.c
Expand Up @@ -13,6 +13,7 @@
#include <fmgr.h>
#include <miscadmin.h>
#include <utils/acl.h>
#include <utils/inval.h>
#include <utils/snapmgr.h>

#include "ts_catalog/continuous_agg.h"
Expand Down Expand Up @@ -357,7 +358,8 @@ cagg_watermark_update_scan_internal(TupleInfo *ti, void *data)
}

static void
cagg_watermark_update_internal(int32 mat_hypertable_id, int64 new_watermark, bool force_update)
cagg_watermark_update_internal(int32 mat_hypertable_id, Oid ht_relid, int64 new_watermark,
bool force_update, bool invalidate_rel_cache)
{
bool watermark_updated;
ScanKeyData scankey[1];
Expand All @@ -384,6 +386,19 @@ cagg_watermark_update_internal(int32 mat_hypertable_id, int64 new_watermark, boo
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("watermark not defined for continuous aggregate: %d", mat_hypertable_id)));
}

/*
* We claim that '_timescaledb_functions.cagg_watermark' is IMMUTABLE. However, that is not
* entirely true but needed for constification of the function value and plan-time chunk
* exclusion. The value of the cagg_watermark function changes as soon as we change the
* watermark (so the function value is STABLE, but for this volatility classification, no plan
* time evaluation and chunk exclusion can be performed).
*
* Send an invalidation for the hypertable to invalidate prepared statements on this
* table and force a re-planning using the new watermark.
*/
if (invalidate_rel_cache)
CacheInvalidateRelcacheByRelid(ht_relid);
}

TSDLLEXPORT void
Expand All @@ -392,13 +407,22 @@ ts_cagg_watermark_update(Hypertable *mat_ht, int64 watermark, bool watermark_isn
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id);

/* If we have a real-time CAgg, it uses a watermark function. So, we have to invalidate the rel
* cache to force a replanning of prepared statements. See cagg_watermark_update_internal for
* more information. */
bool invalidate_rel_cache = !cagg->data.materialized_only;

if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_ht->fd.id)));

watermark = cagg_compute_watermark(cagg, watermark, watermark_isnull);
cagg_watermark_update_internal(mat_ht->fd.id, watermark, force_update);
cagg_watermark_update_internal(mat_ht->fd.id,
mat_ht->main_table_relid,
watermark,
force_update,
invalidate_rel_cache);

return;
}
Expand Down
610 changes: 238 additions & 372 deletions tsl/test/expected/cagg_query-15.out

Large diffs are not rendered by default.

67 changes: 30 additions & 37 deletions tsl/test/expected/cagg_union_view-15.out
Expand Up @@ -354,23 +354,22 @@ SELECT _timescaledb_functions.cagg_watermark(:boundary_view_id);

-- first UNION child should have no rows because no materialization has happened yet and 2nd child should have 4 rows
:PREFIX SELECT * FROM boundary_view;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------
HashAggregate (actual rows=4 loops=1)
Group Key: time_bucket(10, boundary_test."time")
Group Key: time_bucket(10, _hyper_5_5_chunk."time")
Batches: 1
-> Result (actual rows=4 loops=1)
-> Custom Scan (ChunkAppend) on boundary_test (actual rows=4 loops=1)
Chunks excluded during startup: 0
-> Append (actual rows=4 loops=1)
-> Index Scan Backward using _hyper_5_5_chunk_boundary_test_time_idx on _hyper_5_5_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
Index Cond: ("time" >= '-2147483648'::integer)
-> Index Scan Backward using _hyper_5_6_chunk_boundary_test_time_idx on _hyper_5_6_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
Index Cond: ("time" >= '-2147483648'::integer)
-> Index Scan Backward using _hyper_5_7_chunk_boundary_test_time_idx on _hyper_5_7_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
Index Cond: ("time" >= '-2147483648'::integer)
-> Index Scan Backward using _hyper_5_8_chunk_boundary_test_time_idx on _hyper_5_8_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
(14 rows)
Index Cond: ("time" >= '-2147483648'::integer)
(13 rows)

-- result should have 4 rows
SELECT * FROM boundary_view ORDER BY time_bucket;
Expand All @@ -394,24 +393,21 @@ SELECT _timescaledb_functions.cagg_watermark(:boundary_view_id);

-- both sides of the UNION should return 2 rows
:PREFIX SELECT * FROM boundary_view;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=4 loops=1)
-> Custom Scan (ChunkAppend) on _materialized_hypertable_6 (actual rows=2 loops=1)
Chunks excluded during startup: 0
-> Index Scan using _hyper_6_9_chunk__materialized_hypertable_6_time_bucket_idx on _hyper_6_9_chunk (actual rows=2 loops=1)
Index Cond: (time_bucket < COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
-> Index Scan using _hyper_6_9_chunk__materialized_hypertable_6_time_bucket_idx on _hyper_6_9_chunk (actual rows=2 loops=1)
Index Cond: (time_bucket < 30)
-> HashAggregate (actual rows=2 loops=1)
Group Key: time_bucket(10, boundary_test."time")
Group Key: time_bucket(10, _hyper_5_7_chunk."time")
Batches: 1
-> Result (actual rows=2 loops=1)
-> Custom Scan (ChunkAppend) on boundary_test (actual rows=2 loops=1)
Chunks excluded during startup: 2
-> Append (actual rows=2 loops=1)
-> Index Scan Backward using _hyper_5_7_chunk_boundary_test_time_idx on _hyper_5_7_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
Index Cond: ("time" >= 30)
-> Index Scan Backward using _hyper_5_8_chunk_boundary_test_time_idx on _hyper_5_8_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= COALESCE((_timescaledb_functions.cagg_watermark(6))::integer, '-2147483648'::integer))
(15 rows)
Index Cond: ("time" >= 30)
(12 rows)

-- result should have 4 rows
SELECT * FROM boundary_view ORDER BY time_bucket;
Expand Down Expand Up @@ -581,35 +577,32 @@ ORDER by 1;

-- plan output
:PREFIX SELECT * FROM mat_m1 ORDER BY 1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Sort (actual rows=3 loops=1)
Sort Key: _materialized_hypertable_9.time_bucket
Sort Key: _hyper_9_15_chunk.time_bucket
Sort Method: quicksort
-> Append (actual rows=3 loops=1)
-> Custom Scan (ChunkAppend) on _materialized_hypertable_9 (actual rows=1 loops=1)
Chunks excluded during startup: 0
-> Index Scan using _hyper_9_15_chunk__materialized_hypertable_9_time_bucket_idx on _hyper_9_15_chunk (actual rows=1 loops=1)
Index Cond: (time_bucket < COALESCE((_timescaledb_functions.cagg_watermark(9))::integer, '-2147483648'::integer))
-> Index Scan using _hyper_9_15_chunk__materialized_hypertable_9_time_bucket_idx on _hyper_9_15_chunk (actual rows=1 loops=1)
Index Cond: (time_bucket < 25)
-> HashAggregate (actual rows=2 loops=1)
Group Key: time_bucket(5, ht_intdata.a)
Filter: ((sum(ht_intdata.c) > 50) AND ((avg(ht_intdata.b))::integer > 12))
Group Key: time_bucket(5, _hyper_7_11_chunk.a)
Filter: ((sum(_hyper_7_11_chunk.c) > 50) AND ((avg(_hyper_7_11_chunk.b))::integer > 12))
Batches: 1
Rows Removed by Filter: 1
-> Result (actual rows=6 loops=1)
-> Custom Scan (ChunkAppend) on ht_intdata (actual rows=6 loops=1)
Chunks excluded during startup: 1
-> Append (actual rows=6 loops=1)
-> Index Scan Backward using _hyper_7_11_chunk_ht_intdata_a_idx on _hyper_7_11_chunk (actual rows=2 loops=1)
Index Cond: (a >= COALESCE((_timescaledb_functions.cagg_watermark(9))::integer, '-2147483648'::integer))
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
-> Index Scan Backward using _hyper_7_13_chunk_ht_intdata_a_idx on _hyper_7_13_chunk (actual rows=3 loops=1)
Index Cond: (a >= COALESCE((_timescaledb_functions.cagg_watermark(9))::integer, '-2147483648'::integer))
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
-> Index Scan Backward using _hyper_7_14_chunk_ht_intdata_a_idx on _hyper_7_14_chunk (actual rows=1 loops=1)
Index Cond: (a >= COALESCE((_timescaledb_functions.cagg_watermark(9))::integer, '-2147483648'::integer))
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
Rows Removed by Filter: 2
(26 rows)
(23 rows)

-- Test caggs with different time types
CREATE TABLE smallint_table (time smallint, value int);
Expand Down

0 comments on commit a067ac9

Please sign in to comment.