Skip to content

Commit

Permalink
store tokens as murmur3 instead of string (#801)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Sep 14, 2022
1 parent b3df35d commit 43865b7
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 12 deletions.
3 changes: 2 additions & 1 deletion shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::message::{Message, Messages};
use crate::tls::TlsConnector;
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::Result;
use cassandra_protocol::token::Murmur3Token;
use std::net::SocketAddr;
use tokio::net::ToSocketAddrs;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -11,7 +12,7 @@ use tokio::sync::{mpsc, oneshot};
pub struct CassandraNode {
pub address: SocketAddr,
pub rack: String,
pub _tokens: Vec<String>,
pub tokens: Vec<Murmur3Token>,
pub outbound: Option<CassandraConnection>,
}

Expand Down
17 changes: 9 additions & 8 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageValue};
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::{anyhow, Result};
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::{frame::Version, query::QueryParams};
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -119,11 +120,11 @@ mod system_local {

let tokens = if let Some(MessageValue::List(mut list)) = row.pop() {
list.drain(..)
.map::<Result<String>, _>(|x| match x {
MessageValue::Varchar(a) => Ok(a),
.map::<Result<Murmur3Token>, _>(|x| match x {
MessageValue::Varchar(a) => Ok(a.try_into()?),
_ => Err(anyhow!("tokens value not a varchar")),
})
.collect::<Result<Vec<String>>>()?
.collect::<Result<Vec<Murmur3Token>>>()?
} else {
return Err(anyhow!("tokens not a list"));
};
Expand All @@ -137,7 +138,7 @@ mod system_local {
Ok(CassandraNode {
address,
rack,
_tokens: tokens,
tokens,
outbound: None,
})
})
Expand Down Expand Up @@ -238,11 +239,11 @@ mod system_peers {

let tokens = if let Some(MessageValue::List(list)) = row.pop() {
list.into_iter()
.map::<Result<String>, _>(|x| match x {
MessageValue::Varchar(a) => Ok(a),
.map::<Result<Murmur3Token>, _>(|x| match x {
MessageValue::Varchar(a) => Ok(a.try_into()?),
_ => Err(anyhow!("tokens value not a varchar")),
})
.collect::<Result<Vec<String>>>()?
.collect::<Result<Vec<Murmur3Token>>>()?
} else {
return Err(anyhow!("tokens not a list"));
};
Expand Down Expand Up @@ -273,7 +274,7 @@ mod system_peers {
Ok(CassandraNode {
address: SocketAddr::new(ip, port.try_into()?),
rack,
_tokens: tokens,
tokens,
outbound: None,
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) {
.expect("Node did not contain a unique expected rack");
possible_racks.remove(rack_index);

assert_eq!(node._tokens.len(), 128);
assert_eq!(node.tokens.len(), 128);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) {
possible_addresses.remove(address_index);

assert_eq!(node.rack, "rack1");
assert_eq!(node._tokens.len(), 128);
assert_eq!(node.tokens.len(), 128);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,6 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option<u3
possible_addresses.remove(address_index);

assert_eq!(node.rack, "rack1");
assert_eq!(node._tokens.len(), 128);
assert_eq!(node.tokens.len(), 128);
}
}

0 comments on commit 43865b7

Please sign in to comment.