Skip to content

Commit

Permalink
cluster: attempt recreate control conn immediately
Browse files Browse the repository at this point in the history
Until now, if all nodes changed their IPs at once, we would discover
them only after the next metadata fetch is issued, which might happen
only after 60 seconds (if previous fetch succeeded).
This commit introduces immediate signalling that the control connection
got broken, so that ClusterWorker begins instantly its
every-1-second-attempt phase.

In manual tests, this showed to be very robust: immediately after losing
control connection, the reconnect-attempt-phase begins. As soon as any
node (one known initially or from recently fetched metadata) becomes
reachable, a control connection is opened and metadata is fetched
successfully, so the whole cluster is discoverable.
  • Loading branch information
wprzytula committed Aug 28, 2023
1 parent c721f5a commit eceafc7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
27 changes: 27 additions & 0 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ struct ClusterWorker {
// Channel used to receive server events
server_events_channel: tokio::sync::mpsc::Receiver<Event>,

// Channel used to receive signals that control connection is broken
control_connection_repair_channel: tokio::sync::broadcast::Receiver<()>,

// Keyspace send in "USE <keyspace name>" when opening each connection
used_keyspace: Option<VerifiedKeyspaceName>,

Expand Down Expand Up @@ -147,9 +150,12 @@ impl Cluster {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
let (server_events_sender, server_events_receiver) = tokio::sync::mpsc::channel(32);
let (control_connection_repair_sender, control_connection_repair_receiver) =
tokio::sync::broadcast::channel(32);

let mut metadata_reader = MetadataReader::new(
known_nodes,
control_connection_repair_sender,
initial_peers,
pool_config.connection_config.clone(),
pool_config.keepalive_interval,
Expand Down Expand Up @@ -180,6 +186,7 @@ impl Cluster {

refresh_channel: refresh_receiver,
server_events_channel: server_events_receiver,
control_connection_repair_channel: control_connection_repair_receiver,

use_keyspace_channel: use_keyspace_receiver,
used_keyspace: None,
Expand Down Expand Up @@ -538,6 +545,26 @@ impl ClusterWorker {

continue; // Don't go to refreshing, wait for the next event
}
recv_res = self.control_connection_repair_channel.recv() => {
match recv_res {
Ok(()) => {
// The control connection was broken. Acknowledge that and start attempting to reconnect.
// The first reconnect attempt will be immediate (by attempting metadata refresh below),
// and if it does not succeed, then `control_connection_works` will be set to `false`,
// so subsequent attempts will be issued every second.
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// This is very unlikely; we would have to have a lot of concurrent
// control connections opened and broken at the same time.
// The best we can do is ignoring this.
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
// If control_connection_repair_channel was closed then MetadataReader was dropped,
// we can stop working.
return;
}
}
}
}

// Perform the refresh
Expand Down
13 changes: 12 additions & 1 deletion scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Notify};

use tokio::sync::{broadcast, mpsc, Notify};
use tracing::instrument::WithSubscriber;
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -169,6 +170,7 @@ impl NodeConnectionPool {
endpoint: UntranslatedEndpoint,
#[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature
current_keyspace: Option<VerifiedKeyspaceName>,
pool_empty_notifier: broadcast::Sender<()>,
) -> Self {
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);
let pool_updated_notify = Arc::new(Notify::new());
Expand Down Expand Up @@ -205,6 +207,7 @@ impl NodeConnectionPool {
pool_config,
current_keyspace,
pool_updated_notify.clone(),
pool_empty_notifier,
);

let conns = refiller.get_shared_connections();
Expand Down Expand Up @@ -472,6 +475,9 @@ struct PoolRefiller {

// Signaled when the connection pool is updated
pool_updated_notify: Arc<Notify>,

// Signaled when the connection pool becomes empty
pool_empty_notifier: broadcast::Sender<()>,
}

#[derive(Debug)]
Expand All @@ -486,6 +492,7 @@ impl PoolRefiller {
pool_config: PoolConfig,
current_keyspace: Option<VerifiedKeyspaceName>,
pool_updated_notify: Arc<Notify>,
pool_empty_notifier: broadcast::Sender<()>,
) -> Self {
// At the beginning, we assume the node does not have any shards
// and assume that the node is a Cassandra node
Expand Down Expand Up @@ -513,6 +520,7 @@ impl PoolRefiller {
current_keyspace,

pool_updated_notify,
pool_empty_notifier,
}
}

Expand Down Expand Up @@ -1037,6 +1045,9 @@ impl PoolRefiller {
self.conns[shard_id].len(),
self.active_connection_count(),
);
if !self.has_connections() {
let _ = self.pool_empty_notifier.send(());
}
self.update_shared_conns(Some(last_error));
return;
}
Expand Down
10 changes: 9 additions & 1 deletion scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,16 @@ impl Node {
let address = peer.address;
let datacenter = peer.datacenter.clone();
let rack = peer.rack.clone();

// We aren't interested in the fact that the pool becomes empty, so we immediately drop the receiving part.
let (pool_empty_notifier, _) = tokio::sync::broadcast::channel(1);
let pool = enabled.then(|| {
NodeConnectionPool::new(UntranslatedEndpoint::Peer(peer), pool_config, keyspace_name)
NodeConnectionPool::new(
UntranslatedEndpoint::Peer(peer),
pool_config,
keyspace_name,
pool_empty_notifier,
)
});

Node {
Expand Down
14 changes: 12 additions & 2 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum_macros::EnumString;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, trace, warn};
use uuid::Uuid;

Expand All @@ -50,6 +50,10 @@ pub(crate) struct MetadataReader {
// When no known peer is reachable, initial known nodes are resolved once again as a fallback
// and establishing control connection to them is attempted.
initial_known_nodes: Vec<KnownNode>,

// When a control connection breaks, the PoolRefiller of its pool uses the requester
// to signal ClusterWorker that an immediate metadata refresh is advisable.
control_connection_repair_requester: broadcast::Sender<()>,
}

/// Describes all metadata retrieved from the cluster
Expand Down Expand Up @@ -398,6 +402,7 @@ impl MetadataReader {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
initial_known_nodes: Vec<KnownNode>,
control_connection_repair_requester: broadcast::Sender<()>,
initially_known_peers: Vec<ResolvedContactPoint>,
mut connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
Expand All @@ -422,6 +427,7 @@ impl MetadataReader {
control_connection_endpoint.clone(),
connection_config.clone(),
keepalive_interval,
control_connection_repair_requester.clone(),
);

MetadataReader {
Expand All @@ -437,6 +443,7 @@ impl MetadataReader {
fetch_schema,
host_filter: host_filter.clone(),
initial_known_nodes,
control_connection_repair_requester,
}
}

Expand Down Expand Up @@ -547,6 +554,7 @@ impl MetadataReader {
self.control_connection_endpoint.clone(),
self.connection_config.clone(),
self.keepalive_interval,
self.control_connection_repair_requester.clone(),
);

debug!(
Expand Down Expand Up @@ -646,6 +654,7 @@ impl MetadataReader {
self.control_connection_endpoint.clone(),
self.connection_config.clone(),
self.keepalive_interval,
self.control_connection_repair_requester.clone(),
);
}
}
Expand All @@ -656,6 +665,7 @@ impl MetadataReader {
endpoint: UntranslatedEndpoint,
connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
refresh_requester: broadcast::Sender<()>,
) -> NodeConnectionPool {
let pool_config = PoolConfig {
connection_config,
Expand All @@ -669,7 +679,7 @@ impl MetadataReader {
can_use_shard_aware_port: false,
};

NodeConnectionPool::new(endpoint, pool_config, None)
NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester)
}
}

Expand Down

0 comments on commit eceafc7

Please sign in to comment.