-
Notifications
You must be signed in to change notification settings - Fork 848
/
continuous_aggregate_api.c
612 lines (528 loc) · 17.2 KB
/
continuous_aggregate_api.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/xact.h>
#include <common/int128.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>
#include <utils/acl.h>
#include <jsonb_utils.h>
#include <utils/builtins.h>
#include "bgw_policy/continuous_aggregate_api.h"
#include "bgw_policy/job.h"
#include "bgw/job.h"
#include "continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "dimension.h"
#include "hypertable_cache.h"
#include "time_utils.h"
#include "policy_utils.h"
#include "time_utils.h"
#define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate"
#define CONFIG_KEY_MAT_HYPERTABLE_ID "mat_hypertable_id"
#define CONFIG_KEY_START_OFFSET "start_offset"
#define CONFIG_KEY_END_OFFSET "end_offset"
/* Default max runtime for a continuous aggregate jobs is unlimited for now */
#define DEFAULT_MAX_RUNTIME \
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("0"), InvalidOid, -1))
/* infinite number of retries for continuous aggregate jobs */
#define DEFAULT_MAX_RETRIES (-1)
int32
policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config)
{
bool found;
int32 mat_hypertable_id =
ts_jsonb_get_int32_field(config, CONFIG_KEY_MAT_HYPERTABLE_ID, &found);
if (!found)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find \"%s\" in config for job", CONFIG_KEY_MAT_HYPERTABLE_ID)));
return mat_hypertable_id;
}
static int64
get_time_from_interval(const Dimension *dim, Datum interval, Oid type)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
if (IS_INTEGER_TYPE(type))
{
Oid now_func = ts_get_integer_now_func(dim);
int64 value = ts_interval_value_to_internal(interval, type);
Assert(now_func);
return ts_subtract_integer_from_now_saturating(now_func, value, partitioning_type);
}
else if (type == INTERVALOID)
{
Datum res = subtract_interval_from_now(DatumGetIntervalP(interval), partitioning_type);
return ts_time_value_to_internal(res, partitioning_type);
}
else
elog(ERROR, "unsupported offset type for continuous aggregate policy");
pg_unreachable();
return 0;
}
static int64
get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json_label,
bool *isnull)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
*isnull = false;
if (IS_INTEGER_TYPE(partitioning_type))
{
bool found;
int64 interval_val = ts_jsonb_get_int64_field(config, json_label, &found);
if (!found)
{
*isnull = true;
return 0;
}
return get_time_from_interval(dim, Int64GetDatum(interval_val), INT8OID);
}
else
{
Interval *interval_val = ts_jsonb_get_interval_field(config, json_label);
if (!interval_val)
{
*isnull = true;
return 0;
}
return get_time_from_interval(dim, IntervalPGetDatum(interval_val), INTERVALOID);
}
}
int64
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config)
{
bool start_isnull;
int64 res = get_time_from_config(dim, config, CONFIG_KEY_START_OFFSET, &start_isnull);
/* interpret NULL as min value for that type */
if (start_isnull)
return ts_time_get_min(ts_dimension_get_partition_type(dim));
return res;
}
int64
policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config)
{
bool end_isnull;
int64 res = get_time_from_config(dim, config, CONFIG_KEY_END_OFFSET, &end_isnull);
if (end_isnull)
return ts_time_get_end_or_max(ts_dimension_get_partition_type(dim));
return res;
}
Datum
policy_refresh_cagg_proc(PG_FUNCTION_ARGS)
{
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_VOID();
TS_PREVENT_FUNC_IF_READ_ONLY();
policy_refresh_cagg_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));
PG_RETURN_VOID();
}
static Oid
ts_cagg_permissions_check(Oid cagg_oid, Oid userid)
{
Oid ownerid = ts_rel_get_owner(cagg_oid);
if (!has_privs_of_role(userid, ownerid))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be owner of continuous aggregate \"%s\"", get_rel_name(cagg_oid))));
return ownerid;
}
static void
json_add_dim_interval_value(JsonbParseState *parse_state, const char *json_label, Oid dim_type,
Datum value)
{
switch (dim_type)
{
case INTERVALOID:
ts_jsonb_add_interval(parse_state, json_label, DatumGetIntervalP(value));
break;
case INT2OID:
ts_jsonb_add_int64(parse_state, json_label, DatumGetInt16(value));
break;
case INT4OID:
ts_jsonb_add_int64(parse_state, json_label, DatumGetInt32(value));
break;
case INT8OID:
ts_jsonb_add_int64(parse_state, json_label, DatumGetInt64(value));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported interval argument type, expected type : %s",
format_type_be(dim_type))));
}
}
static Datum
convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg)
{
Oid convert_to = dim_type;
Datum converted;
if (IS_TIMESTAMP_TYPE(dim_type))
convert_to = INTERVALOID;
if (*interval_type != convert_to)
{
if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT))
{
if (IS_INTEGER_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
else if (IS_TIMESTAMP_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using "
"timestamp-based time "
"bucket.")));
}
}
converted = ts_time_datum_convert_arg(interval, interval_type, convert_to);
/* For integer types, first convert all types to int64 to get on a common
* type. Then check valid time ranges against the partition/dimension
* type */
switch (*interval_type)
{
case INT2OID:
converted = Int64GetDatum((int64) DatumGetInt16(converted));
break;
case INT4OID:
converted = Int64GetDatum((int64) DatumGetInt32(converted));
break;
case INT8OID:
break;
case INTERVALOID:
/* For timestamp types, we only support Interval, so nothing further
* to do. */
return converted;
default:
pg_unreachable();
break;
}
/* Cap at min and max */
if (DatumGetInt64(converted) < ts_time_get_min(dim_type))
converted = ts_time_get_min(dim_type);
else if (DatumGetInt64(converted) > ts_time_get_max(dim_type))
converted = ts_time_get_max(dim_type);
/* Convert to the desired integer type */
switch (dim_type)
{
case INT2OID:
converted = Int16GetDatum((int16) DatumGetInt64(converted));
break;
case INT4OID:
converted = Int32GetDatum((int32) DatumGetInt64(converted));
break;
case INT8OID:
/* Already int64, so nothing to do. */
break;
default:
pg_unreachable();
break;
}
*interval_type = dim_type;
return converted;
}
typedef struct CaggPolicyOffset
{
Datum value;
Oid type;
bool isnull;
const char *name;
} CaggPolicyOffset;
typedef struct CaggPolicyConfig
{
Oid partition_type;
CaggPolicyOffset offset_start;
CaggPolicyOffset offset_end;
} CaggPolicyConfig;
/*
* Convert an interval to a 128 integer value.
*
* Based on PostgreSQL's interval_cmp_value().
*/
static inline INT128
interval_to_int128(const Interval *interval)
{
INT128 span;
int64 dayfraction;
int64 days;
/*
* Separate time field into days and dayfraction, then add the month and
* day fields to the days part. We cannot overflow int64 days here.
*/
dayfraction = interval->time % USECS_PER_DAY;
days = interval->time / USECS_PER_DAY;
days += interval->month * INT64CONST(30);
days += interval->day;
/* Widen dayfraction to 128 bits */
span = int64_to_int128(dayfraction);
/* Scale up days to microseconds, forming a 128-bit product */
int128_add_int64_mul_int64(&span, days, USECS_PER_DAY);
return span;
}
static int64
interval_to_int64(Datum interval, Oid type)
{
switch (type)
{
case INT2OID:
return DatumGetInt16(interval);
case INT4OID:
return DatumGetInt32(interval);
case INT8OID:
return DatumGetInt64(interval);
case INTERVALOID:
{
const int64 max = ts_time_get_max(TIMESTAMPTZOID);
const int64 min = ts_time_get_min(TIMESTAMPTZOID);
INT128 bigres = interval_to_int128(DatumGetIntervalP(interval));
if (int128_compare(bigres, int64_to_int128(max)) >= 0)
return max;
else if (int128_compare(bigres, int64_to_int128(min)) <= 0)
return min;
else
return int128_to_int64(bigres);
}
default:
break;
}
pg_unreachable();
return 0;
}
static const char *
two_buckets_to_str(const ContinuousAgg *cagg)
{
Oid bucket_type;
Oid outfuncid;
int64 two_buckets;
Datum min_range;
bool isvarlena;
if (IS_TIMESTAMP_TYPE(cagg->partition_type))
bucket_type = INTERVALOID;
else
bucket_type = cagg->partition_type;
two_buckets = ts_time_saturating_add(cagg->data.bucket_width,
cagg->data.bucket_width,
cagg->partition_type);
min_range = ts_internal_to_interval_value(two_buckets, bucket_type);
getTypeOutputInfo(bucket_type, &outfuncid, &isvarlena);
Assert(!isvarlena);
return DatumGetCString(OidFunctionCall1(outfuncid, min_range));
}
/*
* Enforce that a policy has a refresh window of at least two buckets to
* ensure we materialize at least one bucket each run.
*
* Why two buckets? Note that the policy probably won't execute at at time
* that exactly aligns with a bucket boundary, so a window of one bucket
* might not cover a full bucket that we want to materialize:
*
* Refresh window: [-----)
* Materialized buckets: |-----|-----|-----|
*/
static void
validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
{
int64 start_offset;
int64 end_offset;
if (config->offset_start.isnull)
start_offset = ts_time_get_max(cagg->partition_type);
else
start_offset = interval_to_int64(config->offset_start.value, config->offset_start.type);
if (config->offset_end.isnull)
end_offset = ts_time_get_min(cagg->partition_type);
else
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);
if (ts_time_saturating_add(end_offset, cagg->data.bucket_width * 2, INT8OID) > start_offset)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("policy refresh window too small"),
errdetail("The start and end offsets must cover at least"
" two buckets in the valid time range of type \"%s\".",
format_type_be(cagg->partition_type)),
errhint("Use a start and end offset that specifies"
" a window of at least %s.",
two_buckets_to_str(cagg))));
}
static void
parse_offset_arg(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo, CaggPolicyOffset *offset,
int argnum)
{
offset->isnull = PG_ARGISNULL(argnum);
if (!offset->isnull)
{
Oid type = get_fn_expr_argtype(fcinfo->flinfo, argnum);
Datum arg = PG_GETARG_DATUM(argnum);
offset->value = convert_interval_arg(cagg->partition_type, arg, &type, offset->name);
offset->type = type;
}
}
static void
parse_cagg_policy_config(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo,
CaggPolicyConfig *config)
{
MemSet(config, 0, sizeof(CaggPolicyConfig));
config->partition_type = cagg->partition_type;
/* This might seem backwards, but since we are dealing with offsets, start
* actually translates to max and end to min for maximum window. */
config->offset_start.value = ts_time_datum_get_max(config->partition_type);
config->offset_end.value = ts_time_datum_get_min(config->partition_type);
config->offset_start.type = config->offset_end.type =
IS_TIMESTAMP_TYPE(cagg->partition_type) ? INTERVALOID : cagg->partition_type;
config->offset_start.name = CONFIG_KEY_START_OFFSET;
config->offset_end.name = CONFIG_KEY_END_OFFSET;
parse_offset_arg(cagg, fcinfo, &config->offset_start, 1);
parse_offset_arg(cagg, fcinfo, &config->offset_end, 2);
Assert(config->offset_start.type == config->offset_end.type);
validate_window_size(cagg, config);
}
Datum
policy_refresh_cagg_add(PG_FUNCTION_ARGS)
{
NameData application_name;
NameData proc_name, proc_schema, owner;
ContinuousAgg *cagg;
CaggPolicyConfig policyconf;
int32 job_id;
Interval refresh_interval;
Oid cagg_oid, owner_id;
List *jobs;
JsonbParseState *parse_state = NULL;
bool if_not_exists;
/* Verify that the owner can create a background worker */
cagg_oid = PG_GETARG_OID(0);
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);
cagg = ts_continuous_agg_find_by_relid(cagg_oid);
if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
parse_cagg_policy_config(cagg, fcinfo, &policyconf);
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
/* Make sure there is only 1 refresh policy on the cagg */
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
INTERNAL_SCHEMA_NAME,
cagg->data.mat_hypertable_id);
if (jobs != NIL)
{
Assert(list_length(jobs) == 1);
if (!if_not_exists)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("continuous aggregate policy already exists for \"%s\"",
get_rel_name(cagg_oid)),
errdetail("Only one continuous aggregate policy can be created per continuous "
"aggregate and a policy with job id %d already exists for \"%s\".",
((BgwJob *) linitial(jobs))->fd.id,
get_rel_name(cagg_oid))));
BgwJob *existing = linitial(jobs);
if (policy_config_check_hypertable_lag_equality(existing->fd.config,
CONFIG_KEY_START_OFFSET,
cagg->partition_type,
policyconf.offset_start.type,
policyconf.offset_start.value) &&
policy_config_check_hypertable_lag_equality(existing->fd.config,
CONFIG_KEY_END_OFFSET,
cagg->partition_type,
policyconf.offset_end.type,
policyconf.offset_end.value))
{
/* If all arguments are the same, do nothing */
ereport(NOTICE,
(errmsg("continuous aggregate policy already exists for \"%s\", "
"skipping",
get_rel_name(cagg_oid))));
PG_RETURN_INT32(-1);
}
else
{
ereport(WARNING,
(errmsg("continuous aggregate policy already exists for \"%s\"",
get_rel_name(cagg_oid)),
errdetail("A policy already exists with different arguments."),
errhint("Remove the existing policy before adding a new one.")));
PG_RETURN_INT32(-1);
}
}
/* Next, insert a new job into jobs table */
namestrcpy(&application_name, "Refresh Continuous Aggregate Policy");
namestrcpy(&proc_name, POLICY_REFRESH_CAGG_PROC_NAME);
namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME);
namestrcpy(&owner, GetUserNameFromId(owner_id, false));
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, cagg->data.mat_hypertable_id);
if (!policyconf.offset_start.isnull)
json_add_dim_interval_value(parse_state,
CONFIG_KEY_START_OFFSET,
policyconf.offset_start.type,
policyconf.offset_start.value);
else
ts_jsonb_add_null(parse_state, CONFIG_KEY_START_OFFSET);
if (!policyconf.offset_end.isnull)
json_add_dim_interval_value(parse_state,
CONFIG_KEY_END_OFFSET,
policyconf.offset_end.type,
policyconf.offset_end.value);
else
ts_jsonb_add_null(parse_state, CONFIG_KEY_END_OFFSET);
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
Jsonb *config = JsonbValueToJsonb(result);
job_id = ts_bgw_job_insert_relation(&application_name,
&refresh_interval,
DEFAULT_MAX_RUNTIME,
DEFAULT_MAX_RETRIES,
&refresh_interval,
&proc_schema,
&proc_name,
&owner,
true,
cagg->data.mat_hypertable_id,
config);
PG_RETURN_INT32(job_id);
}
Datum
policy_refresh_cagg_remove(PG_FUNCTION_ARGS)
{
Oid cagg_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
int32 mat_htid;
ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(cagg_oid);
if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
ts_cagg_permissions_check(cagg_oid, GetUserId());
mat_htid = cagg->data.mat_hypertable_id;
List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
INTERNAL_SCHEMA_NAME,
mat_htid);
if (jobs == NIL)
{
if (!if_exists)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
(errmsg("continuous aggregate policy not found for \"%s\"",
get_rel_name(cagg_oid)))));
else
{
ereport(NOTICE,
(errmsg("continuous aggregate policy not found for \"%s\", skipping",
get_rel_name(cagg_oid))));
PG_RETURN_VOID();
}
}
Assert(list_length(jobs) == 1);
BgwJob *job = linitial(jobs);
ts_bgw_job_delete_by_id(job->fd.id);
PG_RETURN_VOID();
}