Skip to content

Commit

Permalink
Use NULL in CAgg bucket function catalog table
Browse files Browse the repository at this point in the history
Historically, we have used an empty string for undefined values in the
catalog table continuous_aggs_bucket_function. Since #6624, the optional
arguments can be NULL. This patch cleans up the empty strings and
changes the logic to work with NULL values.
  • Loading branch information
jnidzwetzki committed Feb 23, 2024
1 parent 17d424f commit fdf3aa3
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 50 deletions.
5 changes: 5 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,8 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_get_bucket_function(INTEGER)
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function
SET bucket_origin = bucket_origin::timestamp::timestamptz::text
WHERE length(bucket_origin) > 1;

-- Historically, we have used empty strings for undefined bucket_origin and timezone
-- attributes. This is now replaced by proper NULL values. We use TRIM() to ensure we handle empty string well.
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_origin = NULL WHERE TRIM(bucket_origin) = '';
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_timezone = NULL WHERE TRIM(bucket_timezone) = '';
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.remove_dropped_chunk_metadata(INT
--
-- Rebuild the catalog table `_timescaledb_catalog.continuous_aggs_bucket_function`
--
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_origin = '' WHERE bucket_origin IS NULL;
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_timezone = '' WHERE bucket_timezone IS NULL;

CREATE TABLE _timescaledb_catalog._tmp_continuous_aggs_bucket_function AS
SELECT
mat_hypertable_id,
Expand Down
56 changes: 27 additions & 29 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,10 @@ continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucke
init_scan_cagg_bucket_function_by_mat_hypertable_id(&iterator, mat_hypertable_id);
ts_scanner_foreach(&iterator)
{
const char *bucket_width_str;
const char *origin_str;
Datum values[Natts_continuous_aggs_bucket_function];
bool isnull[Natts_continuous_aggs_bucket_function];

bool should_free;

HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);

/*
Expand All @@ -427,41 +425,49 @@ continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucke
*/
heap_deform_tuple(tuple, ts_scan_iterator_tupledesc(&iterator), values, isnull);

/* Bucket function */
Assert(!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)]);
bf->bucket_function = DatumGetObjectId(
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)]);

Assert(OidIsValid(bf->bucket_function));

/*
* So far bucket_width is stored as TEXT for flexibility, but it's type
* most likely is going to change to Interval when the variable-sized
* buckets feature will stabilize.
* bucket_width
*
* The value is stored as TEXT since we have to store the interval value of time
* buckets and also the number value of integer based buckets.
*/
Assert(!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)]);
bucket_width_str = TextDatumGetCString(
const char *bucket_width_str = TextDatumGetCString(
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)]);
Assert(strlen(bucket_width_str) > 0);
bf->bucket_width = DatumGetIntervalP(
DirectFunctionCall3(interval_in, CStringGetDatum(bucket_width_str), InvalidOid, -1));

Assert(
!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)]);
origin_str = TextDatumGetCString(
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)]);
if (strlen(origin_str) == 0)
TIMESTAMP_NOBEGIN(bf->bucket_origin);
else
/* Bucket origin */
if (!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)])
{
const char *origin_str = TextDatumGetCString(values[AttrNumberGetAttrOffset(
Anum_continuous_aggs_bucket_function_bucket_origin)]);
bf->bucket_origin = DatumGetTimestamp(DirectFunctionCall3(timestamptz_in,
CStringGetDatum(origin_str),
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1)));
}
else
{
TIMESTAMP_NOBEGIN(bf->bucket_origin);
}

Assert(
!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)]);
bf->timezone = TextDatumGetCString(
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)]);
/* Bucket timezone */
if (!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)])
{
bf->timezone = TextDatumGetCString(values[AttrNumberGetAttrOffset(
Anum_continuous_aggs_bucket_function_bucket_timezone)]);
}

