Skip to content

Commit

Permalink
Add deparser for CreateTrigStmt
Browse files Browse the repository at this point in the history
Add deparser for CreateTrigStmt to use for distributed hypertables.

Fixes #3825
  • Loading branch information
Markos Fountoulakis authored and mfundul committed Nov 25, 2021
1 parent da8ce2e commit 58ad8ba
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 25 deletions.
45 changes: 20 additions & 25 deletions tsl/src/continuous_aggs/create.c
Expand Up @@ -75,6 +75,7 @@
#include "refresh.h"
#include "remote/dist_commands.h"
#include "hypertable_data_node.h"
#include "deparse.h"

#define FINALFN "finalize_agg"
#define PARTIALFN "partialize_agg"
Expand Down Expand Up @@ -344,27 +345,28 @@ check_trigger_exists_hypertable(Oid relid, char *trigname)

/* add continuous agg invalidation trigger to hypertable
* relid - oid of hypertable
* trigarg - argument to pass to trigger (the hypertable id from timescaledb catalog as a string)
* hypertableid - argument to pass to trigger (the hypertable id from timescaledb catalog)
*/
static void
cagg_add_trigger_hypertable(Oid relid, char *trigarg)
cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id)
{
char hypertable_id_str[12];
ObjectAddress objaddr;
char *relname = get_rel_name(relid);
Oid schemaid = get_rel_namespace(relid);
char *schema = get_namespace_name(schemaid);
Cache *hcache;
Hypertable *ht;

CreateTrigStmt stmt = {
CreateTrigStmt stmt_template = {
.type = T_CreateTrigStmt,
.row = true,
.timing = TRIGGER_TYPE_AFTER,
.trigname = CAGGINVAL_TRIGGER_NAME,
.relation = makeRangeVar(schema, relname, -1),
.funcname =
list_make2(makeString(INTERNAL_SCHEMA_NAME), makeString(CAGG_INVALIDATION_TRIGGER)),
.args = list_make1(makeString(trigarg)),
.args = NIL, /* to be filled in later */
.events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE | TRIGGER_TYPE_DELETE,
};
if (check_trigger_exists_hypertable(relid, CAGGINVAL_TRIGGER_NAME))
Expand All @@ -383,18 +385,15 @@ cagg_add_trigger_hypertable(Oid relid, char *trigarg)
foreach (cell, ht->data_nodes)
{
HypertableDataNode *node = lfirst(cell);
StringInfo command = makeStringInfo();
appendStringInfo(command,
"CREATE TRIGGER %s AFTER INSERT OR UPDATE OR DELETE ON %s.%s FOR EACH "
"ROW EXECUTE FUNCTION %s.%s(%d, %d)",
quote_identifier(CAGGINVAL_TRIGGER_NAME),
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)),
quote_identifier(INTERNAL_SCHEMA_NAME),
quote_identifier(CAGG_INVALIDATION_TRIGGER),
node->fd.node_hypertable_id, /* distributed member hypertable ID */
node->fd.hypertable_id /* Access Node hypertable ID */);
cmd_descr_data[i].sql = command->data;
char node_hypertable_id_str[12];
CreateTrigStmt remote_stmt = stmt_template;

pg_ltoa(node->fd.node_hypertable_id, node_hypertable_id_str);
pg_ltoa(node->fd.hypertable_id, hypertable_id_str);

remote_stmt.args =
list_make2(makeString(node_hypertable_id_str), makeString(hypertable_id_str));
cmd_descr_data[i].sql = deparse_create_trigger(&remote_stmt);
cmd_descr_data[i].params = NULL;
cmd_descriptors = lappend(cmd_descriptors, &cmd_descr_data[i++]);
}
Expand All @@ -410,7 +409,10 @@ cagg_add_trigger_hypertable(Oid relid, char *trigarg)
* triggers.
*/
}
objaddr = ts_hypertable_create_trigger(ht, &stmt, NULL);
CreateTrigStmt local_stmt = stmt_template;
pg_ltoa(hypertable_id, hypertable_id_str);
local_stmt.args = list_make1(makeString(hypertable_id_str));
objaddr = ts_hypertable_create_trigger(ht, &local_stmt, NULL);
if (!OidIsValid(objaddr.objectId))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
Expand Down Expand Up @@ -1796,8 +1798,6 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
Oid nspid;
RangeVar *part_rel = NULL, *mat_rel = NULL, *dum_rel = NULL;
int32 materialize_hypertable_id;
char trigarg[NAMEDATALEN];
int ret;
bool materialized_only =
DatumGetBool(with_clause_options[ContinuousViewOptionMaterializedOnly].parsed);

