diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 3bf80f794..55dddd04b 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -111,6 +111,9 @@ struct ClusterWorker { // Channel used to receive server events server_events_channel: tokio::sync::mpsc::Receiver, + // Channel used to receive signals that control connection is broken + control_connection_repair_channel: tokio::sync::broadcast::Receiver<()>, + // Keyspace send in "USE " when opening each connection used_keyspace: Option, @@ -147,9 +150,12 @@ impl Cluster { let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32); let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32); let (server_events_sender, server_events_receiver) = tokio::sync::mpsc::channel(32); + let (control_connection_repair_sender, control_connection_repair_receiver) = + tokio::sync::broadcast::channel(32); let mut metadata_reader = MetadataReader::new( known_nodes, + control_connection_repair_sender, initial_peers, pool_config.connection_config.clone(), pool_config.keepalive_interval, @@ -180,6 +186,7 @@ impl Cluster { refresh_channel: refresh_receiver, server_events_channel: server_events_receiver, + control_connection_repair_channel: control_connection_repair_receiver, use_keyspace_channel: use_keyspace_receiver, used_keyspace: None, @@ -538,6 +545,26 @@ impl ClusterWorker { continue; // Don't go to refreshing, wait for the next event } + recv_res = self.control_connection_repair_channel.recv() => { + match recv_res { + Ok(()) => { + // The control connection was broken. Acknowledge that and start attempting to reconnect. + // The first reconnect attempt will be immediate (by attempting metadata refresh below), + // and if it does not succeed, then `control_connection_works` will be set to `false`, + // so subsequent attempts will be issued every second. + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + // This is very unlikely; we would have to have a lot of concurrent + // control connections opened and broken at the same time. + // The best we can do is ignoring this. + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + // If control_connection_repair_channel was closed then MetadataReader was dropped, + // we can stop working. + return; + } + } + } } // Perform the refresh diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index 265155c74..f26ea36ac 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -25,7 +25,8 @@ use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::{Arc, RwLock, Weak}; use std::time::Duration; -use tokio::sync::{mpsc, Notify}; + +use tokio::sync::{broadcast, mpsc, Notify}; use tracing::instrument::WithSubscriber; use tracing::{debug, trace, warn}; @@ -169,6 +170,7 @@ impl NodeConnectionPool { endpoint: UntranslatedEndpoint, #[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature current_keyspace: Option, + pool_empty_notifier: broadcast::Sender<()>, ) -> Self { let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1); let pool_updated_notify = Arc::new(Notify::new()); @@ -205,6 +207,7 @@ impl NodeConnectionPool { pool_config, current_keyspace, pool_updated_notify.clone(), + pool_empty_notifier, ); let conns = refiller.get_shared_connections(); @@ -472,6 +475,9 @@ struct PoolRefiller { // Signaled when the connection pool is updated pool_updated_notify: Arc, + + // Signaled when the connection pool becomes empty + pool_empty_notifier: broadcast::Sender<()>, } #[derive(Debug)] @@ -486,6 +492,7 @@ impl PoolRefiller { pool_config: PoolConfig, current_keyspace: Option, pool_updated_notify: Arc, + pool_empty_notifier: broadcast::Sender<()>, ) -> Self { // At the beginning, we assume the node does not have any shards // and assume that the node is a Cassandra node @@ -513,6 +520,7 @@ impl PoolRefiller { current_keyspace, pool_updated_notify, + pool_empty_notifier, } } @@ -1037,6 +1045,9 @@ impl PoolRefiller { self.conns[shard_id].len(), self.active_connection_count(), ); + if !self.has_connections() { + let _ = self.pool_empty_notifier.send(()); + } self.update_shared_conns(Some(last_error)); return; } diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index c77ee6db7..97b267946 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -103,8 +103,16 @@ impl Node { let address = peer.address; let datacenter = peer.datacenter.clone(); let rack = peer.rack.clone(); + + // We aren't interested in the fact that the pool becomes empty, so we immediately drop the receiving part. + let (pool_empty_notifier, _) = tokio::sync::broadcast::channel(1); let pool = enabled.then(|| { - NodeConnectionPool::new(UntranslatedEndpoint::Peer(peer), pool_config, keyspace_name) + NodeConnectionPool::new( + UntranslatedEndpoint::Peer(peer), + pool_config, + keyspace_name, + pool_empty_notifier, + ) }); Node { diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index b9c146c22..9b40befd5 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -27,7 +27,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use strum_macros::EnumString; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, trace, warn}; use uuid::Uuid; @@ -50,6 +50,10 @@ pub(crate) struct MetadataReader { // When no known peer is reachable, initial known nodes are resolved once again as a fallback // and establishing control connection to them is attempted. initial_known_nodes: Vec, + + // When a control connection breaks, the PoolRefiller of its pool uses the requester + // to signal ClusterWorker that an immediate metadata refresh is advisable. + control_connection_repair_requester: broadcast::Sender<()>, } /// Describes all metadata retrieved from the cluster @@ -398,6 +402,7 @@ impl MetadataReader { #[allow(clippy::too_many_arguments)] pub(crate) fn new( initial_known_nodes: Vec, + control_connection_repair_requester: broadcast::Sender<()>, initially_known_peers: Vec, mut connection_config: ConnectionConfig, keepalive_interval: Option, @@ -422,6 +427,7 @@ impl MetadataReader { control_connection_endpoint.clone(), connection_config.clone(), keepalive_interval, + control_connection_repair_requester.clone(), ); MetadataReader { @@ -437,6 +443,7 @@ impl MetadataReader { fetch_schema, host_filter: host_filter.clone(), initial_known_nodes, + control_connection_repair_requester, } } @@ -547,6 +554,7 @@ impl MetadataReader { self.control_connection_endpoint.clone(), self.connection_config.clone(), self.keepalive_interval, + self.control_connection_repair_requester.clone(), ); debug!( @@ -646,6 +654,7 @@ impl MetadataReader { self.control_connection_endpoint.clone(), self.connection_config.clone(), self.keepalive_interval, + self.control_connection_repair_requester.clone(), ); } } @@ -656,6 +665,7 @@ impl MetadataReader { endpoint: UntranslatedEndpoint, connection_config: ConnectionConfig, keepalive_interval: Option, + refresh_requester: broadcast::Sender<()>, ) -> NodeConnectionPool { let pool_config = PoolConfig { connection_config, @@ -669,7 +679,7 @@ impl MetadataReader { can_use_shard_aware_port: false, }; - NodeConnectionPool::new(endpoint, pool_config, None) + NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester) } }