Skip to content

Commit

Permalink
share prepared metadata between connections (#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Oct 13, 2022
1 parent b8feea7 commit 17ab717
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
3 changes: 2 additions & 1 deletion shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ impl CassandraSinkCluster {
match self
.pool
.replica_node(execute, &metadata.version, &mut self.rng)
.await
{
Ok(Some(replica_node)) => {
replica_node
Expand Down Expand Up @@ -464,7 +465,7 @@ impl CassandraSinkCluster {

for response in responses.iter_mut() {
if let Some((id, metadata)) = get_prepared_result_message(response) {
self.pool.add_prepared_result(id, metadata);
self.pool.add_prepared_result(id, metadata).await;
}
}

Expand Down
35 changes: 25 additions & 10 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::types::CBytesShort;
use rand::prelude::*;
use std::collections::HashMap;
use tokio::sync::watch;
use std::sync::Arc;
use tokio::sync::{watch, RwLock};

pub enum GetReplicaErr {
NoMetadata,
Expand All @@ -18,17 +19,27 @@ pub enum GetReplicaErr {

#[derive(Debug)]
pub struct NodePool {
prepared_metadata: HashMap<CBytesShort, PreparedMetadata>,
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
token_map: TokenMap,
nodes: Vec<CassandraNode>,
}

impl Clone for NodePool {
fn clone(&self) -> Self {
Self {
prepared_metadata: self.prepared_metadata.clone(),
token_map: TokenMap::new(&[]),
nodes: vec![],
}
}
}

impl NodePool {
pub fn new(nodes: Vec<CassandraNode>) -> Self {
Self {
token_map: TokenMap::new(nodes.as_slice()),
nodes,
prepared_metadata: HashMap::new(),
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand All @@ -54,8 +65,9 @@ impl NodePool {
self.token_map = TokenMap::new(self.nodes.as_slice());
}

pub fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) {
self.prepared_metadata.insert(id, metadata);
pub async fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) {
let mut write_lock = self.prepared_metadata.write().await;
write_lock.insert(id, metadata);
}

pub fn get_random_node_in_dc_rack(
Expand All @@ -71,16 +83,19 @@ impl NodePool {
}

/// Get a token routed replica node for the supplied execute message (if exists)
pub fn replica_node(
pub async fn replica_node(
&mut self,
execute: &BodyReqExecuteOwned,
version: &Version,
rng: &mut SmallRng,
) -> Result<Option<&mut CassandraNode>, GetReplicaErr> {
let metadata = self
.prepared_metadata
.get(&execute.id)
.ok_or(GetReplicaErr::NoMetadata)?;
let metadata = {
let read_lock = self.prepared_metadata.read().await;
read_lock
.get(&execute.id)
.ok_or(GetReplicaErr::NoMetadata)?
.clone()
};

let routing_key = calculate_routing_key(
&metadata.pk_indexes,
Expand Down

0 comments on commit 17ab717

Please sign in to comment.