-
Notifications
You must be signed in to change notification settings - Fork 848
/
continuous_aggs_watermark.c
444 lines (379 loc) · 14.2 KB
/
continuous_aggs_watermark.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
/*
* This file handles continuous aggs watermark functions.
*/
#include <postgres.h>
#include <access/xact.h>
#include <fmgr.h>
#include <miscadmin.h>
#include <utils/acl.h>
#include <utils/inval.h>
#include <utils/snapmgr.h>
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "hypertable.h"
typedef struct ContinuousAggregateWatermark
{
int32 mat_hypertable_id;
MemoryContext mctx;
MemoryContextCallback cb;
CommandId cid;
int64 value;
} ContinuousAggregateWatermark;
/*
* Cache the watermark in the current transaction for better performance
* (by avoiding repeated max bucket calculations). The watermark will be
* reset at the end of the transaction, when the watermark function's input
* argument (materialized hypertable ID) changes, or when a new command is
* executed.
*
* One could potentially create a hashtable of watermarks keyed on materialized
* hypertable ID, but this is left as a future optimization since it doesn't
* seem to be common case that multiple continuous aggregates exist in the same
* query. Besides, the planner can constify calls to the watermark function
* during planning since the function is STABLE. Therefore, this is only a
* fallback if the planner needs to constify it many times (e.g., if used as
* an index condition on many chunks).
*/
static ContinuousAggregateWatermark *cagg_watermark_cache = NULL;
/*
* Callback handler to reset the watermark after the transaction ends. This is
* triggered by the deletion of the associated memory context.
*/
static void
cagg_watermark_reset(void *arg)
{
cagg_watermark_cache = NULL;
}
/*
* ContinuousAggregateWatermark is valid for the duration of one command execution on the same
* materialized hypertable.
*/
static bool
cagg_watermark_is_valid(const ContinuousAggregateWatermark *cagg_watermark, int32 mat_hypertable_id)
{
return cagg_watermark != NULL && cagg_watermark->mat_hypertable_id == mat_hypertable_id &&
cagg_watermark->cid == GetCurrentCommandId(false);
}
static void
cagg_watermark_init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int32 mat_hypertable_id)
{
iterator->ctx.index = catalog_get_index(ts_catalog_get(),
CONTINUOUS_AGGS_WATERMARK,
CONTINUOUS_AGGS_WATERMARK_PKEY);
ts_scan_iterator_scan_key_init(iterator,
Anum_continuous_aggs_watermark_pkey_mat_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(mat_hypertable_id));
}
static int64
cagg_watermark_get(Hypertable *mat_ht)
{
PG_USED_FOR_ASSERTS_ONLY short count = 0;
Datum watermark = (Datum) 0;
bool value_isnull = true;
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, AccessShareLock, CurrentMemoryContext);
/*
* The watermark of a CAGG has to be fetched by using the transaction snapshot.
*
* By default, the ts_scanner uses the SnapshotSelf to perform a scan. However, reading the
* watermark must be done using the transaction snapshot in order to ensure that the view on the
* watermark and the materialized part of the CAGG match.
*/
iterator.ctx.snapshot = GetTransactionSnapshot();
Assert(iterator.ctx.snapshot != NULL);
cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_ht->fd.id);
ts_scanner_foreach(&iterator)
{
watermark = slot_getattr(ts_scan_iterator_slot(&iterator),
Anum_continuous_aggs_watermark_watermark,
&value_isnull);
count++;
}
Assert(count <= 1);
ts_scan_iterator_close(&iterator);
if (value_isnull)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("watermark not defined for continuous aggregate: %d", mat_ht->fd.id)));
/* Log the read watermark, needed for MVCC tap tests */
ereport(DEBUG5,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("watermark for continuous aggregate, '%d' is: " INT64_FORMAT,
mat_ht->fd.id,
DatumGetInt64(watermark))));
return DatumGetInt64(watermark);
}
static ContinuousAggregateWatermark *
cagg_watermark_create(const ContinuousAgg *cagg, MemoryContext top_mctx)
{
Hypertable *ht;
ContinuousAggregateWatermark *watermark;
MemoryContext mctx = AllocSetContextCreate(top_mctx,
"ContinuousAggregateWatermark function",
ALLOCSET_DEFAULT_SIZES);
watermark = MemoryContextAllocZero(mctx, sizeof(ContinuousAggregateWatermark));
watermark->mctx = mctx;
watermark->mat_hypertable_id = cagg->data.mat_hypertable_id;
watermark->cid = GetCurrentCommandId(false);
watermark->cb.func = cagg_watermark_reset;
MemoryContextRegisterResetCallback(mctx, &watermark->cb);
/* Hypertable associated to the Continuous Aggregate */
ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
if (NULL == ht)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialization hypertable ID: %d",
cagg->data.mat_hypertable_id)));
/* Get the stored watermark */
watermark->value = cagg_watermark_get(ht);
return watermark;
}
TS_FUNCTION_INFO_V1(ts_continuous_agg_watermark);
/*
* Get the watermark for a real-time aggregation query on a continuous
* aggregate.
*
* The watermark determines where the materialization ends for a continuous
* aggregate. It is used by real-time aggregation as the threshold between the
* materialized data and real-time data in the UNION query.
*
* The watermark is stored into `_timescaledb_catalog.continuous_aggs_watermark`
* catalog table by the `refresh_continuous_agregate` procedure. It is defined
* as the end of the last (highest) bucket in the materialized hypertable of a
* continuous aggregate.
*
* The materialized hypertable ID is given as input argument.
*/
Datum
ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
{
const int32 mat_hypertable_id = PG_GETARG_INT32(0);
ContinuousAgg *cagg;
AclResult aclresult;
if (NULL != cagg_watermark_cache)
{
if (cagg_watermark_is_valid(cagg_watermark_cache, mat_hypertable_id))
PG_RETURN_INT64(cagg_watermark_cache->value);
MemoryContextDelete(cagg_watermark_cache->mctx);
}
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);
if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
/*
* Preemptive permission check to ensure the function complains about lack
* of permissions on the cagg rather than the materialized hypertable
*/
aclresult = pg_class_aclcheck(cagg->relid, GetUserId(), ACL_SELECT);
aclcheck_error(aclresult, OBJECT_MATVIEW, get_rel_name(cagg->relid));
cagg_watermark_cache = cagg_watermark_create(cagg, TopTransactionContext);
PG_RETURN_INT64(cagg_watermark_cache->value);
}
static int64
cagg_compute_watermark(ContinuousAgg *cagg, int64 watermark, bool isnull)
{
if (isnull)
{
watermark = ts_time_get_min(cagg->partition_type);
}
else
{
/*
* The materialized hypertable is already bucketed, which means the
* max is the start of the last bucket. Add one bucket to move to the
* point where the materialized data ends.
*/
if (ts_continuous_agg_bucket_width_variable(cagg))
{
/*
* Since `value` is already bucketed, `bucketed = true` flag can
* be added to ts_compute_beginning_of_the_next_bucket_variable() as
* an optimization, if necessary.
*/
watermark =
ts_compute_beginning_of_the_next_bucket_variable(watermark, cagg->bucket_function);
}
else
{
watermark = ts_time_saturating_add(watermark,
ts_continuous_agg_bucket_width(cagg),
cagg->partition_type);
}
}
return watermark;
}
TS_FUNCTION_INFO_V1(ts_continuous_agg_watermark_materialized);
/*
* Get the materialized watermark for a real-time aggregation query on a
* continuous aggregate.
*
* The difference between this function and `ts_continuous_agg_watermark` is
* that this one get the max open dimension of the materialization hypertable
* insted of get the stored value in the catalog table.
*/
Datum
ts_continuous_agg_watermark_materialized(PG_FUNCTION_ARGS)
{
const int32 mat_hypertable_id = PG_GETARG_INT32(0);
ContinuousAgg *cagg;
AclResult aclresult;
bool isnull;
Hypertable *ht;
int64 watermark;
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);
if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
/*
* Preemptive permission check to ensure the function complains about lack
* of permissions on the cagg rather than the materialized hypertable
*/
aclresult = pg_class_aclcheck(cagg->relid, GetUserId(), ACL_SELECT);
aclcheck_error(aclresult, OBJECT_MATVIEW, get_rel_name(cagg->relid));
ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
watermark = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull);
watermark = cagg_compute_watermark(cagg, watermark, isnull);
PG_RETURN_INT64(watermark);
}
TSDLLEXPORT void
ts_cagg_watermark_insert(Hypertable *mat_ht, int64 watermark, bool watermark_isnull)
{
Catalog *catalog = ts_catalog_get();
Relation rel =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_WATERMARK), RowExclusiveLock);
TupleDesc desc = RelationGetDescr(rel);
Datum values[Natts_continuous_aggs_watermark];
bool nulls[Natts_continuous_aggs_watermark] = { false };
CatalogSecurityContext sec_ctx;
/* if trying to insert a NULL watermark then get the MIN value for the time dimension */
if (watermark_isnull)
{
const Dimension *dim = hyperspace_get_open_dimension(mat_ht->space, 0);
if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", 0);
watermark = ts_time_get_min(ts_dimension_get_partition_type(dim));
}
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_watermark_mat_hypertable_id)] =
Int32GetDatum(mat_ht->fd.id);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_watermark_watermark)] =
Int64GetDatum(watermark);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, NoLock);
}
typedef struct WatermarkUpdate
{
int64 watermark;
bool force_update;
} WatermarkUpdate;
static ScanTupleResult
cagg_watermark_update_scan_internal(TupleInfo *ti, void *data)
{
WatermarkUpdate *watermark_update = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_continuous_aggs_watermark form = (Form_continuous_aggs_watermark) GETSTRUCT(tuple);
if (watermark_update->watermark > form->watermark || watermark_update->force_update)
{
HeapTuple new_tuple = heap_copytuple(tuple);
form = (Form_continuous_aggs_watermark) GETSTRUCT(new_tuple);
form->watermark = watermark_update->watermark;
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);
}
else
{
elog(DEBUG1,
"hypertable %d existing watermark >= new watermark " INT64_FORMAT " " INT64_FORMAT,
form->mat_hypertable_id,
form->watermark,
watermark_update->watermark);
watermark_update->watermark = form->watermark;
}
if (should_free)
heap_freetuple(tuple);
return SCAN_DONE;
}
static void
cagg_watermark_update_internal(int32 mat_hypertable_id, Oid ht_relid, int64 new_watermark,
bool force_update, bool invalidate_rel_cache)
{
bool watermark_updated;
ScanKeyData scankey[1];
WatermarkUpdate data = { .watermark = new_watermark, .force_update = force_update };
ScanKeyInit(&scankey[0],
Anum_continuous_aggs_watermark_mat_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(mat_hypertable_id));
watermark_updated = ts_catalog_scan_one(CONTINUOUS_AGGS_WATERMARK /*=table*/,
CONTINUOUS_AGGS_WATERMARK_PKEY /*=indexid*/,
scankey /*=scankey*/,
1 /*=num_keys*/,
cagg_watermark_update_scan_internal /*=tuple_found*/,
RowExclusiveLock /*=lockmode*/,
CONTINUOUS_AGGS_WATERMARK_TABLE_NAME /*=table_name*/,
&data /*=data*/);
if (!watermark_updated)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("watermark not defined for continuous aggregate: %d", mat_hypertable_id)));
}
/*
* We claim that '_timescaledb_functions.cagg_watermark' is IMMUTABLE. However, that is not
* entirely true but needed for constification of the function value and plan-time chunk
* exclusion. The value of the cagg_watermark function changes as soon as we change the
* watermark (so the function value is STABLE, but for this volatility classification, no plan
* time evaluation and chunk exclusion can be performed).
*
* Send an invalidation for the hypertable to invalidate prepared statements on this
* table and force a re-planning using the new watermark.
*/
if (invalidate_rel_cache)
CacheInvalidateRelcacheByRelid(ht_relid);
}
TSDLLEXPORT void
ts_cagg_watermark_update(Hypertable *mat_ht, int64 watermark, bool watermark_isnull,
bool force_update)
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id);
/* If we have a real-time CAgg, it uses a watermark function. So, we have to invalidate the rel
* cache to force a replanning of prepared statements. See cagg_watermark_update_internal for
* more information. */
bool invalidate_rel_cache = !cagg->data.materialized_only;
if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_ht->fd.id)));
watermark = cagg_compute_watermark(cagg, watermark, watermark_isnull);
cagg_watermark_update_internal(mat_ht->fd.id,
mat_ht->main_table_relid,
watermark,
force_update,
invalidate_rel_cache);
return;
}
TSDLLEXPORT void
ts_cagg_watermark_delete_by_mat_hypertable_id(int32 mat_hypertable_id)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, RowExclusiveLock, CurrentMemoryContext);
cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_hypertable_id);
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
}
ts_scan_iterator_close(&iterator);
}