Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing::{debug, info, warn};
use super::super::{ensure_validation, publisher::Table, Error};
use super::ReplicationSlot;

use crate::backend::replication::logical::subscriber::omni_ownership::OmniOwnership;
use crate::backend::replication::logical::subscriber::stream::StreamSubscriber;
use crate::backend::replication::publisher::progress::Progress;
use crate::backend::replication::publisher::Lsn;
Expand Down Expand Up @@ -196,6 +197,7 @@ impl Publisher {
self.create_slots(source).await?;
}

let n_sources = source.shards().len();
for (number, _) in source.shards().iter().enumerate() {
// Use table offsets from data sync
// or from loading them above.
Expand All @@ -204,7 +206,10 @@ impl Publisher {
.get(&number)
.ok_or(Error::NoReplicationTables(number))?;
// Handles the logical replication stream messages.
let mut stream = StreamSubscriber::new(dest, tables);
// Each subscriber owns a partition of destination shards for omni-table DML
// (dest_shard % n_sources == source_shard), preventing cross-subscriber deadlocks.
let mut stream =
StreamSubscriber::new(dest, tables, OmniOwnership::new(number, n_sources));

// Take ownership of the slot for replication.
let mut slot = self
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/replication/logical/subscriber/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod context;
pub mod copy;
pub mod omni_ownership;
pub mod parallel_connection;
pub mod stream;

Expand Down
112 changes: 112 additions & 0 deletions pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/// Controls which destination shards a subscriber writes to for omni (unsharded) tables.
///
/// Partitions destinations via `dest_shard % n_sources == source_shard` so that each
/// subscriber owns a disjoint subset, preventing cross-subscriber row-lock deadlocks.
#[derive(Debug, Clone, Copy)]
pub struct OmniOwnership {
source_shard: usize,
n_sources: usize,
}

impl OmniOwnership {
pub fn new(source_shard: usize, n_sources: usize) -> Self {
Self {
source_shard,
n_sources,
}
}

/// Returns true if this subscriber should write omni-table DML to `dest_shard`.
pub fn owns(&self, dest_shard: usize) -> bool {
if self.n_sources <= 1 {
return true;
}
dest_shard % self.n_sources == self.source_shard
}
}

impl Default for OmniOwnership {
fn default() -> Self {
Self::new(0, 1)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn omni_single_source_owns_all_dests() {
let p = OmniOwnership::new(0, 1);
assert!(p.owns(0));
assert!(p.owns(1));
assert!(p.owns(2));
assert!(p.owns(7));
}

#[test]
fn omni_zero_sources_owns_all_dests() {
let p = OmniOwnership::new(0, 0);
assert!(p.owns(0));
assert!(p.owns(1));
assert!(p.owns(3));
}

#[test]
fn omni_equal_sources_and_dests() {
// n_sources == n_dests == 3: strict 1:1, each source owns only its own index.
assert!(OmniOwnership::new(0, 3).owns(0));
assert!(OmniOwnership::new(1, 3).owns(1));
assert!(OmniOwnership::new(2, 3).owns(2));

assert!(!OmniOwnership::new(1, 3).owns(0));
assert!(!OmniOwnership::new(2, 3).owns(0));
assert!(!OmniOwnership::new(0, 3).owns(1));
assert!(!OmniOwnership::new(2, 3).owns(1));
assert!(!OmniOwnership::new(0, 3).owns(2));
assert!(!OmniOwnership::new(1, 3).owns(2));
}

#[test]
fn omni_fewer_sources_than_dests() {
// n_sources=2, n_dests=4: sub-0 owns even dests, sub-1 owns odd dests.
let p0 = OmniOwnership::new(0, 2);
assert!(p0.owns(0));
assert!(p0.owns(2));
assert!(!p0.owns(1));
assert!(!p0.owns(3));

let p1 = OmniOwnership::new(1, 2);
assert!(p1.owns(1));
assert!(p1.owns(3));
assert!(!p1.owns(0));
assert!(!p1.owns(2));
}

#[test]
fn omni_more_sources_than_dests_all_dests_covered() {
// n_sources=5, n_dests=3: subs 0-2 each own their matching dest exclusively.
assert!(OmniOwnership::new(0, 5).owns(0));
assert!(OmniOwnership::new(1, 5).owns(1));
assert!(OmniOwnership::new(2, 5).owns(2));

assert!(!OmniOwnership::new(1, 5).owns(0));
assert!(!OmniOwnership::new(2, 5).owns(0));
assert!(!OmniOwnership::new(0, 5).owns(1));
assert!(!OmniOwnership::new(2, 5).owns(1));
assert!(!OmniOwnership::new(0, 5).owns(2));
assert!(!OmniOwnership::new(1, 5).owns(2));
}

#[test]
fn omni_more_sources_than_dests_excess_sources_idle() {
// n_sources=5, n_dests=3: subs 3 and 4 own no destinations.
assert!(!OmniOwnership::new(3, 5).owns(0));
assert!(!OmniOwnership::new(3, 5).owns(1));
assert!(!OmniOwnership::new(3, 5).owns(2));

assert!(!OmniOwnership::new(4, 5).owns(0));
assert!(!OmniOwnership::new(4, 5).owns(1));
assert!(!OmniOwnership::new(4, 5).owns(2));
}
}
13 changes: 10 additions & 3 deletions pgdog/src/backend/replication/logical/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::super::publisher::{tables_missing_unique_index, NonIdentityColumnsPre
use super::super::{
ensure_validation, publisher::Table, Error, TableValidationError, TableValidationErrorKind,
};
use super::omni_ownership::OmniOwnership;
use super::StreamContext;
use crate::net::messages::replication::logical::tuple_data::{Identifier, TupleData};
use crate::net::messages::replication::logical::update::Update as XLogUpdate;
Expand Down Expand Up @@ -128,10 +129,13 @@ pub struct StreamSubscriber {

// Missed rows.
missed_rows: MissedRows,

// Determines which destination shards this subscriber owns for omni tables.
partition: OmniOwnership,
}

impl StreamSubscriber {
pub fn new(cluster: &Cluster, tables: &[Table]) -> Self {
pub fn new(cluster: &Cluster, tables: &[Table], partition: OmniOwnership) -> Self {
let cluster = cluster.logical_stream();
Self {
cluster,
Expand All @@ -158,6 +162,7 @@ impl StreamSubscriber {
in_transaction: false,
missed_rows: MissedRows::default(),
keys: HashMap::default(),
partition,
}
}

Expand Down Expand Up @@ -223,14 +228,16 @@ impl StreamSubscriber {

// Dispatch a pre-built bind to the matching shard(s).
async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> {
// Locals avoid borrowing self inside the iter_mut closure.
let partition = self.partition;
let mut conns: Vec<_> = self
.connections
.iter_mut()
.enumerate()
.filter(|(shard, _)| match val {
Shard::Direct(direct) => *shard == *direct,
Shard::Multi(multi) => multi.contains(shard),
_ => true,
Shard::All => partition.owns(*shard),
})
.map(|(_, server)| server)
.collect();
Expand Down Expand Up @@ -995,7 +1002,7 @@ mod tests {

fn make_subscriber() -> StreamSubscriber {
let cluster = Cluster::new_test(&config());
StreamSubscriber::new(&cluster, &[])
StreamSubscriber::new(&cluster, &[], OmniOwnership::default())
}

#[test]
Expand Down
Loading
Loading