Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle dropping chunks with continuous aggregates #2331

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 42 additions & 95 deletions src/chunk.c
Expand Up @@ -3089,94 +3089,6 @@ ts_chunk_drop_preserve_catalog_row(const Chunk *chunk, DropBehavior behavior, in
ts_chunk_drop_internal(chunk, behavior, log_level, true);
}

static void
ts_chunk_drop_process_invalidations(Oid hypertable_relid, int64 older_than, int64 newer_than,
Chunk *chunks, int num_chunks)
{
Dimension *time_dimension;
int64 ignore_invalidation_older_than;
int64 minimum_invalidation_time;
int64 lowest_completion_time;
List *continuous_aggs;
ListCell *lc;
Cache *hcache;
Hypertable *ht;
int i;
FormData_continuous_agg cagg;

ht = ts_hypertable_cache_get_cache_and_entry(hypertable_relid, CACHE_FLAG_NONE, &hcache);
time_dimension = hyperspace_get_open_dimension(ht->space, 0);

ignore_invalidation_older_than =
ts_continuous_aggs_max_ignore_invalidation_older_than(ht->fd.id, &cagg);
minimum_invalidation_time =
ts_continuous_aggs_get_minimum_invalidation_time(ts_get_now_internal(time_dimension),
ignore_invalidation_older_than);

/* minimum_invalidation_time is inclusive; older_than_time is exclusive */
if (minimum_invalidation_time < older_than)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("older_than must be greater than the "
"timescaledb.ignore_invalidation_older_than "
"parameter of %s.%s",
cagg.user_view_schema.data,
cagg.user_view_name.data)));

/* error for now, maybe better as a warning and ignoring the chunks? */
/* We cannot move a completion threshold up transactionally without taking locks
* that would block the system. So, just bail. The completion threshold
* should be much higher than this anyway */
lowest_completion_time = ts_continuous_aggs_min_completed_threshold(ht->fd.id, &cagg);
if (lowest_completion_time < older_than)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("the continuous aggregate %s.%s is too far behind",
cagg.user_view_schema.data,
cagg.user_view_name.data)));

/* Lock all chunks in Exclusive mode, blocking everything but selects on the table. We have
* to block all modifications so that we can't get new invalidation entries. This makes sure
* that all future modifying txns on this data region will have a now() that higher than
* ours and thus will not invalidate. Otherwise, we could have an old txn with a now() in
* the past that all of a sudden decides to to insert data right after we
* process_invalidations. */
for (i = 0; i < num_chunks; i++)
{
LockRelationOid(chunks[i].table_id, ExclusiveLock);
}

continuous_aggs = ts_continuous_aggs_find_by_raw_table_id(ht->fd.id);
foreach (lc, continuous_aggs)
{
ContinuousAgg *ca = lfirst(lc);
ContinuousAggMatOptions mat_options = {
.verbose = true,
.within_single_transaction = true,
.process_only_invalidation = true,
.invalidate_prior_to_time = older_than,
};
bool finished_all_invalidation = false;

/* This will loop until all invalidations are done, each iteration of the loop will do
* max_interval_per_job's worth of data. We don't want to ignore max_interval_per_job
* here to avoid large sorts. */
while (!finished_all_invalidation)
{
elog(NOTICE,
"making sure all invalidations for %s.%s have been processed prior to "
"dropping "
"chunks",
NameStr(ca->data.user_view_schema),
NameStr(ca->data.user_view_name));
finished_all_invalidation =
ts_cm_functions->continuous_agg_materialize(ca->data.mat_hypertable_id,
&mat_options);
}
}
ts_cache_release(hcache);
}

