Skip to content

Commit

Permalink
Merge branch 'main' into document_error_handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 25, 2022
2 parents a8b86db + f6de0ca commit 724f787
Show file tree
Hide file tree
Showing 56 changed files with 423 additions and 136 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions docs/src/examples/cassandra-cluster-shotover-sidecar.md
Expand Up @@ -12,10 +12,10 @@ In this example, we will be connecting to a Cassandra cluster that has the follo

### Rewriting the peer ports

Shotover will be deployed as a sidecar to each node in the Cassandra cluster, listening on `9043`. Use the following [docker-compose.yml](https://raw.githubusercontent.com/shotover/shotover-proxy/cassandra-docs/shotover-proxy/example-configs-docker/cassandra-peers-rewrite/docker-compose.yml) to run the Cassandra cluster and Shotover sidecars. In this example we want to ensure that all our traffic to Cassandra goes through Shotover.
Shotover will be deployed as a sidecar to each node in the Cassandra cluster, listening on `9043`. Use the following [docker-compose.yaml](https://raw.githubusercontent.com/shotover/shotover-proxy/cassandra-docs/shotover-proxy/example-configs-docker/cassandra-peers-rewrite/docker-compose.yaml) to run the Cassandra cluster and Shotover sidecars. In this example we want to ensure that all our traffic to Cassandra goes through Shotover.

```console
curl -L https://raw.githubusercontent.com/shotover/shotover-proxy/main/shotover-proxy/example-configs-docker/cassandra-peers-rewrite/docker-compose.yml --output docker-compose.yml
curl -L https://raw.githubusercontent.com/shotover/shotover-proxy/main/shotover-proxy/example-configs-docker/cassandra-peers-rewrite/docker-compose.yaml --output docker-compose.yaml
```

Below we can see an example of a Cassandra node and it's Shotover sidecar, notice that they are running on the same network address (`172.16.1.2`) and the present directory is being mounted to allow Shotover to access the config and topology files.
Expand Down
4 changes: 2 additions & 2 deletions docs/src/examples/redis-clustering-aware.md
Expand Up @@ -13,10 +13,10 @@ In this example, we will be connecting to a Redis cluster that has the following
* `172.16.1.6:6379`
* `172.16.1.7:6379`

Shotover will be deployed as a sidecar to each node in the Redis cluster, listening on `6380`. Use the following [docker-compose.yml](https://github.com/shotover/shotover-proxy/blob/main/shotover-proxy/example-configs-docker/redis-cluster-ports-rewrite/docker-compose.yml) to run the Redis cluster and Shotover sidecars.
Shotover will be deployed as a sidecar to each node in the Redis cluster, listening on `6380`. Use the following [docker-compose.yaml](https://github.com/shotover/shotover-proxy/blob/main/shotover-proxy/example-configs-docker/redis-cluster-ports-rewrite/docker-compose.yaml) to run the Redis cluster and Shotover sidecars.

```console
curl -L https://raw.githubusercontent.com/shotover/shotover-proxy/main/shotover-proxy/example-configs-docker/redis-cluster-ports-rewrite/docker-compose.yml --output docker-compose.yml
curl -L https://raw.githubusercontent.com/shotover/shotover-proxy/main/shotover-proxy/example-configs-docker/redis-cluster-ports-rewrite/docker-compose.yaml --output docker-compose.yaml
```

Below we can see an example of a Redis node and it's Shotover sidecar. Notice they are running on the same network address (`172.16.1.2`) and the present directory is being mounted to allow Shotover to access the config and topology files.
Expand Down
10 changes: 5 additions & 5 deletions docs/src/examples/redis-clustering-unaware.md
Expand Up @@ -6,14 +6,14 @@ The following guide shows you how to configure Shotover Proxy to support transpa

First you need to setup a Redis cluster for Shotover to connect to.

The easiest way to do this is with this example [docker-compose.yml](https://github.com/shotover/shotover-proxy/blob/main/shotover-proxy/example-configs-docker/redis-cluster-hiding/docker-compose.yml)
You should first inspect the `docker-compose.yml` to understand what the cluster looks like and how its exposed to the network.
The easiest way to do this is with this example [docker-compose.yaml](https://github.com/shotover/shotover-proxy/blob/main/shotover-proxy/example-configs-docker/redis-cluster-hiding/docker-compose.yaml)
You should first inspect the `docker-compose.yaml` to understand what the cluster looks like and how its exposed to the network.

Then run:

```bash
curl -L https://raw.githubusercontent.com/shotover/shotover-proxy/main/shotover-proxy/example-configs/redis-cluster-hiding/docker-compose.yml --output docker-compose.yml
docker-compose -f docker-compose.yml up
curl -L https://raw.githubusercontent.com/shotover/shotover-proxy/main/shotover-proxy/example-configs/redis-cluster-hiding/docker-compose.yaml --output docker-compose.yaml
docker-compose -f docker-compose.yaml up
```

When you are finished with the containers <kbd>ctrl</kbd> + <kbd>c</kbd> will shut them down.
Expand All @@ -29,7 +29,7 @@ Modify your `topology.yaml` file like this:
{{#include ../../../shotover-proxy/example-configs-docker/redis-cluster-hiding/topology.yaml}}
```

If you didnt use the standard `docker-compose.yml` setup then you will need to change `first_contact_points` to point to the Redis instances you used.
If you didnt use the standard `docker-compose.yaml` setup then you will need to change `first_contact_points` to point to the Redis instances you used.

## Testing

Expand Down
16 changes: 8 additions & 8 deletions shotover-proxy/benches/benches/cassandra.rs
Expand Up @@ -28,7 +28,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/cassandra-protect-local/topology.yaml",
"example-configs/cassandra-protect-local/docker-compose.yml",
"example-configs/cassandra-protect-local/docker-compose.yaml",
)
});
for query in &queries {
Expand All @@ -53,7 +53,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/cassandra-redis-cache/topology.yaml",
"example-configs/cassandra-redis-cache/docker-compose.yml",
"example-configs/cassandra-redis-cache/docker-compose.yaml",
)
});
// Benches the case where the message does not meet the criteria for caching
Expand All @@ -79,7 +79,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/cassandra-passthrough/topology.yaml",
"example-configs/cassandra-passthrough/docker-compose.yml",
"example-configs/cassandra-passthrough/docker-compose.yaml",
)
});
for query in &queries {
Expand All @@ -105,7 +105,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"tests/test-configs/cassandra-passthrough-parse-request/topology.yaml",
"tests/test-configs/cassandra-passthrough-parse-request/docker-compose.yml",
"tests/test-configs/cassandra-passthrough-parse-request/docker-compose.yaml",
)
});
for query in &queries {
Expand All @@ -131,7 +131,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"tests/test-configs/cassandra-passthrough-parse-response/topology.yaml",
"tests/test-configs/cassandra-passthrough-parse-response/docker-compose.yml",
"tests/test-configs/cassandra-passthrough-parse-response/docker-compose.yaml",
)
});
for query in &queries {
Expand All @@ -156,7 +156,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new_tls(
"example-configs/cassandra-tls/topology.yaml",
"example-configs/cassandra-tls/docker-compose.yml",
"example-configs/cassandra-tls/docker-compose.yaml",
)
});
for query in &queries {
Expand Down Expand Up @@ -188,7 +188,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
let resources = BenchResources::new(
"example-configs/cassandra-protect-local/topology.yaml",
"example-configs/cassandra-protect-local/docker-compose.yml",
"example-configs/cassandra-protect-local/docker-compose.yaml",
);

resources
Expand Down Expand Up @@ -235,7 +235,7 @@ fn cassandra(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/cassandra-request-throttling/topology.yaml",
"example-configs/cassandra-request-throttling/docker-compose.yml",
"example-configs/cassandra-request-throttling/docker-compose.yaml",
)
});
for query in &queries {
Expand Down
10 changes: 5 additions & 5 deletions shotover-proxy/benches/benches/redis.rs
Expand Up @@ -31,7 +31,7 @@ fn redis(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/redis-multi/topology.yaml",
"example-configs/redis-multi/docker-compose.yml",
"example-configs/redis-multi/docker-compose.yaml",
)
});
for query in &queries {
Expand All @@ -54,7 +54,7 @@ fn redis(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/redis-cluster-hiding/topology.yaml",
"example-configs/redis-cluster-hiding/docker-compose.yml",
"example-configs/redis-cluster-hiding/docker-compose.yaml",
)
});
for query in &queries {
Expand All @@ -77,7 +77,7 @@ fn redis(c: &mut Criterion) {
let resources = new_lazy_shared(|| {
BenchResources::new(
"example-configs/redis-passthrough/topology.yaml",
"example-configs/redis-passthrough/docker-compose.yml",
"example-configs/redis-passthrough/docker-compose.yaml",
)
});
for query in &queries {
Expand Down Expand Up @@ -105,7 +105,7 @@ fn redis(c: &mut Criterion) {
));
BenchResources::new(
"example-configs/redis-tls/topology.yaml",
"example-configs/redis-tls/docker-compose.yml",
"example-configs/redis-tls/docker-compose.yaml",
)
},
move |b, state| {
Expand All @@ -127,7 +127,7 @@ fn redis(c: &mut Criterion) {
));
BenchResources::new(
"example-configs/redis-cluster-tls/topology.yaml",
"example-configs/redis-cluster-tls/docker-compose.yml",
"example-configs/redis-cluster-tls/docker-compose.yaml",
)
},
move |b, state| {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/examples/cassandra_bench.rs
Expand Up @@ -30,7 +30,7 @@ fn main() {

let latte = Latte::new(args.rate);
{
let _compose = DockerCompose::new(&format!("{}/docker-compose.yml", args.config_dir));
let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", args.config_dir));

// Uses ShotoverProcess instead of ShotoverManager for a more accurate benchmark
let shotover_manager = ShotoverProcess::new(&format!("{}/topology.yaml", args.config_dir));
Expand Down
36 changes: 26 additions & 10 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Expand Up @@ -214,6 +214,22 @@ impl CassandraSinkCluster {
async fn send_message(&mut self, mut messages: Messages) -> ChainResponse {
if self.nodes_rx.has_changed()? {
self.pool.update_nodes(&mut self.nodes_rx);

// recreate the control connection if it is down
if let Some(address) = self.control_connection_address {
if !self
.pool
.nodes()
.iter()
.any(|x| x.address == address && x.is_up)
{
let address = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.create_control_connection(address).await?;
}
}
}

let tables_to_rewrite: Vec<TableToRewrite> = messages
Expand Down Expand Up @@ -251,9 +267,7 @@ impl CassandraSinkCluster {
.address
};

self.control_connection =
Some(self.connection_factory.new_connection(random_point).await?);
self.control_connection_address = Some(random_point);
self.create_control_connection(random_point).await?;
}

if !self.init_handshake_complete {
Expand Down Expand Up @@ -500,16 +514,11 @@ impl CassandraSinkCluster {
// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let random_address = self
let address = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.control_connection = Some(
self.connection_factory
.new_connection(random_address)
.await?,
);
self.control_connection_address = Some(random_address);
self.create_control_connection(address).await?;
}
tracing::info!(
"Control connection finalized against node at: {:?}",
Expand All @@ -519,6 +528,13 @@ impl CassandraSinkCluster {
Ok(())
}

async fn create_control_connection(&mut self, address: SocketAddr) -> Result<()> {
self.control_connection = Some(self.connection_factory.new_connection(address).await?);
self.control_connection_address = Some(address);

Ok(())
}

fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
if let Some(Frame::Cassandra(cassandra)) = request.frame() {
// No need to handle Batch as selects can only occur on Query
Expand Down
Expand Up @@ -30,7 +30,10 @@ fn serialize_routing_key_with_indexes(
0 => None,
1 => values
.get(pk_indexes[0] as usize)
.map(|value| value.serialize_to_vec(version)),
.and_then(|value| match value {
Value::Some(value) => Some(value.serialize_to_vec(version)),
_ => None,
}),
_ => {
let mut buf = vec![];
if pk_indexes
Expand Down
35 changes: 16 additions & 19 deletions shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs
Expand Up @@ -255,18 +255,13 @@ pub async fn test_node_going_down(
shotover_manager: ShotoverManager,
driver: CassandraDriver,
) {
{
let mut connection_shotover = CassandraConnection::new("127.0.0.1", 9042, driver).await;
connection_shotover
.enable_schema_awaiter("172.16.1.2:9044", None)
.await;
// Use Replication 2 in case it ends up on the node that we kill
run_query(&connection_shotover, "CREATE KEYSPACE cluster_single_rack_node_going_down WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };").await;
run_query(&connection_shotover, "CREATE TABLE cluster_single_rack_node_going_down.test_table (pk varchar PRIMARY KEY, col1 int, col2 boolean);").await;
connection_shotover.await_schema_agreement().await;

// TODO: hold onto this connection and use it to test how we handle preexisting connections before the node is stopped
}
let mut connection_shotover = CassandraConnection::new("127.0.0.1", 9042, driver).await;
connection_shotover
.enable_schema_awaiter("172.16.1.2:9044", None)
.await;
// Use Replication 2 in case it ends up on the node that we kill
run_query(&connection_shotover, "CREATE KEYSPACE cluster_single_rack_node_going_down WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };").await;
run_query(&connection_shotover, "CREATE TABLE cluster_single_rack_node_going_down.test_table (pk varchar PRIMARY KEY, col1 int, col2 boolean);").await;

{
let event_connection_direct =
Expand Down Expand Up @@ -320,14 +315,20 @@ pub async fn test_node_going_down(
.await
.expect_err("CassandraSinkCluster must filter out this event");

// test that shotover handles preexisting connections after node goes down
// TODO: test_connection_handles_node_down(&old_connection).await;
let new_connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;

// setup data to read
run_query(&new_connection, "INSERT INTO cluster_single_rack_node_going_down.test_table (pk, col1, col2) VALUES ('pk1', 42, true);").await;
run_query(&new_connection, "INSERT INTO cluster_single_rack_node_going_down.test_table (pk, col1, col2) VALUES ('pk2', 413, false);").await;

// test that shotover handles new connections after node goes down
let new_connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;
test_connection_handles_node_down(&new_connection).await;

// test that shotover handles preexisting connections after node goes down
test_connection_handles_node_down(&connection_shotover).await;
}

std::mem::drop(connection_shotover);
// Purposefully dispose of these as we left the underlying cassandra cluster in a non-recoverable state
std::mem::drop(shotover_manager);
std::mem::drop(compose);
Expand All @@ -337,10 +338,6 @@ async fn test_connection_handles_node_down(connection: &CassandraConnection) {
// test a query that hits the control node and performs rewriting
test_rewrite_system_local(connection).await;

// test queries that get routed across all nodes and performs reading and writing
run_query(connection, "INSERT INTO cluster_single_rack_node_going_down.test_table (pk, col1, col2) VALUES ('pk1', 42, true);").await;
run_query(connection, "INSERT INTO cluster_single_rack_node_going_down.test_table (pk, col1, col2) VALUES ('pk2', 413, false);").await;

// run this a few times to make sure we arent getting lucky with the routing
for _ in 0..10 {
assert_query_result(
Expand Down

0 comments on commit 724f787

Please sign in to comment.