Skip to content

Commit

Permalink
cluster: attempt recreate control conn immediately
Browse files Browse the repository at this point in the history
Until now, if all nodes changed their IPs at once, we would discover
them only after the next metadata fetch is issued, which might happen
only after 60 seconds (if previous fetch succeeded).
This commit introduces immediate signalling that the control connection
got broken, so that ClusterWorker begins instantly its
every-1-second-attempt phase.

In manual tests, this showed to be very robust: immediately after losing
control connection, the reconnect-attempt-phase begins. As soon as any
node (one known initially or from recently fetched metadata) becomes
reachable, a control connection is opened and metadata is fetched
successfully, so the whole cluster is discoverable.
  • Loading branch information
wprzytula committed Jul 31, 2023
1 parent f0debd7 commit 914dbd9
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
26 changes: 26 additions & 0 deletions scylla/src/transport/cluster.rs
Expand Up @@ -110,6 +110,9 @@ struct ClusterWorker {
// Channel used to receive server events
server_events_channel: tokio::sync::mpsc::Receiver<Event>,

// Channel used to receive signals that control connection is broken
control_connection_repair_channel: tokio::sync::broadcast::Receiver<()>,

// Keyspace send in "USE <keyspace name>" when opening each connection
used_keyspace: Option<VerifiedKeyspaceName>,

Expand Down Expand Up @@ -141,9 +144,12 @@ impl Cluster {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
let (server_events_sender, server_events_receiver) = tokio::sync::mpsc::channel(32);
let (control_connection_repair_sender, control_connection_repair_receiver) =
tokio::sync::broadcast::channel(32);

let mut metadata_reader = MetadataReader::new(
known_nodes,
control_connection_repair_sender,
initial_peers,
pool_config.connection_config.clone(),
pool_config.keepalive_interval,
Expand Down Expand Up @@ -174,6 +180,7 @@ impl Cluster {

refresh_channel: refresh_receiver,
server_events_channel: server_events_receiver,
control_connection_repair_channel: control_connection_repair_receiver,

use_keyspace_channel: use_keyspace_receiver,
used_keyspace: None,
Expand Down Expand Up @@ -542,6 +549,25 @@ impl ClusterWorker {

continue; // Don't go to refreshing, wait for the next event
}
recv_res = self.control_connection_repair_channel.recv() => {
match recv_res {
Ok(()) => {
// The control connection was broken. Acknowledge that and start attempting to reconnect.
// The first reconnect attempt will be immediate (by attempting metadata refresh below),
// and if it does not succeed, then `control_connection_works` will be set to `false`,
// so subsequent attempts will be issued every second.
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// This is very unlikely; we would have to have a lot of concurrent
// control connections opened and broken at the same time.
// The best we can do is ignoring this.
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
// If control_connection_repair_channel was closed then MetadataReader was dropped,
// we can stop working.
}
}
}
}

// Perform the refresh
Expand Down
20 changes: 19 additions & 1 deletion scylla/src/transport/connection_pool.rs
Expand Up @@ -25,7 +25,8 @@ use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Notify};

use tokio::sync::{broadcast, mpsc, Notify};
use tracing::instrument::WithSubscriber;
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -169,6 +170,7 @@ impl NodeConnectionPool {
endpoint: UntranslatedEndpoint,
#[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature
current_keyspace: Option<VerifiedKeyspaceName>,
refresh_requester: Option<broadcast::Sender<()>>,
) -> Self {
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);
let pool_updated_notify = Arc::new(Notify::new());
Expand Down Expand Up @@ -205,6 +207,7 @@ impl NodeConnectionPool {
pool_config,
current_keyspace,
pool_updated_notify.clone(),
refresh_requester,
);

let conns = refiller.get_shared_connections();
Expand Down Expand Up @@ -472,6 +475,12 @@ struct PoolRefiller {

// Signaled when the connection pool is updated
pool_updated_notify: Arc<Notify>,

// Enables requesting metadata refresh from ClusterWorker.
// This is useful when a control connection breaks: an empty pool of the control connection
// immediately triggers reestablishing the control connection. Without this, reestablishment
// would occur only after metadata refresh timeout (60s by default) elapses.
control_connection_repair_requester: Option<broadcast::Sender<()>>,
}

#[derive(Debug)]
Expand All @@ -486,6 +495,7 @@ impl PoolRefiller {
pool_config: PoolConfig,
current_keyspace: Option<VerifiedKeyspaceName>,
pool_updated_notify: Arc<Notify>,
control_connection_repair_requester: Option<broadcast::Sender<()>>,
) -> Self {
// At the beginning, we assume the node does not have any shards
// and assume that the node is a Cassandra node
Expand Down Expand Up @@ -513,6 +523,7 @@ impl PoolRefiller {
current_keyspace,

pool_updated_notify,
control_connection_repair_requester,
}
}

Expand Down Expand Up @@ -1037,6 +1048,13 @@ impl PoolRefiller {
self.conns[shard_id].len(),
self.active_connection_count(),
);
if !self.has_connections() {
// The last connection got closed, so if this is a control connection,
// then request a metadata refresh to trigger the control connection reestablishment.
if let Some(requester) = self.control_connection_repair_requester.as_ref() {
let _ = requester.send(());
}
}
self.update_shared_conns(Some(last_error));
return;
}
Expand Down
7 changes: 6 additions & 1 deletion scylla/src/transport/node.rs
Expand Up @@ -104,7 +104,12 @@ impl Node {
let datacenter = peer.datacenter.clone();
let rack = peer.rack.clone();
let pool = enabled.then(|| {
NodeConnectionPool::new(UntranslatedEndpoint::Peer(peer), pool_config, keyspace_name)
NodeConnectionPool::new(
UntranslatedEndpoint::Peer(peer),
pool_config,
keyspace_name,
None,
)
});

Node {
Expand Down
14 changes: 12 additions & 2 deletions scylla/src/transport/topology.rs
Expand Up @@ -27,7 +27,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum_macros::EnumString;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, trace, warn};
use uuid::Uuid;

Expand All @@ -50,6 +50,10 @@ pub(crate) struct MetadataReader {
// When no known peer is reachable, initial known nodes are resolved once again as a fallback
// and establishing control connection to them is attempted.
initial_known_nodes: Vec<KnownNode>,

// When a control connection breaks, the PoolRefiller of its pool uses the requester
// to signal ClusterWorker that an immediate metadata refresh is advisable.
control_connection_repair_requester: broadcast::Sender<()>,
}

/// Describes all metadata retrieved from the cluster
Expand Down Expand Up @@ -386,6 +390,7 @@ impl MetadataReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
initial_known_nodes: Vec<KnownNode>,
control_connection_repair_requester: broadcast::Sender<()>,
initially_known_peers: Vec<ResolvedContactPoint>,
mut connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
Expand All @@ -410,6 +415,7 @@ impl MetadataReader {
control_connection_endpoint.clone(),
connection_config.clone(),
keepalive_interval,
control_connection_repair_requester.clone(),
);

MetadataReader {
Expand All @@ -425,6 +431,7 @@ impl MetadataReader {
fetch_schema,
host_filter: host_filter.clone(),
initial_known_nodes,
control_connection_repair_requester,
}
}

Expand Down Expand Up @@ -530,6 +537,7 @@ impl MetadataReader {
self.control_connection_endpoint.clone(),
self.connection_config.clone(),
self.keepalive_interval,
self.control_connection_repair_requester.clone(),
);

debug!(
Expand Down Expand Up @@ -629,6 +637,7 @@ impl MetadataReader {
self.control_connection_endpoint.clone(),
self.connection_config.clone(),
self.keepalive_interval,
self.control_connection_repair_requester.clone(),
);
}
}
Expand All @@ -639,6 +648,7 @@ impl MetadataReader {
endpoint: UntranslatedEndpoint,
connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
refresh_requester: broadcast::Sender<()>,
) -> NodeConnectionPool {
let pool_config = PoolConfig {
connection_config,
Expand All @@ -652,7 +662,7 @@ impl MetadataReader {
can_use_shard_aware_port: false,
};

NodeConnectionPool::new(endpoint, pool_config, None)
NodeConnectionPool::new(endpoint, pool_config, None, Some(refresh_requester))
}
}

Expand Down

0 comments on commit 914dbd9

Please sign in to comment.