Skip to content

Commit

Permalink
Add plan-time chunk exclusion for real-time CAggs
Browse files Browse the repository at this point in the history
The watermark function for CAggs is declared as STABLE since the value
of the function changes after every CAgg refresh. The function
volatility prevents the planner from replacing the function invocation
with a constant value and executing plan time chunk exclusion. This
leads to high planning times on hypertables with many chunks.

This PR replaces the function invocation with a constant value to allow
plan time exclusion of chunks. We perform the replacement at plan time
instead of changing the function volatility to IMMUTABLE, because we
want to control the constification. Only queries that access the
underlying hypertable in a query (i.e., no queries like SELECT
cagg_watermark(...) without any FROM condition) are rewritten. This is
done to make sure that the query is properly invalidated when the
underlying table changes (e.g., the watermark is updated) and the query
is replanned on the subsequent execution.

Fixes: timescale#6105, timescale#6321
Co-authored-by: Fabrizio de Royes Mello <fabriziomello@gmail.com>
  • Loading branch information
jnidzwetzki and fabriziomello committed Jan 24, 2024
1 parent 942f1fb commit 9904e09
Show file tree
Hide file tree
Showing 48 changed files with 9,357 additions and 3,959 deletions.
2 changes: 2 additions & 0 deletions .unreleased/enhancement_6325
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Implements: #6325 Add plan-time chunk exclusion for real-time CAggs
Thanks: @raymalt and @martinhale for reporting very slow query plans on realtime CAggs queries
7 changes: 7 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ ts_tsl_loaded(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(ts_cm_functions != &ts_cm_functions_default);
}

static void
preprocess_query_tsl_default_fn_community(Query *parse)
{
/* No op in community licensed code */
}

/*
* Define cross-module functions' default values:
* If the submodule isn't activated, using one of the cm functions will throw an
Expand Down Expand Up @@ -369,6 +375,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.chunk_create_empty_table = error_no_default_fn_pg_community,
.recompress_chunk_segmentwise = error_no_default_fn_pg_community,
.get_compressed_chunk_index_for_recompression = error_no_default_fn_pg_community,
.preprocess_query_tsl = preprocess_query_tsl_default_fn_community,
};

TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ typedef struct CrossModuleFunctions
PGFunction chunk_unfreeze_chunk;
PGFunction recompress_chunk_segmentwise;
PGFunction get_compressed_chunk_index_for_recompression;
void (*preprocess_query_tsl)(Query *parse);
} CrossModuleFunctions;

extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
Expand Down
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ bool ts_guc_enable_constraint_exclusion = true;
bool ts_guc_enable_qual_propagation = true;
bool ts_guc_enable_cagg_reorder_groupby = true;
bool ts_guc_enable_now_constify = true;
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
bool ts_guc_enable_osm_reads = true;
TSDLLEXPORT bool ts_guc_enable_dml_decompression = true;
TSDLLEXPORT bool ts_guc_enable_transparent_decompression = true;
Expand Down Expand Up @@ -408,6 +409,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_cagg_watermark_constify",
"Enable cagg watermark constify",
"Enable constifying cagg watermark for real-time caggs",
&ts_guc_enable_cagg_watermark_constify,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable("timescaledb.enable_tiered_reads",
"Enable tiered data reads",
"Enable reading of tiered data by including a foreign table "
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern bool ts_guc_enable_runtime_exclusion;
extern bool ts_guc_enable_constraint_exclusion;
extern bool ts_guc_enable_cagg_reorder_groupby;
extern bool ts_guc_enable_now_constify;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_transparent_decompression;
Expand Down
3 changes: 3 additions & 0 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ timescaledb_planner(Query *parse, const char *query_string, int cursor_opts,
* Preprocess the hypertables in the query and warm up the caches.
*/
preprocess_query((Node *) parse, &context);

if (ts_guc_enable_optimizations)
ts_cm_functions->preprocess_query_tsl(parse);
}

