Skip to content

Commit

Permalink
USE C function for time_bucket() offset
Browse files Browse the repository at this point in the history
Instead of using SQL UDF for handling offset parameter
added ts_timestamp/tz/date_offset_bucket() which will
handle offset
  • Loading branch information
Sachin authored and SachinSetiya committed Nov 28, 2022
1 parent 9b3b571 commit 8a2a9b0
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 37 deletions.
28 changes: 8 additions & 20 deletions sql/time_bucket.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIM
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts DATE, origin DATE) RETURNS DATE
AS '@MODULE_PATHNAME@', 'ts_date_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

--bucketing with offset
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIMESTAMP, "offset" INTERVAL) RETURNS TIMESTAMP
AS '@MODULE_PATHNAME@', 'ts_timestamp_offset_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ, "offset" INTERVAL) RETURNS TIMESTAMPTZ
AS '@MODULE_PATHNAME@', 'ts_timestamptz_offset_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts DATE, "offset" INTERVAL) RETURNS DATE
AS '@MODULE_PATHNAME@', 'ts_date_offset_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

-- bucketing with timezone
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ, timezone TEXT, origin TIMESTAMPTZ DEFAULT NULL, "offset" INTERVAL DEFAULT NULL) RETURNS TIMESTAMPTZ
AS '@MODULE_PATHNAME@', 'ts_timestamptz_timezone_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE;
Expand All @@ -42,23 +50,3 @@ CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INT, ts INT, "of
AS '@MODULE_PATHNAME@', 'ts_int32_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width BIGINT, ts BIGINT, "offset" BIGINT) RETURNS BIGINT
AS '@MODULE_PATHNAME@', 'ts_int64_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

-- If an interval is given as the third argument, the bucket alignment is offset by the interval.
CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIMESTAMP, "offset" INTERVAL)
RETURNS TIMESTAMP LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
$BODY$
SELECT @extschema@.time_bucket(bucket_width, ts OPERATOR(pg_catalog.-) "offset") OPERATOR(pg_catalog.+) "offset";
$BODY$;

CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ, "offset" INTERVAL)
RETURNS TIMESTAMPTZ LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
$BODY$
SELECT @extschema@.time_bucket(bucket_width, ts OPERATOR(pg_catalog.-) "offset") OPERATOR(pg_catalog.+) "offset";
$BODY$;

CREATE OR REPLACE FUNCTION @extschema@.time_bucket(bucket_width INTERVAL, ts DATE, "offset" INTERVAL)
RETURNS DATE LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
$BODY$
SELECT (@extschema@.time_bucket(bucket_width, ts OPERATOR(pg_catalog.-) "offset") OPERATOR(pg_catalog.+) "offset")::pg_catalog.date;
$BODY$;

30 changes: 30 additions & 0 deletions src/func_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, TIMESTAMPOID, INTERVALOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
Expand All @@ -234,6 +244,16 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, TIMESTAMPTZOID, INTERVALOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
Expand All @@ -254,6 +274,16 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, DATEOID, INTERVALOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
Expand Down
62 changes: 62 additions & 0 deletions src/time_bucket.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,26 @@ ts_timestamp_bucket(PG_FUNCTION_ARGS)
}
}

TS_FUNCTION_INFO_V1(ts_timestamp_offset_bucket);

TSDLLEXPORT Datum
ts_timestamp_offset_bucket(PG_FUNCTION_ARGS)
{
Datum period = PG_GETARG_DATUM(0);
Datum timestamp = PG_GETARG_DATUM(1);

if (TIMESTAMP_NOT_FINITE(DatumGetTimestamp(timestamp)))
PG_RETURN_DATUM(timestamp);

/* Apply offset. */
timestamp = DirectFunctionCall2(timestamp_mi_interval, timestamp, PG_GETARG_DATUM(2));
timestamp = DirectFunctionCall2(ts_timestamp_bucket, period, timestamp);

/* Remove offset. */
timestamp = DirectFunctionCall2(timestamp_pl_interval, timestamp, PG_GETARG_DATUM(2));
PG_RETURN_DATUM(timestamp);
}

TS_FUNCTION_INFO_V1(ts_timestamptz_bucket);

TSDLLEXPORT Datum
Expand Down Expand Up @@ -281,6 +301,26 @@ ts_timestamptz_bucket(PG_FUNCTION_ARGS)
}
}

