Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timezone support to time_bucket #4642

Merged
merged 1 commit into from Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@ accidentally triggering the load of a previous DB version.**
* #4393 Support intervals with day component when constifying now()
* #4397 Support intervals with month component when constifying now()
* #4641 Allow bucketing by month in time_bucket
* #4642 Add timezone support to time_bucket

**Bugfixes**
* #4416 Handle TRUNCATE TABLE on chunks
Expand Down
4 changes: 4 additions & 0 deletions sql/time_bucket.sql
Expand Up @@ -23,6 +23,10 @@ CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIM
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts DATE, origin DATE) RETURNS DATE
AS '@MODULE_PATHNAME@', 'ts_date_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

-- bucketing with timezone
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ, timezone TEXT, origin TIMESTAMPTZ DEFAULT NULL, "offset" INTERVAL DEFAULT NULL) RETURNS TIMESTAMPTZ
AS '@MODULE_PATHNAME@', 'ts_timestamptz_timezone_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE;

-- bucketing of int
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width SMALLINT, ts SMALLINT) RETURNS SMALLINT
AS '@MODULE_PATHNAME@', 'ts_int16_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;
Expand Down
4 changes: 3 additions & 1 deletion sql/updates/reverse-dev.sql
Expand Up @@ -154,7 +154,6 @@ GRANT SELECT ON _timescaledb_catalog.chunk TO PUBLIC;

-- end recreate _timescaledb_catalog.chunk table --


ALTER TABLE _timescaledb_internal.bgw_job_stat
DROP CONSTRAINT bgw_job_stat_job_id_fkey;
ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats
Expand Down Expand Up @@ -246,3 +245,6 @@ CREATE FUNCTION @extschema@.alter_job(
RETURNS TABLE (job_id INTEGER, schedule_interval INTERVAL, max_runtime INTERVAL, max_retries INTEGER, retry_period INTERVAL, scheduled BOOL, config JSONB, next_start TIMESTAMPTZ)
AS '@MODULE_PATHNAME@', 'ts_job_alter'
LANGUAGE C VOLATILE;

DROP FUNCTION @extschema@.time_bucket(INTERVAL, TIMESTAMPTZ, TEXT, TIMESTAMPTZ, INTERVAL);

5 changes: 5 additions & 0 deletions src/compat/compat.h
Expand Up @@ -355,6 +355,11 @@ get_reindex_options(ReindexStmt *stmt)
#define list_make5_int(x1, x2, x3, x4, x5) lappend_int(list_make4_int(x1, x2, x3, x4), x5)
#endif

/*
* define lfifth macro for convenience
*/
#define lfifth(l) lfirst(list_nth_cell(l, 4))

/* PG13 removes the natts parameter from map_variable_attnos */
#if PG13_LT
#define map_variable_attnos_compat(node, varno, sublevels_up, map, natts, rowtype, found_wholerow) \
Expand Down
27 changes: 27 additions & 0 deletions src/func_cache.c
Expand Up @@ -20,6 +20,7 @@
#include <utils/selfuncs.h>
#include <utils/syscache.h>

#include "compat/compat.h"
#include "utils.h"
#include "cache.h"
#include "func_cache.h"
Expand Down Expand Up @@ -108,6 +109,22 @@ time_bucket_sort_transform(FuncExpr *func)
return do_sort_transform(func);
}

/*
* time_bucket with timezone will always have 5 args. For the sort
* optimization to apply all args need to be Const except timestamp.
*/
static Expr *
time_bucket_tz_sort_transform(FuncExpr *func)
{
Assert(list_length(func->args) == 5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the tests are missing.


if (!IsA(linitial((func)->args), Const) || !IsA(lthird(func->args), Const) ||
!IsA(lfourth(func->args), Const) || !IsA(lfifth(func->args), Const))
return (Expr *) func;

return do_sort_transform(func);
}

/* For time_bucket this estimate currently works by seeing how many possible
* buckets there will be if the data spans the entire hypertable. Note that
* this is an overestimate.
Expand Down Expand Up @@ -295,6 +312,16 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 5,
.arg_types = { INTERVALOID, TIMESTAMPTZOID, TEXTOID, TIMESTAMPTZOID, INTERVALOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_tz_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE_EXPERIMENTAL,
.is_bucketing_func = true,
Expand Down
7 changes: 5 additions & 2 deletions src/planner/expand_hypertable.c
Expand Up @@ -434,6 +434,9 @@ transform_time_bucket_comparison(PlannerInfo *root, OpExpr *op)
{
Interval *interval = DatumGetIntervalP(width->constvalue);

/*
* Optimization can't be applied when interval has month component.
*/
if (interval->month != 0)
return op;

Expand Down Expand Up @@ -466,7 +469,7 @@ transform_time_bucket_comparison(PlannerInfo *root, OpExpr *op)
Assert(width->consttype == INTERVALOID);

/*
* intervals with month component are not supported by time_bucket
* Optimization can't be applied when interval has month component.
*/
if (interval->month != 0)
return op;
Expand Down Expand Up @@ -513,7 +516,7 @@ transform_time_bucket_comparison(PlannerInfo *root, OpExpr *op)
Assert(width->consttype == INTERVALOID);

/*
* intervals with month component are not supported by time_bucket
* Optimization can't be applied when interval has month component.
*/
if (interval->month != 0)
return op;
Expand Down
58 changes: 58 additions & 0 deletions src/time_bucket.c
Expand Up @@ -281,6 +281,64 @@ ts_timestamptz_bucket(PG_FUNCTION_ARGS)
}
}

