Skip to content

Commit

Permalink
Give ShotoverProcess powerful event assertions by parsing JSON events
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 13, 2022
1 parent 8db5a4c commit 7c820f6
Show file tree
Hide file tree
Showing 15 changed files with 783 additions and 181 deletions.
21 changes: 20 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -2,6 +2,7 @@
members = [
"shotover-proxy",
"test-helpers",
"tokio-bin-process",
]

# https://deterministic.space/high-performance-rust.html
Expand Down
11 changes: 6 additions & 5 deletions shotover-proxy/examples/cassandra_bench.rs
@@ -1,6 +1,6 @@
use clap::Parser;
use test_helpers::docker_compose::DockerCompose;
use test_helpers::shotover_process::ShotoverProcess;
use test_helpers::shotover_process::shotover_from_topology_file;

/// e.g.
/// cargo run --release --example cassandra_bench -- --config-dir example-configs/cassandra-passthrough -r 1000
Expand All @@ -16,7 +16,8 @@ pub struct Args {
pub rate: u64,
}

fn main() {
#[tokio::main]
async fn main() {
if env!("PROFILE") != "release" {
println!("Need to run with --release flag");
return;
Expand All @@ -33,15 +34,15 @@ fn main() {
let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", args.config_dir));

// Uses ShotoverProcess instead of ShotoverManager for a more accurate benchmark
let shotover_manager =
ShotoverProcess::from_topology_file(&format!("{}/topology.yaml", args.config_dir));
let shotover =
shotover_from_topology_file(&format!("{}/topology.yaml", args.config_dir)).await;

println!("Benching Shotover ...");
bench_read(&latte, "localhost:9043", "localhost:9042");
println!("Benching Direct Cassandra ...");
bench_read(&latte, "localhost:9043", "localhost:9043");

shotover_manager.shutdown_and_assert_success();
shotover.shutdown_and_then_consume_events(&[]).await;
}

println!("Direct Cassandra (A) vs Shotover (B)");
Expand Down
14 changes: 11 additions & 3 deletions shotover-proxy/src/runner.rs
Expand Up @@ -120,10 +120,15 @@ impl Runner {
pub fn run_block(self) -> Result<()> {
let (trigger_shutdown_tx, trigger_shutdown_rx) = watch::channel(false);

// We need to block on this part to ensure that we immediately register these signals.
// Otherwise if we included signal creation in the below spawned task we would be at the mercy of whenever tokio decides to start running the task.
let (mut interrupt, mut terminate) = self.runtime_handle.block_on(async {
(
signal(SignalKind::interrupt()).unwrap(),
signal(SignalKind::terminate()).unwrap(),
)
});
self.runtime_handle.spawn(async move {
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
let mut terminate = signal(SignalKind::terminate()).unwrap();

tokio::select! {
_ = interrupt.recv() => {
info!("received SIGINT");
Expand Down Expand Up @@ -223,6 +228,9 @@ impl TracingState {
handle
}
};
if let LogFormat::Json = format {
crate::tracing_panic_handler::setup();
}

// When in json mode we need to process panics as events instead of printing directly to stdout.
// This is so that:
Expand Down
7 changes: 5 additions & 2 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Expand Up @@ -18,6 +18,7 @@ use metrics_util::debugging::DebuggingRecorder;
use rstest::rstest;
use serial_test::serial;
use test_helpers::docker_compose::DockerCompose;
use test_helpers::shotover_process::shotover_from_topology_file;
use tokio::time::{sleep, timeout, Duration};

mod batch_statements;
Expand Down Expand Up @@ -64,12 +65,14 @@ where
async fn passthrough_standard(#[case] driver: CassandraDriver) {
let _compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yaml");

let _shotover_manager =
ShotoverManager::from_topology_file("example-configs/cassandra-passthrough/topology.yaml");
let shotover =
shotover_from_topology_file("example-configs/cassandra-passthrough/topology.yaml").await;

let connection = || CassandraConnection::new("127.0.0.1", 9042, driver);

standard_test_suite(&connection, driver).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "alpha-transforms")]
Expand Down
132 changes: 87 additions & 45 deletions shotover-proxy/tests/runner/runner_int_tests.rs
@@ -1,8 +1,10 @@
use crate::helpers::ShotoverManager;
use serial_test::serial;
use std::any::Any;

use crate::helpers::ShotoverManager;
use test_helpers::shotover_process::ShotoverProcess;
use test_helpers::shotover_process::{
shotover_from_topology_file, shotover_from_topology_file_fail_to_startup, Count, EventMatcher,
Level,
};

#[tokio::test(flavor = "multi_thread")]
#[serial]
Expand All @@ -28,68 +30,108 @@ fn test_runtime_create() {
assert!(shotover_manager.runtime.is_some());
}

#[test]
#[tokio::test]
#[serial]
fn test_early_shutdown_cassandra_source() {
std::mem::drop(ShotoverManager::from_topology_file(
"example-configs/null-cassandra/topology.yaml",
));
async fn test_early_shutdown_cassandra_source() {
shotover_from_topology_file("example-configs/null-cassandra/topology.yaml")
.await
.shutdown_and_then_consume_events(&[])
.await;
}

#[test]
#[tokio::test]
#[serial]
fn test_shotover_responds_sigterm() {
let shotover_process =
ShotoverProcess::from_topology_file("example-configs/null-redis/topology.yaml");
shotover_process.signal(nix::sys::signal::Signal::SIGTERM);
async fn test_shotover_responds_sigterm() {
// Ensure it isnt reliant on timing
for _ in 0..1000 {
let shotover_process =
shotover_from_topology_file("example-configs/null-redis/topology.yaml").await;
shotover_process.signal(nix::sys::signal::Signal::SIGTERM);

let wait_output = shotover_process.wait();
assert_eq!(wait_output.exit_code, 0);
if !wait_output.stdout.contains("received SIGTERM") {
panic!(
"stdout does not contain 'received SIGTERM'. Instead was: {}",
wait_output.stdout
let events = shotover_process.consume_remaining_events(&[]).await;
events.contains(
&EventMatcher::new()
.with_level(Level::Info)
.with_target("shotover_proxy::runner")
.with_message("received SIGTERM"),
);
}
}

#[test]
#[tokio::test]
#[serial]
fn test_shotover_responds_sigint() {
async fn test_shotover_responds_sigint() {
let shotover_process =
ShotoverProcess::from_topology_file("example-configs/null-redis/topology.yaml");
shotover_from_topology_file("example-configs/null-redis/topology.yaml").await;
shotover_process.signal(nix::sys::signal::Signal::SIGINT);

let wait_output = shotover_process.wait();
assert_eq!(wait_output.exit_code, 0);
if !wait_output.stdout.contains("received SIGINT") {
panic!(
"stdout does not contain 'received SIGINT'. Instead was: {}",
wait_output.stdout
);
}
let events = shotover_process.consume_remaining_events(&[]).await;
events.contains(
&EventMatcher::new()
.with_level(Level::Info)
.with_target("shotover_proxy::runner")
.with_message("received SIGINT"),
);
}

#[test]
#[should_panic]
#[tokio::test]
#[serial]
fn test_shotover_shutdown_when_invalid_topology_non_terminating_last() {
let _shotover_manager =
ShotoverManager::from_topology_file("tests/test-configs/invalid_non_terminating_last.yaml");
async fn test_shotover_shutdown_when_invalid_topology_non_terminating_last() {
shotover_from_topology_file_fail_to_startup(
"tests/test-configs/invalid_non_terminating_last.yaml",
&[],
)
.await;
}

#[test]
#[should_panic]
#[tokio::test]
#[serial]
fn test_shotover_shutdown_when_invalid_topology_terminating_not_last() {
let _shotover_manager =
ShotoverManager::from_topology_file("tests/test-configs/invalid_terminating_not_last.yaml");
async fn test_shotover_shutdown_when_invalid_topology_terminating_not_last() {
shotover_from_topology_file_fail_to_startup(
"tests/test-configs/invalid_terminating_not_last.yaml",
&[],
)
.await;
}

#[test]
#[should_panic]
#[tokio::test]
#[serial]
fn test_shotover_shutdown_when_topology_invalid_topology_subchains() {
let _shotover_manager =
ShotoverManager::from_topology_file("tests/test-configs/invalid_subchains.yaml");
async fn test_shotover_shutdown_when_topology_invalid_topology_subchains() {
shotover_from_topology_file_fail_to_startup(
"tests/test-configs/invalid_subchains.yaml",
&[
EventMatcher::new().with_level(Level::Error)
.with_target("shotover_proxy::runner")
.with_message(r#"Topology errors
a_first_chain:
Terminating transform "Null" is not last in chain. Terminating transform must be last in chain.
Terminating transform "Null" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_second_chain:
ConsistentScatter:
a_chain_1:
Terminating transform "Null" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_chain_2:
Terminating transform "Null" is not last in chain. Terminating transform must be last in chain.
c_chain_3:
ConsistentScatter:
sub_chain_2:
Terminating transform "Null" is not last in chain. Terminating transform must be last in chain.
"#),
EventMatcher::new().with_level(Level::Warn)
.with_target("shotover_proxy::transforms::distributed::consistent_scatter")
.with_message("Using this transform is considered unstable - Does not work with REDIS pipelines")
.with_count(Count::Times(2)),
// TODO: Investigate these
EventMatcher::new().with_level(Level::Error)
.with_message("failed response Couldn't send message to wrapped chain SendError(BufferedChainMessages { local_addr: 127.0.0.1:10000, messages: [], return_chan: Some(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: false, is_tx_task_set: false } }) }) })")
.with_count(Count::Any),
EventMatcher::new().with_level(Level::Error)
.with_target("shotover_proxy::transforms::distributed::consistent_scatter")
.with_message("failed response channel closed")
.with_count(Count::Any),
],
)
.await;
}
3 changes: 2 additions & 1 deletion test-helpers/Cargo.toml
Expand Up @@ -11,7 +11,8 @@ license = "Apache-2.0"
tracing = "0.1.15"
subprocess = "0.2.7"
anyhow = "1.0.42"
nix = "0.26.0"
rcgen = "0.10.0"
serde_yaml = "0.9.0"
regex = "1.7.0"
tokio = { version = "1.21.1", features = ["full", "macros"] }
tokio-bin-process = { path = "../tokio-bin-process" }
5 changes: 5 additions & 0 deletions test-helpers/src/lib.rs
@@ -1,3 +1,8 @@
#![allow(clippy::derive_partial_eq_without_eq)]
// Accidentally printing would break json log output
#![deny(clippy::print_stdout)]
#![deny(clippy::print_stderr)]

pub mod cert;
pub mod docker_compose;
pub mod lazy;
Expand Down

0 comments on commit 7c820f6

Please sign in to comment.