Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid having to cast time arg for cagg policy #2387

Merged
merged 1 commit into from Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 42 additions & 30 deletions src/time_utils.c
Expand Up @@ -41,6 +41,45 @@ subtract_interval_from_now(Oid timetype, const Interval *interval)
return res;
}

Datum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth do document this function as it is part of internal API, so it is clear what to expect from it and how to use it. Currently I cannot guess from the function name what to expect from it.

ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype)
{
Oid type = *argtype;

if (!OidIsValid(type) || type == UNKNOWNOID)
{
Oid infuncid = InvalidOid;
Oid typeioparam;

type = timetype;
getTypeInputInfo(type, &infuncid, &typeioparam);

switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct that this is a reuse of variable arg, which first contains the original argument, and then assigned the converted argument? I suggest to use different variable and actually it will contain different object to my understanding.

break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid time argument"),
errhint("Time argument requires an explicit cast.")));
Comment on lines +71 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to test this case? Codecov complains that it's not covered.

Copy link
Contributor Author

@pmwkaa pmwkaa Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of those cases already covered by continuous agg refresh function tests, which are reusing the same functionality.

Copy link
Contributor

@k-rus k-rus Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Codecov would not complain if it is covered ;) I searched that such error is never expected in our tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this case is actually possible right now, it was made more like a precaution in case new time functions will arrive in future versions I believe

}

*argtype = type;
}

return arg;
}

/*
* Get the internal time value from a pseudo-type function argument.
*
Expand Down Expand Up @@ -75,36 +114,9 @@ subtract_interval_from_now(Oid timetype, const Interval *interval)
int64
ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype)
{
if (!OidIsValid(argtype) || argtype == UNKNOWNOID)
{
/* No explicit cast was done by the user. Try to convert the argument
* to the time type used by the continuous aggregate. */
Oid infuncid = InvalidOid;
Oid typeioparam;

argtype = timetype;
getTypeInputInfo(argtype, &infuncid, &typeioparam);

switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid time argument"),
errhint("Time argument requires an explicit cast.")));
}
}
/* If no explicit cast was done by the user, try to convert the argument
* to the time type used by the continuous aggregate. */
arg = ts_time_datum_convert_arg(arg, &argtype, timetype);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here, since arg will contain different object after the call. To avoid code changes, the original variable can be called arg_in.


if (argtype == INTERVALOID)
{
Expand Down
1 change: 1 addition & 0 deletions src/time_utils.h
Expand Up @@ -68,6 +68,7 @@
TS_TIME_IS_NOEND(timeval, type))

extern TSDLLEXPORT int64 ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_min(Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_max(Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_end(Oid timetype);
Expand Down
103 changes: 59 additions & 44 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Expand Up @@ -6,16 +6,18 @@

#include <postgres.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>

#include <jsonb_utils.h>
#include <miscadmin.h>
#include <utils/builtins.h>
#include "bgw_policy/continuous_aggregate_api.h"
#include "bgw_policy/job.h"
#include "bgw/job.h"
#include "continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "dimension.h"
#include "hypertable_cache.h"
#include "time_utils.h"
#include "policy_utils.h"

#define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate"
Expand Down Expand Up @@ -143,26 +145,35 @@ json_add_dim_interval_value(JsonbParseState *parse_state, const char *json_label
}
}

static void
check_valid_interval(Oid dim_type, Oid interval_type, const char *str_msg)
static Datum
convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this conversion useful for other policies as well (e.g., retention)? I figured this would be in time_utils or policy_utils for use by all policies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought maybe to do that in another PR related to the retention policy.

{
if (IS_INTEGER_TYPE(dim_type))
{
if (interval_type != dim_type)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
}
else if (IS_TIMESTAMP_TYPE(dim_type) && (interval_type != INTERVALOID))
Oid convert_to = dim_type;

if (*interval_type != convert_to)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using timestamp-based time "
"bucket.")));
if (IS_TIMESTAMP_TYPE(dim_type))
convert_to = INTERVALOID;

if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT))
{
if (IS_INTEGER_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
else if (IS_TIMESTAMP_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using "
"timestamp-based time "
"bucket.")));
}
}