TS_FUNCTION_INFO_V1(ts_timestamptz_timezone_bucket);

/*
* time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ, timezone TEXT, origin TIMESTAMPTZ DEFAULT
* NULL, "offset" INTERVAL DEFAULT NULL) RETURNS TIMESTAMPTZ
*/
TSDLLEXPORT Datum
ts_timestamptz_timezone_bucket(PG_FUNCTION_ARGS)
{
Datum period = PG_GETARG_DATUM(0);
Datum timestamp = PG_GETARG_DATUM(1);
Datum tzname = PG_GETARG_DATUM(2);

/*
* When called from SQL we will always have 5 args because default values
* will be filled in for missing arguments. When called from C with
* DirectFunctionCall number of arguments might be less than 5.
*/
bool have_origin = PG_NARGS() > 3 && !PG_ARGISNULL(3);
bool have_offset = PG_NARGS() > 4 && !PG_ARGISNULL(4);

/*
* We need to check for NULL arguments here because the function cannot be
* defined STRICT due to the optional arguments.
*/
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
PG_RETURN_NULL();

/* Convert to local timestamp according to timezone */
timestamp = DirectFunctionCall2(timestamptz_zone, tzname, timestamp);
if (have_offset)
{
/* Apply offset. */
timestamp = DirectFunctionCall2(timestamp_mi_interval, timestamp, PG_GETARG_DATUM(4));
svenklemm marked this conversation as resolved.
Show resolved Hide resolved
}

if (have_origin)
{
Datum origin = DirectFunctionCall2(timestamptz_zone, tzname, PG_GETARG_DATUM(3));
timestamp = DirectFunctionCall3(ts_timestamp_bucket, period, timestamp, origin);
}
else
{
timestamp = DirectFunctionCall2(ts_timestamp_bucket, period, timestamp);
}

if (have_offset)
{
/* Remove offset. */
timestamp = DirectFunctionCall2(timestamp_pl_interval, timestamp, PG_GETARG_DATUM(4));
}

/* Convert back to timezone */
timestamp = DirectFunctionCall2(timestamp_zone, tzname, timestamp);

PG_RETURN_DATUM(timestamp);
}

