Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop chunks from materialized hypertables #1666

Merged
merged 1 commit into from Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -9,9 +9,10 @@ accidentally triggering the load of a previous DB version.**
**Major Features**

**Minor Features**
* #1666 Support drop_chunks API for continuous aggregates

**Bugfixes**
* #1648 Drop chunks for materialized hypertable
* #1648 Drop chunks from materialized hypertable
* #1665 Add ignore_invalidation_older_than to timescaledb_information.continuous_aggregates view
* #1668 Cannot add dimension if hypertable has empty chunks
* #1674 Fix time_bucket_gapfill's interaction with GROUP BY
Expand Down
48 changes: 40 additions & 8 deletions src/chunk.c
Expand Up @@ -2392,11 +2392,14 @@ ts_chunk_drop_process_materialization(Oid hypertable_relid,
ts_cache_release(hcache);
}

/* Continuous agg materialization hypertables can be dropped
* only if a user explicitly specifies the table name
*/
List *
ts_chunk_do_drop_chunks(Oid table_relid, Datum older_than_datum, Datum newer_than_datum,
Oid older_than_type, Oid newer_than_type, bool cascade,
CascadeToMaterializationOption cascades_to_materializations,
int32 log_level)
int32 log_level, bool user_supplied_table_name)
erimatnor marked this conversation as resolved.
Show resolved Hide resolved
{
uint64 i = 0;
uint64 num_chunks = 0;
Expand All @@ -2413,9 +2416,18 @@ ts_chunk_do_drop_chunks(Oid table_relid, Datum older_than_datum, Datum newer_tha
switch (ts_continuous_agg_hypertable_status(hypertable_id))
{
case HypertableIsMaterialization:
if (user_supplied_table_name == false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nit, I guess those two cases can be combined together to avoid having error message duplication, like this:

case HypertableIsMaterialization:
case HypertableIsMaterializationAndRaw: 
    if (user_supplied_table_name == false)
        elog(ERROR, "cannot drop chunks on a continuous aggregate materialization table");
    has_continuous_aggs = (status == HypertableIsMaterializationAndRaw);
    break;

{
elog(ERROR, "cannot drop chunks on a continuous aggregate materialization table");
}
has_continuous_aggs = false;
break;
case HypertableIsMaterializationAndRaw:
elog(ERROR, "cannot drop_chunks on a continuous aggregate materialization table");
pg_unreachable();
if (user_supplied_table_name == false)
{
elog(ERROR, "cannot drop chunks on a continuous aggregate materialization table");
}
has_continuous_aggs = true;
break;
case HypertableIsRawTable:
if (cascades_to_materializations == CASCADE_TO_MATERIALIZATION_UNKNOWN)
Expand Down Expand Up @@ -2485,7 +2497,8 @@ ts_chunk_do_drop_chunks(Oid table_relid, Datum older_than_datum, Datum newer_tha
older_than_type,
newer_than_type,
cascade,
log_level);
log_level,
user_supplied_table_name);
}
return dropped_chunk_names;
}
Expand Down Expand Up @@ -2550,6 +2563,7 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
bool cascade, verbose;
CascadeToMaterializationOption cascades_to_materializations;
int elevel;
bool user_supplied_table_name = true;

/*
* When past the first call of the SRF, dropping has already been completed,
Expand Down Expand Up @@ -2585,10 +2599,27 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
if (table_name != NULL)
{
if (ht_oids == NIL)
erimatnor marked this conversation as resolved.
Show resolved Hide resolved
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("hypertable \"%s\" does not exist", NameStr(*table_name))));
{
ContinuousAgg *ca = NULL;
ca = ts_continuous_agg_find_userview_name(schema_name ? NameStr(*schema_name) : NULL,
NameStr(*table_name));
if (ca == NULL)
ereport(ERROR,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that sometimes we are using elog(ERROR) and sometimes ereport(ERROR) here, maybe we should stick to one of them, unless there is a good reason for it

(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("\"%s\" is not a hypertable or a continuous aggregate view",
NameStr(*table_name)),
errhint("It is only possible to drop chunks from a hypertable or "
"continuous aggregate view")));
else
{
int32 matid = ca->data.mat_hypertable_id;
Hypertable *mat_ht = ts_hypertable_get_by_id(matid);
ht_oids = lappend_oid(ht_oids, mat_ht->main_table_relid);
}
}
}
else
user_supplied_table_name = false;

/* Initial multi function call setup */
funcctx = SRF_FIRSTCALL_INIT();
Expand Down Expand Up @@ -2657,7 +2688,8 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
newer_than_type,
cascade,
cascades_to_materializations,
elevel);
elevel,
user_supplied_table_name);
dc_names = list_concat(dc_names, dc_temp);

