forked from timescale/timescaledb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
invalidation_threshold.c
333 lines (296 loc) · 11.4 KB
/
invalidation_threshold.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/htup_details.h>
#include <access/htup.h>
#include <access/xact.h>
#include <nodes/memnodes.h>
#include <storage/lockdefs.h>
#include <storage/lmgr.h>
#include <utils/builtins.h>
#include <utils/memutils.h>
#include <utils/snapmgr.h>
#include "ts_catalog/catalog.h"
#include <scanner.h>
#include <scan_iterator.h>
#include <compat/compat.h>
#include <time_utils.h>
#include <time_bucket.h>
#include "debug_point.h"
#include "ts_catalog/continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "invalidation_threshold.h"
/*
* Invalidation threshold.
*
* The invalidation threshold acts as a dampener on a hypertable to make sure
* that invalidations written during inserts won't cause too much write
* amplification in "hot" regions---typically the "head" of the table. The
* presumption is that most inserts happen at recent time intervals, and those
* intervals will be invalid until writes move out of them. Therefore, it
* isn't worth writing invalidations in that region since it is presumed
* out-of-date anyway. Further, although it is possible to refresh a
* continuous aggregate in those "hot" regions, it will lead to partially
* filled buckets. Thus, refreshing those intervals is discouraged since the
* aggregate will be immediately out-of-date until the buckets are filled. The
* invalidation threshold is, in other words, used as a marker that lags
* behind the head of the hypertable, where invalidations are written before
* the threshold but not after it.
*
* The invalidation threshold is moved forward (and only forward) by refreshes
* on continuous aggregates when it covers a window that stretches beyond the
* current threshold. The invalidation threshold needs to be moved in its own
* transaction, with exclusive access, before the refresh starts to
* materialize data. This is to avoid losing any invalidations that occur
* between the start of the transaction that moves the threshold and its end
* (when the new threshold becomes visible).
*
* ______________________________________________
* |_______________________________________|_____| recent data
* ^
* invalidations written here | no invalidations
* |
* invalidation threshold
*
* Transactions that use an isolation level stronger than READ COMMITTED will
* not be able to "see" changes to the invalidation threshold that may have
* been made while they were running. Therefore, they always create records
* in the hypertable invalidation log. See the cache_inval_entry_write()
* implementation in tsl/src/continuous_aggs/insert.c
*/
typedef struct InvalidationThresholdData
{
const ContinuousAgg *cagg;
const InternalTimeRange *refresh_window;
int64 computed_invalidation_threshold;
} InvalidationThresholdData;
static ScanTupleResult
invalidation_threshold_scan_update(TupleInfo *ti, void *const data)
{
DEBUG_WAITPOINT("invalidation_threshold_scan_update_enter");
InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data;
/* If the tuple was modified concurrently, retry the operation */
if (ti->lockresult == TM_Updated)
return SCAN_RESCAN;
if (ti->lockresult != TM_Ok)
{
elog(ERROR,
"unable to lock invalidation threshold tuple for hypertable %d (lock result %d)",
invthresh->cagg->data.raw_hypertable_id,
ti->lockresult);
pg_unreachable();
}
bool isnull;
Datum datum =
slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull);
/* NULL should never happen because we always initialize the threshold with the MIN
* value of the partition type */
Ensure(!isnull,
"invalidation threshold for hypertable %d is null",
invthresh->cagg->data.raw_hypertable_id);
int64 current_invalidation_threshold = DatumGetInt64(datum);
/* Compute new invalidation threshold. Note that this computation caps the
* threshold at the end of the last bucket that holds data in the
* underlying hypertable. */
invthresh->computed_invalidation_threshold =
invalidation_threshold_compute(invthresh->cagg, invthresh->refresh_window);
if (invthresh->computed_invalidation_threshold > current_invalidation_threshold)
{
bool nulls[Natts_continuous_agg];
Datum values[Natts_continuous_agg];
bool do_replace[Natts_continuous_agg] = { false };
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple;
TupleDesc tupdesc = ts_scanner_get_tupledesc(ti);
heap_deform_tuple(tuple, tupdesc, values, nulls);
do_replace[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
true;
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
Int64GetDatum(invthresh->computed_invalidation_threshold);
new_tuple = heap_modify_tuple(tuple, tupdesc, values, nulls, do_replace);
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);
}
else
{
elog(DEBUG1,
"hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT
" " INT64_FORMAT,
invthresh->cagg->data.raw_hypertable_id,
current_invalidation_threshold,
invthresh->computed_invalidation_threshold);
invthresh->computed_invalidation_threshold = current_invalidation_threshold;
}
return SCAN_CONTINUE;
}
/*
* Set a new invalidation threshold.
*
* The threshold is only updated if the new threshold is greater than the old
* one.
*
* On success, the new threshold is returned, otherwise the existing threshold
* is returned instead.
*/
int64
invalidation_threshold_set_or_get(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window)
{
bool found = false;
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
InvalidationThresholdData updatectx = {
.cagg = cagg,
.refresh_window = refresh_window,
};
ScannerCtx scanctx = {
.table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
.index =
catalog_get_index(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.data = &updatectx,
.limit = 1,
.tuple_found = invalidation_threshold_scan_update,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock,
.flags = SCANNER_F_KEEPLOCK,
};
ScanKeyInit(&scankey[0],
Anum_continuous_aggs_invalidation_threshold_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(cagg->data.raw_hypertable_id));
found = ts_scanner_scan_one(&scanctx, false, "invalidation threshold");
Ensure(found,
"invalidation threshold for hypertable %d not found",
cagg->data.raw_hypertable_id);
return updatectx.computed_invalidation_threshold;
}
/*
* Compute a new invalidation threshold.
*
* The new invalidation threshold returned is the end of the given refresh
* window, unless it ends at "infinity" in which case the threshold is capped
* at the end of the last bucket materialized.
*/
int64
invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window)
{
bool max_refresh = false;
Hypertable *ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id);
if (IS_TIMESTAMP_TYPE(refresh_window->type))
max_refresh = TS_TIME_IS_END(refresh_window->end, refresh_window->type) ||
TS_TIME_IS_NOEND(refresh_window->end, refresh_window->type);
else
max_refresh = TS_TIME_IS_MAX(refresh_window->end, refresh_window->type);
if (max_refresh)
{
bool isnull;
int64 maxval = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull);
if (isnull)
{
/* No data in hypertable */
if (ts_continuous_agg_bucket_width_variable(cagg))
{
/*
* To determine inscribed/circumscribed refresh window for variable-sized
* buckets we should be able to calculate time_bucket(window.begin) and
* time_bucket(window.end). This, however, is not possible in general case.
* As an example, the minimum date is 4714-11-24 BC, which is before any
* reasonable default `origin` value. Thus for variable-sized buckets
* instead of minimum date we use -infinity since time_bucket(-infinity)
* is well-defined as -infinity.
*
* For more details see:
* - ts_compute_inscribed_bucketed_refresh_window_variable()
* - ts_compute_circumscribed_bucketed_refresh_window_variable()
*/
return ts_time_get_nobegin(refresh_window->type);
}
else
{
/* For fixed-sized buckets return min (start of time) */
return ts_time_get_min(refresh_window->type);
}
}
else
{
if (ts_continuous_agg_bucket_width_variable(cagg))
{
return ts_compute_beginning_of_the_next_bucket_variable(maxval,
cagg->bucket_function);
}
int64 bucket_width = ts_continuous_agg_bucket_width(cagg);
int64 bucket_start = ts_time_bucket_by_type(bucket_width, maxval, refresh_window->type);
/* Add one bucket to get to the end of the last bucket */
return ts_time_saturating_add(bucket_start, bucket_width, refresh_window->type);
}
}
return refresh_window->end;
}
/*
* Initialize the invalidation threshold.
*
* The initial value of the invalidation threshold should be the MIN
* value for the Continuous Aggregate partition type.
*/
void
invalidation_threshold_initialize(const ContinuousAgg *cagg)
{
bool found = false;
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScannerCtx scanctx = {
.table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
.index =
catalog_get_index(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.limit = 1,
.lockmode = ShareUpdateExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.flags = SCANNER_F_KEEPLOCK,
};
ScanKeyInit(&scankey[0],
Anum_continuous_aggs_invalidation_threshold_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(cagg->data.raw_hypertable_id));
found = ts_scanner_scan_one(&scanctx, false, "invalidation threshold");
if (!found)
{
Relation rel =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
ShareUpdateExclusiveLock);
TupleDesc desc = RelationGetDescr(rel);
Datum values[Natts_continuous_aggs_invalidation_threshold];
bool nulls[Natts_continuous_aggs_invalidation_threshold] = { false };
CatalogSecurityContext sec_ctx;
/* get the MIN value for the partition type */
int64 min_value = ts_continuous_agg_bucket_width_variable(cagg) ?
ts_time_get_nobegin(cagg->partition_type) :
ts_time_get_min(cagg->partition_type);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_hypertable_id)] =
Int32GetDatum(cagg->data.raw_hypertable_id);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
Int64GetDatum(min_value);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, NoLock);
}
}