Skip to content

Commit

Permalink
CassandraSinkCluster fix routing for system keyspaces (#799)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 13, 2022
1 parent 2c2b6b6 commit 14ea9e9
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub struct CassandraSinkCluster {
local_table: FQName,
peers_table: FQName,
peers_v2_table: FQName,
system_keyspaces: [Identifier; 3],
local_shotover_node: ShotoverNode,
/// A local clone of topology_task_nodes.
/// Internally stores connections to the nodes.
Expand Down Expand Up @@ -126,6 +127,7 @@ impl Clone for CassandraSinkCluster {
local_table: self.local_table.clone(),
peers_table: self.peers_table.clone(),
peers_v2_table: self.peers_v2_table.clone(),
system_keyspaces: self.system_keyspaces.clone(),
local_shotover_node: self.local_shotover_node.clone(),
local_nodes: vec![],
topology_task_nodes: self.topology_task_nodes.clone(),
Expand Down Expand Up @@ -171,6 +173,11 @@ impl CassandraSinkCluster {
local_table: FQName::new("system", "local"),
peers_table: FQName::new("system", "peers"),
peers_v2_table: FQName::new("system", "peers_v2"),
system_keyspaces: [
Identifier::parse("system"),
Identifier::parse("system_schema"),
Identifier::parse("system_distributed"),
],
local_shotover_node,
local_nodes: vec![],
topology_task_nodes: nodes_shared,
Expand Down Expand Up @@ -266,7 +273,7 @@ impl CassandraSinkCluster {
// system.local and system.peers must be routed to the same node otherwise the system.local node will be amongst the system.peers nodes and a node will be missing
// DDL statements and system.local must be routed through the same connection, so that schema_version changes appear immediately in system.local
|| is_ddl_statement(&mut message)
|| self.is_system_local_or_peers(&mut message)
|| self.is_system_query(&mut message)
{
self.init_handshake_connection.as_mut().unwrap()
} else if is_use_statement(&mut message) {
Expand Down Expand Up @@ -715,13 +722,13 @@ impl CassandraSinkCluster {
}

// TODO: handle use statement state
fn is_system_local_or_peers(&self, request: &mut Message) -> bool {
fn is_system_query(&self, request: &mut Message) -> bool {
if let Some(Frame::Cassandra(frame)) = request.frame() {
if let CassandraOperation::Query { query, .. } = &mut frame.operation {
if let CassandraStatement::Select(select) = query.as_ref() {
return self.local_table == select.table_name
|| self.peers_table == select.table_name
|| self.peers_v2_table == select.table_name;
if let Some(keyspace) = &select.table_name.keyspace {
return self.system_keyspaces.contains(keyspace);
}
}
}
}
Expand Down

0 comments on commit 14ea9e9

Please sign in to comment.