TS_FUNCTION_INFO_V1(ts_timestamptz_offset_bucket);

TSDLLEXPORT Datum
ts_timestamptz_offset_bucket(PG_FUNCTION_ARGS)
{
Datum period = PG_GETARG_DATUM(0);
Datum timestamp = PG_GETARG_DATUM(1);

if (TIMESTAMP_NOT_FINITE(DatumGetTimestampTz(timestamp)))
PG_RETURN_DATUM(timestamp);

/* Apply offset. */
timestamp = DirectFunctionCall2(timestamptz_mi_interval, timestamp, PG_GETARG_DATUM(2));
timestamp = DirectFunctionCall2(ts_timestamptz_bucket, period, timestamp);

/* Remove offset. */
timestamp = DirectFunctionCall2(timestamptz_pl_interval, timestamp, PG_GETARG_DATUM(2));
PG_RETURN_DATUM(timestamp);
}

TS_FUNCTION_INFO_V1(ts_timestamptz_timezone_bucket);

/*
Expand Down Expand Up @@ -402,6 +442,28 @@ ts_date_bucket(PG_FUNCTION_ARGS)
}
}

TS_FUNCTION_INFO_V1(ts_date_offset_bucket);

TSDLLEXPORT Datum
ts_date_offset_bucket(PG_FUNCTION_ARGS)
{
Datum period = PG_GETARG_DATUM(0);
Datum date = PG_GETARG_DATUM(1);

if (DATE_NOT_FINITE(DatumGetDateADT(date)))
PG_RETURN_DATUM(date);

/* Apply offset. */
Datum time = DirectFunctionCall2(date_mi_interval, date, PG_GETARG_DATUM(2));
date = DirectFunctionCall1(timestamp_date, time);
date = DirectFunctionCall2(ts_date_bucket, period, date);

/* Remove offset. */
time = DirectFunctionCall2(date_pl_interval, date, PG_GETARG_DATUM(2));
date = DirectFunctionCall1(timestamp_date, time);
PG_RETURN_DATUM(date);
}

/* when working with time_buckets stored in our catalog, we may not know ahead of time which
* bucketing function to use, this function dynamically dispatches to the correct time_bucket_<foo>
* based on an inputted timestamp_type*/
Expand Down
4 changes: 2 additions & 2 deletions test/expected/plan_hashagg.out
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ ORDER BY MetricMinuteTs DESC;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: ((time_bucket('@ 1 min'::interval, (_hyper_1_1_chunk."time" - '@ 30 secs'::interval)) + '@ 30 secs'::interval)) DESC
Sort Key: (time_bucket('@ 1 min'::interval, _hyper_1_1_chunk."time", '@ 30 secs'::interval)) DESC
-> HashAggregate
Group Key: (time_bucket('@ 1 min'::interval, (_hyper_1_1_chunk."time" - '@ 30 secs'::interval)) + '@ 30 secs'::interval)
Group Key: time_bucket('@ 1 min'::interval, _hyper_1_1_chunk."time", '@ 30 secs'::interval)
-> Result
-> Seq Scan on _hyper_1_1_chunk
Filter: (("time" >= 'Thu Jan 04 00:00:00 2001'::timestamp without time zone) AND ("time" <= 'Fri Jan 05 01:00:00 2001'::timestamp without time zone))
Expand Down
11 changes: 6 additions & 5 deletions test/expected/query-12.out
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ BEGIN;

:PREFIX SELECT time_bucket('1 minute', time, INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------
Limit
-> GroupAggregate
Group Key: ((time_bucket('@ 1 min'::interval, (_hyper_1_1_chunk."time" - '@ 30 secs'::interval)) + '@ 30 secs'::interval))
-> Result
Group Key: (time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval))
-> Custom Scan (ChunkAppend) on hyper_1
Order: time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval) DESC
-> Index Scan using _hyper_1_1_chunk_time_plain on _hyper_1_1_chunk
(5 rows)
(6 rows)

