Skip to content

Commit

Permalink
Dynamic port (#786)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Sep 12, 2022
1 parent 9ff268b commit 9d87c29
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ services:
HEAP_NEWSIZE: "48M"
CASSANDRA_ENABLE_SCRIPTED_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_CQL_PORT_NUMBER: 9044

cassandra-two:
image: *image
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.1:9042"
chain_config:
main_chain:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9044", "172.16.1.3:9044"]
local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"
shotover_nodes:
- address: "127.0.0.1:9042"
data_center: "dc1"
rack: "rack1"
host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"
# These extra nodes dont really make sense, its pointing at the same address as the local shotover node.
# It is however useful for testing the functionality of the system.peers rewriting.
# We can make stronger assertions against the values returned by system.peers with this config because
# more system.peers fields are static due to always being queried against this one shotover instance.
- address: "127.0.0.1:9042"
data_center: "dc1"
rack: "rack1"
host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4"
- address: "127.0.0.1:9042"
data_center: "dc1"
rack: "rack1"
host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b"
source_to_chain_mapping:
cassandra_prod: main_chain
18 changes: 18 additions & 0 deletions shotover-proxy/example-configs/cassandra-cluster/topology-v4.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.1:9042"
chain_config:
main_chain:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9044", "172.16.1.3:9044"]
local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"
shotover_nodes:
- address: "127.0.0.1:9042"
data_center: "dc1"
rack: "rack1"
host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"

source_to_chain_mapping:
cassandra_prod: main_chain
8 changes: 8 additions & 0 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl CassandraSinkCluster {
} else {
self.get_random_node_in_dc_rack().address
};

self.init_handshake_connection =
Some(self.connection_factory.new_connection(random_point).await?);
self.init_handshake_address = Some(random_point);
Expand Down Expand Up @@ -619,6 +620,7 @@ impl CassandraSinkCluster {
let mut broadcast_address_alias = "broadcast_address";
let mut listen_address_alias = "listen_address";
let mut host_id_alias = "host_id";
let mut rpc_port_alias = "rpc_port";
for select in &table.selects {
if let SelectElement::Column(column) = select {
if let Some(alias) = &column.alias {
Expand All @@ -638,6 +640,8 @@ impl CassandraSinkCluster {
listen_address_alias = alias;
} else if column.name == Identifier::Unquoted("host_id".to_string()) {
host_id_alias = alias;
} else if column.name == Identifier::Unquoted("rpc_port".to_string()) {
rpc_port_alias = alias
}
}
}
Expand Down Expand Up @@ -688,6 +692,10 @@ impl CassandraSinkCluster {
if let MessageValue::Uuid(host_id) = col {
*host_id = self.local_shotover_node.host_id;
}
} else if col_meta.name == rpc_port_alias {
if let MessageValue::Integer(rpc_port, _) = col {
*rpc_port = self.local_shotover_node.address.port() as i64;
}
}
}
}
Expand Down

0 comments on commit 9d87c29

Please sign in to comment.