static inline void
check_period_is_daily(int64 period)
{
Expand Down
1 change: 1 addition & 0 deletions src/time_bucket.h
Expand Up @@ -17,6 +17,7 @@ extern TSDLLEXPORT Datum ts_int64_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_date_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_timestamp_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_timestamptz_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_timestamptz_timezone_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT int64 ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid type);
extern TSDLLEXPORT Datum ts_time_bucket_ng_date(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_time_bucket_ng_timestamp(PG_FUNCTION_ARGS);
Expand Down
94 changes: 68 additions & 26 deletions src/ts_catalog/continuous_agg.c
Expand Up @@ -1541,46 +1541,88 @@ ts_continuous_agg_bucket_width(const ContinuousAgg *agg)
* a common procedure used by ts_compute_* below.
*/
static Datum
generic_time_bucket_ng(const ContinuousAggsBucketFunction *bf, Datum timestamp)
generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
{
/* bf->timezone can't be NULL. If timezone is not specified, "" is stored */
Assert(bf->timezone != NULL);

if (strlen(bf->timezone) > 0)
if (!bf->experimental)
{
if (strlen(bf->timezone) > 0)
{
if (TIMESTAMP_NOT_FINITE(bf->origin))
{
/* using default origin */
return DirectFunctionCall3(ts_timestamptz_timezone_bucket,
IntervalPGetDatum(bf->bucket_width),
timestamp,
CStringGetTextDatum(bf->timezone));
}
else
{
/* custom origin specified */
return DirectFunctionCall4(ts_timestamptz_timezone_bucket,
IntervalPGetDatum(bf->bucket_width),
timestamp,
CStringGetTextDatum(bf->timezone),
TimestampTzGetDatum((TimestampTz) bf->origin));
}
}

if (TIMESTAMP_NOT_FINITE(bf->origin))
{
/* using default origin */
return DirectFunctionCall3(ts_time_bucket_ng_timezone,
return DirectFunctionCall2(ts_timestamp_bucket,
IntervalPGetDatum(bf->bucket_width),
timestamp,
CStringGetTextDatum(bf->timezone));
timestamp);
}
else
{
/* custom origin specified */
return DirectFunctionCall4(ts_time_bucket_ng_timezone_origin,
return DirectFunctionCall3(ts_timestamp_bucket,
IntervalPGetDatum(bf->bucket_width),
timestamp,
TimestampTzGetDatum((TimestampTz) bf->origin),
CStringGetTextDatum(bf->timezone));
TimestampGetDatum(bf->origin));
}
}

if (TIMESTAMP_NOT_FINITE(bf->origin))
{
/* using default origin */
return DirectFunctionCall2(ts_time_bucket_ng_timestamp,
IntervalPGetDatum(bf->bucket_width),
timestamp);
}
else
{
/* custom origin specified */
return DirectFunctionCall3(ts_time_bucket_ng_timestamp,
IntervalPGetDatum(bf->bucket_width),
timestamp,
TimestampGetDatum(bf->origin));
if (strlen(bf->timezone) > 0)
{
if (TIMESTAMP_NOT_FINITE(bf->origin))
{
/* using default origin */
return DirectFunctionCall3(ts_time_bucket_ng_timezone,
IntervalPGetDatum(bf->bucket_width),
timestamp,
CStringGetTextDatum(bf->timezone));
}
else
{
/* custom origin specified */
return DirectFunctionCall4(ts_time_bucket_ng_timezone_origin,
IntervalPGetDatum(bf->bucket_width),
timestamp,
TimestampTzGetDatum((TimestampTz) bf->origin),
CStringGetTextDatum(bf->timezone));
}
}

if (TIMESTAMP_NOT_FINITE(bf->origin))
{
/* using default origin */
return DirectFunctionCall2(ts_time_bucket_ng_timestamp,
IntervalPGetDatum(bf->bucket_width),
timestamp);
}
else
{
/* custom origin specified */
return DirectFunctionCall3(ts_time_bucket_ng_timestamp,
IntervalPGetDatum(bf->bucket_width),
timestamp,
TimestampGetDatum(bf->origin));
}
}
}

Expand Down Expand Up @@ -1650,8 +1692,8 @@ ts_compute_inscribed_bucketed_refresh_window_variable(int64 *start, int64 *end,
start_old = ts_internal_to_time_value(*start, TIMESTAMPOID);
end_old = ts_internal_to_time_value(*end, TIMESTAMPOID);

start_new = generic_time_bucket_ng(bf, start_old);
end_new = generic_time_bucket_ng(bf, end_old);
start_new = generic_time_bucket(bf, start_old);
end_new = generic_time_bucket(bf, end_old);

if (DatumGetTimestamp(start_new) != DatumGetTimestamp(start_old))
{
Expand Down Expand Up @@ -1684,8 +1726,8 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e
*/
start_old = ts_internal_to_time_value(*start, TIMESTAMPOID);
end_old = ts_internal_to_time_value(*end, TIMESTAMPOID);
start_new = generic_time_bucket_ng(bf, start_old);
end_new = generic_time_bucket_ng(bf, end_old);
start_new = generic_time_bucket(bf, start_old);
end_new = generic_time_bucket(bf, end_old);

if (DatumGetTimestamp(end_new) != DatumGetTimestamp(end_old))
{
Expand Down Expand Up @@ -1716,7 +1758,7 @@ ts_compute_beginning_of_the_next_bucket_variable(int64 timeval,
*/
val_old = ts_internal_to_time_value(timeval, TIMESTAMPOID);

val_new = generic_time_bucket_ng(bf, val_old);
val_new = generic_time_bucket(bf, val_old);
val_new = generic_add_interval(bf, val_new);
return ts_time_value_to_internal(val_new, TIMESTAMPOID);
}