Skip to content

Commit

Permalink
Fix double publish of child table's data.
Browse files Browse the repository at this point in the history
We publish the child table's data twice for a publication that has both
child and parent tables and is published with publish_via_partition_root
as true. This happens because subscribers will initiate synchronization
using both parent and child tables, since it gets both as separate tables
in the initial table list.

Ensure that pg_publication_tables returns only parent tables in such
cases.

Author: Hou Zhijie
Reviewed-by: Greg Nancarrow, Amit Langote, Vignesh C, Amit Kapila
Backpatch-through: 13
Discussion: https://postgr.es/m/OS0PR01MB57167F45D481F78CDC5986F794B99@OS0PR01MB5716.jpnprd01.prod.outlook.com
  • Loading branch information
Amit Kapila committed Dec 9, 2021
1 parent a96a1d6 commit 3f06c00
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 4 deletions.
52 changes: 52 additions & 0 deletions src/backend/catalog/pg_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,45 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
relid >= FirstNormalObjectId;
}

/*
* Filter out the partitions whose parent tables were also specified in
* the publication.
*/
static List *
filter_partitions(List *relids)
{
List *result = NIL;
ListCell *lc;
ListCell *lc2;

foreach(lc, relids)
{
bool skip = false;
List *ancestors = NIL;
Oid relid = lfirst_oid(lc);

if (get_rel_relispartition(relid))
ancestors = get_partition_ancestors(relid);

foreach(lc2, ancestors)
{
Oid ancestor = lfirst_oid(lc2);

/* Check if the parent table exists in the published table list. */
if (list_member_oid(relids, ancestor))
{
skip = true;
break;
}
}

if (!skip)
result = lappend_oid(result, relid);
}

return result;
}

/*
* Another variant of this, taking a Relation.
*/
Expand Down Expand Up @@ -557,10 +596,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
if (publication->alltables)
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
else
{
tables = GetPublicationRelations(publication->oid,
publication->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);

/*
* If the publication publishes partition changes via their
* respective root partitioned tables, we must exclude partitions
* in favor of including the root partitioned tables. Otherwise,
* the function could return both the child and parent tables
* which could cause data of the child table to be
* double-published on the subscriber side.
*/
if (publication->pubviaroot)
tables = filter_partitions(tables);
}
funcctx->user_fctx = (void *) tables;

MemoryContextSwitchTo(oldcontext);
Expand Down
9 changes: 9 additions & 0 deletions src/test/regress/expected/publication.out
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
-- publication includes both the parent table and the child table
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
-- only parent is listed as being in publication, not the partition
SELECT * FROM pg_publication_tables;
pubname | schemaname | tablename
-------------------+------------+----------------
testpub_forparted | public | testpub_parted
(1 row)

DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication
Expand Down
4 changes: 4 additions & 0 deletions src/test/regress/sql/publication.sql
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ UPDATE testpub_parted2 SET a = 2;
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
-- publication includes both the parent table and the child table
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
-- only parent is listed as being in publication, not the partition
SELECT * FROM pg_publication_tables;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;

Expand Down
18 changes: 14 additions & 4 deletions src/test/subscription/t/013_partition.pl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 62;
use Test::More tests => 63;

# setup

Expand Down Expand Up @@ -409,11 +409,16 @@ BEGIN
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
# Note: tab3_1's parent is not in the publication, in which case its
# changes are published using own identity.
# changes are published using own identity. For tab2, even though both parent
# and child tables are present but changes will be replicated via the parent's
# identity and only once.
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)"
"CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
);

# prepare data for the initial sync
$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");

# subscriber 1
$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
$node_subscriber1->safe_psql('postgres',
Expand Down Expand Up @@ -465,12 +470,17 @@ BEGIN
$node_subscriber2->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";

# check that data is synced correctly
$result = $node_subscriber1->safe_psql('postgres',
"SELECT c, a FROM tab2");
is( $result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot');

# insert
$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)");
$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)");
$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab2 VALUES (1), (0), (3), (5)");
"INSERT INTO tab2 VALUES (0), (3), (5)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab3 VALUES (1), (0), (3), (5)");

Expand Down

0 comments on commit 3f06c00

Please sign in to comment.