if (prev_planner_hook != NULL)
Expand Down
48 changes: 39 additions & 9 deletions src/ts_catalog/continuous_aggs_watermark.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
#include <fmgr.h>
#include <miscadmin.h>
#include <utils/acl.h>
#include <utils/inval.h>
#include <utils/snapmgr.h>

#include "debug_point.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/continuous_aggs_watermark.h"
#include "hypertable.h"
Expand Down Expand Up @@ -80,8 +82,8 @@ cagg_watermark_init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int3
Int32GetDatum(mat_hypertable_id));
}

static int64
cagg_watermark_get(Hypertable *mat_ht)
int64
ts_cagg_watermark_get(int32 hypertable_id)
{
PG_USED_FOR_ASSERTS_ONLY short count = 0;
Datum watermark = (Datum) 0;
Expand All @@ -99,7 +101,7 @@ cagg_watermark_get(Hypertable *mat_ht)
iterator.ctx.snapshot = GetTransactionSnapshot();
Assert(iterator.ctx.snapshot != NULL);

cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_ht->fd.id);
cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, hypertable_id);

ts_scanner_foreach(&iterator)
{
Expand All @@ -114,13 +116,13 @@ cagg_watermark_get(Hypertable *mat_ht)
if (value_isnull)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("watermark not defined for continuous aggregate: %d", mat_ht->fd.id)));
errmsg("watermark not defined for continuous aggregate: %d", hypertable_id)));

/* Log the read watermark, needed for MVCC tap tests */
ereport(DEBUG5,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("watermark for continuous aggregate, '%d' is: " INT64_FORMAT,
mat_ht->fd.id,
hypertable_id,
DatumGetInt64(watermark))));

return DatumGetInt64(watermark);
Expand Down Expand Up @@ -152,7 +154,7 @@ cagg_watermark_create(const ContinuousAgg *cagg, MemoryContext top_mctx)
cagg->data.mat_hypertable_id)));

/* Get the stored watermark */
watermark->value = cagg_watermark_get(ht);
watermark->value = ts_cagg_watermark_get(cagg->data.mat_hypertable_id);

return watermark;
}
Expand Down Expand Up @@ -322,6 +324,8 @@ typedef struct WatermarkUpdate
{
int64 watermark;
bool force_update;
bool invalidate_rel_cache;
Oid ht_relid;
} WatermarkUpdate;

static ScanTupleResult
Expand All @@ -339,6 +343,19 @@ cagg_watermark_update_scan_internal(TupleInfo *ti, void *data)
form->watermark = watermark_update->watermark;
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);

/*
* During query planning, the values of the watermark function are constified using the
* constify_cagg_watermark() function. However, this function's value changes when we update
* the Cagg (the volatility of the function is STABLE not IMMUTABLE). To ensure that caches,
* such as the query plan cache, are properly evicted, we send an invalidation message for
* the hypertable.
*/
if (watermark_update->invalidate_rel_cache)
{
DEBUG_WAITPOINT("cagg_watermark_update_internal_before_refresh");
CacheInvalidateRelcacheByRelid(watermark_update->ht_relid);
}
}
else
{
Expand All @@ -357,11 +374,15 @@ cagg_watermark_update_scan_internal(TupleInfo *ti, void *data)
}

static void
cagg_watermark_update_internal(int32 mat_hypertable_id, int64 new_watermark, bool force_update)
cagg_watermark_update_internal(int32 mat_hypertable_id, Oid ht_relid, int64 new_watermark,
bool force_update, bool invalidate_rel_cache)
{
bool watermark_updated;
ScanKeyData scankey[1];
WatermarkUpdate data = { .watermark = new_watermark, .force_update = force_update };
WatermarkUpdate data = { .watermark = new_watermark,
.force_update = force_update,
.invalidate_rel_cache = invalidate_rel_cache,
.ht_relid = ht_relid };

ScanKeyInit(&scankey[0],
Anum_continuous_aggs_watermark_mat_hypertable_id,
Expand Down Expand Up @@ -397,8 +418,17 @@ ts_cagg_watermark_update(Hypertable *mat_ht, int64 watermark, bool watermark_isn
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_ht->fd.id)));