MemoryContextSwitchTo(oldcontext);
Expand Down
2 changes: 1 addition & 1 deletion src/chunk.h
Expand Up @@ -139,7 +139,7 @@ extern TSDLLEXPORT List *
ts_chunk_do_drop_chunks(Oid table_relid, Datum older_than_datum, Datum newer_than_datum,
Oid older_than_type, Oid newer_than_type, bool cascade,
CascadeToMaterializationOption cascades_to_materializations,
int32 log_level);
int32 log_level, bool user_supplied_table_name);
extern TSDLLEXPORT Chunk *
ts_chunk_get_chunks_in_time_range(Oid table_relid, Datum older_than_datum, Datum newer_than_datum,
Oid older_than_type, Oid newer_than_type, char *caller_name,
Expand Down
83 changes: 83 additions & 0 deletions src/continuous_agg.c
Expand Up @@ -401,6 +401,41 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name)
return ca;
}

ContinuousAgg *
ts_continuous_agg_find_userview_name(const char *schema, const char *name)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
ContinuousAgg *ca = NULL;
int count = 0;
const char *chkschema = schema;

ts_scanner_foreach(&iterator)
{
ContinuousAggViewType vtyp;
FormData_continuous_agg *data =
(FormData_continuous_agg *) GETSTRUCT(ts_scan_iterator_tuple(&iterator));
if (schema == NULL)
{
/* only user visible views will be returned */
Oid relid = RelnameGetRelid(NameStr(data->user_view_name));
if (relid == InvalidOid)
continue;
chkschema = NameStr(data->user_view_schema);
}

vtyp = ts_continuous_agg_view_type(data, chkschema, name);
if (vtyp == ContinuousAggUserView)
{
ca = palloc0(sizeof(*ca));
continuous_agg_init(ca, data);
count++;
}
}
Assert(count <= 1);
return ca;
}

ContinuousAgg *
ts_continuous_agg_find_by_job_id(int32 job_id)
{
Expand Down Expand Up @@ -783,3 +818,51 @@ ts_continuous_agg_get_user_view_oid(ContinuousAgg *agg)
elog(ERROR, "could not find user view for continuous agg");
return view_relid;
}

static int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add newline

find_raw_hypertable_for_materialization(int32 mat_hypertable_id)
{
short count = 0;
erimatnor marked this conversation as resolved.
Show resolved Hide resolved
int32 htid = INVALID_HYPERTABLE_ID;
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext);

init_scan_by_mat_hypertable_id(&iterator, mat_hypertable_id);
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
HeapTuple tuple = ti->tuple;
Form_continuous_agg form = (Form_continuous_agg) GETSTRUCT(tuple);
htid = form->raw_hypertable_id;
count++;
}
Assert(count <= 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just do Assert(htid == INVALID_HYPERTABLE_ID) instead of count when assigning htid. In case if Assert is disabled, we might get compiler warning saying that count is set but not used.

ts_scan_iterator_close(&iterator);
return htid;
}

