Skip to content

Commit

Permalink
code and doc cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Apr 30, 2024
1 parent d966806 commit 6305727
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 64 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

Unofficial testcontainer for [Redpanda](https://redpanda.com).

> [!NOTE]
>
> - version 0.2.x supports `testcontainer` 0.16
> - version 0.1.x supports `testcontainer` 0.15
Note:

- version `0.2.x` supports `testcontainer` `0.16`
- version `0.1.x` supports `testcontainer` `0.15`

Add dependency:

Expand Down Expand Up @@ -38,5 +38,6 @@ async fn main() {
}
```

> [!WARNING]
> It will use default kafka ports and only one test can at any time on given host.
Limitations:

- It will use default kafka ports and only single test can run on given host.
58 changes: 2 additions & 56 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
//! # Common For tests
//!

#![allow(dead_code)]
//#![allow(dead_code)]

use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::ConsumerContext;
Expand All @@ -21,18 +20,6 @@ use std::time::Duration;
/// Taken from https://github.com/fede1024/rust-rdkafka/blob/master/tests/utils.rs with some slight modifications and updates
/// credit to rdkafka

pub fn rand_test_topic() -> String {
format!("__test_{}", random_topic_name())
}

pub fn rand_test_group() -> String {
format!("__test_{}", random_topic_name())
}

pub fn rand_test_transactional_id() -> String {
format!("__test_{}", random_topic_name())
}

pub fn get_bootstrap_server() -> String {
env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned())
}
Expand All @@ -45,21 +32,6 @@ impl ClientContext for ProducerTestContext {
fn stats(&self, _: Statistics) {} // Don't print stats
}

pub async fn create_topic(name: &str, partitions: i32) {
let client: AdminClient<_> = consumer_config("create_topic", None)
.into_iter()
.collect::<ClientConfig>()
.create()
.unwrap();
client
.create_topics(
&[NewTopic::new(name, partitions, TopicReplication::Fixed(1))],
&AdminOptions::new(),
)
.await
.unwrap();
}

/// Produce the specified count of messages to the topic and partition specified. A map
/// of (partition, offset) -> message id will be returned. It panics if any error is encountered
/// while populating the topic.
Expand All @@ -84,7 +56,7 @@ where
.set("bootstrap.servers", get_bootstrap_server().as_str())
.set("statistics.interval.ms", "500")
.set("api.version.request", "true")
.set("debug", "all")
//.set("debug", "all")
.set("message.timeout.ms", "10000")
.create_with_context::<ProducerTestContext, FutureProducer<_>>(prod_context)
.expect("Producer creation error");
Expand Down Expand Up @@ -151,32 +123,6 @@ impl ConsumerContext for ConsumerTestContext {
}
}

pub fn consumer_config<'a>(
group_id: &'a str,
config_overrides: Option<HashMap<&'a str, &'a str>>,
) -> HashMap<String, String> {
let mut config: HashMap<String, String> = HashMap::new();

config.insert("group.id".into(), group_id.into());
config.insert("client.id".into(), "datafusion-streams".into());
config.insert("bootstrap.servers".into(), get_bootstrap_server());
config.insert("enable.partition.eof".into(), "true".into());
config.insert("session.timeout.ms".into(), "6000".into());
config.insert("enable.auto.commit".into(), "true".into());
config.insert("statistics.interval.ms".into(), "500".into());
config.insert("api.version.request".into(), "true".into());
config.insert("debug".into(), "all".into());
config.insert("auto.offset.reset".into(), "earliest".into());

if let Some(overrides) = config_overrides {
for (key, value) in overrides {
config.insert(key.into(), value.into());
}
}

config
}

#[cfg(test)]
#[ctor::ctor]
fn init() {
Expand Down
2 changes: 1 addition & 1 deletion tests/create_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod test {

// if topic has only one partition this part is optional
// it will be automatically created when client connects
let test_topic_name = "test_topic";
let test_topic_name = &random_topic_name();
server_node.exec(Redpanda::cmd_create_topic(test_topic_name, 3)).await;

info!("bootstrap servers: {}", bootstrap_servers);
Expand Down
2 changes: 1 addition & 1 deletion tests/redpanda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod test {
std::env::set_var("KAFKA_HOST", &bootstrap_servers);

assert!(bootstrap_servers.len() > 10);
let test_topic_name = "test_topic";
let test_topic_name = random_topic_name();

log::info!("populating topic: [{}] ...", test_topic_name);
populate_topic(&test_topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
Expand Down

0 comments on commit 6305727

Please sign in to comment.