Skip to content

Commit

Permalink
Fail replication worker on DDL command that rewrites table using vola…
Browse files Browse the repository at this point in the history
…tile functions, such as ALTER TABLE tab ADD COLUMN col DEFAULT volatile_expr. This is to avoid data mismatch compared to the publisher. We can potentially unblock this type of command when table rewrite is supported in logical replication.
  • Loading branch information
Zheng (Zane) Li committed Apr 29, 2022
1 parent 06549e4 commit 1e6115c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/backend/replication/logical/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "parser/parse_expr.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
Expand Down Expand Up @@ -2548,6 +2549,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear

commandTag = CreateCommandTag((Node *)command);

/* The following DDL commands need special handling */

/*
* Remember the schemaname and relname if the cmd is going to create a table
* because we will need them for some post-processing after we
Expand Down Expand Up @@ -2595,6 +2598,47 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
sstmt->intoClause->skipData = true;
}
}
/*
* ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported.
* Until we support logical replication of table rewrite, see ATRewriteTables()
* for details on table rewrite.
*/
else if (commandTag == CMDTAG_ALTER_TABLE)
{
AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt;
ListCell *lc;

foreach(lc, atstmt->cmds)
{
AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc);

if (cmd->subtype == AT_AddColumn)
{
ColumnDef *colDef;
ListCell *c;

colDef = castNode(ColumnDef, cmd->def);
foreach(c, colDef->constraints)
{
Constraint *con = lfirst_node(Constraint, c);

if (con->contype == CONSTR_DEFAULT)
{
Node *expr;
ParseState *pstate = make_parsestate(NULL);

expr = transformExpr(pstate, copyObject(con->raw_expr), EXPR_KIND_COLUMN_DEFAULT);
if (contain_volatile_functions(expr))
{
elog(ERROR,
"Do not support replication of DDL statement that rewrites table using volatile functions: %s",
cmdstr);
}
}
}
}
}
}

/*
* Set up a snapshot if parse analysis/planning will need one.
Expand Down
22 changes: 22 additions & 0 deletions src/test/subscription/t/030_rep_ddls.pl
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,29 @@ BEGIN
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.proc_table where c3 = 22;");
is($result, qq(1), 'DDLs in procedure are replicated');

# Test Alter table alter column type stmt is replicated
$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ALTER COLUMN name type TEXT;");

$node_publisher->wait_for_catchup('mysub');

$result = $node_subscriber->safe_psql('postgres', "SELECT data_type FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'name';");
is($result, qq(text), 'Alter table column type stmt is replicated');

# Test Alter table add column default 0.01 is replicated
$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN non_volatile double precision DEFAULT 0.01;");

$node_publisher->wait_for_catchup('mysub');

$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep where non_volatile = 0.01;");
is($result, qq(2), 'Alter table add column default 0.01 is replicated');

#TODO TEST certain DDLs are not replicated
# Test DDL statement that rewrites table with volatile functions are not replicated
$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();");
$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'volatile';");
is($result, qq(1), 'Alter table add column default random() is executed on the publisher DB.');

$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result);

pass "DDL replication tests passed!";

Expand Down

0 comments on commit 1e6115c

Please sign in to comment.