Skip to content

Commit

Permalink
Limit number of materializations per cagg refresh
Browse files Browse the repository at this point in the history
When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes #2867
  • Loading branch information
erimatnor committed Feb 19, 2021
1 parent ec87062 commit 0fcdf01
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.**

**Minor features**
* #2736 Support adding columns to hypertables with compression enabled
* #2926 Optimize cagg refresh for small invalidations

**Bugfixes**
* #2883 Fix join qual propagation for nested joins
Expand All @@ -16,6 +17,8 @@ accidentally triggering the load of a previous DB version.**

**Thanks**
* @zeeshanshabbir93 for reporting an issue with joins
* @Antiarchitect for reporting the issue with slow refreshes of
continuous aggregates.

## 1.7.5 (2021-02-12)

Expand Down
121 changes: 119 additions & 2 deletions tsl/src/continuous_aggs/refresh.c
Expand Up @@ -9,6 +9,7 @@
#include <utils/fmgrprotos.h>
#include <utils/snapmgr.h>
#include <utils/guc.h>
#include <utils/builtins.h>
#include <access/xact.h>
#include <storage/lmgr.h>
#include <miscadmin.h>
Expand Down Expand Up @@ -287,15 +288,100 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
DatumGetCString(OidFunctionCall1(outfuncid, end_ts)));
}

/*
* Get the limit on number of invalidation-based refreshes we allow per
* refresh call. If this limit is exceeded, fall back to a single refresh that
* covers the range decided by the min and max invalidated time.
*
* Use a session variable for debugging and testing. In other words, this
* purposefully not a user-visible GUC. Might be promoted to official GUC in
* the future.
*/
static long
materialization_per_refresh_window(void)
{
#define DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW 10
#define MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME \
"timescaledb.materializations_per_refresh_window"

const char *max_materializations_setting =
GetConfigOption(MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME, true, false);
long max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW;

if (max_materializations_setting)
{
char *endptr = NULL;

/* Not using pg_strtol here since we don't want to throw error in case
* of parsing issue */
max_materializations = strtol(max_materializations_setting, &endptr, 10);

/* Accept trailing whitespaces */
while (*endptr == ' ')
endptr++;

if (*endptr != '\0')
{
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for session variable \"%s\"",
MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME),
errdetail("Expected an integer but current value is \"%s\".",
max_materializations_setting)));
max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW;
}
}

return max_materializations;
}

/*
* Execute refreshes based on the processed invalidations.
*
* The given refresh window covers a set of buckets, some of which are
* out-of-date (invalid) and some which are up-to-date (valid). Invalid
* buckets that are adjacent form larger ranges, as shown below.
*
* Refresh window: [-----------------------------------------)
* Invalid ranges: [-----] [-] [--] [-] [---]
* Merged range: [---------------------------)
*
* The maximum number of individual (non-mergable) ranges are
* #buckets_in_window/2 (i.e., every other bucket is invalid).
*
* Since it might not be efficient to materialize a lot buckets separately
* when there are many invalid (non-adjecent) buckets/ranges, we put a limit
* on the number of individual materializations we do. This limit is
* determined by the MATERIALIZATIONS_PER_REFRESH_WINDOW setting.
*
* Thus, if the refresh window covers a large number of buckets, but only a
* few of them are invalid, it is likely beneficial to materialized these
* separately to avoid materializing a lot of buckets that are already
* up-to-date. But if the number of invalid buckets/ranges go above the
* threshold, we materialize all of them in one go using the "merged range",
* as illustrated above.
*/
static void
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations, const int32 chunk_id)
{
CaggRefreshState refresh;
TupleTableSlot *slot;
bool do_merged_refresh = false;
InternalTimeRange merged_refresh_window;
long count = 0;

continuous_agg_refresh_init(&refresh, cagg, refresh_window);

/*
* If there are many individual invalidation ranges to refresh, then
* revert to a merged refresh across the range decided by lowest and
* highest invalidated value.
*/
if (tuplestore_tuple_count(invalidations->tupstore) > materialization_per_refresh_window())
do_merged_refresh = true;

slot = MakeSingleTupleTableSlotCompat(invalidations->tupdesc, &TTSOpsMinimalTuple);

while (tuplestore_gettupleslot(invalidations->tupstore,
Expand Down Expand Up @@ -323,8 +409,39 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
InternalTimeRange bucketed_refresh_window =
compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width);

log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on");
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id);
if (do_merged_refresh)
{
if (count == 0)
merged_refresh_window = bucketed_refresh_window;
else
{
if (bucketed_refresh_window.start < merged_refresh_window.start)
merged_refresh_window.start = bucketed_refresh_window.start;

if (bucketed_refresh_window.end > merged_refresh_window.end)
merged_refresh_window.end = bucketed_refresh_window.end;
}
}
else
{
log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on");
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id);
}

