Skip to content

Commit

Permalink
Avoid having to cast time arg for cagg policy
Browse files Browse the repository at this point in the history
This patch does a minor refactoring and adds a way to guess
interval argument type based on used cagg

Issue: #2286
  • Loading branch information
pmwkaa committed Sep 21, 2020
1 parent e1a00eb commit c15d8be
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 85 deletions.
72 changes: 42 additions & 30 deletions src/time_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,45 @@ subtract_interval_from_now(Oid timetype, const Interval *interval)
return res;
}

Datum
ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype)
{
Oid type = *argtype;

if (!OidIsValid(type) || type == UNKNOWNOID)
{
Oid infuncid = InvalidOid;
Oid typeioparam;

type = timetype;
getTypeInputInfo(type, &infuncid, &typeioparam);

switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid time argument"),
errhint("Time argument requires an explicit cast.")));
}

*argtype = type;
}

return arg;
}

/*
* Get the internal time value from a pseudo-type function argument.
*
Expand Down Expand Up @@ -75,36 +114,9 @@ subtract_interval_from_now(Oid timetype, const Interval *interval)
int64
ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype)
{
if (!OidIsValid(argtype) || argtype == UNKNOWNOID)
{
/* No explicit cast was done by the user. Try to convert the argument
* to the time type used by the continuous aggregate. */
Oid infuncid = InvalidOid;
Oid typeioparam;

argtype = timetype;
getTypeInputInfo(argtype, &infuncid, &typeioparam);

switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid time argument"),
errhint("Time argument requires an explicit cast.")));
}
}
/* If no explicit cast was done by the user, try to convert the argument
* to the time type used by the continuous aggregate. */
arg = ts_time_datum_convert_arg(arg, &argtype, timetype);

if (argtype == INTERVALOID)
{
Expand Down
1 change: 1 addition & 0 deletions src/time_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
TS_TIME_IS_NOEND(timeval, type))

extern TSDLLEXPORT int64 ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_min(Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_max(Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_end(Oid timetype);
Expand Down
103 changes: 59 additions & 44 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@

#include <postgres.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>

#include <jsonb_utils.h>
#include <miscadmin.h>
#include <utils/builtins.h>
#include "bgw_policy/continuous_aggregate_api.h"
#include "bgw_policy/job.h"
#include "bgw/job.h"
#include "continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "dimension.h"
#include "hypertable_cache.h"
#include "time_utils.h"
#include "policy_utils.h"

#define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate"
Expand Down Expand Up @@ -143,26 +145,35 @@ json_add_dim_interval_value(JsonbParseState *parse_state, const char *json_label
}
}

static void
check_valid_interval(Oid dim_type, Oid interval_type, const char *str_msg)
static Datum
convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg)
{
if (IS_INTEGER_TYPE(dim_type))
{
if (interval_type != dim_type)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
}
else if (IS_TIMESTAMP_TYPE(dim_type) && (interval_type != INTERVALOID))
Oid convert_to = dim_type;

if (*interval_type != convert_to)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using timestamp-based time "
"bucket.")));
if (IS_TIMESTAMP_TYPE(dim_type))
convert_to = INTERVALOID;

if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT))
{
if (IS_INTEGER_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
else if (IS_TIMESTAMP_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using "
"timestamp-based time "
"bucket.")));
}
}

return ts_time_datum_convert_arg(interval, interval_type, convert_to);
}

Datum
Expand All @@ -182,40 +193,49 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
Oid cagg_oid, owner_id;
List *jobs;
bool if_not_exists, start_isnull, end_isnull;
if (PG_ARGISNULL(3))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
}

/* Verify that the owner can create a background worker */
cagg_oid = PG_GETARG_OID(0);
start_interval = PG_GETARG_DATUM(1);
end_interval = PG_GETARG_DATUM(2);
start_isnull = PG_ARGISNULL(1);
end_isnull = PG_ARGISNULL(2);
start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);

cagg = ts_continuous_agg_find_by_relid(cagg_oid);
if (!cagg)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
}

mat_htid = cagg->data.mat_hypertable_id;
/* Verify that the owner can create a background worker */
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);

hcache = ts_hypertable_cache_pin();
mat_htid = cagg->data.mat_hypertable_id;
mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid);
dim = hyperspace_get_open_dimension(mat_ht->space, 0);
dim_type = ts_dimension_get_partition_type(dim);
ts_cache_release(hcache);

/* Try to convert the argument to the time type used by the
* continuous aggregate */
start_interval = PG_GETARG_DATUM(1);
end_interval = PG_GETARG_DATUM(2);
start_isnull = PG_ARGISNULL(1);
end_isnull = PG_ARGISNULL(2);
start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2);

if (!start_isnull)
start_interval =
convert_interval_arg(dim_type, start_interval, &start_interval_type, "start_interval");

if (!end_isnull)
end_interval =
convert_interval_arg(dim_type, end_interval, &end_interval_type, "end_interval");

if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);

/* Make sure there is only 1 refresh policy on the cagg */
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
INTERNAL_SCHEMA_NAME,
Expand Down Expand Up @@ -265,11 +285,6 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME);
namestrcpy(&owner, GetUserNameFromId(owner_id, false));

if (!start_isnull)
check_valid_interval(dim_type, start_interval_type, "start_interval");
if (!end_isnull)
check_valid_interval(dim_type, end_interval_type, "end_interval");

JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, mat_htid);
Expand Down
27 changes: 22 additions & 5 deletions tsl/test/expected/continuous_aggs_policy.out
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ CREATE MATERIALIZED VIEW max_mat_view_date
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
ERROR: invalid parameter value for end_interval
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
Expand All @@ -161,7 +161,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
Expand All @@ -182,8 +182,10 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
(1 row)

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_interval in config for job
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW max_mat_view_timestamp;
--smallint table
Expand Down Expand Up @@ -226,8 +228,24 @@ SELECT * FROM mat_smallint;
10 | 1
(2 rows)

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true);
WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

\set ON_ERROR_STOP 1
-- end of coverage tests
-- tests for interval argument convertions
--
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
ERROR: invalid parameter value for start_interval
SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true);
ERROR: invalid parameter value for end_interval
SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true);
WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments
add_continuous_aggregate_policy
---------------------------------
Expand All @@ -237,4 +255,3 @@ WARNING: could not add refresh policy due to existing policy on continuous aggr
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;
NOTICE: drop cascades to table _timescaledb_internal._hyper_8_7_chunk
-- end of coverage tests
22 changes: 16 additions & 6 deletions tsl/test/sql/continuous_aggs_policy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ CREATE MATERIALIZED VIEW max_mat_view_date
GROUP BY 1 WITH NO DATA;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;

Expand All @@ -106,7 +106,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;

SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);

SELECT config FROM _timescaledb_config.bgw_job
Expand All @@ -120,7 +120,8 @@ WHERE id = :job_id;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1

DROP MATERIALIZED VIEW max_mat_view_timestamp;
Expand Down Expand Up @@ -149,8 +150,17 @@ CALL run_job(:job_id);
SELECT * FROM mat_smallint;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;

-- end of coverage tests

-- tests for interval argument convertions
--
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1

DROP MATERIALIZED VIEW mat_smallint;

0 comments on commit c15d8be

Please sign in to comment.