Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ reload_schema_on_ddl = false
#idle_healthcheck_delay = 50000000
unique_id_function = "standard"
auth_type = "scram"
log_dedup_window = 30_000
log_dedup_threshold = 1
# log_dedup_window = 30_000
# log_dedup_threshold = 1

[memory]
net_buffer = 8096
Expand Down
6 changes: 6 additions & 0 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub struct ClusterConfig<'a> {
pub resharding_copy_retry_max_attempts: usize,
pub resharding_copy_retry_min_delay: u64,
pub regex_parser_limit: usize,
pub pub_sub_enabled: bool,
}

impl<'a> ClusterConfig<'a> {
Expand Down Expand Up @@ -220,6 +221,7 @@ impl<'a> ClusterConfig<'a> {
resharding_copy_retry_max_attempts: general.resharding_copy_retry_max_attempts,
resharding_copy_retry_min_delay: general.resharding_copy_retry_min_delay,
regex_parser_limit: general.regex_parser_limit,
pub_sub_enabled: general.pub_sub_enabled(),
}
}
}
Expand Down Expand Up @@ -260,6 +262,7 @@ impl Cluster {
resharding_copy_retry_max_attempts,
resharding_copy_retry_min_delay,
regex_parser_limit,
pub_sub_enabled,
} = config;

let identifier = Arc::new(DatabaseUser {
Expand All @@ -281,6 +284,7 @@ impl Cluster {
rw_split,
identifier: identifier.clone(),
lsn_check_interval,
pub_sub_enabled,
})
})
.collect(),
Expand Down Expand Up @@ -736,6 +740,7 @@ mod test {
rw_split: ReadWriteSplit::IncludePrimary,
identifier: identifier.clone(),
lsn_check_interval: Duration::MAX,
pub_sub_enabled: false,
})
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -823,6 +828,7 @@ mod test {
rw_split: ReadWriteSplit::default(),
identifier: identifier.clone(),
lsn_check_interval: Duration::default(),
pub_sub_enabled: false,
})],
prepared_statements: config.config.general.prepared_statements,
dry_run: config.config.general.dry_run,
Expand Down
59 changes: 43 additions & 16 deletions pgdog/src/backend/pool/shard/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A shard is a collection of replicas and an optional primary.

use arc_swap::ArcSwap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -11,7 +12,7 @@ use crate::backend::databases::User;
use crate::backend::pool::lb::ban::Ban;
use crate::backend::PubSubListener;
use crate::backend::Schema;
use crate::config::{config, LoadBalancingStrategy, ReadWriteSplit, Role};
use crate::config::{LoadBalancingStrategy, ReadWriteSplit, Role};
use crate::net::messages::BackendKeyData;
use crate::net::{NotificationResponse, Parameters};

Expand All @@ -38,6 +39,8 @@ pub(super) struct ShardConfig<'a> {
pub(super) identifier: Arc<User>,
/// LSN check interval
pub(super) lsn_check_interval: Duration,
/// Pub/sub enabled
pub(super) pub_sub_enabled: bool,
}

