diff --git a/src/time_utils.c b/src/time_utils.c index c504f6cecc5..07273d9901f 100644 --- a/src/time_utils.c +++ b/src/time_utils.c @@ -41,6 +41,45 @@ subtract_interval_from_now(Oid timetype, const Interval *interval) return res; } +Datum +ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype) +{ + Oid type = *argtype; + + if (!OidIsValid(type) || type == UNKNOWNOID) + { + Oid infuncid = InvalidOid; + Oid typeioparam; + + type = timetype; + getTypeInputInfo(type, &infuncid, &typeioparam); + + switch (get_func_nargs(infuncid)) + { + case 1: + /* Functions that take one input argument, e.g., the Date function */ + arg = OidFunctionCall1(infuncid, arg); + break; + case 3: + /* Timestamp functions take three input arguments */ + arg = OidFunctionCall3(infuncid, + arg, + ObjectIdGetDatum(InvalidOid), + Int32GetDatum(-1)); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid time argument"), + errhint("Time argument requires an explicit cast."))); + } + + *argtype = type; + } + + return arg; +} + /* * Get the internal time value from a pseudo-type function argument. * @@ -75,36 +114,9 @@ subtract_interval_from_now(Oid timetype, const Interval *interval) int64 ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype) { - if (!OidIsValid(argtype) || argtype == UNKNOWNOID) - { - /* No explicit cast was done by the user. Try to convert the argument - * to the time type used by the continuous aggregate. */ - Oid infuncid = InvalidOid; - Oid typeioparam; - - argtype = timetype; - getTypeInputInfo(argtype, &infuncid, &typeioparam); - - switch (get_func_nargs(infuncid)) - { - case 1: - /* Functions that take one input argument, e.g., the Date function */ - arg = OidFunctionCall1(infuncid, arg); - break; - case 3: - /* Timestamp functions take three input arguments */ - arg = OidFunctionCall3(infuncid, - arg, - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid time argument"), - errhint("Time argument requires an explicit cast."))); - } - } + /* If no explicit cast was done by the user, try to convert the argument + * to the time type used by the continuous aggregate. */ + arg = ts_time_datum_convert_arg(arg, &argtype, timetype); if (argtype == INTERVALOID) { diff --git a/src/time_utils.h b/src/time_utils.h index 82d8b3682d8..dd35a20dd00 100644 --- a/src/time_utils.h +++ b/src/time_utils.h @@ -68,6 +68,7 @@ TS_TIME_IS_NOEND(timeval, type)) extern TSDLLEXPORT int64 ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype); +extern TSDLLEXPORT Datum ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype); extern TSDLLEXPORT Datum ts_time_datum_get_min(Oid timetype); extern TSDLLEXPORT Datum ts_time_datum_get_max(Oid timetype); extern TSDLLEXPORT Datum ts_time_datum_get_end(Oid timetype); diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.c b/tsl/src/bgw_policy/continuous_aggregate_api.c index ff126f6ffe1..e9d815b259e 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.c +++ b/tsl/src/bgw_policy/continuous_aggregate_api.c @@ -6,16 +6,18 @@ #include #include +#include #include -#include #include #include "bgw_policy/continuous_aggregate_api.h" #include "bgw_policy/job.h" #include "bgw/job.h" #include "continuous_agg.h" +#include "continuous_aggs/materialize.h" #include "dimension.h" #include "hypertable_cache.h" +#include "time_utils.h" #include "policy_utils.h" #define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate" @@ -143,26 +145,35 @@ json_add_dim_interval_value(JsonbParseState *parse_state, const char *json_label } } -static void -check_valid_interval(Oid dim_type, Oid interval_type, const char *str_msg) +static Datum +convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg) { - if (IS_INTEGER_TYPE(dim_type)) - { - if (interval_type != dim_type) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid parameter value for %s", str_msg), - errhint("Use time interval of type %s with the continuous aggregate.", - format_type_be(dim_type)))); - } - else if (IS_TIMESTAMP_TYPE(dim_type) && (interval_type != INTERVALOID)) + Oid convert_to = dim_type; + + if (*interval_type != convert_to) { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid parameter value for %s", str_msg), - errhint("Use time interval with a continuous aggregate using timestamp-based time " - "bucket."))); + if (IS_TIMESTAMP_TYPE(dim_type)) + convert_to = INTERVALOID; + + if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT)) + { + if (IS_INTEGER_TYPE(dim_type)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid parameter value for %s", str_msg), + errhint("Use time interval of type %s with the continuous aggregate.", + format_type_be(dim_type)))); + else if (IS_TIMESTAMP_TYPE(dim_type)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid parameter value for %s", str_msg), + errhint("Use time interval with a continuous aggregate using " + "timestamp-based time " + "bucket."))); + } } + + return ts_time_datum_convert_arg(interval, interval_type, convert_to); } Datum @@ -182,40 +193,49 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) Oid cagg_oid, owner_id; List *jobs; bool if_not_exists, start_isnull, end_isnull; - if (PG_ARGISNULL(3)) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot use NULL schedule interval"))); - } + + /* Verify that the owner can create a background worker */ cagg_oid = PG_GETARG_OID(0); - start_interval = PG_GETARG_DATUM(1); - end_interval = PG_GETARG_DATUM(2); - start_isnull = PG_ARGISNULL(1); - end_isnull = PG_ARGISNULL(2); - start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1); - end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2); - refresh_interval = *PG_GETARG_INTERVAL_P(3); - if_not_exists = PG_GETARG_BOOL(4); + owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId()); + ts_bgw_job_validate_job_owner(owner_id); + cagg = ts_continuous_agg_find_by_relid(cagg_oid); if (!cagg) - { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid)))); - } - - mat_htid = cagg->data.mat_hypertable_id; - /* Verify that the owner can create a background worker */ - owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId()); - ts_bgw_job_validate_job_owner(owner_id); hcache = ts_hypertable_cache_pin(); + mat_htid = cagg->data.mat_hypertable_id; mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid); dim = hyperspace_get_open_dimension(mat_ht->space, 0); dim_type = ts_dimension_get_partition_type(dim); ts_cache_release(hcache); + /* Try to convert the argument to the time type used by the + * continuous aggregate */ + start_interval = PG_GETARG_DATUM(1); + end_interval = PG_GETARG_DATUM(2); + start_isnull = PG_ARGISNULL(1); + end_isnull = PG_ARGISNULL(2); + start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1); + end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2); + + if (!start_isnull) + start_interval = + convert_interval_arg(dim_type, start_interval, &start_interval_type, "start_interval"); + + if (!end_isnull) + end_interval = + convert_interval_arg(dim_type, end_interval, &end_interval_type, "end_interval"); + + if (PG_ARGISNULL(3)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use NULL schedule interval"))); + refresh_interval = *PG_GETARG_INTERVAL_P(3); + if_not_exists = PG_GETARG_BOOL(4); + /* Make sure there is only 1 refresh policy on the cagg */ jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME, INTERNAL_SCHEMA_NAME, @@ -265,11 +285,6 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME); namestrcpy(&owner, GetUserNameFromId(owner_id, false)); - if (!start_isnull) - check_valid_interval(dim_type, start_interval_type, "start_interval"); - if (!end_isnull) - check_valid_interval(dim_type, end_interval_type, "end_interval"); - JsonbParseState *parse_state = NULL; pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL); ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, mat_htid); diff --git a/tsl/test/expected/continuous_aggs_policy.out b/tsl/test/expected/continuous_aggs_policy.out index 718abf4dd2e..ec54988d526 100644 --- a/tsl/test/expected/continuous_aggs_policy.out +++ b/tsl/test/expected/continuous_aggs_policy.out @@ -133,10 +133,10 @@ CREATE MATERIALIZED VIEW max_mat_view_date FROM continuous_agg_max_mat_date GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval); +SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval); ERROR: invalid parameter value for end_interval \set ON_ERROR_STOP 1 -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset SELECT config FROM _timescaledb_config.bgw_job WHERE id = :job_id; config @@ -161,7 +161,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp AS SELECT time_bucket('7 days', time) FROM continuous_agg_timestamp GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset CALL run_job(:job_id); SELECT config FROM _timescaledb_config.bgw_job WHERE id = :job_id; @@ -182,8 +182,10 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; (1 row) \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true); ERROR: could not find start_interval in config for job +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true); +ERROR: invalid input syntax for type interval: "xyz" \set ON_ERROR_STOP 1 DROP MATERIALIZED VIEW max_mat_view_timestamp; --smallint table @@ -226,8 +228,24 @@ SELECT * FROM mat_smallint; 10 | 1 (2 rows) +\set ON_ERROR_STOP 0 +SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true); +WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments + add_continuous_aggregate_policy +--------------------------------- + -1 +(1 row) + +\set ON_ERROR_STOP 1 +-- end of coverage tests +-- tests for interval argument convertions +-- \set ON_ERROR_STOP 0 SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true); +ERROR: invalid parameter value for start_interval +SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true); +ERROR: invalid parameter value for end_interval +SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true); WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments add_continuous_aggregate_policy --------------------------------- @@ -237,4 +255,3 @@ WARNING: could not add refresh policy due to existing policy on continuous aggr \set ON_ERROR_STOP 1 DROP MATERIALIZED VIEW mat_smallint; NOTICE: drop cascades to table _timescaledb_internal._hyper_8_7_chunk --- end of coverage tests diff --git a/tsl/test/sql/continuous_aggs_policy.sql b/tsl/test/sql/continuous_aggs_policy.sql index ff2c013bd0a..b37781b2ad7 100644 --- a/tsl/test/sql/continuous_aggs_policy.sql +++ b/tsl/test/sql/continuous_aggs_policy.sql @@ -86,9 +86,9 @@ CREATE MATERIALIZED VIEW max_mat_view_date GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval); +SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval); \set ON_ERROR_STOP 1 -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset SELECT config FROM _timescaledb_config.bgw_job WHERE id = :job_id; @@ -106,7 +106,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp FROM continuous_agg_timestamp GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset CALL run_job(:job_id); SELECT config FROM _timescaledb_config.bgw_job @@ -120,7 +120,8 @@ WHERE id = :job_id; SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true); \set ON_ERROR_STOP 1 DROP MATERIALIZED VIEW max_mat_view_timestamp; @@ -149,8 +150,17 @@ CALL run_job(:job_id); SELECT * FROM mat_smallint; \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true); \set ON_ERROR_STOP 1 -DROP MATERIALIZED VIEW mat_smallint; -- end of coverage tests + +-- tests for interval argument convertions +-- +\set ON_ERROR_STOP 0 +SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true); +\set ON_ERROR_STOP 1 + +DROP MATERIALIZED VIEW mat_smallint;