Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-resolve hostnames as fallback when control connection is broken #770

Merged
merged 10 commits into from Aug 28, 2023
57 changes: 43 additions & 14 deletions scylla/src/transport/cluster.rs
Expand Up @@ -17,7 +17,7 @@ use arc_swap::ArcSwap;
use futures::future::join_all;
use futures::{future::RemoteHandle, FutureExt};
use itertools::Itertools;
use scylla_cql::errors::BadQuery;
use scylla_cql::errors::{BadQuery, NewSessionError};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -26,14 +26,7 @@ use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

use super::node::NodeAddr;

#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ContactPoint {
pub address: SocketAddr,
pub datacenter: Option<String>,
}
use super::node::{KnownNode, NodeAddr};

use super::locator::ReplicaLocator;
use super::partitioner::calculate_token_for_partition_key;
Expand Down Expand Up @@ -118,6 +111,9 @@ struct ClusterWorker {
// Channel used to receive server events
server_events_channel: tokio::sync::mpsc::Receiver<Event>,

// Channel used to receive signals that control connection is broken
control_connection_repair_channel: tokio::sync::broadcast::Receiver<()>,

// Keyspace send in "USE <keyspace name>" when opening each connection
used_keyspace: Option<VerifiedKeyspaceName>,

Expand All @@ -143,26 +139,30 @@ struct UseKeyspaceRequest {

impl Cluster {
pub(crate) async fn new(
initial_peers: Vec<ContactPoint>,
known_nodes: Vec<KnownNode>,
pool_config: PoolConfig,
keyspaces_to_fetch: Vec<String>,
fetch_schema_metadata: bool,
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
) -> Result<Cluster, QueryError> {
) -> Result<Cluster, NewSessionError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
let (server_events_sender, server_events_receiver) = tokio::sync::mpsc::channel(32);
let (control_connection_repair_sender, control_connection_repair_receiver) =
tokio::sync::broadcast::channel(32);

let mut metadata_reader = MetadataReader::new(
initial_peers,
known_nodes,
control_connection_repair_sender,
pool_config.connection_config.clone(),
pool_config.keepalive_interval,
server_events_sender,
keyspaces_to_fetch,
fetch_schema_metadata,
&host_filter,
);
)
.await?;

let metadata = metadata_reader.read_metadata(true).await?;
let cluster_data = ClusterData::new(
Expand All @@ -185,6 +185,7 @@ impl Cluster {

refresh_channel: refresh_receiver,
server_events_channel: server_events_receiver,
control_connection_repair_channel: control_connection_repair_receiver,

use_keyspace_channel: use_keyspace_receiver,
used_keyspace: None,
Expand Down Expand Up @@ -479,14 +480,20 @@ impl ClusterWorker {
pub(crate) async fn work(mut self) {
use tokio::time::Instant;

let control_connection_repair_duration = Duration::from_secs(1); // Attempt control connection repair every second
let mut last_refresh_time = Instant::now();
let mut control_connection_works = true;

loop {
let mut cur_request: Option<RefreshRequest> = None;

// Wait until it's time for the next refresh
let sleep_until: Instant = last_refresh_time
.checked_add(self.cluster_metadata_refresh_interval)
.checked_add(if control_connection_works {
self.cluster_metadata_refresh_interval
} else {
control_connection_repair_duration
})
.unwrap_or_else(Instant::now);

let sleep_future = tokio::time::sleep_until(sleep_until);
Expand Down Expand Up @@ -537,13 +544,35 @@ impl ClusterWorker {

continue; // Don't go to refreshing, wait for the next event
}
recv_res = self.control_connection_repair_channel.recv() => {
match recv_res {
Ok(()) => {
// The control connection was broken. Acknowledge that and start attempting to reconnect.
// The first reconnect attempt will be immediate (by attempting metadata refresh below),
// and if it does not succeed, then `control_connection_works` will be set to `false`,
// so subsequent attempts will be issued every second.
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// This is very unlikely; we would have to have a lot of concurrent
// control connections opened and broken at the same time.
// The best we can do is ignoring this.
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
// If control_connection_repair_channel was closed then MetadataReader was dropped,
// we can stop working.
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
}
}

// Perform the refresh
debug!("Requesting topology refresh");
last_refresh_time = Instant::now();
let refresh_res = self.perform_refresh().await;

control_connection_works = refresh_res.is_ok();

// Send refresh result if there was a request
if let Some(request) = cur_request {
// We can ignore sending error - if no one waits for the response we can drop it
Expand Down
12 changes: 6 additions & 6 deletions scylla/src/transport/connection.rs
Expand Up @@ -1764,8 +1764,8 @@ mod tests {

use super::ConnectionConfig;
use crate::query::Query;
use crate::transport::cluster::ContactPoint;
use crate::transport::connection::open_connection;
use crate::transport::node::ResolvedContactPoint;
use crate::transport::topology::UntranslatedEndpoint;
use crate::utils::test_utils::unique_keyspace_name;
use crate::{IntoTypedRows, SessionBuilder};
Expand Down Expand Up @@ -1801,7 +1801,7 @@ mod tests {
let addr: SocketAddr = resolve_hostname(&uri).await;

let (connection, _) = super::open_connection(
UntranslatedEndpoint::ContactPoint(ContactPoint {
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: addr,
datacenter: None,
}),
Expand Down Expand Up @@ -1919,7 +1919,7 @@ mod tests {

let subtest = |enable_coalescing: bool, ks: String| async move {
let (connection, _) = super::open_connection(
UntranslatedEndpoint::ContactPoint(ContactPoint {
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: addr,
datacenter: None,
}),
Expand Down Expand Up @@ -2049,15 +2049,15 @@ mod tests {

// We must interrupt the driver's full connection opening, because our proxy does not interact further after Startup.
let (startup_without_lwt_optimisation, _shard) = select! {
_ = open_connection(UntranslatedEndpoint::ContactPoint(ContactPoint{address: proxy_addr, datacenter: None}), None, config.clone()) => unreachable!(),
_ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, config.clone()) => unreachable!(),
startup = startup_rx.recv() => startup.unwrap(),
};

proxy.running_nodes[0]
.change_request_rules(Some(make_rules(options_with_lwt_optimisation_support)));

let (startup_with_lwt_optimisation, _shard) = select! {
_ = open_connection(UntranslatedEndpoint::ContactPoint(ContactPoint{address: proxy_addr, datacenter: None}), None, config.clone()) => unreachable!(),
_ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, config.clone()) => unreachable!(),
startup = startup_rx.recv() => startup.unwrap(),
};

Expand Down Expand Up @@ -2112,7 +2112,7 @@ mod tests {

// Setup connection normally, without obstruction
let (conn, mut error_receiver) = open_connection(
UntranslatedEndpoint::ContactPoint(ContactPoint {
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: proxy_addr,
datacenter: None,
}),
Expand Down
23 changes: 17 additions & 6 deletions scylla/src/transport/connection_pool.rs
Expand Up @@ -9,10 +9,10 @@ use crate::transport::{
};

#[cfg(feature = "cloud")]
use super::session::resolve_hostname;
use super::node::resolve_hostname;

#[cfg(feature = "cloud")]
use super::cluster::ContactPoint;
use super::node::ResolvedContactPoint;
use super::topology::{PeerEndpoint, UntranslatedEndpoint};
use super::NodeAddr;

Expand All @@ -25,7 +25,8 @@ use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Notify};

use tokio::sync::{broadcast, mpsc, Notify};
use tracing::instrument::WithSubscriber;
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -169,14 +170,15 @@ impl NodeConnectionPool {
endpoint: UntranslatedEndpoint,
#[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature
current_keyspace: Option<VerifiedKeyspaceName>,
pool_empty_notifier: broadcast::Sender<()>,
) -> Self {
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);
let pool_updated_notify = Arc::new(Notify::new());

#[cfg(feature = "cloud")]
if pool_config.connection_config.cloud_config.is_some() {
let (host_id, address, dc) = match endpoint {
UntranslatedEndpoint::ContactPoint(ContactPoint {
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address,
ref datacenter,
}) => (None, address, datacenter.as_deref()), // FIXME: Pass DC in ContactPoint
Expand Down Expand Up @@ -205,6 +207,7 @@ impl NodeConnectionPool {
pool_config,
current_keyspace,
pool_updated_notify.clone(),
pool_empty_notifier,
);

let conns = refiller.get_shared_connections();
Expand Down Expand Up @@ -472,6 +475,9 @@ struct PoolRefiller {

// Signaled when the connection pool is updated
pool_updated_notify: Arc<Notify>,

// Signaled when the connection pool becomes empty
pool_empty_notifier: broadcast::Sender<()>,
}

#[derive(Debug)]
Expand All @@ -486,6 +492,7 @@ impl PoolRefiller {
pool_config: PoolConfig,
current_keyspace: Option<VerifiedKeyspaceName>,
pool_updated_notify: Arc<Notify>,
pool_empty_notifier: broadcast::Sender<()>,
) -> Self {
// At the beginning, we assume the node does not have any shards
// and assume that the node is a Cassandra node
Expand Down Expand Up @@ -513,6 +520,7 @@ impl PoolRefiller {
current_keyspace,

pool_updated_notify,
pool_empty_notifier,
}
}

Expand Down Expand Up @@ -1037,6 +1045,9 @@ impl PoolRefiller {
self.conns[shard_id].len(),
self.active_connection_count(),
);
if !self.has_connections() {
let _ = self.pool_empty_notifier.send(());
}
self.update_shared_conns(Some(last_error));
return;
}
Expand Down Expand Up @@ -1248,8 +1259,8 @@ async fn open_connection_to_shard_aware_port(
mod tests {
use super::open_connection_to_shard_aware_port;
use crate::routing::{ShardCount, Sharder};
use crate::transport::cluster::ContactPoint;
use crate::transport::connection::ConnectionConfig;
use crate::transport::node::ResolvedContactPoint;
use crate::transport::topology::UntranslatedEndpoint;
use std::net::{SocketAddr, ToSocketAddrs};

Expand Down Expand Up @@ -1286,7 +1297,7 @@ mod tests {

for _ in 0..connections_number {
conns.push(open_connection_to_shard_aware_port(
UntranslatedEndpoint::ContactPoint(ContactPoint {
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: connect_address,
datacenter: None,
}),
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/transport/mod.rs
Expand Up @@ -35,4 +35,4 @@ mod cql_types_test;
mod cql_value_test;

pub use cluster::ClusterData;
pub use node::{Node, NodeAddr, NodeRef};
pub use node::{KnownNode, Node, NodeAddr, NodeRef};