/// Connection pools for a single database shard.
Expand Down Expand Up @@ -109,7 +112,7 @@ impl Shard {
&self,
channel: &str,
) -> Result<broadcast::Receiver<NotificationResponse>, Error> {
if let Some(ref listener) = self.pub_sub {
if let Some(listener) = self.pub_sub.load_full().deref() {
listener.listen(channel).await
} else {
Err(Error::PubSubDisabled)
Expand All @@ -118,7 +121,7 @@ impl Shard {

/// Notify channel with optional payload (payload can be empty string).
pub async fn notify(&self, channel: &str, payload: &str) -> Result<(), Error> {
if let Some(ref listener) = self.pub_sub {
if let Some(listener) = self.pub_sub.load_full().deref() {
listener.notify(channel, payload).await
} else {
Err(Error::PubSubDisabled)
Expand Down Expand Up @@ -148,9 +151,7 @@ impl Shard {
pub fn launch(&self) {
self.lb.launch();
ShardMonitor::run(self);
if let Some(ref listener) = self.pub_sub {
listener.launch();
}
self.init_pub_sub();
}

/// Returns true if the shard has a primary database.
Expand Down Expand Up @@ -208,9 +209,7 @@ impl Shard {
/// Shutdown every pool and maintenance task in this shard.
pub fn shutdown(&self) {
self.comms.shutdown.notify_waiters();
if let Some(ref listener) = self.pub_sub {
listener.shutdown();
}
self.shutdown_pub_sub();
self.lb.shutdown();
}

Expand Down Expand Up @@ -241,6 +240,34 @@ impl Shard {
pub async fn params(&self, request: &Request) -> Result<&Parameters, Error> {
self.lb.params(request).await
}

/// (Re)initialize the pub/sub listener.
pub(crate) fn init_pub_sub(&self) {
if self.inner.pub_sub_enabled {
// Create new listener.
// This is useful if we promoted a primary
// from a replica.
let primary = self.lb.primary().cloned();
let pub_sub = primary.as_ref().map(PubSubListener::new);

// Launch the new listener first!
if let Some(ref pub_sub) = pub_sub {
pub_sub.launch();
}

// Shutdown the old listener.
if let Some(pub_sub) = self.inner.pub_sub.swap(Arc::new(pub_sub)).deref() {
pub_sub.shutdown();
}
}
}

/// Shutdown pub/sub listener.
fn shutdown_pub_sub(&self) {
if let Some(pub_sub) = self.inner.pub_sub.swap(Arc::new(None)).deref() {
pub_sub.shutdown();
}
}
}

impl Deref for Shard {
Expand All @@ -258,9 +285,10 @@ pub struct ShardInner {
number: usize,
lb: LoadBalancer,
comms: Arc<ShardComms>,
pub_sub: Option<PubSubListener>,
pub_sub: Arc<ArcSwap<Option<PubSubListener>>>,
identifier: Arc<User>,
schema: Arc<OnceCell<Schema>>,
pub_sub_enabled: bool,
}

impl ShardInner {
Expand All @@ -273,26 +301,23 @@ impl ShardInner {
rw_split,
identifier,
lsn_check_interval,
pub_sub_enabled,
} = shard;
let primary = primary.as_ref().map(Pool::new);
let lb = LoadBalancer::new(&primary, replicas, lb_strategy, rw_split);
let comms = Arc::new(ShardComms {
shutdown: Notify::new(),
lsn_check_interval,
});
let pub_sub = if config().pub_sub_enabled() {
primary.as_ref().map(PubSubListener::new)
} else {
None
};

Self {
number,
lb,
comms,
pub_sub,
pub_sub: Arc::new(ArcSwap::new(Arc::new(None))),
identifier,
schema: Arc::new(OnceCell::new()),
pub_sub_enabled,
}
}
}
Expand Down Expand Up @@ -330,6 +355,7 @@ mod test {
database: "pgdog".into(),
}),
lsn_check_interval: Duration::MAX,
pub_sub_enabled: false,
});
shard.launch();

Expand Down Expand Up @@ -368,6 +394,7 @@ mod test {
database: "pgdog".into(),
}),
lsn_check_interval: Duration::MAX,
pub_sub_enabled: false,
});
shard.launch();
let mut ids = BTreeSet::new();
Expand Down
8 changes: 7 additions & 1 deletion pgdog/src/backend/pool/shard/role_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ impl RoleDetector {
/// Detect role change in the shard.
pub(super) fn changed(&mut self) -> bool {
if self.enabled() {
self.shard.redetect_roles()
let changed = self.shard.redetect_roles();
if changed {
// Re-initialize pub/sub channel.
self.shard.init_pub_sub();
}
changed
} else {
false
}
Expand Down Expand Up @@ -78,6 +83,7 @@ mod test {
database: "pgdog".into(),
}),
lsn_check_interval: Duration::MAX,
pub_sub_enabled: false,
})
}

Expand Down
Loading