return ts_time_datum_convert_arg(interval, interval_type, convert_to);
}

Datum
Expand All @@ -182,40 +193,49 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
Oid cagg_oid, owner_id;
List *jobs;
bool if_not_exists, start_isnull, end_isnull;
if (PG_ARGISNULL(3))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
}

/* Verify that the owner can create a background worker */
cagg_oid = PG_GETARG_OID(0);
start_interval = PG_GETARG_DATUM(1);
end_interval = PG_GETARG_DATUM(2);
start_isnull = PG_ARGISNULL(1);
end_isnull = PG_ARGISNULL(2);
start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);

cagg = ts_continuous_agg_find_by_relid(cagg_oid);
if (!cagg)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
}

mat_htid = cagg->data.mat_hypertable_id;
/* Verify that the owner can create a background worker */
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);

hcache = ts_hypertable_cache_pin();
mat_htid = cagg->data.mat_hypertable_id;
mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid);
dim = hyperspace_get_open_dimension(mat_ht->space, 0);
dim_type = ts_dimension_get_partition_type(dim);
ts_cache_release(hcache);

/* Try to convert the argument to the time type used by the
* continuous aggregate */
start_interval = PG_GETARG_DATUM(1);
end_interval = PG_GETARG_DATUM(2);
start_isnull = PG_ARGISNULL(1);
end_isnull = PG_ARGISNULL(2);
start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2);

if (!start_isnull)
start_interval =
convert_interval_arg(dim_type, start_interval, &start_interval_type, "start_interval");

if (!end_isnull)
end_interval =
convert_interval_arg(dim_type, end_interval, &end_interval_type, "end_interval");

if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);

/* Make sure there is only 1 refresh policy on the cagg */
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
INTERNAL_SCHEMA_NAME,
Expand Down Expand Up @@ -265,11 +285,6 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME);
namestrcpy(&owner, GetUserNameFromId(owner_id, false));

if (!start_isnull)
check_valid_interval(dim_type, start_interval_type, "start_interval");
if (!end_isnull)
check_valid_interval(dim_type, end_interval_type, "end_interval");

JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, mat_htid);
Expand Down
27 changes: 22 additions & 5 deletions tsl/test/expected/continuous_aggs_policy.out
Expand Up @@ -133,10 +133,10 @@ CREATE MATERIALIZED VIEW max_mat_view_date
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
ERROR: invalid parameter value for end_interval
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
Expand All @@ -161,7 +161,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
Expand All @@ -182,8 +182,10 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
(1 row)

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_interval in config for job
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW max_mat_view_timestamp;
--smallint table
Expand Down Expand Up @@ -226,8 +228,24 @@ SELECT * FROM mat_smallint;
10 | 1
(2 rows)

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true);
WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

\set ON_ERROR_STOP 1
-- end of coverage tests
-- tests for interval argument convertions
--
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
ERROR: invalid parameter value for start_interval
SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true);
ERROR: invalid parameter value for end_interval
SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true);
WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments
add_continuous_aggregate_policy
---------------------------------
Expand All @@ -237,4 +255,3 @@ WARNING: could not add refresh policy due to existing policy on continuous aggr
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;
NOTICE: drop cascades to table _timescaledb_internal._hyper_8_7_chunk
-- end of coverage tests
22 changes: 16 additions & 6 deletions tsl/test/sql/continuous_aggs_policy.sql
Expand Up @@ -86,9 +86,9 @@ CREATE MATERIALIZED VIEW max_mat_view_date
GROUP BY 1 WITH NO DATA;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;

Expand All @@ -106,7 +106,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;

SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);

SELECT config FROM _timescaledb_config.bgw_job
Expand All @@ -120,7 +120,8 @@ WHERE id = :job_id;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1

DROP MATERIALIZED VIEW max_mat_view_timestamp;
Expand Down Expand Up @@ -149,8 +150,17 @@ CALL run_job(:job_id);
SELECT * FROM mat_smallint;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;

-- end of coverage tests

-- tests for interval argument convertions
--
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1

DROP MATERIALIZED VIEW mat_smallint;