Expand Down Expand Up @@ -1878,12 +1878,7 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
dum_rel->relname);

/* Step 5 create trigger on raw hypertable -specified in the user view query*/
ret = snprintf(trigarg, NAMEDATALEN, "%d", origquery_ht->htid);
if (ret < 0 || ret >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("bad argument to continuous aggregate trigger")));
cagg_add_trigger_hypertable(origquery_ht->htoid, trigarg);
cagg_add_trigger_hypertable(origquery_ht->htoid, origquery_ht->htid);

return;
}
Expand Down
85 changes: 85 additions & 0 deletions tsl/src/deparse.c
Expand Up @@ -1003,3 +1003,88 @@ deparse_grant_revoke_on_database(Node *node, const char *dbname)

return command->data;
}

/* Deparse user-defined trigger */
const char *
deparse_create_trigger(CreateTrigStmt *stmt)
{
ListCell *lc;
bool found_event = false;
bool found_first_arg = false;

/*
CREATE [ CONSTRAINT ] TRIGGER name { BEFORE | AFTER | INSTEAD OF } { event [ OR ... ] }
ON table_name
[ FROM referenced_table_name ]
[ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]
[ REFERENCING { { OLD | NEW } TABLE [ AS ] transition_relation_name } [ ... ] ]
[ FOR [ EACH ] { ROW | STATEMENT } ]
[ WHEN ( condition ) ]
EXECUTE { FUNCTION | PROCEDURE } function_name ( arguments )
*/
if (stmt->isconstraint)
elog(ERROR, "deparsing constraint triggers is not supported");

StringInfo command = makeStringInfo();
appendStringInfo(command, "CREATE TRIGGER %s ", quote_identifier(stmt->trigname));

if (TRIGGER_FOR_BEFORE(stmt->timing))
appendStringInfoString(command, "BEFORE");
else if (TRIGGER_FOR_AFTER(stmt->timing))
appendStringInfoString(command, "AFTER");
else if (TRIGGER_FOR_INSTEAD(stmt->timing))
appendStringInfoString(command, "INSTEAD OF");
else
elog(ERROR, "unexpected timing value: %d", stmt->timing);

if (TRIGGER_FOR_INSERT(stmt->events))
{
appendStringInfoString(command, " INSERT");
found_event = true;
}
if (TRIGGER_FOR_DELETE(stmt->events))
{
if (found_event)
appendStringInfoString(command, " OR");
appendStringInfoString(command, " DELETE");
found_event = true;
}
if (TRIGGER_FOR_UPDATE(stmt->events))
{
if (found_event)
appendStringInfoString(command, " OR");
appendStringInfoString(command, " UPDATE");
found_event = true;
}
if (TRIGGER_FOR_TRUNCATE(stmt->events))
{
if (found_event)
appendStringInfoString(command, " OR");
appendStringInfoString(command, " TRUNCATE");
}
appendStringInfo(command,
" ON %s.%s",
quote_identifier(stmt->relation->schemaname),
quote_identifier(stmt->relation->relname));

if (stmt->row)
appendStringInfoString(command, " FOR EACH ROW");
else
appendStringInfoString(command, " FOR EACH STATEMENT");

if (stmt->whenClause)
elog(ERROR, "deparsing trigger WHEN clause is not supported");

appendStringInfo(command, " EXECUTE FUNCTION %s(", NameListToQuotedString(stmt->funcname));
foreach (lc, stmt->args)
{
if (found_first_arg)
appendStringInfoString(command, ", ");
else
found_first_arg = true;
appendStringInfoString(command, strVal(lfirst(lc)));
}
appendStringInfoString(command, ")");

return command->data;
}
1 change: 1 addition & 0 deletions tsl/src/deparse.h
Expand Up @@ -50,5 +50,6 @@ DeparsedHypertableCommands *deparse_get_distributed_hypertable_create_command(Hy
const char *deparse_func_call(FunctionCallInfo finfo);
const char *deparse_oid_function_call_coll(Oid funcid, Oid collation, unsigned int num_args, ...);
const char *deparse_grant_revoke_on_database(Node *node, const char *dbname);
const char *deparse_create_trigger(CreateTrigStmt *stmt);

#endif

0 comments on commit 58ad8ba

Please sign in to comment.