/* Continuous aggregate materialization hypertables inherit integer_now func
* from the raw hypertable (unless it was explictly reset for cont. aggregate.
* Walk the materialzation hyperatable ->raw hypertable tree till
* we find a hypertable that has integer_now_func set.
*/
TSDLLEXPORT Dimension *
ts_continous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid)
{
int32 raw_htid = mat_htid;
Dimension *par_dim = NULL;
while (raw_htid != INVALID_HYPERTABLE_ID)
{
Hypertable *raw_ht = ts_hypertable_get_by_id(raw_htid);
Dimension *open_dim = hyperspace_get_open_dimension(raw_ht->space, 0);
if (strlen(NameStr(open_dim->fd.integer_now_func)) != 0 &&
strlen(NameStr(open_dim->fd.integer_now_func_schema)) != 0)
{
par_dim = open_dim;
break;
}
mat_htid = raw_htid;
raw_htid = find_raw_hypertable_for_materialization(mat_htid);
}
return par_dim;
}
3 changes: 3 additions & 0 deletions src/continuous_agg.h
Expand Up @@ -90,5 +90,8 @@ extern void ts_continuous_agg_rename_view(char *old_schema, char *name, char *ne
extern TSDLLEXPORT int32 ts_number_of_continuous_aggs(void);

extern Oid ts_continuous_agg_get_user_view_oid(ContinuousAgg *agg);
extern TSDLLEXPORT Dimension *
ts_continous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid);
extern ContinuousAgg *ts_continuous_agg_find_userview_name(const char *schema, const char *name);

#endif /* TIMESCALEDB_CONTINUOUS_AGG_H */
3 changes: 2 additions & 1 deletion src/cross_module_fn.c
Expand Up @@ -347,7 +347,8 @@ static void
continuous_agg_drop_chunks_by_chunk_id_default(int32 raw_hypertable_id, Chunk **chunks,
Size num_chunks, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type,
Oid newer_than_type, bool cascade, int32 log_level)
Oid newer_than_type, bool cascade, int32 log_level,
bool user_supplied_table_name)
{
error_no_default_fn_community();
}
Expand Down
3 changes: 2 additions & 1 deletion src/cross_module_fn.h
Expand Up @@ -71,7 +71,8 @@ typedef struct CrossModuleFunctions

Datum older_than_datum, Datum newer_than_datum,
Oid older_than_type, Oid newer_than_type,
bool cascade, int32 log_level);
bool cascade, int32 log_level,
bool user_supplied_table_name);
PGFunction continuous_agg_trigfn;
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);
Expand Down
12 changes: 9 additions & 3 deletions src/hypertable.c
Expand Up @@ -1987,7 +1987,14 @@ hypertable_tuple_match_name(TupleInfo *ti, void *data)
if (!OidIsValid(schema_oid))
return SCAN_CONTINUE;

relid = get_relname_relid(NameStr(fd.table_name), schema_oid);
if (accum->schema_name == NULL)
{
/* only user visible tables will be returned */
relid = RelnameGetRelid(NameStr(fd.table_name));
}
else
relid = get_relname_relid(NameStr(fd.table_name), schema_oid);

if (!OidIsValid(relid))
return SCAN_CONTINUE;

Expand All @@ -2000,13 +2007,12 @@ hypertable_tuple_match_name(TupleInfo *ti, void *data)
NameGetDatum(accum->table_name),
NameGetDatum(&fd.table_name)))))
accum->ht_oids = lappend_oid(accum->ht_oids, relid);

return SCAN_CONTINUE;
}