:PREFIX SELECT time_bucket('1 minute', time - INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
Expand Down
11 changes: 6 additions & 5 deletions test/expected/query-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ BEGIN;

:PREFIX SELECT time_bucket('1 minute', time, INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------
Limit
-> GroupAggregate
Group Key: ((time_bucket('@ 1 min'::interval, (_hyper_1_1_chunk."time" - '@ 30 secs'::interval)) + '@ 30 secs'::interval))
-> Result
Group Key: (time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval))
-> Custom Scan (ChunkAppend) on hyper_1
Order: time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval) DESC
-> Index Scan using _hyper_1_1_chunk_time_plain on _hyper_1_1_chunk
(5 rows)
(6 rows)

:PREFIX SELECT time_bucket('1 minute', time - INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
Expand Down
11 changes: 6 additions & 5 deletions test/expected/query-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ BEGIN;

:PREFIX SELECT time_bucket('1 minute', time, INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------
Limit
-> GroupAggregate
Group Key: ((time_bucket('@ 1 min'::interval, (_hyper_1_1_chunk."time" - '@ 30 secs'::interval)) + '@ 30 secs'::interval))
-> Result
Group Key: (time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval))
-> Custom Scan (ChunkAppend) on hyper_1
Order: time_bucket('@ 1 min'::interval, hyper_1."time", '@ 30 secs'::interval) DESC
-> Index Scan using _hyper_1_1_chunk_time_plain on _hyper_1_1_chunk
(5 rows)
(6 rows)

:PREFIX SELECT time_bucket('1 minute', time - INTERVAL '30 seconds') t, avg(series_0), min(series_1), trunc(avg(series_2)::numeric,5)
FROM hyper_1 GROUP BY t ORDER BY t DESC limit 2;
Expand Down
37 changes: 37 additions & 0 deletions test/expected/timestamp.out
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,43 @@ FROM unnest(ARRAY[
Sun Jan 02 01:08:00 2011 | Sun Jan 02 01:08:00 2011
(4 rows)

--offset with infinity
-- timestamp
SELECT time, time_bucket(INTERVAL '1 week', time, INTERVAL '1 day')
FROM unnest(ARRAY[
timestamp '-Infinity',
timestamp 'Infinity'
]) AS time;
time | time_bucket
-----------+-------------
-infinity | -infinity
infinity | infinity
(2 rows)

-- timestamptz
SELECT time, time_bucket(INTERVAL '1 week', time, INTERVAL '1 day')
FROM unnest(ARRAY[
timestamp with time zone '-Infinity',
timestamp with time zone 'Infinity'
]) AS time;
time | time_bucket
-----------+-------------
-infinity | -infinity
infinity | infinity
(2 rows)

-- Date
SELECT date, time_bucket(INTERVAL '1 week', date, INTERVAL '1 day')
FROM unnest(ARRAY[
date '-Infinity',
date 'Infinity'
]) AS date;
date | time_bucket
-----------+-------------
-infinity | -infinity
infinity | infinity
(2 rows)

--example to align with an origin
SELECT time, time_bucket(INTERVAL '5 minute', time - (TIMESTAMP '2011-01-02 00:02:00' - TIMESTAMP 'epoch')) + (TIMESTAMP '2011-01-02 00:02:00'-TIMESTAMP 'epoch')
FROM unnest(ARRAY[
Expand Down
22 changes: 22 additions & 0 deletions test/sql/timestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,28 @@ FROM unnest(ARRAY[
TIMESTAMP '2011-01-02 01:08:00'
]) AS time;

--offset with infinity
-- timestamp
SELECT time, time_bucket(INTERVAL '1 week', time, INTERVAL '1 day')
FROM unnest(ARRAY[
timestamp '-Infinity',
timestamp 'Infinity'
]) AS time;

-- timestamptz
SELECT time, time_bucket(INTERVAL '1 week', time, INTERVAL '1 day')
FROM unnest(ARRAY[
timestamp with time zone '-Infinity',
timestamp with time zone 'Infinity'
]) AS time;

-- Date
SELECT date, time_bucket(INTERVAL '1 week', date, INTERVAL '1 day')
FROM unnest(ARRAY[
date '-Infinity',
date 'Infinity'
]) AS date;

--example to align with an origin
SELECT time, time_bucket(INTERVAL '5 minute', time - (TIMESTAMP '2011-01-02 00:02:00' - TIMESTAMP 'epoch')) + (TIMESTAMP '2011-01-02 00:02:00'-TIMESTAMP 'epoch')
FROM unnest(ARRAY[
Expand Down

0 comments on commit 8a2a9b0

Please sign in to comment.