diff --git a/Cargo.toml b/Cargo.toml index 19e2f52..1d27c40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,6 @@ env_logger = "0.11" rdkafka = { version = "0.36", features = ["cmake-build"] } rusty_ulid = "2.0" tokio = { version = "1", features = ["rt", "macros"]} -ctor = { version = "0.2"} \ No newline at end of file +ctor = { version = "0.2" } +serial_test = { version = "3.1" } +reqwest = { version = "0.12" } \ No newline at end of file diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 42e3fbf..7447f67 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,40 +2,20 @@ //! # Common For tests //! -//#![allow(dead_code)] - -use rdkafka::client::ClientContext; use rdkafka::config::ClientConfig; -use rdkafka::consumer::ConsumerContext; -use rdkafka::error::KafkaResult; use rdkafka::message::ToBytes; use rdkafka::producer::{FutureProducer, FutureRecord}; -use rdkafka::statistics::Statistics; -use rdkafka::TopicPartitionList; use std::collections::HashMap; -use std::env; - use std::time::Duration; /// Taken from https://github.com/fede1024/rust-rdkafka/blob/master/tests/utils.rs with some slight modifications and updates /// credit to rdkafka -pub fn get_bootstrap_server() -> String { - env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned()) -} - -pub struct ProducerTestContext { - _some_data: i64, -} - -impl ClientContext for ProducerTestContext { - fn stats(&self, _: Statistics) {} // Don't print stats -} - /// Produce the specified count of messages to the topic and partition specified. A map /// of (partition, offset) -> message id will be returned. It panics if any error is encountered /// while populating the topic. pub async fn populate_topic( + bootstrap_server: &str, topic_name: &str, count: i32, value_fn: &P, @@ -49,16 +29,14 @@ where J: ToBytes, Q: ToBytes, { - let prod_context = ProducerTestContext { _some_data: 1234 }; - // Produce some messages let producer = &ClientConfig::new() - .set("bootstrap.servers", get_bootstrap_server().as_str()) + .set("bootstrap.servers", bootstrap_server) .set("statistics.interval.ms", "500") .set("api.version.request", "true") //.set("debug", "all") .set("message.timeout.ms", "10000") - .create_with_context::>(prod_context) + .create::>() .expect("Producer creation error"); let futures = (0..count) @@ -105,24 +83,6 @@ pub fn random_topic_name() -> String { rusty_ulid::generate_ulid_string() } -pub struct ConsumerTestContext { - pub _n: i64, // Add data for memory access validation -} - -impl ClientContext for ConsumerTestContext { - // Access stats - fn stats(&self, stats: Statistics) { - let stats_str = format!("{:?}", stats); - log::info!("Stats received: {} bytes", stats_str.len()); - } -} - -impl ConsumerContext for ConsumerTestContext { - fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { - log::info!("Committing offsets: {:?}", result); - } -} - #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/tests/create_topic.rs b/tests/create_topic.rs deleted file mode 100644 index 5ae4fcf..0000000 --- a/tests/create_topic.rs +++ /dev/null @@ -1,27 +0,0 @@ -mod common; - -#[cfg(test)] -mod test { - use crate::common::*; - use log::info; - use testcontainers_redpanda_rs::*; - - #[tokio::test] - async fn should_start_redpanda_server_crate_topic_send_messages_to_partition() { - let container = Redpanda::latest(); - - let server_node = container.start().await; - let bootstrap_servers = format!("localhost:{}", server_node.get_host_port_ipv4(REDPANDA_PORT).await); - - // if topic has only one partition this part is optional - // it will be automatically created when client connects - let test_topic_name = &random_topic_name(); - server_node.exec(Redpanda::cmd_create_topic(test_topic_name, 3)).await; - - info!("bootstrap servers: {}", bootstrap_servers); - std::env::set_var("KAFKA_HOST", &bootstrap_servers); - - log::info!("populating topic: [{}] ...", test_topic_name); - populate_topic(&test_topic_name, 10, &value_fn, &key_fn, Some(2), None).await; - } -} diff --git a/tests/redpanda.rs b/tests/redpanda.rs deleted file mode 100644 index 76194c0..0000000 --- a/tests/redpanda.rs +++ /dev/null @@ -1,29 +0,0 @@ -mod common; -#[cfg(test)] -mod test { - use crate::common::*; - use log::info; - use testcontainers_redpanda_rs::*; - - #[tokio::test] - async fn should_start_redpanda_server_send_messages() { - let container = Redpanda::latest(); - - let server_node = container.start().await; - - let bootstrap_servers = format!( - "{}:{}", - "localhost", - server_node.get_host_port_ipv4(REDPANDA_PORT).await - ); - - info!("bootstrap servers: {}", bootstrap_servers); - std::env::set_var("KAFKA_HOST", &bootstrap_servers); - - assert!(bootstrap_servers.len() > 10); - let test_topic_name = random_topic_name(); - - log::info!("populating topic: [{}] ...", test_topic_name); - populate_topic(&test_topic_name, 10, &value_fn, &key_fn, Some(0), None).await; - } -} diff --git a/tests/verification.rs b/tests/verification.rs new file mode 100644 index 0000000..116787c --- /dev/null +++ b/tests/verification.rs @@ -0,0 +1,80 @@ +mod common; +#[cfg(test)] +mod test { + use crate::common::*; + use testcontainers_redpanda_rs::*; + + #[tokio::test] + #[serial_test::serial] + async fn should_start_redpanda_server_send_messages() { + let container = Redpanda::latest(); + + let instance = container.start().await; + let bootstrap_servers = format!("localhost:{}", instance.get_host_port_ipv4(REDPANDA_PORT).await); + log::info!("bootstrap servers: {}", bootstrap_servers); + + let test_topic_name = random_topic_name(); + log::info!("populating topic: [{}] ...", test_topic_name); + populate_topic(&bootstrap_servers, &test_topic_name, 10, &value_fn, &key_fn, None, None).await; + } + + #[tokio::test] + #[serial_test::serial] + async fn should_start_redpanda_server_crate_topic_send_messages_to_partition() { + let container = Redpanda::latest(); + + let instance = container.start().await; + let bootstrap_servers = format!("localhost:{}", instance.get_host_port_ipv4(REDPANDA_PORT).await); + + // if topic has only one partition this part is optional + // it will be automatically created when client connects + let test_topic_name = &random_topic_name(); + log::info!("creating topic: [{}] ...", test_topic_name); + instance.exec(Redpanda::cmd_create_topic(test_topic_name, 3)).await; + + log::info!("bootstrap servers: {}", bootstrap_servers); + + log::info!("populating topic: [{}] ...", test_topic_name); + populate_topic( + &bootstrap_servers, + &test_topic_name, + 10, + &value_fn, + &key_fn, + Some(2), + None, + ) + .await; + } + + #[tokio::test] + #[serial_test::serial] + async fn should_expose_admin_api() { + let container = Redpanda::latest(); + + let instance = container.start().await; + let address_admin_api = format!("http://localhost:{}/v1", instance.get_host_port_ipv4(ADMIN_PORT).await); + + let response = reqwest::get(address_admin_api).await.expect("admin http response"); + + assert_eq!(200, response.status().as_u16()); + } + + #[tokio::test] + #[serial_test::serial] + async fn should_expose_schema_registry_api() { + let container = Redpanda::latest(); + + let instance = container.start().await; + let address_schema_registry = format!( + "http://localhost:{}/v1", + instance.get_host_port_ipv4(SCHEMA_REGISTRY_PORT).await + ); + + let response = reqwest::get(address_schema_registry) + .await + .expect("admin http response"); + + assert_eq!(200, response.status().as_u16()); + } +}