Skip to content

Commit

Permalink
Fix accessor function cagg_get_bucket_function
Browse files Browse the repository at this point in the history
In #6624 we refactored time_bucket catalog table to be more generic and
introduced the `cagg_get_bucket_function` to inspect the query tree of
a given Continuous Aggregate and return the time_bucket function oid.

The problem with the implementation is we traverse the whole query tree
looking for `FuncExpr` and in certain cases we can have two different
`time_bucket` function definition but what matters is the correct and
valid `time_bucket` function that is part of the `Query->groupClause`.

Fixed it by inspecting only the `Query->groupClause` items looking for
a valid time bucket `FuncExpr` and return it `Oid`.
  • Loading branch information
fabriziomello committed May 17, 2024
1 parent c717792 commit 54830d1
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 73 deletions.
95 changes: 34 additions & 61 deletions tsl/src/continuous_aggs/repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,61 +251,6 @@ tsl_cagg_try_repair(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}

typedef struct
{
/* Input parameter */
int32 mat_hypertable_id;

/* Output parameter */
Oid bucket_fuction;
} CaggQueryWalkerContext;

/* Process the CAgg query and find all used (usually one) time_bucket functions. It returns
* InvalidOid is no or more than one bucket function is found. */
static bool
cagg_query_walker(Node *node, CaggQueryWalkerContext *context)
{
if (node == NULL)
return false;

if (IsA(node, FuncExpr))
{
FuncExpr *func_expr = castNode(FuncExpr, node);

/* Is the used function a bucket function?
* We can not call ts_func_cache_get_bucketing_func at this point, since
*/
FuncInfo *func_info = ts_func_cache_get_bucketing_func(func_expr->funcid);
if (func_info != NULL)
{
/* First bucket function found */
if (!OidIsValid(context->bucket_fuction))
{
context->bucket_fuction = func_expr->funcid;
}
else
{
/* Got multiple bucket functions. Should never happen because this is checked during
* CAgg query validation.
*/
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("found multiple time_bucket functions in CAgg definition for "
"mat_ht_id: %d",
context->mat_hypertable_id)));
pg_unreachable();
}
}
}
else if (IsA(node, Query))
{
Query *query = castNode(Query, node);
return query_tree_walker(query, cagg_query_walker, context, 0);
}

return expression_tree_walker(node, cagg_query_walker, context);
}

/* Get the Oid of the direct view of the CAgg. We cannot use the TimescaleDB internal
* functions such as ts_continuous_agg_find_by_mat_hypertable_id() at this point since this
* function can be called during an extension upgrade and ts_catalog_get() does not work.
Expand Down Expand Up @@ -398,6 +343,7 @@ Datum
continuous_agg_get_bucket_function(PG_FUNCTION_ARGS)
{
const int32 mat_hypertable_id = PG_GETARG_INT32(0);
Oid funcid = InvalidOid;

/* Get the user view query of the user defined CAGG. */
Oid direct_view_oid = get_direct_view_oid(mat_hypertable_id);
Expand All @@ -410,12 +356,39 @@ continuous_agg_get_bucket_function(PG_FUNCTION_ARGS)
Assert(direct_query != NULL);
Assert(direct_query->commandType == CMD_SELECT);

/* Process the query and collect function information */
CaggQueryWalkerContext context = { 0 };
context.mat_hypertable_id = mat_hypertable_id;
context.bucket_fuction = InvalidOid;
ListCell *l;
bool found = false;
foreach (l, direct_query->groupClause)
{
SortGroupClause *sgc = lfirst_node(SortGroupClause, l);
TargetEntry *tle = get_sortgroupclause_tle(sgc, direct_query->targetList);

if (IsA(tle->expr, FuncExpr))
{
FuncExpr *fe = ((FuncExpr *) tle->expr);

/* Filter any non bucketing functions */
FuncInfo *finfo = ts_func_cache_get_bucketing_func(fe->funcid);
if (finfo == NULL)
continue;

Assert(finfo->is_bucketing_func);

cagg_query_walker((Node *) direct_query, &context);
funcid = fe->funcid;
found = true;
break;
}
}

if (!found)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("time_bucket function not found in CAgg definition for "
"mat_ht_id: %d",
mat_hypertable_id)));
pg_unreachable();
}

PG_RETURN_DATUM(ObjectIdGetDatum(context.bucket_fuction));
PG_RETURN_DATUM(ObjectIdGetDatum(funcid));
}
60 changes: 54 additions & 6 deletions tsl/test/expected/cagg_utils.out
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,10 @@ CREATE MATERIALIZED VIEW integer_ht_cagg
GROUP BY time_bucket(1, a), a;
NOTICE: continuous aggregate "integer_ht_cagg" is already up-to-date
--- Get the bucket Oids
SELECT user_view_name,
cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name in('temperature_4h', 'temperature_tz_4h', 'temperature_tz_4h_ts', 'integer_ht_cagg')
ORDER BY user_view_name;
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name IN ('temperature_4h', 'temperature_tz_4h', 'temperature_tz_4h_ts', 'integer_ht_cagg')
ORDER BY user_view_name;
user_view_name | cagg_get_bucket_function
----------------------+---------------------------------------------------------------------------------------
integer_ht_cagg | time_bucket(integer,integer)
Expand All @@ -345,7 +344,56 @@ SELECT user_view_name,
temperature_tz_4h_ts | time_bucket(interval,timestamp with time zone,text,timestamp with time zone,interval)
(4 rows)

