Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused refresh arguments from invalidation #2441

Merged
merged 1 commit into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 11 additions & 25 deletions tsl/src/continuous_aggs/invalidation.c
Expand Up @@ -564,9 +564,8 @@ invalidation_entry_try_merge(Invalidation *entry, const Invalidation *newentry)
}

static void
cut_and_insert_new_cagg_invalidation(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window,
const Invalidation *entry, int32 cagg_hyper_id)
cut_and_insert_new_cagg_invalidation(const CaggInvalidationState *state, const Invalidation *entry,
int32 cagg_hyper_id)
{
CatalogSecurityContext sec_ctx;
TupleDesc tupdesc = RelationGetDescr(state->cagg_log_rel);
Expand All @@ -586,11 +585,7 @@ cut_and_insert_new_cagg_invalidation(const CaggInvalidationState *state,
/*
* Process invalidations in the hypertable invalidation log.
*
* Copy and delete all entries from the hypertable invalidation log. For the
* continuous aggregate that is getting refreshed, we also match the
* invalidation against the refresh window and perform additional processing
* (cutting or deleting and merging); work that we'd otherwise have to do
* later in the cagg invalidation log.
* Copy and delete all entries from the hypertable invalidation log.
*
* Note that each entry gets one copy per continuous aggregate in the cagg
* invalidation log (unless it was merged or matched the refresh
Expand All @@ -601,8 +596,7 @@ cut_and_insert_new_cagg_invalidation(const CaggInvalidationState *state,
* invalidation log.
*/
static void
move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window)
move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state)
{
int32 hyper_id = state->cagg.data.raw_hypertable_id;
List *cagg_ids = get_cagg_ids(hyper_id);
Expand Down Expand Up @@ -652,10 +646,7 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state,
}
else if (!invalidation_entry_try_merge(&mergedentry, &logentry))
{
cut_and_insert_new_cagg_invalidation(state,
refresh_window,
&mergedentry,
cagg_hyper_id);
cut_and_insert_new_cagg_invalidation(state, &mergedentry, cagg_hyper_id);
mergedentry = logentry;
}

Expand All @@ -679,10 +670,7 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state,

/* Handle the last merged invalidation */
if (IS_VALID_INVALIDATION(&mergedentry))
cut_and_insert_new_cagg_invalidation(state,
refresh_window,
&mergedentry,
cagg_hyper_id);
cut_and_insert_new_cagg_invalidation(state, &mergedentry, cagg_hyper_id);
}
}

Expand Down Expand Up @@ -851,8 +839,7 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
}

static void
invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window)
invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg)
{
state->cagg = *cagg;
state->cagg_log_rel = open_invalidation_log(LOG_CAGG, RowExclusiveLock);
Expand All @@ -871,13 +858,12 @@ invalidation_state_cleanup(const CaggInvalidationState *state)
}

void
invalidation_process_hypertable_log(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window)
invalidation_process_hypertable_log(const ContinuousAgg *cagg)
{
CaggInvalidationState state;

invalidation_state_init(&state, cagg, refresh_window);
move_invalidations_from_hyper_to_cagg_log(&state, refresh_window);
invalidation_state_init(&state, cagg);
move_invalidations_from_hyper_to_cagg_log(&state);
invalidation_state_cleanup(&state);
}

Expand All @@ -887,7 +873,7 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange
CaggInvalidationState state;
InvalidationStore *store = NULL;

invalidation_state_init(&state, cagg, refresh_window);
invalidation_state_init(&state, cagg);
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
clear_cagg_invalidations_for_refresh(&state, refresh_window);

Expand Down
3 changes: 1 addition & 2 deletions tsl/src/continuous_aggs/invalidation.h
Expand Up @@ -41,8 +41,7 @@ extern void invalidation_hyper_log_add_entry(int32 hyper_id, int64 modtime, int6
extern void invalidation_add_entry(const Hypertable *ht, int64 start, int64 end);
extern void invalidation_entry_set_from_hyper_invalidation(Invalidation *entry, const TupleInfo *ti,
int32 hyper_id);
extern void invalidation_process_hypertable_log(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
extern void invalidation_process_hypertable_log(const ContinuousAgg *cagg);
extern InvalidationStore *invalidation_process_cagg_log(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
extern void invalidation_store_free(InvalidationStore *store);
Expand Down
6 changes: 3 additions & 3 deletions tsl/src/continuous_aggs/refresh.c
Expand Up @@ -433,7 +433,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
}

/* Process invalidations in the hypertable invalidation log */
invalidation_process_hypertable_log(cagg, &refresh_window);
invalidation_process_hypertable_log(cagg);

/* Start a new transaction. Note that this invalidates previous memory
* allocations (and locks). */
Expand Down Expand Up @@ -481,8 +481,8 @@ continuous_agg_refresh_all(const Hypertable *ht, int64 start, int64 end)
invalidation_threshold_set_or_get(ht->fd.id, refresh_window.end);

/* It is enough to process the hypertable invalidation log once,
* so do it only for the first continuous aggregate */
invalidation_process_hypertable_log(linitial(caggs), &refresh_window);
* so do it only for the first continuous aggregate. */
invalidation_process_hypertable_log(linitial(caggs));
/* Must make invalidation processing visible */
CommandCounterIncrement();

Expand Down