Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control connection creation error handling #887

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 65 additions & 22 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Expand Up @@ -223,11 +223,13 @@ impl CassandraSinkCluster {
.iter()
.any(|x| x.address == address && x.is_up)
{
let address = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.create_control_connection(address).await?;
let addresses = self.pool.get_shuffled_addresses_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
);
self.create_control_connection(&addresses).await.map_err(|e|
e.context("Failed to recreate control connection after control connection node went down")
)?;
}
}
}
Expand Down Expand Up @@ -256,18 +258,22 @@ impl CassandraSinkCluster {
// Create the initial connection.
// Messages will be sent through this connection until we have extracted the handshake.
if self.control_connection.is_none() {
let random_point = if self.pool.nodes().iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
.next()
.unwrap()
let points = if self.pool.nodes().iter().all(|x| !x.is_up) {
let mut points = Vec::with_capacity(self.contact_points.len());
for point in &self.contact_points {
points.push(tokio::net::lookup_host(point).await?.next().unwrap());
}
points
} else {
self.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address
self.pool.get_shuffled_addresses_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
)
};

self.create_control_connection(random_point).await?;
self.create_control_connection(&points)
.await
.map_err(|e| e.context("Failed to create initial control connection"))?;
}

if !self.init_handshake_complete {
Expand Down Expand Up @@ -514,11 +520,12 @@ impl CassandraSinkCluster {
// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let address = self
let addresses = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.create_control_connection(address).await?;
.get_shuffled_addresses_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng);
self.create_control_connection(&addresses)
.await
.map_err(|e| e.context("Failed to recreate control connection when initial connection was possibly against the wrong node"))?;
}
tracing::info!(
"Control connection finalized against node at: {:?}",
Expand All @@ -528,11 +535,47 @@ impl CassandraSinkCluster {
Ok(())
}

async fn create_control_connection(&mut self, address: SocketAddr) -> Result<()> {
self.control_connection = Some(self.connection_factory.new_connection(address).await?);
self.control_connection_address = Some(address);
async fn create_control_connection(&mut self, addresses: &[SocketAddr]) -> Result<()> {
struct AddressError {
address: SocketAddr,
error: anyhow::Error,
}
fn bullet_list_of_node_failures(errors: &[AddressError]) -> String {
let mut node_errors = String::new();
for AddressError { error, address } in errors {
node_errors.push_str(&format!("\n* {address:?}:"));
for sub_error in error.chain() {
node_errors.push_str(&format!("\n - {sub_error}"));
}
}
node_errors
}

Ok(())
let mut errors = vec![];
for address in addresses {
match self.connection_factory.new_connection(address).await {
Ok(connection) => {
self.control_connection = Some(connection);
self.control_connection_address = Some(*address);
if !errors.is_empty() {
let node_errors = bullet_list_of_node_failures(&errors);
tracing::warn!("A successful connection to a control node was made but attempts to connect to these nodes failed first:{node_errors}");
}
return Ok(());
}
Err(error) => {
errors.push(AddressError {
error,
address: *address,
});
}
}
}

let node_errors = bullet_list_of_node_failures(&errors);
Err(anyhow!(
"Attempted to create a control connection against every node in the rack and all attempts failed:{node_errors}"
))
}

fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
Expand Down
Expand Up @@ -8,8 +8,8 @@ use cassandra_protocol::frame::Version;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::types::CBytesShort;
use rand::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::{collections::HashMap, net::SocketAddr};
use tokio::sync::{watch, RwLock};

pub enum GetReplicaErr {
Expand Down Expand Up @@ -73,6 +73,22 @@ impl NodePool {
write_lock.insert(id, metadata);
}

pub fn get_shuffled_addresses_in_dc_rack(
&mut self,
conorbros marked this conversation as resolved.
Show resolved Hide resolved
rack: &str,
rng: &mut SmallRng,
) -> Vec<SocketAddr> {
let mut nodes: Vec<_> = self
.nodes
.iter_mut()
.filter(|node| node.is_up && node.rack == *rack)
.map(|node| node.address)
.collect();

nodes.shuffle(rng);
nodes
}

pub fn get_round_robin_node_in_dc_rack(&mut self, rack: &str) -> &mut CassandraNode {
let up_indexes: Vec<usize> = self
.nodes
Expand Down