Skip to content

Commit

Permalink
update tests to cover HTTP APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Apr 30, 2024
1 parent 6305727 commit b73e349
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 100 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ env_logger = "0.11"
rdkafka = { version = "0.36", features = ["cmake-build"] }
rusty_ulid = "2.0"
tokio = { version = "1", features = ["rt", "macros"]}
ctor = { version = "0.2"}
ctor = { version = "0.2" }
serial_test = { version = "3.1" }
reqwest = { version = "0.12" }
46 changes: 3 additions & 43 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,20 @@
//! # Common For tests
//!

//#![allow(dead_code)]

use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::ConsumerContext;
use rdkafka::error::KafkaResult;
use rdkafka::message::ToBytes;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::statistics::Statistics;
use rdkafka::TopicPartitionList;
use std::collections::HashMap;
use std::env;

use std::time::Duration;

/// Taken from https://github.com/fede1024/rust-rdkafka/blob/master/tests/utils.rs with some slight modifications and updates
/// credit to rdkafka

pub fn get_bootstrap_server() -> String {
env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned())
}

pub struct ProducerTestContext {
_some_data: i64,
}

impl ClientContext for ProducerTestContext {
fn stats(&self, _: Statistics) {} // Don't print stats
}

/// Produce the specified count of messages to the topic and partition specified. A map
/// of (partition, offset) -> message id will be returned. It panics if any error is encountered
/// while populating the topic.
pub async fn populate_topic<P, K, J, Q>(
bootstrap_server: &str,
topic_name: &str,
count: i32,
value_fn: &P,
Expand All @@ -49,16 +29,14 @@ where
J: ToBytes,
Q: ToBytes,
{
let prod_context = ProducerTestContext { _some_data: 1234 };

// Produce some messages
let producer = &ClientConfig::new()
.set("bootstrap.servers", get_bootstrap_server().as_str())
.set("bootstrap.servers", bootstrap_server)
.set("statistics.interval.ms", "500")
.set("api.version.request", "true")
//.set("debug", "all")
.set("message.timeout.ms", "10000")
.create_with_context::<ProducerTestContext, FutureProducer<_>>(prod_context)
.create::<FutureProducer<_>>()
.expect("Producer creation error");

let futures = (0..count)
Expand Down Expand Up @@ -105,24 +83,6 @@ pub fn random_topic_name() -> String {
rusty_ulid::generate_ulid_string()
}

pub struct ConsumerTestContext {
pub _n: i64, // Add data for memory access validation
}

impl ClientContext for ConsumerTestContext {
// Access stats
fn stats(&self, stats: Statistics) {
let stats_str = format!("{:?}", stats);
log::info!("Stats received: {} bytes", stats_str.len());
}
}

impl ConsumerContext for ConsumerTestContext {
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
log::info!("Committing offsets: {:?}", result);
}
}

#[cfg(test)]
#[ctor::ctor]
fn init() {
Expand Down
27 changes: 0 additions & 27 deletions tests/create_topic.rs

This file was deleted.

29 changes: 0 additions & 29 deletions tests/redpanda.rs

This file was deleted.

80 changes: 80 additions & 0 deletions tests/verification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
mod common;
#[cfg(test)]
mod test {
use crate::common::*;
use testcontainers_redpanda_rs::*;

#[tokio::test]
#[serial_test::serial]
async fn should_start_redpanda_server_send_messages() {
let container = Redpanda::latest();

let instance = container.start().await;
let bootstrap_servers = format!("localhost:{}", instance.get_host_port_ipv4(REDPANDA_PORT).await);
log::info!("bootstrap servers: {}", bootstrap_servers);

let test_topic_name = random_topic_name();
log::info!("populating topic: [{}] ...", test_topic_name);
populate_topic(&bootstrap_servers, &test_topic_name, 10, &value_fn, &key_fn, None, None).await;
}

#[tokio::test]
#[serial_test::serial]
async fn should_start_redpanda_server_crate_topic_send_messages_to_partition() {
let container = Redpanda::latest();

let instance = container.start().await;
let bootstrap_servers = format!("localhost:{}", instance.get_host_port_ipv4(REDPANDA_PORT).await);

// if topic has only one partition this part is optional
// it will be automatically created when client connects
let test_topic_name = &random_topic_name();
log::info!("creating topic: [{}] ...", test_topic_name);
instance.exec(Redpanda::cmd_create_topic(test_topic_name, 3)).await;

log::info!("bootstrap servers: {}", bootstrap_servers);

log::info!("populating topic: [{}] ...", test_topic_name);
populate_topic(
&bootstrap_servers,
&test_topic_name,
10,
&value_fn,
&key_fn,
Some(2),
None,
)
.await;
}

#[tokio::test]
#[serial_test::serial]
async fn should_expose_admin_api() {
let container = Redpanda::latest();

let instance = container.start().await;
let address_admin_api = format!("http://localhost:{}/v1", instance.get_host_port_ipv4(ADMIN_PORT).await);

let response = reqwest::get(address_admin_api).await.expect("admin http response");

assert_eq!(200, response.status().as_u16());
}

#[tokio::test]
#[serial_test::serial]
async fn should_expose_schema_registry_api() {
let container = Redpanda::latest();

let instance = container.start().await;
let address_schema_registry = format!(
"http://localhost:{}/v1",
instance.get_host_port_ipv4(SCHEMA_REGISTRY_PORT).await
);

let response = reqwest::get(address_schema_registry)
.await
.expect("admin http response");

assert_eq!(200, response.status().as_u16());
}
}

0 comments on commit b73e349

Please sign in to comment.