static void
lock_referenced_tables(Oid table_relid)
{
Expand Down Expand Up @@ -3278,13 +3190,44 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
DEBUG_WAITPOINT("drop_chunks_chunks_found");

if (has_continuous_aggs)
ts_chunk_drop_process_invalidations(ht->main_table_relid,
older_than,
newer_than,
chunks,
num_chunks);
{
int i;

for (; i < num_chunks; i++)
/* Exclusively lock all chunks, and also refresh and invalidate the
* continuous aggregates in the regions covered by the chunks. Locking
* prevents further modification of the dropped region during this
* transaction, which allows moving the invalidation threshold without
* having to worry about new invalidations while refreshing. */
for (i = 0; i < num_chunks; i++)
{
int64 start = chunk_primary_dimension_start(&chunks[i]);
int64 end = chunk_primary_dimension_end(&chunks[i]);

LockRelationOid(chunks[i].table_id, ExclusiveLock);
erimatnor marked this conversation as resolved.
Show resolved Hide resolved

Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
chunks[i].cube->slices[0]->fd.dimension_id);

/* Refresh all continuous aggregates on the hypertable in the
* region covered by the dropped chunk. Note that we cannot assume
* that all dropped chunks exists in one single contiguous region,
* so we refresh all chunk regions individually rather than the
* region defined by the min and max chunk. An optimization is to
k-rus marked this conversation as resolved.
Show resolved Hide resolved
* merge adjecent regions into a larger one for a larger
* refresh. However, such merging needs to account for
* multi-dimensional tables where some chunks have the same
* primary dimension ranges. */
ts_cm_functions->continuous_agg_refresh_all(ht, start, end);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is kept to be in consistent with previous implementation, but the intention here was not to do it if I am not mistaken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept for now, and we might remove later.


/* Invalidate the dropped region to indicate that it was
* modified. The invalidation will allow the refresh command on a
* continuous aggregate to see that this region was dropped and
* and will therefore be able to refresh accordingly.*/
ts_cm_functions->continuous_agg_invalidate(ht, start, end);
}
}

for (i = 0; i < num_chunks; i++)
{
char *chunk_name;
ListCell *lc;
Expand Down Expand Up @@ -3513,10 +3456,14 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
* CASCADE (we don't support it), so we replace the hint with a more
* accurate hint for our situation. */
ErrorData *edata;

MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
FlushErrorState();
edata->hint = pstrdup("Use DROP ... to drop the dependent objects.");

if (edata->sqlerrcode == ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST)
edata->hint = pstrdup("Use DROP ... to drop the dependent objects.");

ts_cache_release(hcache);
ReThrowError(edata);
}
Expand Down
4 changes: 4 additions & 0 deletions src/chunk.h
Expand Up @@ -168,4 +168,8 @@ extern List *ts_chunk_data_nodes_copy(const Chunk *chunk);
CurrentMemoryContext, \
fail_if_not_found)

#define chunk_primary_dimension_start(chunk) (chunk)->cube->slices[0]->fd.range_start

#define chunk_primary_dimension_end(chunk) (chunk)->cube->slices[0]->fd.range_end

#endif /* TIMESCALEDB_CHUNK_H */
28 changes: 0 additions & 28 deletions src/continuous_agg.c
Expand Up @@ -359,34 +359,6 @@ ts_continuous_aggs_max_ignore_invalidation_older_than(int32 raw_hypertable_id,
return ignore_invalidation_older_than;
}

TSDLLEXPORT int64
ts_continuous_aggs_min_completed_threshold(int32 raw_hypertable_id, FormData_continuous_agg *entry)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
int64 min_threshold = PG_INT64_MAX;

init_scan_by_raw_hypertable_id(&iterator, raw_hypertable_id);
ts_scanner_foreach(&iterator)
{
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
int64 completed_threshold =
ts_continuous_agg_get_completed_threshold(data->mat_hypertable_id);

if (min_threshold > completed_threshold)
min_threshold = completed_threshold;
if (entry != NULL)
memcpy(entry, data, sizeof(*entry));

if (should_free)
heap_freetuple(tuple);
}

return min_threshold;
}

/* returns the inclusive value for the minimum time to invalidate */
TSDLLEXPORT int64
ts_continuous_aggs_get_minimum_invalidation_time(int64 modification_time,
Expand Down
2 changes: 0 additions & 2 deletions src/continuous_agg.h
Expand Up @@ -68,8 +68,6 @@ ts_continuous_agg_hypertable_status(int32 hypertable_id);
extern TSDLLEXPORT List *ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id);
extern TSDLLEXPORT int64 ts_continuous_aggs_max_ignore_invalidation_older_than(
int32 raw_hypertable_id, FormData_continuous_agg *entry);
TSDLLEXPORT int64 ts_continuous_aggs_min_completed_threshold(int32 raw_hypertable_id,
FormData_continuous_agg *entry);
extern TSDLLEXPORT int64 ts_continuous_aggs_get_minimum_invalidation_time(
int64 modification_time, int64 ignore_invalidation_older_than);
TSDLLEXPORT
Expand Down
5 changes: 3 additions & 2 deletions src/cross_module_fn.c
Expand Up @@ -261,7 +261,7 @@ continuous_agg_update_options_default(ContinuousAgg *cagg, WithClauseResult *wit
}

