Skip to content

Commit

Permalink
Initial kafka integration test (#1013)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 3, 2023
1 parent a6ea8be commit bf89a42
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 3 deletions.
54 changes: 54 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions shotover-proxy/Cargo.toml
Expand Up @@ -93,6 +93,7 @@ nix = "0.26.0"
reqwest = "0.11.6"
cdrs-tokio = { git = "https://github.com/krojew/cdrs-tokio", branch = "8.0-dev" }
rstest = "0.16.0"
rdkafka = { version = "0.29", features = ["cmake-build"] }

[[bench]]
name = "benches"
Expand Down
11 changes: 11 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
@@ -0,0 +1,11 @@
use serial_test::serial;
use test_helpers::docker_compose::DockerCompose;

mod test_cases;

#[tokio::test]
#[serial]
async fn basic() {
let _docker_compose = DockerCompose::new("tests/test-configs/kafka-simple/docker-compose.yaml");
test_cases::basic().await;
}
32 changes: 32 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
@@ -0,0 +1,32 @@
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

async fn produce(brokers: &str, topic_name: &str) {
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");

let delivery_status = producer
.send(
FutureRecord::to(topic_name)
.payload("Message")
.key("Key")
.headers(OwnedHeaders::new().insert(Header {
key: "header_key",
value: Some("header_value"),
})),
Duration::from_secs(0),
)
.await
.unwrap();

assert_eq!(delivery_status, (0, 0));
}

pub async fn basic() {
produce("localhost:9092", "foo").await;
}
5 changes: 3 additions & 2 deletions shotover-proxy/tests/lib.rs
@@ -1,6 +1,7 @@
mod cassandra_int_tests;
pub mod codec;
mod examples;
mod kafka_int_tests;
mod redis_int_tests;
pub mod runner;
pub mod transforms;
mod runner;
mod transforms;
19 changes: 19 additions & 0 deletions shotover-proxy/tests/test-configs/kafka-simple/docker-compose.yaml
@@ -0,0 +1,19 @@
version: "3"
services:
kafka:
image: 'bitnami/kafka:3.3.2'
ports:
- '9092:9092'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- type: tmpfs
target: /bitnami/kafka
1 change: 1 addition & 0 deletions test-helpers/Cargo.toml
Expand Up @@ -33,3 +33,4 @@ tokio-openssl = "0.6.2"
inferno = "0.11.13"
itertools = "0.10.1"
reqwest = "0.11.6"
tracing-subscriber = "0.3.16"
16 changes: 15 additions & 1 deletion test-helpers/src/docker_compose.rs
Expand Up @@ -9,6 +9,15 @@ use std::time::{self, Duration};
use std::{env, path::Path};
use subprocess::{Exec, Redirection};
use tracing::trace;
use tracing_subscriber::fmt::TestWriter;

fn setup_tracing_subscriber_for_test_logic() {
tracing_subscriber::fmt()
.with_writer(TestWriter::new())
.with_env_filter("warn")
.try_init()
.ok();
}

/// Runs a command and returns the output as a string.
///
Expand All @@ -17,7 +26,6 @@ use tracing::trace;
/// # Arguments
/// * `command` - The system command to run
/// * `args` - An array of command line arguments for the command
///
pub fn run_command(command: &str, args: &[&str]) -> Result<String> {
trace!("executing {}", command);
let data = Exec::cmd(command)
Expand Down Expand Up @@ -59,6 +67,8 @@ impl DockerCompose {
/// # Panics
/// * Will panic if docker-compose is not installed
pub fn new(file_path: &str) -> Self {
setup_tracing_subscriber_for_test_logic();

if let Err(ErrorKind::NotFound) = Command::new("docker-compose")
.output()
.map_err(|e| e.kind())
Expand Down Expand Up @@ -164,6 +174,10 @@ impl DockerCompose {
name: "shotover-int-tests/cassandra:3.11.13",
log_regex_to_wait_for: r"Startup complete",
},
Image {
name: "bitnami/kafka:3.3.2",
log_regex_to_wait_for: r"Kafka Server started",
},
];

let services: Vec<Service> = self
Expand Down

0 comments on commit bf89a42

Please sign in to comment.