From 4b3fdeaa1a257be70918e6222f64445befd9cadf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Wed, 26 Oct 2022 16:44:59 -0300 Subject: [PATCH] Refactor Continuous Aggregate catalog code Get rid of `GETSTRUCT` to fill the form data and use `heap_deform_tuple` instead. This is necessary specially when you have variable lenght fields and/or fields that accept NULL values. This refactoring will be specially usefull in a following PR for Nested Continuous Aggregates where we'll add a new metadata to the catalog that can accept NULL values. Also refactor the rename view and schema code paths improving the readability and maintainability. --- src/ts_catalog/continuous_agg.c | 408 ++++++++++++++++++++------------ src/ts_catalog/continuous_agg.h | 9 +- 2 files changed, 264 insertions(+), 153 deletions(-) diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index 84238c4ef8b..34ef9f95c1c 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -275,6 +275,92 @@ ts_materialization_invalidation_log_delete(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +static HeapTuple +continuous_agg_formdata_make_tuple(const FormData_continuous_agg *fd, TupleDesc desc) +{ + Datum values[Natts_continuous_agg]; + bool nulls[Natts_continuous_agg] = { false }; + + memset(values, 0, sizeof(Datum) * Natts_continuous_agg); + + values[AttrNumberGetAttrOffset(Anum_continuous_agg_mat_hypertable_id)] = + Int32GetDatum(fd->mat_hypertable_id); + values[AttrNumberGetAttrOffset(Anum_continuous_agg_raw_hypertable_id)] = + Int32GetDatum(fd->raw_hypertable_id); + + values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_schema)] = + NameGetDatum(&fd->user_view_schema); + values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_name)] = + NameGetDatum(&fd->user_view_name); + + values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_schema)] = + NameGetDatum(&fd->partial_view_schema); + values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_name)] = + NameGetDatum(&fd->partial_view_name); + + values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)] = + Int64GetDatum(fd->bucket_width); + + values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_schema)] = + NameGetDatum(&fd->direct_view_schema); + values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_name)] = + NameGetDatum(&fd->direct_view_name); + + values[AttrNumberGetAttrOffset(Anum_continuous_agg_materialize_only)] = + BoolGetDatum(fd->materialized_only); + values[AttrNumberGetAttrOffset(Anum_continuous_agg_finalized)] = BoolGetDatum(fd->finalized); + + return heap_form_tuple(desc, values, nulls); +} + +static void +continuous_agg_formdata_fill(FormData_continuous_agg *fd, const TupleInfo *ti) +{ + bool should_free; + HeapTuple tuple; + Datum values[Natts_continuous_agg]; + bool nulls[Natts_continuous_agg] = { false }; + + tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); + heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls); + + fd->mat_hypertable_id = + DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_continuous_agg_mat_hypertable_id)]); + fd->raw_hypertable_id = + DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_continuous_agg_raw_hypertable_id)]); + + memcpy(&fd->user_view_schema, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_schema)]), + NAMEDATALEN); + memcpy(&fd->user_view_name, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_name)]), + NAMEDATALEN); + + memcpy(&fd->partial_view_schema, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_schema)]), + NAMEDATALEN); + memcpy(&fd->partial_view_name, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_name)]), + NAMEDATALEN); + + fd->bucket_width = + DatumGetInt64(values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)]); + + memcpy(&fd->direct_view_schema, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_schema)]), + NAMEDATALEN); + memcpy(&fd->direct_view_name, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_name)]), + NAMEDATALEN); + + fd->materialized_only = + DatumGetBool(values[AttrNumberGetAttrOffset(Anum_continuous_agg_materialize_only)]); + fd->finalized = DatumGetBool(values[AttrNumberGetAttrOffset(Anum_continuous_agg_finalized)]); + + if (should_free) + heap_freetuple(tuple); +} + static void continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucketFunction *bf) { @@ -637,18 +723,16 @@ ts_continuous_agg_hypertable_status(int32 hypertable_id) ts_scanner_foreach(&iterator) { - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple); + FormData_continuous_agg data; + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); - if (data->raw_hypertable_id == hypertable_id) + continuous_agg_formdata_fill(&data, ti); + + if (data.raw_hypertable_id == hypertable_id) status |= HypertableIsRawTable; - if (data->mat_hypertable_id == hypertable_id) + if (data.mat_hypertable_id == hypertable_id) status |= HypertableIsMaterialization; - if (should_free) - heap_freetuple(tuple); - if (status == HypertableIsMaterializationAndRaw) { ts_scan_iterator_close(&iterator); @@ -670,19 +754,17 @@ ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id) ts_scanner_foreach(&iterator) { ContinuousAgg *ca; - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - Form_continuous_agg data = (Form_continuous_agg) GETSTRUCT(tuple); + FormData_continuous_agg data; MemoryContext oldmctx; + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + + continuous_agg_formdata_fill(&data, ti); oldmctx = MemoryContextSwitchTo(ts_scan_iterator_get_result_memory_context(&iterator)); ca = palloc0(sizeof(*ca)); - continuous_agg_init(ca, data); + continuous_agg_init(ca, &data); continuous_aggs = lappend(continuous_aggs, ca); MemoryContextSwitchTo(oldmctx); - - if (should_free) - heap_freetuple(tuple); } return continuous_aggs; @@ -699,28 +781,27 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id) init_scan_by_mat_hypertable_id(&iterator, mat_hypertable_id); ts_scanner_foreach(&iterator) { - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - Form_continuous_agg form = (Form_continuous_agg) GETSTRUCT(tuple); + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + FormData_continuous_agg form; + + continuous_agg_formdata_fill(&form, ti); /* Note that this scan can only match at most once, so we assert on * `ca` here. */ Assert(ca == NULL); ca = ts_scan_iterator_alloc_result(&iterator, sizeof(*ca)); - continuous_agg_init(ca, form); + continuous_agg_init(ca, &form); Assert(ca && ca->data.mat_hypertable_id == mat_hypertable_id); - - if (should_free) - heap_freetuple(tuple); } ts_scan_iterator_close(&iterator); + return ca; } static bool -continuous_agg_fill_form_data(const char *schema, const char *name, ContinuousAggViewType type, - FormData_continuous_agg *fd) +continuous_agg_find_by_name(const char *schema, const char *name, ContinuousAggViewType type, + FormData_continuous_agg *fd) { ScanIterator iterator; AttrNumber view_name_attrnum = 0; @@ -766,22 +847,20 @@ continuous_agg_fill_form_data(const char *schema, const char *name, ContinuousAg ts_scanner_foreach(&iterator) { - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple); ContinuousAggViewType vtype = type; + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + FormData_continuous_agg data; + + continuous_agg_formdata_fill(&data, ti); if (vtype == ContinuousAggAnyView) - vtype = ts_continuous_agg_view_type(data, schema, name); + vtype = ts_continuous_agg_view_type(&data, schema, name); if (vtype != ContinuousAggAnyView) { - memcpy(fd, data, sizeof(*fd)); + memcpy(fd, &data, sizeof(*fd)); count++; } - - if (should_free) - heap_freetuple(tuple); } Assert(count <= 1); @@ -796,7 +875,7 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name, FormData_continuous_agg fd; ContinuousAgg *ca; - if (!continuous_agg_fill_form_data(schema, name, type, &fd)) + if (!continuous_agg_find_by_name(schema, name, type, &fd)) return NULL; ca = palloc0(sizeof(ContinuousAgg)); @@ -988,33 +1067,30 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view) ts_scanner_foreach(&iterator) { TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - Form_continuous_agg form = (Form_continuous_agg) GETSTRUCT(tuple); + FormData_continuous_agg form; + + continuous_agg_formdata_fill(&form, ti); ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti)); /* Delete all related rows */ if (!raw_hypertable_has_other_caggs) { - hypertable_invalidation_log_delete(form->raw_hypertable_id); + hypertable_invalidation_log_delete(form.raw_hypertable_id); if (ts_cm_functions->remote_invalidation_log_delete) - ts_cm_functions->remote_invalidation_log_delete(form->raw_hypertable_id, + ts_cm_functions->remote_invalidation_log_delete(form.raw_hypertable_id, HypertableIsRawTable); } - ts_materialization_invalidation_log_delete_inner(form->mat_hypertable_id); + ts_materialization_invalidation_log_delete_inner(form.mat_hypertable_id); if (ts_cm_functions->remote_invalidation_log_delete) - ts_cm_functions->remote_invalidation_log_delete(form->mat_hypertable_id, + ts_cm_functions->remote_invalidation_log_delete(form.mat_hypertable_id, HypertableIsMaterialization); if (!raw_hypertable_has_other_caggs) { - invalidation_threshold_delete(form->raw_hypertable_id); + invalidation_threshold_delete(form.raw_hypertable_id); } - - if (should_free) - heap_freetuple(tuple); } if (cadata->bucket_width == BUCKET_WIDTH_VARIABLE) @@ -1063,21 +1139,19 @@ ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id) ts_scanner_foreach(&iterator) { - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple); + FormData_continuous_agg data; + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); - if (data->raw_hypertable_id == hypertable_id) - drop_continuous_agg(data, true); + continuous_agg_formdata_fill(&data, ti); - if (data->mat_hypertable_id == hypertable_id) + if (data.raw_hypertable_id == hypertable_id) + drop_continuous_agg(&data, true); + + if (data.mat_hypertable_id == hypertable_id) ereport(ERROR, (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), errmsg("cannot drop the materialized table because it is required by a " "continuous aggregate"))); - - if (should_free) - heap_freetuple(tuple); } } @@ -1127,7 +1201,7 @@ bool ts_continuous_agg_drop(const char *view_schema, const char *view_name) { FormData_continuous_agg fd; - bool found = continuous_agg_fill_form_data(view_schema, view_name, ContinuousAggAnyView, &fd); + bool found = continuous_agg_find_by_name(view_schema, view_name, ContinuousAggAnyView, &fd); if (found) continuous_agg_drop_view_callback(&fd, view_schema, view_name); @@ -1169,124 +1243,160 @@ ts_continuous_agg_view_type(FormData_continuous_agg *data, const char *schema, c return ContinuousAggAnyView; } -static FormData_continuous_agg * -ensure_new_tuple(HeapTuple old_tuple, HeapTuple *new_tuple) +typedef struct CaggRenameCtx { - if (*new_tuple == NULL) - *new_tuple = heap_copytuple(old_tuple); - - return (FormData_continuous_agg *) GETSTRUCT(*new_tuple); -} + const char *old_schema; + const char *old_name; + const char *new_schema; + const char *new_name; + ObjectType *object_type; + void (*process_rename)(FormData_continuous_agg *form, bool *do_update, void *data); +} CaggRenameCtx; -void -ts_continuous_agg_rename_schema_name(char *old_schema, char *new_schema) +static void +continuous_agg_rename_process_rename_schema(FormData_continuous_agg *form, bool *do_update, + void *data) { - ScanIterator iterator = - ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext); + CaggRenameCtx *ctx = (CaggRenameCtx *) data; - ts_scanner_foreach(&iterator) + if (ts_continuous_agg_is_user_view_schema(form, ctx->old_schema)) { - TupleInfo *tinfo = ts_scan_iterator_tuple_info(&iterator); - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple); - HeapTuple new_tuple = NULL; + namestrcpy(&form->user_view_schema, ctx->new_schema); + *do_update = true; + } - if (ts_continuous_agg_is_user_view_schema(data, old_schema)) - { - FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple); - namestrcpy(&new_data->user_view_schema, new_schema); - } + if (ts_continuous_agg_is_partial_view_schema(form, ctx->old_schema)) + { + namestrcpy(&form->partial_view_schema, ctx->new_schema); + *do_update = true; + } + + if (ts_continuous_agg_is_direct_view_schema(form, ctx->old_schema)) + { + namestrcpy(&form->direct_view_schema, ctx->new_schema); + *do_update = true; + } +} - if (ts_continuous_agg_is_partial_view_schema(data, old_schema)) +static void +continuous_agg_rename_process_rename_view(FormData_continuous_agg *form, bool *do_update, + void *data) +{ + CaggRenameCtx *ctx = (CaggRenameCtx *) data; + ContinuousAggViewType vtyp; + + vtyp = ts_continuous_agg_view_type(form, ctx->old_schema, ctx->old_name); + + switch (vtyp) + { + case ContinuousAggUserView: { - FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple); - namestrcpy(&new_data->partial_view_schema, new_schema); + if (*ctx->object_type == OBJECT_VIEW) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter continuous aggregate using ALTER VIEW"), + errhint("Use ALTER MATERIALIZED VIEW to alter a continuous aggregate."))); + + Assert(*ctx->object_type == OBJECT_MATVIEW); + *ctx->object_type = OBJECT_VIEW; + + namestrcpy(&form->user_view_schema, ctx->new_schema); + namestrcpy(&form->user_view_name, ctx->new_name); + *do_update = true; + break; } - - if (ts_continuous_agg_is_direct_view_schema(data, old_schema)) + case ContinuousAggPartialView: { - FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple); - namestrcpy(&new_data->direct_view_schema, new_schema); + namestrcpy(&form->partial_view_schema, ctx->new_schema); + namestrcpy(&form->partial_view_name, ctx->new_name); + *do_update = true; + break; } - - if (new_tuple != NULL) + case ContinuousAggDirectView: { - ts_catalog_update(tinfo->scanrel, new_tuple); - heap_freetuple(new_tuple); + namestrcpy(&form->direct_view_schema, ctx->new_schema); + namestrcpy(&form->direct_view_name, ctx->new_name); + *do_update = true; + break; } - - if (should_free) - heap_freetuple(tuple); + default: + break; } } -extern void -ts_continuous_agg_rename_view(const char *old_schema, const char *name, const char *new_schema, - const char *new_name, ObjectType *object_type) +static ScanTupleResult +continuous_agg_rename(TupleInfo *ti, void *data) { - ScanIterator iterator = - ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext); + CaggRenameCtx *ctx = (CaggRenameCtx *) data; + FormData_continuous_agg form; + bool do_update = false; + CatalogSecurityContext sec_ctx; - Assert(object_type); + continuous_agg_formdata_fill(&form, ti); - ts_scanner_foreach(&iterator) + ctx->process_rename(&form, &do_update, (void *) ctx); + + if (do_update) { - TupleInfo *tinfo = ts_scan_iterator_tuple_info(&iterator); - bool should_free; - HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); - FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple); - HeapTuple new_tuple = NULL; - ContinuousAggViewType vtyp = ts_continuous_agg_view_type(data, old_schema, name); + HeapTuple new_tuple = + continuous_agg_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti)); + ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); + ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple); + ts_catalog_restore_user(&sec_ctx); + heap_freetuple(new_tuple); + } - switch (vtyp) - { - case ContinuousAggUserView: - { - FormData_continuous_agg *new_data; - - if (*object_type == OBJECT_VIEW) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot alter continuous aggregate using ALTER VIEW"), - errhint( - "Use ALTER MATERIALIZED VIEW to alter a continuous aggregate."))); - - Assert(*object_type == OBJECT_MATVIEW); - *object_type = OBJECT_VIEW; - - new_data = ensure_new_tuple(tuple, &new_tuple); - namestrcpy(&new_data->user_view_schema, new_schema); - namestrcpy(&new_data->user_view_name, new_name); - break; - } - case ContinuousAggPartialView: - { - FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple); - namestrcpy(&new_data->partial_view_schema, new_schema); - namestrcpy(&new_data->partial_view_name, new_name); - break; - } - case ContinuousAggDirectView: - { - FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple); - namestrcpy(&new_data->direct_view_schema, new_schema); - namestrcpy(&new_data->direct_view_name, new_name); - break; - } - default: - break; - } + return SCAN_CONTINUE; +} - if (new_tuple != NULL) - { - ts_catalog_update(tinfo->scanrel, new_tuple); - heap_freetuple(new_tuple); - } +void +ts_continuous_agg_rename_schema_name(const char *old_schema, const char *new_schema) +{ + CaggRenameCtx cagg_rename_ctx = { + .old_schema = old_schema, + .new_schema = new_schema, + .process_rename = continuous_agg_rename_process_rename_schema, + }; + + Catalog *catalog = ts_catalog_get(); + + ScannerCtx scanctx = { + .table = catalog_get_table_id(catalog, CONTINUOUS_AGG), + .index = InvalidOid, + .tuple_found = continuous_agg_rename, + .data = &cagg_rename_ctx, + .lockmode = RowExclusiveLock, + .scandirection = ForwardScanDirection, + }; + + ts_scanner_scan(&scanctx); +} - if (should_free) - heap_freetuple(tuple); - } +void +ts_continuous_agg_rename_view(const char *old_schema, const char *old_name, const char *new_schema, + const char *new_name, ObjectType *object_type) +{ + CaggRenameCtx cagg_rename_ctx = { + .old_schema = old_schema, + .old_name = old_name, + .new_schema = new_schema, + .new_name = new_name, + .object_type = object_type, + .process_rename = continuous_agg_rename_process_rename_view, + }; + + Catalog *catalog = ts_catalog_get(); + + ScannerCtx scanctx = { + .table = catalog_get_table_id(catalog, CONTINUOUS_AGG), + .index = InvalidOid, + .tuple_found = continuous_agg_rename, + .data = &cagg_rename_ctx, + .lockmode = RowExclusiveLock, + .scandirection = ForwardScanDirection, + }; + + ts_scanner_scan(&scanctx); } TSDLLEXPORT int32 diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h index 376193a98d9..5bf84d48b20 100644 --- a/src/ts_catalog/continuous_agg.h +++ b/src/ts_catalog/continuous_agg.h @@ -188,10 +188,11 @@ extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id); extern TSDLLEXPORT ContinuousAggViewType ts_continuous_agg_view_type(FormData_continuous_agg *data, const char *schema, const char *name); -extern void ts_continuous_agg_rename_schema_name(char *old_schema, char *new_schema); -extern void ts_continuous_agg_rename_view(const char *old_schema, const char *name, - const char *new_schema, const char *new_name, - ObjectType *object_type); +extern TSDLLEXPORT void ts_continuous_agg_rename_schema_name(const char *old_schema, + const char *new_schema); +extern TSDLLEXPORT void ts_continuous_agg_rename_view(const char *old_schema, const char *old_name, + const char *new_schema, const char *new_name, + ObjectType *object_type); extern TSDLLEXPORT int32 ts_number_of_continuous_aggs(void);