Skip to content

Commit

Permalink
Fix system.peers v2 rewriting (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 6, 2022
1 parent 0bbea6d commit 778d7de
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
14 changes: 12 additions & 2 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ impl CassandraSinkCluster {
let mut data_center_alias = "data_center";
let mut rack_alias = "rack";
let mut host_id_alias = "host_id";
let mut native_address_alias = "native_address";
let mut native_port_alias = "native_port";
let mut preferred_ip_alias = "preferred_ip";
let mut preferred_port_alias = "preferred_port";
let mut rpc_address_alias = "rpc_address";
Expand All @@ -483,6 +485,10 @@ impl CassandraSinkCluster {
rack_alias = alias;
} else if column.name == Identifier::Unquoted("host_id".to_string()) {
host_id_alias = alias;
} else if column.name == Identifier::Unquoted("native_address".to_string()) {
native_address_alias = alias;
} else if column.name == Identifier::Unquoted("native_port".to_string()) {
native_port_alias = alias;
} else if column.name == Identifier::Unquoted("preferred_ip".to_string()) {
preferred_ip_alias = alias;
} else if column.name == Identifier::Unquoted("preferred_port".to_string()) {
Expand Down Expand Up @@ -559,13 +565,17 @@ impl CassandraSinkCluster {
|| colspec.name == rpc_address_alias
{
MessageValue::Null
} else if colspec.name == peer_alias {
} else if colspec.name == native_address_alias {
MessageValue::Inet(shotover_peer.address.ip())
} else if colspec.name == peer_port_alias {
} else if colspec.name == native_port_alias {
MessageValue::Integer(
shotover_peer.address.port() as i64,
IntSize::I32,
)
} else if colspec.name == peer_alias {
MessageValue::Inet(shotover_peer.address.ip())
} else if colspec.name == peer_port_alias {
MessageValue::Integer(7000, IntSize::I32)
} else if colspec.name == release_version_alias {
MessageValue::Varchar(release_version.clone())
} else if colspec.name == tokens_alias {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ async fn test_rewrite_system_peers_dummy_peers(connection: &CassandraConnection)
async fn test_rewrite_system_peers_v2_dummy_peers(connection: &CassandraConnection) {
let star_results1 = [
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Int(9042),
ResultValue::Int(7000),
ResultValue::Varchar("dc1".into()),
ResultValue::Uuid("3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4".parse().unwrap()),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Int(-1),
ResultValue::Inet("127.0.0.1".into()),
ResultValue::Int(9042),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Int(-1),
ResultValue::Varchar("rack1".into()),
Expand All @@ -110,11 +110,11 @@ async fn test_rewrite_system_peers_v2_dummy_peers(connection: &CassandraConnecti
];
let star_results2 = [
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Int(9042),
ResultValue::Int(7000),
ResultValue::Varchar("dc1".into()),
ResultValue::Uuid("fa74d7ec-1223-472b-97de-04a32ccdb70b".parse().unwrap()),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Int(-1),
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Int(9042),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Int(-1),
ResultValue::Varchar("rack1".into()),
Expand Down

0 comments on commit 778d7de

Please sign in to comment.