/*
* Used for drop_chunks. Either name can be NULL, which indicates matching on
* all possible names.
* all possible names visible in search path.
*/
List *
ts_hypertable_get_all_by_name(Name schema_name, Name table_name, MemoryContext mctx)
Expand Down
41 changes: 32 additions & 9 deletions src/interval.c
Expand Up @@ -95,7 +95,7 @@ ts_interval_from_sql_input(Oid relid, Datum interval, Oid interval_type, const c
{
Hypertable *hypertable;
Cache *hcache;
FormData_ts_interval *invl = palloc0(sizeof(FormData_ts_interval));
FormData_ts_interval *invl;
Oid partitioning_type;
Dimension *open_dim;

Expand All @@ -109,8 +109,33 @@ ts_interval_from_sql_input(Oid relid, Datum interval, Oid interval_type, const c
elog(ERROR, "internal error: no open dimension found while parsing interval");

partitioning_type = ts_dimension_get_partition_type(open_dim);
if (IS_INTEGER_TYPE(partitioning_type))
{
if (strlen(NameStr(open_dim->fd.integer_now_func)) == 0 ||
strlen(NameStr(open_dim->fd.integer_now_func_schema)) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest a different error code, since this one is reserved for invalid parameter values when calling (user-facing) SQL functions. For instance, user calls SELECT set_port(port => 134000); then the value for port is invalid (outside the port range).

Also, in my understanding, having a NULL integer now func is valid.

I suggest INTERNAL_ERROR or just a simple elog.

Copy link
Contributor Author

@gayyappan gayyappan Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code path is exercised by add drop_chunks policy and compress_chunks policy. So this is a user facing error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that integer_now_func and integer_now_func_schema aren't, AFAIK, set as part of those functions, like in my set_port example. Further, the error is not about a function parameter but about some values of internal metadata being NULL.

So, I am not sure it makes sense to raise ERRCODE_INVALID_PARAMETER_VALUE. The error should tell you that the system is in the wrong state for the given operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are errors for the "older_than" parameter for the policy. There is a specific action needed on the user's part to fix this issue. older_than parameter on integer-time based hypertables doesn't make sense without specifying a now() function. INTERNAL_ERROR to me typically signifies something unexpected happening in the code which is probably a bug.

errmsg("integer_now_func not set on hypertable \"%s\"", get_rel_name(relid))));
}
invl = ts_interval_from_sql_input_internal(open_dim,
interval,
interval_type,
parameter_name,
caller_name);
ts_cache_release(hcache);
return invl;
}

/* use this variant only if the open_dim needs to be
* inferred for the hypertable. This is the case for continuous aggr
* related materialization hypertables
*/
TSDLLEXPORT FormData_ts_interval *
ts_interval_from_sql_input_internal(Dimension *open_dim, Datum interval, Oid interval_type,
const char *parameter_name, const char *caller_name)
{
FormData_ts_interval *invl = palloc0(sizeof(FormData_ts_interval));
Oid partitioning_type = ts_dimension_get_partition_type(open_dim);
switch (interval_type)
{
case INTERVALOID:
Expand All @@ -134,12 +159,6 @@ ts_interval_from_sql_input(Oid relid, Datum interval, Oid interval_type, const c
errhint("integer-based time duration cannot be used with hypertables with "
"a timestamp-based time dimensions")));

if (strlen(NameStr(open_dim->fd.integer_now_func)) == 0 ||
strlen(NameStr(open_dim->fd.integer_now_func_schema)) == 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("integer_now_func not set on hypertable %s", get_rel_name(relid))));

invl->is_time_interval = false;
invl->integer_interval = ts_time_value_to_internal(interval, interval_type);

Expand Down Expand Up @@ -277,11 +296,15 @@ ts_interval_from_now_func_get_datum(int64 interval, Oid time_dim_type, Oid now_f
(errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW), errmsg("ts_interval overflow")));
return Int32GetDatum(res);
case INT8OID:
res = DatumGetInt64(now) - interval;
if (res > DatumGetInt64(now))
{
bool overflow = pg_sub_s64_overflow(DatumGetInt64(now), interval, &res);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to put blank line after a variable definition?

if (overflow)
{
ereport(ERROR,
(errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW), errmsg("ts_interval overflow")));
}
return Int64GetDatum(res);
}
default:
pg_unreachable();
}
Expand Down
4 changes: 3 additions & 1 deletion src/interval.h
Expand Up @@ -21,5 +21,7 @@ TSDLLEXPORT bool ts_interval_equal(FormData_ts_interval *invl1, FormData_ts_inte
TSDLLEXPORT void ts_interval_now_func_validate(Oid now_func_oid, Oid open_dim_type);
TSDLLEXPORT Datum ts_interval_subtract_from_now(FormData_ts_interval *invl, Dimension *open_dim);
TSDLLEXPORT int64 ts_get_now_internal(Dimension *open_dim);

TSDLLEXPORT FormData_ts_interval *
ts_interval_from_sql_input_internal(Dimension *open_dim, Datum interval, Oid interval_type,
const char *parameter_name, const char *caller_name);
#endif /* TIMESCALEDB_INTERVAL */