diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
index 8b25df02a..8241f9c28 100644
--- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
+++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
@@ -13,6 +13,7 @@ use tracing::{debug, info, warn};
use super::super::{ensure_validation, publisher::Table, Error};
use super::ReplicationSlot;
+use crate::backend::replication::logical::subscriber::omni_ownership::OmniOwnership;
use crate::backend::replication::logical::subscriber::stream::StreamSubscriber;
use crate::backend::replication::publisher::progress::Progress;
use crate::backend::replication::publisher::Lsn;
@@ -196,6 +197,7 @@ impl Publisher {
self.create_slots(source).await?;
}
+ let n_sources = source.shards().len();
for (number, _) in source.shards().iter().enumerate() {
// Use table offsets from data sync
// or from loading them above.
@@ -204,7 +206,10 @@ impl Publisher {
.get(&number)
.ok_or(Error::NoReplicationTables(number))?;
// Handles the logical replication stream messages.
- let mut stream = StreamSubscriber::new(dest, tables);
+ // Each subscriber owns a partition of destination shards for omni-table DML
+ // (dest_shard % n_sources == source_shard), preventing cross-subscriber deadlocks.
+ let mut stream =
+ StreamSubscriber::new(dest, tables, OmniOwnership::new(number, n_sources));
// Take ownership of the slot for replication.
let mut slot = self
diff --git a/pgdog/src/backend/replication/logical/subscriber/mod.rs b/pgdog/src/backend/replication/logical/subscriber/mod.rs
index 9df30191b..27d1853b9 100644
--- a/pgdog/src/backend/replication/logical/subscriber/mod.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/mod.rs
@@ -1,5 +1,6 @@
pub mod context;
pub mod copy;
+pub mod omni_ownership;
pub mod parallel_connection;
pub mod stream;
diff --git a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
new file mode 100644
index 000000000..1aaa3a410
--- /dev/null
+++ b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
@@ -0,0 +1,112 @@
+/// Controls which destination shards a subscriber writes to for omni (unsharded) tables.
+///
+/// Partitions destinations via `dest_shard % n_sources == source_shard` so that each
+/// subscriber owns a disjoint subset, preventing cross-subscriber row-lock deadlocks.
+#[derive(Debug, Clone, Copy)]
+pub struct OmniOwnership {
+ source_shard: usize,
+ n_sources: usize,
+}
+
+impl OmniOwnership {
+ pub fn new(source_shard: usize, n_sources: usize) -> Self {
+ Self {
+ source_shard,
+ n_sources,
+ }
+ }
+
+ /// Returns true if this subscriber should write omni-table DML to `dest_shard`.
+ pub fn owns(&self, dest_shard: usize) -> bool {
+ if self.n_sources <= 1 {
+ return true;
+ }
+ dest_shard % self.n_sources == self.source_shard
+ }
+}
+
+impl Default for OmniOwnership {
+ fn default() -> Self {
+ Self::new(0, 1)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn omni_single_source_owns_all_dests() {
+ let p = OmniOwnership::new(0, 1);
+ assert!(p.owns(0));
+ assert!(p.owns(1));
+ assert!(p.owns(2));
+ assert!(p.owns(7));
+ }
+
+ #[test]
+ fn omni_zero_sources_owns_all_dests() {
+ let p = OmniOwnership::new(0, 0);
+ assert!(p.owns(0));
+ assert!(p.owns(1));
+ assert!(p.owns(3));
+ }
+
+ #[test]
+ fn omni_equal_sources_and_dests() {
+ // n_sources == n_dests == 3: strict 1:1, each source owns only its own index.
+ assert!(OmniOwnership::new(0, 3).owns(0));
+ assert!(OmniOwnership::new(1, 3).owns(1));
+ assert!(OmniOwnership::new(2, 3).owns(2));
+
+ assert!(!OmniOwnership::new(1, 3).owns(0));
+ assert!(!OmniOwnership::new(2, 3).owns(0));
+ assert!(!OmniOwnership::new(0, 3).owns(1));
+ assert!(!OmniOwnership::new(2, 3).owns(1));
+ assert!(!OmniOwnership::new(0, 3).owns(2));
+ assert!(!OmniOwnership::new(1, 3).owns(2));
+ }
+
+ #[test]
+ fn omni_fewer_sources_than_dests() {
+ // n_sources=2, n_dests=4: sub-0 owns even dests, sub-1 owns odd dests.
+ let p0 = OmniOwnership::new(0, 2);
+ assert!(p0.owns(0));
+ assert!(p0.owns(2));
+ assert!(!p0.owns(1));
+ assert!(!p0.owns(3));
+
+ let p1 = OmniOwnership::new(1, 2);
+ assert!(p1.owns(1));
+ assert!(p1.owns(3));
+ assert!(!p1.owns(0));
+ assert!(!p1.owns(2));
+ }
+
+ #[test]
+ fn omni_more_sources_than_dests_all_dests_covered() {
+ // n_sources=5, n_dests=3: subs 0-2 each own their matching dest exclusively.
+ assert!(OmniOwnership::new(0, 5).owns(0));
+ assert!(OmniOwnership::new(1, 5).owns(1));
+ assert!(OmniOwnership::new(2, 5).owns(2));
+
+ assert!(!OmniOwnership::new(1, 5).owns(0));
+ assert!(!OmniOwnership::new(2, 5).owns(0));
+ assert!(!OmniOwnership::new(0, 5).owns(1));
+ assert!(!OmniOwnership::new(2, 5).owns(1));
+ assert!(!OmniOwnership::new(0, 5).owns(2));
+ assert!(!OmniOwnership::new(1, 5).owns(2));
+ }
+
+ #[test]
+ fn omni_more_sources_than_dests_excess_sources_idle() {
+ // n_sources=5, n_dests=3: subs 3 and 4 own no destinations.
+ assert!(!OmniOwnership::new(3, 5).owns(0));
+ assert!(!OmniOwnership::new(3, 5).owns(1));
+ assert!(!OmniOwnership::new(3, 5).owns(2));
+
+ assert!(!OmniOwnership::new(4, 5).owns(0));
+ assert!(!OmniOwnership::new(4, 5).owns(1));
+ assert!(!OmniOwnership::new(4, 5).owns(2));
+ }
+}
diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs
index 4d4052701..834ebf378 100644
--- a/pgdog/src/backend/replication/logical/subscriber/stream.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs
@@ -18,6 +18,7 @@ use super::super::publisher::{tables_missing_unique_index, NonIdentityColumnsPre
use super::super::{
ensure_validation, publisher::Table, Error, TableValidationError, TableValidationErrorKind,
};
+use super::omni_ownership::OmniOwnership;
use super::StreamContext;
use crate::net::messages::replication::logical::tuple_data::{Identifier, TupleData};
use crate::net::messages::replication::logical::update::Update as XLogUpdate;
@@ -128,10 +129,13 @@ pub struct StreamSubscriber {
// Missed rows.
missed_rows: MissedRows,
+
+ // Determines which destination shards this subscriber owns for omni tables.
+ partition: OmniOwnership,
}
impl StreamSubscriber {
- pub fn new(cluster: &Cluster, tables: &[Table]) -> Self {
+ pub fn new(cluster: &Cluster, tables: &[Table], partition: OmniOwnership) -> Self {
let cluster = cluster.logical_stream();
Self {
cluster,
@@ -158,6 +162,7 @@ impl StreamSubscriber {
in_transaction: false,
missed_rows: MissedRows::default(),
keys: HashMap::default(),
+ partition,
}
}
@@ -223,6 +228,8 @@ impl StreamSubscriber {
// Dispatch a pre-built bind to the matching shard(s).
async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> {
+ // Locals avoid borrowing self inside the iter_mut closure.
+ let partition = self.partition;
let mut conns: Vec<_> = self
.connections
.iter_mut()
@@ -230,7 +237,7 @@ impl StreamSubscriber {
.filter(|(shard, _)| match val {
Shard::Direct(direct) => *shard == *direct,
Shard::Multi(multi) => multi.contains(shard),
- _ => true,
+ Shard::All => partition.owns(*shard),
})
.map(|(_, server)| server)
.collect();
@@ -995,7 +1002,7 @@ mod tests {
fn make_subscriber() -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
- StreamSubscriber::new(&cluster, &[])
+ StreamSubscriber::new(&cluster, &[], OmniOwnership::default())
}
#[test]
diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs
index c70a371ca..eb5e0b45a 100644
--- a/pgdog/src/backend/replication/logical/subscriber/tests.rs
+++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs
@@ -30,6 +30,7 @@ use crate::{
},
};
+use super::omni_ownership::OmniOwnership;
use super::stream::StreamSubscriber;
fn random_id() -> String {
@@ -254,23 +255,26 @@ fn x_update(u: XLogUpdate) -> CopyData {
fn make_subscriber() -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
let tables = vec![make_sharded_table(), make_sharded_test_b_table()];
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, OmniOwnership::default())
}
fn make_subscriber_with_tables(tables: Vec
) -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, OmniOwnership::default())
}
-fn make_subscriber_with_tables_two_databases(tables: Vec) -> StreamSubscriber {
+fn make_subscriber_with_tables_two_databases(
+ tables: Vec,
+ partition: OmniOwnership,
+) -> StreamSubscriber {
let cluster = Cluster::new_test_two_databases(&config());
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, partition)
}
fn make_subscriber_single_shard() -> StreamSubscriber {
let cluster = Cluster::new_test_single_shard(&config());
let tables = vec![make_sharded_table(), make_sharded_test_b_table()];
- StreamSubscriber::new(&cluster, &tables)
+ StreamSubscriber::new(&cluster, &tables, OmniOwnership::default())
}
/// Count rows matching the given `WHERE` predicate using a separate connection.
@@ -582,7 +586,7 @@ async fn partition_leaves_share_destination() {
leaf_b.table.parent_name = "sharded".to_string();
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b]);
+ let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b], OmniOwnership::default());
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1523,7 +1527,11 @@ fn omni_insert_copy_data(oid: Oid, a: &str, b: &str) -> CopyData {
#[tokio::test]
async fn full_identity_nothing_rejected() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_replica_identity_nothing_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_replica_identity_nothing_table()],
+ OmniOwnership::default(),
+ );
sub.connect().await.unwrap();
let oid = Oid(16390);
@@ -1557,7 +1565,11 @@ async fn full_identity_nothing_rejected() {
#[tokio::test]
async fn full_identity_omni_no_unique_index_rejected() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_omni_table()],
+ OmniOwnership::default(),
+ );
// Enforce precondition: the table must exist but have no qualifying unique index.
// A stale unique index from a prior run would make tables_missing_unique_index() return empty,
@@ -1596,7 +1608,11 @@ async fn full_identity_omni_no_unique_index_rejected() {
#[tokio::test]
async fn full_identity_insert_sharded() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1622,7 +1638,11 @@ async fn full_identity_insert_sharded() {
#[tokio::test]
async fn full_identity_update_fast_path() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1677,7 +1697,11 @@ async fn full_identity_update_fast_path() {
#[tokio::test]
async fn full_identity_update_slow_path() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1737,7 +1761,11 @@ async fn full_identity_update_slow_path() {
#[tokio::test]
async fn full_identity_update_slow_path_realistic_old_tuple() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1794,7 +1822,11 @@ async fn full_identity_update_slow_path_realistic_old_tuple() {
#[tokio::test]
async fn full_identity_update_all_toasted_is_noop() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1836,7 +1868,11 @@ async fn full_identity_update_all_toasted_is_noop() {
#[tokio::test]
async fn full_identity_delete() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_sharded_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_sharded_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
sub.connect().await.unwrap();
@@ -1874,7 +1910,11 @@ async fn full_identity_delete() {
#[tokio::test]
async fn full_identity_insert_omni_dedup() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_omni_dedup_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_omni_dedup_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
// Ensure destination table exists with unique index before relation() runs.
@@ -1933,7 +1973,11 @@ async fn full_identity_insert_omni_dedup() {
#[tokio::test]
async fn full_identity_update_duplicate_rows() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
ensure_table(&mut verify, "public.full_dup_rows").await;
@@ -1999,7 +2043,11 @@ async fn full_identity_update_duplicate_rows() {
#[tokio::test]
async fn full_identity_delete_duplicate_rows() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
ensure_table(&mut verify, "public.full_dup_rows").await;
@@ -2066,7 +2114,11 @@ async fn full_identity_delete_duplicate_rows() {
#[tokio::test]
async fn full_identity_update_matches_null_column() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
// full_dup_rows has no NOT NULL on value — we can seed a NULL row.
@@ -2127,7 +2179,11 @@ async fn full_identity_update_matches_null_column() {
#[tokio::test]
async fn full_identity_delete_matches_null_column() {
let cluster = Cluster::new_test_single_shard(&config());
- let mut sub = StreamSubscriber::new(&cluster, &[make_full_identity_dup_rows_table()]);
+ let mut sub = StreamSubscriber::new(
+ &cluster,
+ &[make_full_identity_dup_rows_table()],
+ OmniOwnership::default(),
+ );
let mut verify = test_server().await;
ensure_table(&mut verify, "public.full_dup_rows").await;
@@ -2302,7 +2358,13 @@ async fn cross_subscriber_omni_deadlock_two_databases() {
let id1 = id1.clone();
let round_barrier = Arc::clone(&round_barrier);
tokio::spawn(async move {
- let mut sub = make_subscriber_with_tables_two_databases(vec![make_settings_table()]);
+ // Each subscriber owns a disjoint subset of destination shards:
+ // sub-0 → dest-0, sub-1 → dest-1 (dest_shard % 2 == sub_idx).
+ // This is the destination-partitioned apply fix for the cross-subscriber deadlock.
+ let mut sub = make_subscriber_with_tables_two_databases(
+ vec![make_settings_table()],
+ OmniOwnership::new(sub_idx, 2),
+ );
sub.connect().await.unwrap();
// Distinct LSN ranges so neither subscriber's LSN gating skips the other's events.