/* If we have a real-time CAgg, it uses a watermark function. So, we have to invalidate the rel
* cache to force a replanning of prepared statements. See cagg_watermark_update_internal for
* more information. */
bool invalidate_rel_cache = !cagg->data.materialized_only;

watermark = cagg_compute_watermark(cagg, watermark, watermark_isnull);
cagg_watermark_update_internal(mat_ht->fd.id, watermark, force_update);
cagg_watermark_update_internal(mat_ht->fd.id,
mat_ht->main_table_relid,
watermark,
force_update,
invalidate_rel_cache);
}

TSDLLEXPORT void
Expand Down
2 changes: 2 additions & 0 deletions src/ts_catalog/continuous_aggs_watermark.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ extern TSDLLEXPORT void ts_cagg_watermark_insert(Hypertable *mat_ht, int64 water
bool watermark_isnull);
extern TSDLLEXPORT void ts_cagg_watermark_update(Hypertable *mat_ht, int64 watermark,
bool watermark_isnull, bool force_update);

extern TSDLLEXPORT int64 ts_cagg_watermark_get(int32 hypertable_id);
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c
${CMAKE_CURRENT_SOURCE_DIR}/materialize.c
${CMAKE_CURRENT_SOURCE_DIR}/options.c
${CMAKE_CURRENT_SOURCE_DIR}/planner.c
${CMAKE_CURRENT_SOURCE_DIR}/refresh.c
${CMAKE_CURRENT_SOURCE_DIR}/repair.c
${CMAKE_CURRENT_SOURCE_DIR}/utils.c)
Expand Down
26 changes: 18 additions & 8 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id
static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList,
bool is_cagg_create);
static bool cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail,
bool finalized);
static Oid cagg_get_boundary_converter_funcoid(Oid typoid);
const bool finalized);
static FuncExpr *build_conversion_call(Oid type, FuncExpr *boundary);
static FuncExpr *build_boundary_call(int32 ht_id, Oid type);
static Const *cagg_boundary_make_lower_bound(Oid type);
Expand Down Expand Up @@ -985,7 +984,7 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
* Get oid of function to convert from our internal representation
* to postgres representation.
*/
static Oid
Oid
cagg_get_boundary_converter_funcoid(Oid typoid)
{
char *function_name;
Expand Down Expand Up @@ -1079,20 +1078,31 @@ build_conversion_call(Oid type, FuncExpr *boundary)
}

/*
* Build function call that returns boundary for a hypertable
* wrapped in type conversion calls when required.
* Return the Oid of the cagg_watermark function
*/
static FuncExpr *
build_boundary_call(int32 ht_id, Oid type)
Oid
get_watermark_function_oid(void)
{
Oid argtyp[] = { INT4OID };
FuncExpr *boundary;

Oid boundary_func_oid =
LookupFuncName(list_make2(makeString(FUNCTIONS_SCHEMA_NAME), makeString(BOUNDARY_FUNCTION)),
lengthof(argtyp),
argtyp,
false);

return boundary_func_oid;
}

/*
* Build function call that returns boundary for a hypertable
* wrapped in type conversion calls when required.
*/
static FuncExpr *
build_boundary_call(int32 ht_id, Oid type)
{
FuncExpr *boundary;
Oid boundary_func_oid = get_watermark_function_oid();
List *func_args =
list_make1(makeConst(INT4OID, -1, InvalidOid, 4, Int32GetDatum(ht_id), false, true));

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,5 @@ extern Query *build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Qu
extern void mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist);
extern void mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo);
extern bool function_allowed_in_cagg_definition(Oid funcid);
extern Oid get_watermark_function_oid(void);
extern Oid cagg_get_boundary_converter_funcoid(Oid typoid);

0 comments on commit 9904e09

Please sign in to comment.