Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing_ok check to CAgg catalog find function #6640

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id)

/* Find a continuous aggregate by the materialized hypertable id */
ContinuousAgg *
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id, bool missing_ok)
{
ContinuousAgg *ca = NULL;
ScanIterator iterator =
Expand All @@ -619,6 +619,13 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)
}
ts_scan_iterator_close(&iterator);

if (ca == NULL && !missing_ok)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
}

return ca;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ extern TSDLLEXPORT Oid ts_cagg_permissions_check(Oid cagg_oid, Oid userid);

extern TSDLLEXPORT CaggsInfo ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id);
extern TSDLLEXPORT ContinuousAgg *
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id);
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id, bool missing_ok);

extern TSDLLEXPORT void ts_materialization_invalidation_log_delete_inner(int32 mat_hypertable_id);

Expand Down
21 changes: 3 additions & 18 deletions src/ts_catalog/continuous_aggs_watermark.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,7 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
MemoryContextDelete(cagg_watermark_cache->mctx);
}

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);

if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id, false);

/*
* Preemptive permission check to ensure the function complains about lack
Expand Down Expand Up @@ -264,12 +259,7 @@ ts_continuous_agg_watermark_materialized(PG_FUNCTION_ARGS)
Hypertable *ht;
int64 watermark;

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);

if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id, false);

/*
* Preemptive permission check to ensure the function complains about lack
Expand Down Expand Up @@ -411,12 +401,7 @@ TSDLLEXPORT void
ts_cagg_watermark_update(Hypertable *mat_ht, int64 watermark, bool watermark_isnull,
bool force_update)
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id);

if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_ht->fd.id)));
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id, false);

/* If we have a real-time CAgg, it uses a watermark function. So, we have to invalidate the rel
* cache to force a replanning of prepared statements. See cagg_watermark_update_internal for
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
/* We need to do a reverse lookup here since the given hypertable might be
a materialized hypertable, and thus need to call drop_chunks on the
continuous aggregate instead. */
cagg = ts_continuous_agg_find_by_mat_hypertable_id(hypertable->fd.id);
cagg = ts_continuous_agg_find_by_mat_hypertable_id(hypertable->fd.id, true);
if (cagg)
{
object_relid = ts_get_relation_relid(NameStr(cagg->data.user_view_schema),
Expand Down Expand Up @@ -420,7 +420,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->refresh_window.type = dim_type;
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.end = refresh_end;
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id);
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
}
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ get_hypertable_or_cagg_name(Hypertable *ht, Name objname)
namestrcpy(objname, NameStr(ht->fd.table_name));
else if (status == HypertableIsMaterialization)
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id);
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id, false);
namestrcpy(objname, NameStr(cagg->data.user_view_name));
}
else
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
if (status == HypertableIsMaterialization ||
status == HypertableIsMaterializationAndRaw)
{
const ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id);
const ContinuousAgg *cagg =
ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id, false);
Assert(cagg != NULL);

ts_cache_release(hcache);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
/* Commit and Start a new transaction */
SPI_commit_and_chain();

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id);
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
emit_up_to_date_notice(cagg, callctx);
Expand Down