Skip to content

Commit

Permalink
Move int time_bucket functions with offset to C
Browse files Browse the repository at this point in the history
  • Loading branch information
svenklemm committed Nov 12, 2018
1 parent 92d4dbd commit f27c0a3
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 72 deletions.
29 changes: 9 additions & 20 deletions sql/time_bucket.sql
Expand Up @@ -24,15 +24,22 @@ CREATE OR REPLACE FUNCTION time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ, or
CREATE OR REPLACE FUNCTION time_bucket(bucket_width INTERVAL, ts DATE, origin DATE) RETURNS DATE
AS '@MODULE_PATHNAME@', 'ts_date_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

-- bucketing of int
CREATE OR REPLACE FUNCTION time_bucket(bucket_width SMALLINT, ts SMALLINT) RETURNS SMALLINT
AS '@MODULE_PATHNAME@', 'ts_int16_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION time_bucket(bucket_width INT, ts INT) RETURNS INT
AS '@MODULE_PATHNAME@', 'ts_int32_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

CREATE OR REPLACE FUNCTION time_bucket(bucket_width BIGINT, ts BIGINT) RETURNS BIGINT
AS '@MODULE_PATHNAME@', 'ts_int64_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

-- bucketing of int with offset
CREATE OR REPLACE FUNCTION time_bucket(bucket_width SMALLINT, ts SMALLINT, "offset" SMALLINT) RETURNS SMALLINT
AS '@MODULE_PATHNAME@', 'ts_int16_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;
CREATE OR REPLACE FUNCTION time_bucket(bucket_width INT, ts INT, "offset" INT) RETURNS INT
AS '@MODULE_PATHNAME@', 'ts_int32_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;
CREATE OR REPLACE FUNCTION time_bucket(bucket_width BIGINT, ts BIGINT, "offset" BIGINT) RETURNS BIGINT
AS '@MODULE_PATHNAME@', 'ts_int64_bucket' LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT;

-- If an interval is given as the third argument, the bucket alignment is offset by the interval.
CREATE OR REPLACE FUNCTION time_bucket(bucket_width INTERVAL, ts TIMESTAMP, "offset" INTERVAL)
RETURNS TIMESTAMP LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
Expand All @@ -52,21 +59,3 @@ $BODY$
SELECT (@extschema@.time_bucket(bucket_width, ts-"offset")+"offset")::date;
$BODY$;

CREATE OR REPLACE FUNCTION time_bucket(bucket_width SMALLINT, ts SMALLINT, "offset" SMALLINT)
RETURNS SMALLINT LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
$BODY$
SELECT @extschema@.time_bucket(bucket_width, ts-"offset")+"offset";
$BODY$;

CREATE OR REPLACE FUNCTION time_bucket(bucket_width INT, ts INT, "offset" INT)
RETURNS INT LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
$BODY$
SELECT @extschema@.time_bucket(bucket_width, ts-"offset")+"offset";
$BODY$;

CREATE OR REPLACE FUNCTION time_bucket(bucket_width BIGINT, ts BIGINT, "offset" BIGINT)
RETURNS BIGINT LANGUAGE SQL IMMUTABLE PARALLEL SAFE STRICT AS
$BODY$
SELECT @extschema@.time_bucket(bucket_width, ts-"offset")+"offset";
$BODY$;

