Skip to content

Commit

Permalink
Merge adjacent invalidations during refresh
Browse files Browse the repository at this point in the history
Since invalidations are inclusive in both ends, adjacent invalidations
can be merged. However, adjacency wasn't accounted for when merging
invalidations, which meant that a refresh could leave more
invalidations in the log than strictly necessary. Note that this
didn't otherwise affect the correctness of a refresh.
  • Loading branch information
erimatnor committed Sep 25, 2020
1 parent 5102c3e commit a7d135c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 10 deletions.
38 changes: 28 additions & 10 deletions tsl/src/continuous_aggs/invalidation.c
Expand Up @@ -521,6 +521,23 @@ invalidation_entry_set_from_cagg_invalidation(Invalidation *entry, const TupleIn
Form_continuous_aggs_materialization_invalidation_log);
}

/*
* Check if two invalidations can be merged into one.
*
* Since invalidations are inclusive in both ends, two adjacent invalidations
* can be merged.
*/
static bool
invalidations_can_be_merged(const Invalidation *a, const Invalidation *b)
{
/* To account for adjacency, expand one window 1 step in each
* direction. This makes adjacent invalidations overlapping. */
int64 a_start = int64_saturating_sub(a->lowest_modified_value, 1);
int64 a_end = int64_saturating_add(a->greatest_modified_value, 1);

return a_end >= b->lowest_modified_value && a_start <= b->greatest_modified_value;
}

/*
* Try to merge two invalidations into one.
*
Expand All @@ -535,10 +552,11 @@ invalidation_entry_set_from_cagg_invalidation(Invalidation *entry, const TupleIn
* |-------------|
* |++++++++|
*
* The closest non-overlapping case is:
* The closest non-overlapping case is (note that adjacent invalidations can
* be merged since they are inclusive in both ends):
*
* |--|
* |++++++++|
* |++++++++|
*
*/
static bool
Expand All @@ -548,11 +566,9 @@ invalidation_entry_try_merge(Invalidation *entry, const Invalidation *newentry)
return false;

/* Quick exit if no overlap */
if (entry->greatest_modified_value < newentry->lowest_modified_value)
{
Assert(entry->lowest_modified_value <= newentry->lowest_modified_value);
if (!invalidations_can_be_merged(entry, newentry))
return false;
}

/* Check if the new entry expands beyond the old one (first case above) */
if (entry->greatest_modified_value < newentry->greatest_modified_value)
{
Expand Down Expand Up @@ -765,16 +781,18 @@ cut_cagg_invalidation_and_compute_remainder(const CaggInvalidationState *state,
* Clear all cagg invalidations that match a refresh window.
*
* This function clears all invalidations in the cagg invalidation log that
* matches a window. Note that the refresh currently doesn't make use of the
* invalidations to optimize the materialization.
* matches a window, and, optionally, adds the invalidation segments covered
* by the window to the invalidation store in the passed in state. These
* remaining segments are regions that require materialization.
*
* An invalidation entry that gets processed is either completely enclosed
* (covered) by the refresh window, or it partially overlaps. In the former
* case, the invalidation entry is removed and for the latter case it is
* cut. Thus, an entry can either disappear, reduce in size, or be cut in two.
*
* Note that the refresh window is inclusive at the start and exclusive at the
* end.
* end. This function also assumes that invalidations are scanned in order of
* lowest_modified_value.
*/
static void
clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
Expand Down Expand Up @@ -808,7 +826,7 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
{
/*
* The previous and current invalidation were merged into
* one entry (i.e., they overlapped or were adjecent).
* one entry (i.e., they overlapped or were adjacent).
*/
ts_catalog_delete_tid_only(state->cagg_log_rel, &logentry.tid);
}
Expand Down
20 changes: 20 additions & 0 deletions tsl/test/expected/continuous_aggs_invalidation.out
Expand Up @@ -1163,3 +1163,23 @@ ORDER BY 1,2;
10 | 1 | 10
(1 row)

-- Test that adjacent invalidations are merged
INSERT INTO conditions VALUES(1, 1, 1.0), (2, 1, 2.0);
INSERT INTO conditions VALUES(3, 1, 1.0);
INSERT INTO conditions VALUES(4, 1, 1.0);
INSERT INTO conditions VALUES(6, 1, 1.0);
CALL refresh_continuous_aggregate('cond_1', 10, NULL);
NOTICE: continuous aggregate "cond_1" is already up-to-date
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
greatest_modified_value AS end
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :cond_1_id
ORDER BY 1,2,3;
cagg_id | start | end
---------+-------+---------------------
6 | 1 | 4
6 | 6 | 6
6 | 110 | 9223372036854775807
(3 rows)

14 changes: 14 additions & 0 deletions tsl/test/sql/continuous_aggs_invalidation.sql
Expand Up @@ -687,3 +687,17 @@ CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
-- Aggregate now up-to-date with the source hypertable
SELECT * FROM cond_1
ORDER BY 1,2;

-- Test that adjacent invalidations are merged
INSERT INTO conditions VALUES(1, 1, 1.0), (2, 1, 2.0);
INSERT INTO conditions VALUES(3, 1, 1.0);
INSERT INTO conditions VALUES(4, 1, 1.0);
INSERT INTO conditions VALUES(6, 1, 1.0);

CALL refresh_continuous_aggregate('cond_1', 10, NULL);
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
greatest_modified_value AS end
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :cond_1_id
ORDER BY 1,2,3;

0 comments on commit a7d135c

Please sign in to comment.