From 6f2834e19612e6ba77d218d0a3eb54165fe13157 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 17 Mar 2023 14:46:56 +1100 Subject: [PATCH] Add benchmarks for kafka (#1085) --- shotover-proxy/examples/kafka_bench.rs | 27 +++++++++++++++++ shotover-proxy/examples/kafka_flamegraph.rs | 32 ++++++++++++++++++++ test-helpers/src/kafka_producer_perf_test.rs | 19 ++++++++++++ test-helpers/src/latte.rs | 14 ++------- test-helpers/src/lib.rs | 12 ++++++++ 5 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 shotover-proxy/examples/kafka_bench.rs create mode 100644 shotover-proxy/examples/kafka_flamegraph.rs create mode 100644 test-helpers/src/kafka_producer_perf_test.rs diff --git a/shotover-proxy/examples/kafka_bench.rs b/shotover-proxy/examples/kafka_bench.rs new file mode 100644 index 000000000..99e2f56d1 --- /dev/null +++ b/shotover-proxy/examples/kafka_bench.rs @@ -0,0 +1,27 @@ +use test_helpers::docker_compose::DockerCompose; +use test_helpers::kafka_producer_perf_test::run_producer_bench; +use test_helpers::shotover_process::ShotoverProcessBuilder; + +#[tokio::main] +async fn main() { + test_helpers::bench::init(); + + let config_dir = "tests/test-configs/kafka/passthrough"; + { + let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", config_dir)); + let shotover = + ShotoverProcessBuilder::new_with_topology(&format!("{}/topology.yaml", config_dir)) + .start() + .await; + + println!("Benching Shotover ..."); + run_producer_bench("[localhost]:9192"); + + shotover.shutdown_and_then_consume_events(&[]).await; + } + + // restart the docker container to avoid running out of disk space + let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", config_dir)); + println!("\nBenching Direct Kafka ..."); + run_producer_bench("[localhost]:9092"); +} diff --git a/shotover-proxy/examples/kafka_flamegraph.rs b/shotover-proxy/examples/kafka_flamegraph.rs new file mode 100644 index 000000000..76c2b6a1b --- /dev/null +++ b/shotover-proxy/examples/kafka_flamegraph.rs @@ -0,0 +1,32 @@ +use test_helpers::docker_compose::DockerCompose; +use test_helpers::flamegraph::Perf; +use test_helpers::kafka_producer_perf_test::run_producer_bench; +use test_helpers::shotover_process::ShotoverProcessBuilder; + +// To get useful results you will need to modify the Cargo.toml like: +// [profile.release] +// #lto = "fat" +// codegen-units = 1 +// debug = true + +#[tokio::main] +async fn main() { + test_helpers::bench::init(); + let config_dir = "tests/test-configs/kafka/passthrough"; + { + let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", config_dir)); + + let shotover = + ShotoverProcessBuilder::new_with_topology(&format!("{}/topology.yaml", config_dir)) + .start() + .await; + + let perf = Perf::new(shotover.child.as_ref().unwrap().id().unwrap()); + + println!("Benching Shotover ..."); + run_producer_bench("[localhost]:9192"); + + shotover.shutdown_and_then_consume_events(&[]).await; + perf.flamegraph(); + } +} diff --git a/test-helpers/src/kafka_producer_perf_test.rs b/test-helpers/src/kafka_producer_perf_test.rs new file mode 100644 index 000000000..1ff87fc3d --- /dev/null +++ b/test-helpers/src/kafka_producer_perf_test.rs @@ -0,0 +1,19 @@ +use crate::run_command_to_stdout; + +pub fn run_producer_bench(address_bench: &str) { + run_command_to_stdout( + "kafka-producer-perf-test.sh", + &[ + "--producer-props", + &format!("bootstrap.servers={address_bench}"), + "--record-size", + "1000", + "--throughput", + "-1", + "--num-records", + "5000000", + "--topic", + "foo", + ], + ); +} diff --git a/test-helpers/src/latte.rs b/test-helpers/src/latte.rs index c6179724f..1f8a1226e 100644 --- a/test-helpers/src/latte.rs +++ b/test-helpers/src/latte.rs @@ -1,3 +1,5 @@ +use crate::run_command_to_stdout; + // TODO: Shelling out directly like this is just for experimenting. // Either: // * get access to latte as a crate @@ -85,15 +87,3 @@ impl Latte { run_command_to_stdout("latte", &["show", file_b, "-b", file_a]); } } - -/// unlike crate::docker_compose::run_command stdout of the command is sent to the stdout of the application -fn run_command_to_stdout(command: &str, args: &[&str]) { - assert!( - std::process::Command::new(command) - .args(args) - .status() - .unwrap() - .success(), - "Failed to run: {command} {args:?}" - ); -} diff --git a/test-helpers/src/lib.rs b/test-helpers/src/lib.rs index e7db9dc29..5d7180b9e 100644 --- a/test-helpers/src/lib.rs +++ b/test-helpers/src/lib.rs @@ -3,6 +3,7 @@ pub mod cert; pub mod connection; pub mod docker_compose; pub mod flamegraph; +pub mod kafka_producer_perf_test; pub mod latte; pub mod lazy; pub mod metrics; @@ -26,3 +27,14 @@ pub fn try_wait_for_socket_to_open(address: &str, port: u16) -> Result<()> { } Ok(()) } + +fn run_command_to_stdout(command: &str, args: &[&str]) { + assert!( + std::process::Command::new(command) + .args(args) + .status() + .unwrap() + .success(), + "Failed to run: {command} {args:?}" + ); +}