/* Bucket fixed width */
Assert(!isnull[AttrNumberGetAttrOffset(
Anum_continuous_aggs_bucket_function_bucket_fixed_width)]);
bf->bucket_fixed_interval = DatumGetBool(values[AttrNumberGetAttrOffset(
Expand Down Expand Up @@ -1351,16 +1357,13 @@ ts_continuous_agg_bucket_width(const ContinuousAgg *agg)
static Datum
generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
{
/* bf->timezone can't be NULL. If timezone is not specified, "" is stored */
Assert(bf->timezone != NULL);

FuncInfo *func_info = ts_func_cache_get_bucketing_func(bf->bucket_function);
Ensure(func_info != NULL, "unable to get bucket function for Oid %d", bf->bucket_function);
bool is_experimental = func_info->origin == ORIGIN_TIMESCALE_EXPERIMENTAL;

if (!is_experimental)
{
if (strlen(bf->timezone) > 0)
if (bf->timezone != NULL)
{
if (TIMESTAMP_NOT_FINITE(bf->bucket_origin))
{
Expand Down Expand Up @@ -1399,7 +1402,7 @@ generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
}
else
{
if (strlen(bf->timezone) > 0)
if (bf->timezone != NULL)
{
if (TIMESTAMP_NOT_FINITE(bf->bucket_origin))
{
Expand Down Expand Up @@ -1449,12 +1452,7 @@ static Datum
generic_add_interval(const ContinuousAggsBucketFunction *bf, Datum timestamp)
{
Datum tzname = 0;
bool has_timezone;

/* bf->timezone can't be NULL. If timezone is not specified, "" is stored */
Assert(bf->timezone != NULL);

has_timezone = (strlen(bf->timezone) > 0);
bool has_timezone = (bf->timezone != NULL);

if (has_timezone)
{
Expand Down
2 changes: 1 addition & 1 deletion src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ typedef struct ContinuousAggsBucketFunction
TimestampTz bucket_origin;

/* `bucket_offset` argument of the function. */
Interval *bucket_offest;
Interval *bucket_offset;

/* `timezone` argument of the function provided by the user. */
char *timezone;
Expand Down
64 changes: 49 additions & 15 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ create_cagg_catalog_entry(int32 matht_id, int32 rawht_id, const char *user_schem
*/
static void
create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_function, const char *bucket_width,
const char *origin, const char *offset, const char *timezone,
const bool bucket_fixed_width)
const char *bucket_origin, const char *bucket_offset,
const char *bucket_timezone, const bool bucket_fixed_width)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
Expand All @@ -203,18 +203,53 @@ create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_function, const
desc = RelationGetDescr(rel);

memset(values, 0, sizeof(values));

/* Hypertable ID */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_mat_hypertable_id)] =
matht_id;

/* Bucket function */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)] =
ObjectIdGetDatum(bucket_function);

/* Bucket width */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)] =
CStringGetTextDatum(bucket_width);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)] =
CStringGetTextDatum(origin);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_offset)] =
CStringGetTextDatum(offset);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)] =
CStringGetTextDatum(timezone ? timezone : "");

/* Bucket origin */
if (bucket_origin != NULL)
{
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)] =
CStringGetTextDatum(bucket_origin);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)] = true;
}

/* Bucket offset */
if (bucket_offset != NULL)
{
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_offset)] =
CStringGetTextDatum(bucket_offset);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_offset)] = true;
}

/* Bucket timezone */
if (bucket_timezone != NULL)
{
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)] =
CStringGetTextDatum(bucket_timezone);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)] = true;
}

/* Bucket fixed width */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_fixed_width)] =
BoolGetDatum(bucket_fixed_width);

Expand Down Expand Up @@ -784,28 +819,27 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer

if (bucket_info->bucket_width == BUCKET_WIDTH_VARIABLE)
{
const char *bucket_width;
const char *origin = "";
const char *offset = "";
const char *bucket_origin = NULL;
const char *bucket_offset = NULL;

/*
* Variable-sized buckets work only with intervals.
*/
Assert(bucket_info->interval != NULL);
bucket_width = DatumGetCString(
const char *bucket_width = DatumGetCString(
DirectFunctionCall1(interval_out, IntervalPGetDatum(bucket_info->interval)));

if (!TIMESTAMP_NOT_FINITE(bucket_info->origin))
{
origin = DatumGetCString(
bucket_origin = DatumGetCString(
DirectFunctionCall1(timestamptz_out, TimestampTzGetDatum(bucket_info->origin)));
}

create_bucket_function_catalog_entry(materialize_hypertable_id,
bucket_info->bucket_func->funcid,
bucket_width,
origin,
offset,
bucket_origin,
bucket_offset,
bucket_info->timezone,
bucket_info->bucket_width != BUCKET_WIDTH_VARIABLE);
}
Expand Down
10 changes: 6 additions & 4 deletions tsl/test/expected/exp_cagg_monthly.out
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ WHERE mat_hypertable_id = :cagg_id;
-1
(1 row)

SELECT bucket_func, bucket_width, bucket_origin, bucket_timezone, bucket_fixed_width
\pset null <NULL>
SELECT *
FROM _timescaledb_catalog.continuous_aggs_bucket_function
WHERE mat_hypertable_id = :cagg_id;
bucket_func | bucket_width | bucket_origin | bucket_timezone | bucket_fixed_width
--------------------------------------------------------+--------------+---------------+-----------------+--------------------
timescaledb_experimental.time_bucket_ng(interval,date) | @ 1 mon | | | f
mat_hypertable_id | bucket_func | bucket_width | bucket_origin | bucket_offset | bucket_timezone | bucket_fixed_width
-------------------+--------------------------------------------------------+--------------+---------------+---------------+-----------------+--------------------
2 | timescaledb_experimental.time_bucket_ng(interval,date) | @ 1 mon | <NULL> | <NULL> | <NULL> | f
(1 row)

\pset null ""
-- Check that the saved invalidation threshold is -infinity
SELECT _timescaledb_functions.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
Expand Down
4 changes: 3 additions & 1 deletion tsl/test/sql/exp_cagg_monthly.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ SELECT bucket_width
FROM _timescaledb_catalog.continuous_agg
WHERE mat_hypertable_id = :cagg_id;

SELECT bucket_func, bucket_width, bucket_origin, bucket_timezone, bucket_fixed_width
\pset null <NULL>
SELECT *
FROM _timescaledb_catalog.continuous_aggs_bucket_function
WHERE mat_hypertable_id = :cagg_id;
\pset null ""

-- Check that the saved invalidation threshold is -infinity
SELECT _timescaledb_functions.to_timestamp(watermark)
Expand Down

0 comments on commit fdf3aa3

Please sign in to comment.