static void
continuous_agg_invalidate_default(const Hypertable *ht, int64 start, int64 end)
continuous_agg_invalidate_or_refresh_all_default(const Hypertable *ht, int64 start, int64 end)
{
error_no_default_fn_community();
pg_unreachable();
Expand Down Expand Up @@ -374,9 +374,10 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.process_cagg_viewstmt = process_cagg_viewstmt_default,
.continuous_agg_invalidation_trigger = error_no_default_fn_pg_community,
.continuous_agg_refresh = error_no_default_fn_pg_community,
.continuous_agg_refresh_all = continuous_agg_invalidate_or_refresh_all_default,
.continuous_agg_invalidate = continuous_agg_invalidate_or_refresh_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_materialize = cagg_materialize_default_fn,
.continuous_agg_invalidate = continuous_agg_invalidate_default,

/* compression */
.compressed_data_send = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -94,6 +94,7 @@ typedef struct CrossModuleFunctions
WithClauseResult *with_clause_options);
PGFunction continuous_agg_invalidation_trigger;
PGFunction continuous_agg_refresh;
void (*continuous_agg_refresh_all)(const Hypertable *ht, int64 start, int64 end);
void (*continuous_agg_invalidate)(const Hypertable *ht, int64 start, int64 end);
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);
Expand Down
27 changes: 26 additions & 1 deletion src/process_utility.c
Expand Up @@ -56,6 +56,7 @@
#include "dimension_vector.h"
#include "indexing.h"
#include "scan_iterator.h"
#include "time_utils.h"
#include "trigger.h"
#include "utils.h"
#include "with_clause_parser.h"
Expand Down Expand Up @@ -791,7 +792,9 @@ process_truncate(ProcessUtilityArgs *args)

if (agg_status == HypertableIsRawTable)
/* The truncation invalidates all associated continuous aggregates */
ts_cm_functions->continuous_agg_invalidate(ht, PG_INT64_MIN, PG_INT64_MAX);
ts_cm_functions->continuous_agg_invalidate(ht,
TS_TIME_NOBEGIN,
TS_TIME_NOEND);

if (!relation_should_recurse(rv))
ereport(ERROR,
Expand Down Expand Up @@ -874,6 +877,7 @@ static void
process_drop_chunk(ProcessUtilityArgs *args, DropStmt *stmt)
{
ListCell *lc;
Cache *hcache = ts_hypertable_cache_pin();

foreach (lc, stmt->objects)
{
Expand All @@ -887,8 +891,11 @@ process_drop_chunk(ProcessUtilityArgs *args, DropStmt *stmt)

relid = RangeVarGetRelid(relation, NoLock, true);
chunk = ts_chunk_get_by_relid(relid, false);

if (chunk != NULL)
{
Hypertable *ht;

if (ts_chunk_contains_compressed_data(chunk))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
Expand All @@ -906,8 +913,26 @@ process_drop_chunk(ProcessUtilityArgs *args, DropStmt *stmt)
if (compressed_chunk != NULL)
ts_chunk_drop(compressed_chunk, stmt->behavior, DEBUG1);
}

ht = ts_hypertable_cache_get_entry(hcache, chunk->hypertable_relid, CACHE_FLAG_NONE);

Assert(ht != NULL);

/* If the hypertable has continuous aggregates, then invalidate
* the dropped region. */
if (ts_continuous_agg_hypertable_status(ht->fd.id) == HypertableIsRawTable)
{
int64 start = chunk_primary_dimension_start(chunk);
int64 end = chunk_primary_dimension_end(chunk);

Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
chunk->cube->slices[0]->fd.dimension_id);
ts_cm_functions->continuous_agg_invalidate(ht, start, end);
}
}
}

ts_cache_release(hcache);
}

/*
Expand Down