42 changes: 42 additions & 0 deletions src/plan_add_hashagg.c
Expand Up @@ -74,41 +74,83 @@ static CustomEstimateForFunctionInfo custom_estimate_func_info[] =
.arg_types = {INTERVALOID, TIMESTAMPOID},
.custom_group_estimate_func = custom_group_estimate_time_bucket
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 3,
.arg_types = {INTERVALOID, TIMESTAMPOID, TIMESTAMPOID},
.custom_group_estimate_func = custom_group_estimate_time_bucket
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 2,
.arg_types = {INTERVALOID, TIMESTAMPTZOID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 3,
.arg_types = {INTERVALOID, TIMESTAMPTZOID, TIMESTAMPTZOID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 2,
.arg_types = {INTERVALOID, DATEOID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 3,
.arg_types = {INTERVALOID, DATEOID, DATEOID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 2,
.arg_types = {INT2OID, INT2OID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 3,
.arg_types = {INT2OID, INT2OID, INT2OID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 2,
.arg_types = {INT4OID, INT4OID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 3,
.arg_types = {INT4OID, INT4OID, INT4OID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 2,
.arg_types = {INT8OID, INT8OID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.extension_function = true,
.function_name = "time_bucket",
.nargs = 3,
.arg_types = {INT8OID, INT8OID, INT8OID},
.custom_group_estimate_func = custom_group_estimate_time_bucket,
},
{
.function_name = "date_trunc",
.nargs = 2,
Expand Down
14 changes: 13 additions & 1 deletion src/sort_transform.c
Expand Up @@ -48,6 +48,13 @@ transform_date_trunc(FuncExpr *func)
return (Expr *) copyObject(second);
}

/*
* Check that time_bucket period is Const and if an offset is supplied
* that it is Const aswell
*/
#define time_bucket_has_const_period_and_offset(func) \
(IsA(linitial((func)->args), Const) && (list_length((func)->args) == 2 || IsA(lthird((func)->args), Const)))

static Expr *
transform_time_bucket(FuncExpr *func)
{
Expand All @@ -59,7 +66,12 @@ transform_time_bucket(FuncExpr *func)
*/
Expr *second;

if (list_length(func->args) != 2 || !IsA(linitial(func->args), Const))
Assert(list_length(func->args) >= 2);

/*
* If period and offset are not constants we must not do the optimization
*/
if (!time_bucket_has_const_period_and_offset(func))
return (Expr *) func;

second = sort_transform_expr(lsecond(func->args));
Expand Down
53 changes: 38 additions & 15 deletions src/time_bucket.c
Expand Up @@ -16,24 +16,38 @@
#include <utils/fmgrprotos.h>
#endif

#define TIME_BUCKET(period, timestamp, min, result) \
#define TIME_BUCKET(period, timestamp, offset, min, max, result) \
do \
{ \
if (period <= 0) \
ereport(ERROR, \
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), \
errmsg("period must be greater then 0"))); \
*(result) = (timestamp / period) * period; \
if (timestamp < 0) \
if (timestamp % period) \
{ \
if (*(result) < min + period) \
ereport(ERROR, \
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), \
errmsg("timestamp out of range"))); \
else \
*(result) = *(result) - period; \
} \
if (offset != 0) \
{ \
/* We need to ensure that the timestamp is in range _after_ the */ \
/* offset is applied: when the offset is positive we need to make */ \
/* sure the resultant time is at least min, and when negative that */ \
/* it is less than the max. */ \
offset = offset % period; \
if ((offset > 0 && timestamp < min + offset) || \
(offset < 0 && timestamp > max + offset)) \
ereport(ERROR, \
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), \
errmsg("timestamp out of range"))); \
timestamp -= offset; \
} \
result = (timestamp / period) * period; \
if (timestamp < 0 && timestamp % period) \
{ \
if (result < min + period) \
ereport(ERROR, \
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), \
errmsg("timestamp out of range"))); \
else \
result = result - period; \
} \
result += offset; \
} while (0)


Expand All @@ -42,8 +56,11 @@ Datum
ts_int16_bucket(PG_FUNCTION_ARGS)
{
int16 result;
int16 period = PG_GETARG_INT16(0);
int16 timestamp = PG_GETARG_INT16(1);
int16 offset = PG_NARGS() > 2 ? PG_GETARG_INT16(2) : 0;

TIME_BUCKET(PG_GETARG_INT16(0), PG_GETARG_INT16(1), PG_INT16_MIN, &result);
TIME_BUCKET(period, timestamp, offset, PG_INT16_MIN, PG_INT16_MAX, result);

PG_RETURN_INT16(result);
}
Expand All @@ -53,8 +70,11 @@ Datum
ts_int32_bucket(PG_FUNCTION_ARGS)
{
int32 result;
int32 period = PG_GETARG_INT32(0);
int32 timestamp = PG_GETARG_INT32(1);
int32 offset = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : 0;

TIME_BUCKET(PG_GETARG_INT32(0), PG_GETARG_INT32(1), PG_INT32_MIN, &result);
TIME_BUCKET(period, timestamp, offset, PG_INT32_MIN, PG_INT32_MAX, result);

PG_RETURN_INT32(result);
}
Expand All @@ -64,8 +84,11 @@ Datum
ts_int64_bucket(PG_FUNCTION_ARGS)
{
int64 result;
int64 period = PG_GETARG_INT64(0);
int64 timestamp = PG_GETARG_INT64(1);
int64 offset = PG_NARGS() > 2 ? PG_GETARG_INT64(2) : 0;

TIME_BUCKET(PG_GETARG_INT64(0), PG_GETARG_INT64(1), PG_INT64_MIN, &result);
TIME_BUCKET(period, timestamp, offset, PG_INT64_MIN, PG_INT64_MAX, result);

PG_RETURN_INT64(result);
}
Expand Down
16 changes: 8 additions & 8 deletions test/expected/plan_hashagg_optimized-10.out
Expand Up @@ -205,13 +205,13 @@ ORDER BY MetricMinuteTs DESC;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: ((time_bucket('60'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)) DESC
Sort Key: (time_bucket('60'::bigint, hyper.time_int, '10'::bigint)) DESC
-> Finalize HashAggregate
Group Key: ((time_bucket('60'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint))
Group Key: (time_bucket('60'::bigint, hyper.time_int, '10'::bigint))
-> Gather
Workers Planned: 2
-> Partial HashAggregate
Group Key: (time_bucket('60'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)
Group Key: time_bucket('60'::bigint, hyper.time_int, '10'::bigint)
-> Result
-> Append
-> Parallel Seq Scan on hyper
Expand Down Expand Up @@ -326,13 +326,13 @@ ORDER BY MetricMinuteTs DESC, metric.id;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: ((time_bucket('3600'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)) DESC, metric.id
Sort Key: (time_bucket('3600'::bigint, hyper.time_int, '10'::bigint)) DESC, metric.id
-> Finalize HashAggregate
Group Key: ((time_bucket('3600'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)), metric.id
Group Key: (time_bucket('3600'::bigint, hyper.time_int, '10'::bigint)), metric.id
-> Gather
Workers Planned: 2
-> Partial HashAggregate
Group Key: (time_bucket('3600'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint), metric.id
Group Key: time_bucket('3600'::bigint, hyper.time_int, '10'::bigint), metric.id
-> Hash Join
Hash Cond: (hyper.metricid = metric.id)
-> Append
Expand All @@ -354,9 +354,9 @@ ORDER BY MetricMinuteTs DESC, metric.id;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Group Key: ((time_bucket('3600'::bigint, (regular.time_int - '10'::bigint)) + '10'::bigint)), metric.id
Group Key: (time_bucket('3600'::bigint, regular.time_int, '10'::bigint)), metric.id
-> Sort
Sort Key: ((time_bucket('3600'::bigint, (regular.time_int - '10'::bigint)) + '10'::bigint)) DESC, metric.id
Sort Key: (time_bucket('3600'::bigint, regular.time_int, '10'::bigint)) DESC, metric.id
-> Nested Loop
Join Filter: (regular.metricid = metric.id)
-> Seq Scan on regular
Expand Down
12 changes: 6 additions & 6 deletions test/expected/plan_hashagg_optimized-9.6.out
Expand Up @@ -177,9 +177,9 @@ ORDER BY MetricMinuteTs DESC;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: ((time_bucket('60'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)) DESC
Sort Key: (time_bucket('60'::bigint, hyper.time_int, '10'::bigint)) DESC
-> HashAggregate
Group Key: (time_bucket('60'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)
Group Key: time_bucket('60'::bigint, hyper.time_int, '10'::bigint)
-> Result
-> Append
-> Seq Scan on hyper
Expand Down Expand Up @@ -278,9 +278,9 @@ ORDER BY MetricMinuteTs DESC, metric.id;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: ((time_bucket('3600'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint)) DESC, metric.id
Sort Key: (time_bucket('3600'::bigint, hyper.time_int, '10'::bigint)) DESC, metric.id
-> HashAggregate
Group Key: (time_bucket('3600'::bigint, (hyper.time_int - '10'::bigint)) + '10'::bigint), metric.id
Group Key: time_bucket('3600'::bigint, hyper.time_int, '10'::bigint), metric.id
-> Hash Join
Hash Cond: (hyper.metricid = metric.id)
-> Append
Expand All @@ -302,9 +302,9 @@ ORDER BY MetricMinuteTs DESC, metric.id;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Group Key: ((time_bucket('3600'::bigint, (regular.time_int - '10'::bigint)) + '10'::bigint)), metric.id
Group Key: (time_bucket('3600'::bigint, regular.time_int, '10'::bigint)), metric.id
-> Sort
Sort Key: ((time_bucket('3600'::bigint, (regular.time_int - '10'::bigint)) + '10'::bigint)) DESC, metric.id
Sort Key: (time_bucket('3600'::bigint, regular.time_int, '10'::bigint)) DESC, metric.id
-> Nested Loop
Join Filter: (regular.metricid = metric.id)
-> Seq Scan on regular
Expand Down
4 changes: 2 additions & 2 deletions test/expected/sql_query_results_optimized.out
Expand Up @@ -405,10 +405,10 @@ FROM hyper_1_int GROUP BY t ORDER BY t DESC limit 2;
----------------------------------------------------------------------------------------------
Limit
-> GroupAggregate
Group Key: ((time_bucket(10, (hyper_1_int."time" - 2)) + 2))
Group Key: (time_bucket(10, hyper_1_int."time", 2))
-> Result
-> Merge Append
Sort Key: ((time_bucket(10, (hyper_1_int."time" - 2)) + 2)) DESC
Sort Key: (time_bucket(10, hyper_1_int."time", 2)) DESC
-> Index Scan using time_plain_int on hyper_1_int
-> Index Scan using _hyper_3_3_chunk_time_plain_int on _hyper_3_3_chunk
-> Index Scan using _hyper_3_4_chunk_time_plain_int on _hyper_3_4_chunk
Expand Down
8 changes: 4 additions & 4 deletions test/expected/sql_query_results_unoptimized.out
Expand Up @@ -413,13 +413,13 @@ FROM hyper_1_int GROUP BY t ORDER BY t DESC limit 2;

EXPLAIN (costs off) SELECT time_bucket(10, time, 2) t, avg(series_0), min(series_1), avg(series_2)
FROM hyper_1_int GROUP BY t ORDER BY t DESC limit 2;
QUERY PLAN
--------------------------------------------------------------------------------
QUERY PLAN
-----------------------------------------------------------------------
Limit
-> GroupAggregate
Group Key: ((time_bucket(10, (hyper_1_int."time" - 2)) + 2))
Group Key: (time_bucket(10, hyper_1_int."time", 2))
-> Sort
Sort Key: ((time_bucket(10, (hyper_1_int."time" - 2)) + 2)) DESC
Sort Key: (time_bucket(10, hyper_1_int."time", 2)) DESC
-> Result
-> Append
-> Seq Scan on hyper_1_int
Expand Down
8 changes: 4 additions & 4 deletions test/expected/sql_query_results_x_diff.out
Expand Up @@ -264,20 +264,20 @@
< QUERY PLAN
< ----------------------------------------------------------------------------------------------
---
> QUERY PLAN
> --------------------------------------------------------------------------------
> QUERY PLAN
> -----------------------------------------------------------------------
409,416c421,429
< -> Result
< -> Merge Append
< Sort Key: ((time_bucket(10, (hyper_1_int."time" - 2)) + 2)) DESC
< Sort Key: (time_bucket(10, hyper_1_int."time", 2)) DESC
< -> Index Scan using time_plain_int on hyper_1_int
< -> Index Scan using _hyper_3_3_chunk_time_plain_int on _hyper_3_3_chunk
< -> Index Scan using _hyper_3_4_chunk_time_plain_int on _hyper_3_4_chunk
< -> Index Scan using _hyper_3_5_chunk_time_plain_int on _hyper_3_5_chunk
< (10 rows)
---
> -> Sort
> Sort Key: ((time_bucket(10, (hyper_1_int."time" - 2)) + 2)) DESC
> Sort Key: (time_bucket(10, hyper_1_int."time", 2)) DESC
> -> Result
> -> Append
> -> Seq Scan on hyper_1_int
Expand Down

0 comments on commit f27c0a3

Please sign in to comment.