Skip to content

Commit

Permalink
cassandra_int_tests: replace ShotoverManager with ShotoverProcess (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 23, 2023
1 parent 6c62d79 commit a81399b
Showing 1 changed file with 128 additions and 41 deletions.
169 changes: 128 additions & 41 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
@@ -1,24 +1,22 @@
use crate::helpers::ShotoverManager;
#[cfg(feature = "cassandra-cpp-driver-tests")]
use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType};
use cdrs_tokio::frame::events::{
SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent,
};
#[cfg(feature = "cassandra-cpp-driver-tests")]
use futures::future::join_all;
use futures::Future;
use metrics_util::debugging::DebuggingRecorder;
use rstest::rstest;
use serial_test::serial;
use test_helpers::connection::cassandra::{assert_query_result, ResultValue};
#[cfg(feature = "cassandra-cpp-driver-tests")]
use test_helpers::connection::cassandra::{run_query, CassandraDriver::Datastax};
use test_helpers::connection::cassandra::CassandraDriver::Datastax;
use test_helpers::connection::cassandra::{
CassandraConnection, CassandraDriver, CassandraDriver::CdrsTokio, CassandraDriver::Scylla,
assert_query_result, run_query, CassandraConnection, CassandraDriver,
CassandraDriver::CdrsTokio, CassandraDriver::Scylla, ResultValue,
};
use test_helpers::connection::redis_connection;
use test_helpers::docker_compose::DockerCompose;
use test_helpers::shotover_process::shotover_from_topology_file;
use test_helpers::shotover_process::{shotover_from_topology_file, Count, EventMatcher, Level};
use tokio::time::{timeout, Duration};

mod batch_statements;
Expand Down Expand Up @@ -85,16 +83,17 @@ async fn passthrough_standard(#[case] driver: CassandraDriver) {
async fn passthrough_encode(#[case] driver: CassandraDriver) {
let _compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yaml");

let _shotover_manager = ShotoverManager::from_topology_file(
"example-configs/cassandra-passthrough/topology-encode.yaml",
);
let shotover =
shotover_from_topology_file("example-configs/cassandra-passthrough/topology-encode.yaml")
.await;

let connection = || CassandraConnection::new("127.0.0.1", 9042, driver);

standard_test_suite(&connection, driver).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "cassandra-cpp-driver-tests")]
#[rstest]
#[case::scylla(Scylla)]
//#[case::cdrs(CdrsTokio)] // TODO
Expand All @@ -105,8 +104,7 @@ async fn source_tls_and_single_tls(#[case] driver: CassandraDriver) {
test_helpers::cert::generate_cassandra_test_certs();
let _compose = DockerCompose::new("example-configs/cassandra-tls/docker-compose.yaml");

let _shotover_manager =
ShotoverManager::from_topology_file("example-configs/cassandra-tls/topology.yaml");
let shotover = shotover_from_topology_file("example-configs/cassandra-tls/topology.yaml").await;

let ca_cert = "example-configs/docker-images/cassandra-tls-4.0.6/certs/localhost_CA.crt";

Expand All @@ -125,6 +123,24 @@ async fn source_tls_and_single_tls(#[case] driver: CassandraDriver) {
let connection = || CassandraConnection::new_tls("127.0.0.1", 9043, ca_cert, driver);

standard_test_suite(&connection, driver).await;

if let CassandraDriver::Scylla | CassandraDriver::CdrsTokio = driver {
shotover.shutdown_and_then_consume_events(&[]).await;
} else {
shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover_proxy::server")
.with_message(
r#"connection was unexpectedly terminated
Caused by:
0: Failed to accept TLS connection
1: unexpected EOF"#,
)
.with_count(Count::Any)])
.await;
}
}

