Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 31, 2022
1 parent ce6ddb0 commit 158e859
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 22 deletions.
29 changes: 10 additions & 19 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Expand Up @@ -223,15 +223,10 @@ impl CassandraSinkCluster {
.iter()
.any(|x| x.address == address && x.is_up)
{
let addresses: Vec<_> = self
.pool
.get_shuffled_nodes_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
)
.iter()
.map(|node| node.address)
.collect();
let addresses = self.pool.get_shuffled_addresses_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
);
self.create_control_connection(&addresses).await.map_err(|e|
e.context("Failed to recreate control connection after control connection node went down")
)?;
Expand Down Expand Up @@ -270,11 +265,10 @@ impl CassandraSinkCluster {
}
points
} else {
self.pool
.get_shuffled_nodes_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.iter()
.map(|node| node.address)
.collect()
self.pool.get_shuffled_addresses_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
)
};

self.create_control_connection(&points)
Expand Down Expand Up @@ -526,12 +520,9 @@ impl CassandraSinkCluster {
// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let addresses: Vec<_> = self
let addresses = self
.pool
.get_shuffled_nodes_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.iter()
.map(|node| node.address)
.collect();
.get_shuffled_addresses_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng);
self.create_control_connection(&addresses)
.await
.map_err(|e| e.context("Failed to recreate control connection when initial connection was possibly against the wrong node"))?;
Expand Down
Expand Up @@ -8,8 +8,8 @@ use cassandra_protocol::frame::Version;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::types::CBytesShort;
use rand::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::{collections::HashMap, net::SocketAddr};
use tokio::sync::{watch, RwLock};

pub enum GetReplicaErr {
Expand Down Expand Up @@ -73,15 +73,16 @@ impl NodePool {
write_lock.insert(id, metadata);
}

pub fn get_shuffled_nodes_in_dc_rack(
pub fn get_shuffled_addresses_in_dc_rack(
&mut self,
rack: &str,
rng: &mut SmallRng,
) -> Vec<&mut CassandraNode> {
) -> Vec<SocketAddr> {
let mut nodes: Vec<_> = self
.nodes
.iter_mut()
.filter(|node| node.is_up && node.rack == *rack)
.map(|node| node.address)
.collect();

nodes.shuffle(rng);
Expand Down

0 comments on commit 158e859

Please sign in to comment.