--- Cleanup
-- Valid multiple time_bucket usage on view definition
CREATE MATERIALIZED VIEW temperature_tz_4h_2
WITH (timescaledb.continuous) AS
SELECT (time_bucket('4 hour', time) at time zone 'utc')::date, avg(value)
FROM timestamptz_ht
GROUP BY time_bucket('4 hour', time)
ORDER BY 1
WITH NO DATA;
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_2'
ORDER BY user_view_name;
user_view_name | cagg_get_bucket_function
---------------------+------------------------------------------------
temperature_tz_4h_2 | time_bucket(interval,timestamp with time zone)
(1 row)

-- Corrupt the direct view definition
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT direct_view_schema, direct_view_name
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_2' \gset
CREATE OR REPLACE VIEW :direct_view_schema.:direct_view_name AS
SELECT NULL::date AS timezone, NULL::FLOAT8 AS avg;
\set ON_ERROR_STOP 0
-- Should error because there's no time_bucket function on the view definition
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_2'
ORDER BY user_view_name;
ERROR: time_bucket function not found in CAgg definition for mat_ht_id: 11
\set ON_ERROR_STOP 1
-- Group by another function to make sure it will be ignored
CREATE FUNCTION skip() RETURNS INTEGER AS $$ SELECT 1; $$ IMMUTABLE LANGUAGE SQL;
CREATE MATERIALIZED VIEW temperature_tz_4h_3
WITH (timescaledb.continuous) AS
SELECT skip(), time_bucket('4 hour', time), avg(value)
FROM timestamptz_ht
GROUP BY 1, 2
ORDER BY 1
WITH NO DATA;
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_3'
ORDER BY user_view_name;
user_view_name | cagg_get_bucket_function
---------------------+------------------------------------------------
temperature_tz_4h_3 | time_bucket(interval,timestamp with time zone)
(1 row)

--- Cleanup
DROP FUNCTION IF EXISTS cagg_get_bucket_function(INTEGER);
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
58 changes: 52 additions & 6 deletions tsl/test/sql/cagg_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,59 @@ CREATE MATERIALIZED VIEW integer_ht_cagg
GROUP BY time_bucket(1, a), a;

--- Get the bucket Oids
SELECT user_view_name,
cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name in('temperature_4h', 'temperature_tz_4h', 'temperature_tz_4h_ts', 'integer_ht_cagg')
ORDER BY user_view_name;
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name IN ('temperature_4h', 'temperature_tz_4h', 'temperature_tz_4h_ts', 'integer_ht_cagg')
ORDER BY user_view_name;

-- Valid multiple time_bucket usage on view definition
CREATE MATERIALIZED VIEW temperature_tz_4h_2
WITH (timescaledb.continuous) AS
SELECT (time_bucket('4 hour', time) at time zone 'utc')::date, avg(value)
FROM timestamptz_ht
GROUP BY time_bucket('4 hour', time)
ORDER BY 1
WITH NO DATA;

--- Cleanup
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_2'
ORDER BY user_view_name;

-- Corrupt the direct view definition
\c :TEST_DBNAME :ROLE_SUPERUSER

SELECT direct_view_schema, direct_view_name
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_2' \gset

CREATE OR REPLACE VIEW :direct_view_schema.:direct_view_name AS
SELECT NULL::date AS timezone, NULL::FLOAT8 AS avg;

\set ON_ERROR_STOP 0
-- Should error because there's no time_bucket function on the view definition
SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_2'
ORDER BY user_view_name;
\set ON_ERROR_STOP 1

-- Group by another function to make sure it will be ignored
CREATE FUNCTION skip() RETURNS INTEGER AS $$ SELECT 1; $$ IMMUTABLE LANGUAGE SQL;

CREATE MATERIALIZED VIEW temperature_tz_4h_3
WITH (timescaledb.continuous) AS
SELECT skip(), time_bucket('4 hour', time), avg(value)
FROM timestamptz_ht
GROUP BY 1, 2
ORDER BY 1
WITH NO DATA;

SELECT user_view_name, cagg_get_bucket_function(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'temperature_tz_4h_3'
ORDER BY user_view_name;

--- Cleanup
DROP FUNCTION IF EXISTS cagg_get_bucket_function(INTEGER);
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER

0 comments on commit 54830d1

Please sign in to comment.