#[rstest]
Expand All @@ -137,9 +153,10 @@ async fn cluster_single_rack_v3(#[case] driver: CassandraDriver) {
let _compose = DockerCompose::new("example-configs/cassandra-cluster-v3/docker-compose.yaml");

{
let _shotover_manager = ShotoverManager::from_topology_file(
let shotover = shotover_from_topology_file(
"example-configs/cassandra-cluster-v3/topology-dummy-peers.yaml",
);
)
.await;

let connection = || async {
let mut connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;
Expand All @@ -155,6 +172,8 @@ async fn cluster_single_rack_v3(#[case] driver: CassandraDriver) {

//Check for bugs in cross connection state
native_types::test(&connection().await).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

cluster::single_rack_v3::test_topology_task(None).await;
Expand All @@ -178,9 +197,8 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) {
connection
};
{
let _shotover_manager = ShotoverManager::from_topology_file(
"example-configs/cassandra-cluster-v4/topology-encode.yaml",
);
let shotover =
shotover_from_topology_file("example-configs/cassandra-cluster-v4/topology.yaml").await;

standard_test_suite(&connection, driver).await;
cluster::single_rack_v4::test(&connection().await, driver).await;
Expand All @@ -195,14 +213,57 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) {
native_types::test(&connection2).await;

cluster::single_rack_v4::test_node_going_down(&compose, driver).await;

shotover
.shutdown_and_then_consume_events(&[
EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover_proxy::server")
.with_message(
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
1: CassandraSinkCluster transform failed
2: Failed to create new connection
3: destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#,
)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover_proxy::server")
.with_message(
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
1: CassandraSinkCluster transform failed
2: system.local returned unexpected cassandra operation: Error(ErrorBody { message: "Internal shotover error: Broken pipe (os error 32)", ty: Server })"#,
)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover_proxy::transforms::cassandra::sink_cluster")
.with_message(
r#"A successful connection to a control node was made but attempts to connect to these nodes failed first:
* 172.16.1.3:9044:
- Failed to create new connection
- destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#,
)
.with_count(Count::Any),
])
.await;
}

{
let _shotover_manager = ShotoverManager::from_topology_file(
let shotover = shotover_from_topology_file(
"example-configs/cassandra-cluster-v4/topology-dummy-peers.yaml",
);
)
.await;

cluster::single_rack_v4::test_dummy_peers(&connection().await, driver).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

cluster::single_rack_v4::test_topology_task(None, Some(9044)).await;
Expand Down Expand Up @@ -247,7 +308,6 @@ async fn cluster_multi_rack(#[case] driver: CassandraDriver) {
}

#[cfg(feature = "alpha-transforms")]
#[cfg(feature = "cassandra-cpp-driver-tests")]
#[rstest]
#[case::scylla(Scylla)]
//#[case::cdrs(CdrsTokio)] // TODO
Expand All @@ -260,9 +320,9 @@ async fn source_tls_and_cluster_tls(#[case] driver: CassandraDriver) {

let _compose = DockerCompose::new("example-configs/cassandra-cluster-tls/docker-compose.yaml");
{
let _shotover_manager = ShotoverManager::from_topology_file(
"example-configs/cassandra-cluster-tls/topology.yaml",
);
let shotover =
shotover_from_topology_file("example-configs/cassandra-cluster-tls/topology.yaml")
.await;

{
// Run a quick test straight to Cassandra to check our assumptions that Shotover and Cassandra TLS are behaving exactly the same
Expand All @@ -287,6 +347,24 @@ async fn source_tls_and_cluster_tls(#[case] driver: CassandraDriver) {

standard_test_suite(&connection, driver).await;
cluster::single_rack_v4::test(&connection().await, driver).await;

if let CassandraDriver::Scylla | CassandraDriver::CdrsTokio = driver {
shotover.shutdown_and_then_consume_events(&[]).await;
} else {
shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover_proxy::server")
.with_message(
r#"connection was unexpectedly terminated
Caused by:
0: Failed to accept TLS connection
1: unexpected EOF"#,
)
.with_count(Count::Any)])
.await;
}
}

cluster::single_rack_v4::test_topology_task(Some(ca_cert), None).await;
Expand Down Expand Up @@ -316,7 +394,7 @@ async fn cassandra_redis_cache(#[case] driver: CassandraDriver) {
table::test(&connection).await;
udt::test(&connection).await;
functions::test(&connection).await;
// collections::test // TODO: for some this test case fails here
// collections::test // TODO: for some reason this test case fails here
prepared_statements_simple::test(&connection, connection_creator).await;
batch_statements::test(&connection).await;
cache::test(&connection, &mut redis_connection, &snapshotter).await;
Expand All @@ -333,15 +411,16 @@ async fn protect_transform_local(#[case] driver: CassandraDriver) {
let _compose =
DockerCompose::new("example-configs/cassandra-protect-local/docker-compose.yaml");

let _shotover_manager = ShotoverManager::from_topology_file(
"example-configs/cassandra-protect-local/topology.yaml",
);
let shotover =
shotover_from_topology_file("example-configs/cassandra-protect-local/topology.yaml").await;

let shotover_connection = || CassandraConnection::new("127.0.0.1", 9042, driver);
let direct_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await;

standard_test_suite(shotover_connection, driver).await;
protect::test(&shotover_connection().await, &direct_connection).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
Expand All @@ -355,14 +434,16 @@ async fn protect_transform_aws(#[case] driver: CassandraDriver) {
let _compose = DockerCompose::new("example-configs/cassandra-protect-aws/docker-compose.yaml");
let _compose_aws = DockerCompose::new_moto();

let _shotover_manager =
ShotoverManager::from_topology_file("example-configs/cassandra-protect-aws/topology.yaml");
let shotover =
shotover_from_topology_file("example-configs/cassandra-protect-aws/topology.yaml").await;

let shotover_connection = || CassandraConnection::new("127.0.0.1", 9042, driver);
let direct_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await;

standard_test_suite(shotover_connection, driver).await;
protect::test(&shotover_connection().await, &direct_connection).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
Expand All @@ -376,9 +457,9 @@ async fn peers_rewrite_v4(#[case] driver: CassandraDriver) {
"tests/test-configs/cassandra-peers-rewrite/docker-compose-4.0-cassandra.yaml",
);

let _shotover_manager = ShotoverManager::from_topology_file(
"tests/test-configs/cassandra-peers-rewrite/topology.yaml",
);
let shotover =
shotover_from_topology_file("tests/test-configs/cassandra-peers-rewrite/topology.yaml")
.await;

let normal_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await;
let rewrite_port_connection = CassandraConnection::new("127.0.0.1", 9044, driver).await;
Expand Down Expand Up @@ -457,9 +538,10 @@ async fn peers_rewrite_v4(#[case] driver: CassandraDriver) {
.await;
assert_eq!(result[0][5], ResultValue::Int(9044));
}

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "cassandra-cpp-driver-tests")]
#[rstest]
//#[case::cdrs(CdrsTokio)] // Disabled due to intermittent failure that only occurs on v3
#[case::scylla(Scylla)]
Expand All @@ -471,9 +553,9 @@ async fn peers_rewrite_v3(#[case] driver: CassandraDriver) {
"tests/test-configs/cassandra-peers-rewrite/docker-compose-3.11-cassandra.yaml",
);

let _shotover_manager = ShotoverManager::from_topology_file(
"tests/test-configs/cassandra-peers-rewrite/topology.yaml",
);
let shotover =
shotover_from_topology_file("tests/test-configs/cassandra-peers-rewrite/topology.yaml")
.await;

let connection = CassandraConnection::new("127.0.0.1", 9044, driver).await;
// run some basic tests to confirm it works as normal
Expand All @@ -490,9 +572,10 @@ async fn peers_rewrite_v3(#[case] driver: CassandraDriver) {
message: "unconfigured table peers_v2".into()
})
);

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "cassandra-cpp-driver-tests")]
#[rstest]
//#[case::cdrs(CdrsTokio)] // TODO: cdrs-tokio seems to be sending extra messages triggering the rate limiter
#[case::scylla(Scylla)]
Expand All @@ -503,8 +586,8 @@ async fn request_throttling(#[case] driver: CassandraDriver) {
let _docker_compose =
DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yaml");

let _shotover_manager =
ShotoverManager::from_topology_file("tests/test-configs/cassandra-request-throttling.yaml");
let shotover =
shotover_from_topology_file("tests/test-configs/cassandra-request-throttling.yaml").await;

let connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;
std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window and not trigger the rate limiter with client's startup reqeusts
Expand Down Expand Up @@ -591,6 +674,8 @@ async fn request_throttling(#[case] driver: CassandraDriver) {
tokio::time::sleep(std::time::Duration::from_secs(1)).await; // sleep to reset the window

batch_statements::test(&connection).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
Expand All @@ -601,8 +686,8 @@ async fn events_keyspace(#[case] driver: CassandraDriver) {
let _docker_compose =
DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yaml");

let _shotover_manager =
ShotoverManager::from_topology_file("example-configs/cassandra-passthrough/topology.yaml");
let shotover =
shotover_from_topology_file("example-configs/cassandra-passthrough/topology.yaml").await;

let connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;

Expand All @@ -624,4 +709,6 @@ async fn events_keyspace(#[case] driver: CassandraDriver) {
options: SchemaChangeOptions::Keyspace("test_events_ks".to_string())
})
);

shotover.shutdown_and_then_consume_events(&[]).await;
}

0 comments on commit a81399b

Please sign in to comment.