Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp out of range #4804

Merged
merged 1 commit into from Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@ accidentally triggering the load of a previous DB version.**
## Unreleased

**Bugfixes**
* #4804 Skip bucketing when start or end of refresh job is null
* #4926 Fix corruption when inserting into compressed chunks

## 2.9.2 (2023-01-26)
Expand Down
3 changes: 2 additions & 1 deletion src/time_bucket.c
Expand Up @@ -466,7 +466,8 @@ ts_date_offset_bucket(PG_FUNCTION_ARGS)

/* when working with time_buckets stored in our catalog, we may not know ahead of time which
* bucketing function to use, this function dynamically dispatches to the correct time_bucket_<foo>
* based on an inputted timestamp_type*/
* based on an inputted timestamp_type
*/
TSDLLEXPORT int64
ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
{
Expand Down
16 changes: 8 additions & 8 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Expand Up @@ -112,22 +112,22 @@ get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json
}

int64
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config)
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, bool *start_isnull)
{
bool start_isnull;
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, &start_isnull);
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, start_isnull);

/* interpret NULL as min value for that type */
if (start_isnull)
if (*start_isnull)
return ts_time_get_min(ts_dimension_get_partition_type(dim));
return res;
}

int64
policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config)
policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config, bool *end_isnull)
{
bool end_isnull;
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_END_OFFSET, &end_isnull);
if (end_isnull)
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_END_OFFSET, end_isnull);

if (*end_isnull)
return ts_time_get_end_or_max(ts_dimension_get_partition_type(dim));
return res;
}
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.h
Expand Up @@ -18,8 +18,10 @@ extern Datum policy_refresh_cagg_check(PG_FUNCTION_ARGS);
extern Datum policy_refresh_cagg_remove(PG_FUNCTION_ARGS);

int32 policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config,
bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);
Expand Down
11 changes: 8 additions & 3 deletions tsl/src/bgw_policy/job.c
Expand Up @@ -336,7 +336,9 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
policy_refresh_cagg_read_and_validate_config(config, &policy_data);
continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY);
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null);

return true;
}
Expand All @@ -349,6 +351,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
const Dimension *open_dim;
Oid dim_type;
int64 refresh_start, refresh_end;
bool start_isnull, end_isnull;

materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config);
mat_ht = ts_hypertable_get_by_id(materialization_id);
Expand All @@ -361,8 +364,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD

open_dim = get_open_dimension_for_hypertable(mat_ht);
dim_type = ts_dimension_get_partition_type(open_dim);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config, &start_isnull);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config, &end_isnull);

if (refresh_start >= refresh_end)
ereport(ERROR,
Expand All @@ -379,6 +382,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.end = refresh_end;
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id);
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
}
}

Expand Down
1 change: 1 addition & 0 deletions tsl/src/bgw_policy/job.h
Expand Up @@ -35,6 +35,7 @@ typedef struct PolicyContinuousAggData
{
InternalTimeRange refresh_window;
ContinuousAgg *cagg;
bool start_is_null, end_is_null;
} PolicyContinuousAggData;

typedef struct PolicyCompressionData
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/create.c
Expand Up @@ -2791,7 +2791,7 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
ts_time_get_min(refresh_window.type);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);

continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true);
}
return DDL_DONE;
}
Expand Down
37 changes: 23 additions & 14 deletions tsl/src/continuous_aggs/refresh.c
Expand Up @@ -573,7 +573,11 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
else
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);

continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_WINDOW);
continuous_agg_refresh_internal(cagg,
&refresh_window,
CAGG_REFRESH_WINDOW,
PG_ARGISNULL(1),
PG_ARGISNULL(2));

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -698,11 +702,12 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx)
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
{
Catalog *catalog = ts_catalog_get();
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window;
InternalTimeRange refresh_window = *refresh_window_arg;
int64 computed_invalidation_threshold;
int64 invalidation_threshold;
bool is_raw_ht_distributed;
Expand Down Expand Up @@ -732,18 +737,22 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
is_raw_ht_distributed = hypertable_is_distributed(ht);

if (ts_continuous_agg_bucket_width_variable(cagg))
/* No bucketing when open ended */
if (!(start_isnull && end_isnull))
{
refresh_window = *refresh_window_arg;
ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
}
else
{
refresh_window =
compute_inscribed_bucketed_refresh_window(refresh_window_arg,
ts_continuous_agg_bucket_width(cagg));
if (ts_continuous_agg_bucket_width_variable(cagg))
{
refresh_window = *refresh_window_arg;
ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
}
else
{
refresh_window =
compute_inscribed_bucketed_refresh_window(refresh_window_arg,
ts_continuous_agg_bucket_width(cagg));
}
}

if (refresh_window.start >= refresh_window.end)
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/refresh.h
Expand Up @@ -28,6 +28,7 @@ extern void continuous_agg_calculate_merged_refresh_window(
InternalTimeRange *merged_refresh_window);
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx);
const CaggRefreshCallContext callctx,
const bool start_isnull, const bool end_isnull);

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H */
21 changes: 10 additions & 11 deletions tsl/test/expected/cagg_invalidation.out
Expand Up @@ -559,17 +559,16 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(7 rows)