count++;
}

if (do_merged_refresh && count > 0)
{
Assert(merged_refresh_window.type == refresh_window->type);
Assert(merged_refresh_window.start >= refresh_window->start);
Assert(merged_refresh_window.end <= refresh_window->end);

log_refresh_window(DEBUG1,
cagg,
&merged_refresh_window,
psprintf("merged %ld invalidations for refresh on", count));
continuous_agg_refresh_execute(&refresh, &merged_refresh_window, chunk_id);
}

ExecDropSingleTupleTableSlot(slot);
Expand Down
50 changes: 50 additions & 0 deletions tsl/test/expected/continuous_aggs_invalidation.out
Expand Up @@ -1131,3 +1131,53 @@ WHERE cagg_id = :cond_10_id;
3 | 60 | 9223372036854775807
(4 rows)

-- should trigger two individual refreshes
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
DEBUG: refreshing continuous aggregate "cond_10" in window [ 0, 200 ]
DEBUG: invalidation refresh on "cond_10" in window [ 0, 30 ]
DEBUG: invalidation refresh on "cond_10" in window [ 40, 50 ]
DEBUG: invalidation refresh on "cond_10" in window [ 60, 200 ]
RESET client_min_messages;
-- Allow at most 5 individual invalidations per refreshe
SET timescaledb.materializations_per_refresh_window=5;
-- Insert into every second bucket
INSERT INTO conditions VALUES (20, 1, 1.0);
INSERT INTO conditions VALUES (40, 1, 1.0);
INSERT INTO conditions VALUES (60, 1, 1.0);
INSERT INTO conditions VALUES (80, 1, 1.0);
INSERT INTO conditions VALUES (100, 1, 1.0);
INSERT INTO conditions VALUES (120, 1, 1.0);
INSERT INTO conditions VALUES (140, 1, 1.0);
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
DEBUG: refreshing continuous aggregate "cond_10" in window [ 0, 200 ]
DEBUG: hypertable 1 existing watermark >= new invalidation threshold 200 200
DEBUG: merged 7 invalidations for refresh on "cond_10" in window [ 20, 150 ]
RESET client_min_messages;
\set VERBOSITY default
-- Test acceptable values for materializations per refresh
SET timescaledb.materializations_per_refresh_window=' 5 ';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Large value will be treated as LONG_MAX
SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Test bad values for materializations per refresh
SET timescaledb.materializations_per_refresh_window='foo';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "foo".
SET timescaledb.materializations_per_refresh_window='2bar';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "2bar".
SET timescaledb.materializations_per_refresh_window='-';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "-".
\set VERBOSITY terse
45 changes: 44 additions & 1 deletion tsl/test/sql/continuous_aggs_invalidation.sql
Expand Up @@ -624,7 +624,6 @@ WHERE cagg_id = :cond_1_id;
-- Test that single timestamp invalidations are expanded to buckets,
-- and adjacent buckets merged.
---------------------------------------------------------------------

-- First clear invalidations in a range:
CALL refresh_continuous_aggregate('cond_10', -20, 60);

Expand All @@ -645,3 +644,47 @@ WHERE user_view_name = 'cond_10' \gset

SELECT * FROM cagg_invals
WHERE cagg_id = :cond_10_id;

-- should trigger two individual refreshes
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
RESET client_min_messages;

-- Allow at most 5 individual invalidations per refreshe
SET timescaledb.materializations_per_refresh_window=5;

-- Insert into every second bucket
INSERT INTO conditions VALUES (20, 1, 1.0);
INSERT INTO conditions VALUES (40, 1, 1.0);
INSERT INTO conditions VALUES (60, 1, 1.0);
INSERT INTO conditions VALUES (80, 1, 1.0);
INSERT INTO conditions VALUES (100, 1, 1.0);
INSERT INTO conditions VALUES (120, 1, 1.0);
INSERT INTO conditions VALUES (140, 1, 1.0);

SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
RESET client_min_messages;

\set VERBOSITY default
-- Test acceptable values for materializations per refresh
SET timescaledb.materializations_per_refresh_window=' 5 ';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Large value will be treated as LONG_MAX
SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);

-- Test bad values for materializations per refresh
SET timescaledb.materializations_per_refresh_window='foo';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
SET timescaledb.materializations_per_refresh_window='2bar';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);

SET timescaledb.materializations_per_refresh_window='-';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
\set VERBOSITY terse

0 comments on commit 0fcdf01

Please sign in to comment.