SELECT * FROM hyper_invals;
hyper_id | start | end
Expand Down
23 changes: 11 additions & 12 deletions tsl/test/expected/cagg_invalidation_dist_ht-12.out
Expand Up @@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)

SELECT * FROM hyper_invals;
hyper_id | start | end
Expand Down
23 changes: 11 additions & 12 deletions tsl/test/expected/cagg_invalidation_dist_ht-13.out
Expand Up @@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)

SELECT * FROM hyper_invals;
hyper_id | start | end
Expand Down
23 changes: 11 additions & 12 deletions tsl/test/expected/cagg_invalidation_dist_ht-14.out
Expand Up @@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)

SELECT * FROM hyper_invals;
hyper_id | start | end
Expand Down
23 changes: 11 additions & 12 deletions tsl/test/expected/cagg_invalidation_dist_ht-15.out
Expand Up @@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)

SELECT * FROM hyper_invals;
hyper_id | start | end
Expand Down
31 changes: 18 additions & 13 deletions tsl/test/expected/exp_cagg_origin.out
Expand Up @@ -910,6 +910,15 @@ SELECT create_hypertable(
(17,public,conditions_timestamptz,t)
(1 row)

-- Add some data to the hypertable and make sure it is visible in the cagg
INSERT INTO conditions_timestamptz(tstamp, city, temperature)
SELECT ts, city,
(CASE WHEN city = 'Moscow' THEN 20000 ELSE 10000 END) +
date_part('day', ts at time zone 'MSK')*100 +
date_part('hour', ts at time zone 'MSK')
FROM
generate_series('2022-01-01 00:00:00 MSK' :: timestamptz, '2022-01-02 00:00:00 MSK' :: timestamptz - interval '1 hour', '1 hour') as ts,
unnest(array['Moscow', 'Berlin']) as city;
\set ON_ERROR_STOP 0
-- For monthly buckets origin should be the first day of the month in given timezone
-- 2020-06-02 00:00:00 MSK == 2020-06-01 21:00:00 UTC
Expand Down Expand Up @@ -941,23 +950,18 @@ SELECT city,
MAX(temperature)
FROM conditions_timestamptz
GROUP BY city, bucket;
NOTICE: continuous aggregate "conditions_summary_timestamptz" is already up-to-date
NOTICE: refreshing continuous aggregate "conditions_summary_timestamptz"
SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS b, min, max
FROM conditions_summary_timestamptz
ORDER BY b, city;
city | b | min | max
------+---+-----+-----
(0 rows)
city | b | min | max
--------+---------------------+-------+-------
Berlin | 2022-01-01 00:00:00 | 10100 | 10111
Moscow | 2022-01-01 00:00:00 | 20100 | 20111
Berlin | 2022-01-01 12:00:00 | 10112 | 10123
Moscow | 2022-01-01 12:00:00 | 20112 | 20123
(4 rows)

-- Add some data to the hypertable and make sure it is visible in the cagg
INSERT INTO conditions_timestamptz(tstamp, city, temperature)
SELECT ts, city,
(CASE WHEN city = 'Moscow' THEN 20000 ELSE 10000 END) +
date_part('day', ts at time zone 'MSK')*100 +
date_part('hour', ts at time zone 'MSK')
FROM
generate_series('2022-01-01 00:00:00 MSK' :: timestamptz, '2022-01-02 00:00:00 MSK' :: timestamptz - interval '1 hour', '1 hour') as ts,
unnest(array['Moscow', 'Berlin']) as city;
-- Check the data
SELECT to_char(tstamp at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS ts, city, temperature FROM conditions_timestamptz
ORDER BY ts, city;
Expand Down Expand Up @@ -1026,6 +1030,7 @@ ORDER BY b, city;

-- Refresh the cagg and make sure that the result of SELECT query didn't change
CALL refresh_continuous_aggregate('conditions_summary_timestamptz', '2022-01-01 00:00:00 MSK', '2022-01-02 00:00:00 MSK');
NOTICE: continuous aggregate "conditions_summary_timestamptz" is already up-to-date
SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS b, min, max
FROM conditions_summary_timestamptz
ORDER BY b, city;
Expand Down