Skip to content

Commit

Permalink
Refactor tests (#98)
Browse files Browse the repository at this point in the history
- reuse the same broker across all tests
- randomize topic names to avoid conflicts
- never interact with the broker directly, so that we can replace the broker method later (e.g. change to Testcontainers)
  • Loading branch information
bsideup committed Jun 14, 2019
1 parent c593791 commit 339777e
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 206 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -9,3 +9,4 @@ bin/
.DS_Store
*.iml

out/
Expand Up @@ -36,7 +36,6 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,15 +63,8 @@
public class SampleScenariosTest extends AbstractKafkaTest {
private static final Logger log = LoggerFactory.getLogger(SampleScenariosTest.class.getName());

private String bootstrapServers;
private List<Disposable> disposables = new ArrayList<>();

@Before
public void setUp() throws Exception {
super.setUp();
bootstrapServers = embeddedKafka.bootstrapServers();
}

@After
public void tearDown() {
for (Disposable disposable : disposables)
Expand All @@ -84,7 +76,7 @@ public void kafkaSink() throws Exception {
List<Person> expected = new ArrayList<>();
List<Person> received = new ArrayList<>();
subscribeToDestTopic("test-group", topic, received);
KafkaSink sink = new KafkaSink(bootstrapServers, topic);
KafkaSink sink = new KafkaSink(bootstrapServers(), topic);
sink.source(createTestSource(10, expected));
sink.runScenario();
waitForMessages(expected, received);
Expand All @@ -95,13 +87,10 @@ public void kafkaSinkChain() throws Exception {
List<Person> expected = new ArrayList<>();
List<Person> received1 = new ArrayList<>();
List<Person> received2 = new ArrayList<>();
String topic1 = "testtopic1";
String topic2 = "testtopic2";
createNewTopic(topic1, partitions);
createNewTopic(topic2, partitions);
subscribeToDestTopic("test-group", topic1, received1);
String topic2 = createNewTopic();
subscribeToDestTopic("test-group", topic, received1);
subscribeToDestTopic("test-group", topic2, received2);
KafkaSinkChain sinkChain = new KafkaSinkChain(bootstrapServers, topic1, topic2);
KafkaSinkChain sinkChain = new KafkaSinkChain(bootstrapServers(), topic, topic2);
sinkChain.source(createTestSource(10, expected));
sinkChain.runScenario();
waitForMessages(expected, received1);
Expand All @@ -115,7 +104,7 @@ public void kafkaSinkChain() throws Exception {
public void kafkaSource() throws Exception {
List<Person> expected = new CopyOnWriteArrayList<>();
List<Person> received = new CopyOnWriteArrayList<>();
KafkaSource source = new KafkaSource(bootstrapServers, topic) {
KafkaSource source = new KafkaSource(bootstrapServers(), topic) {
public Mono<Void> storeInDB(Person person) {
received.add(person);
return Mono.empty();
Expand All @@ -134,9 +123,8 @@ public void kafkaTransform() throws Exception {
List<Person> expected = new ArrayList<>();
List<Person> received = new ArrayList<>();
String sourceTopic = topic;
String destTopic = "testtopic2";
createNewTopic(destTopic, partitions);
KafkaTransform flow = new KafkaTransform(bootstrapServers, sourceTopic, destTopic) {
String destTopic = createNewTopic();
KafkaTransform flow = new KafkaTransform(bootstrapServers(), sourceTopic, destTopic) {
public ReceiverOptions<Integer, Person> receiverOptions() {
return super.receiverOptions().consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
Expand All @@ -154,9 +142,8 @@ public void atmostOnce() throws Exception {
List<Person> expected = new ArrayList<>();
List<Person> received = new ArrayList<>();
String sourceTopic = topic;
String destTopic = "testtopic2";
createNewTopic(destTopic, partitions);
AtmostOnce flow = new AtmostOnce(bootstrapServers, sourceTopic, destTopic) {
String destTopic = createNewTopic();
AtmostOnce flow = new AtmostOnce(bootstrapServers(), sourceTopic, destTopic) {
public ReceiverOptions<Integer, Person> receiverOptions() {
return super.receiverOptions().consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
Expand All @@ -174,13 +161,11 @@ public void transactionalSend() throws Exception {
List<Person> expected1 = new ArrayList<>();
List<Person> received1 = new ArrayList<>();
List<Person> received2 = new ArrayList<>();
String destTopic1 = "desttopic1";
createNewTopic(destTopic1, partitions);
String destTopic2 = "desttopic2";
createNewTopic(destTopic2, partitions);
String destTopic1 = topic;
String destTopic2 = createNewTopic();
subscribeToDestTopic("test-group", destTopic1, received1);
subscribeToDestTopic("test-group", destTopic2, received2);
TransactionalSend sink = new TransactionalSend(bootstrapServers, destTopic1, destTopic2);
TransactionalSend sink = new TransactionalSend(bootstrapServers(), destTopic1, destTopic2);
sink.source(createTestSource(10, expected1));
for (Person p : expected1)
p.email(p.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
Expand All @@ -200,9 +185,8 @@ public void exactlyOnce() throws Exception {
List<Person> expected = new ArrayList<>();
List<Person> received = new ArrayList<>();
String sourceTopic = topic;
String destTopic = "testtopic2";
createNewTopic(destTopic, partitions);
KafkaExactlyOnce flow = new KafkaExactlyOnce(bootstrapServers, sourceTopic, destTopic) {
String destTopic = createNewTopic();
KafkaExactlyOnce flow = new KafkaExactlyOnce(bootstrapServers(), sourceTopic, destTopic) {
public ReceiverOptions<Integer, Person> receiverOptions() {
return super.receiverOptions().consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
Expand All @@ -222,10 +206,9 @@ public void exactlyOnceAbort() throws Exception {
List<Person> received = new ArrayList<>();
AtomicInteger transformCounter = new AtomicInteger();
String sourceTopic = topic;
String destTopic = "testtopic2";
createNewTopic(destTopic, partitions);
String destTopic = createNewTopic();
sendMessages(sourceTopic, count, expected);
KafkaExactlyOnce flow = new KafkaExactlyOnce(bootstrapServers, sourceTopic, destTopic) {
KafkaExactlyOnce flow = new KafkaExactlyOnce(bootstrapServers(), sourceTopic, destTopic) {
public ReceiverOptions<Integer, Person> receiverOptions() {
return super.receiverOptions()
.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10")
Expand Down Expand Up @@ -254,11 +237,9 @@ public void fanOut() throws Exception {
List<Person> received1 = new ArrayList<>();
List<Person> received2 = new ArrayList<>();
String sourceTopic = topic;
String destTopic1 = "testtopic1";
String destTopic2 = "testtopic2";
createNewTopic(destTopic1, partitions);
createNewTopic(destTopic2, partitions);
FanOut flow = new FanOut(bootstrapServers, sourceTopic, destTopic1, destTopic2) {
String destTopic1 = createNewTopic();
String destTopic2 = createNewTopic();
FanOut flow = new FanOut(bootstrapServers(), sourceTopic, destTopic1, destTopic2) {
public ReceiverOptions<Integer, Person> receiverOptions() {
return super.receiverOptions().consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
Expand All @@ -284,7 +265,7 @@ public void partition() throws Exception {
Map<Integer, List<Person>> partitionMap = new HashMap<>();
for (int i = 0; i < partitions; i++)
partitionMap.put(i, new ArrayList<>());
PartitionProcessor source = new PartitionProcessor(bootstrapServers, topic) {
PartitionProcessor source = new PartitionProcessor(bootstrapServers(), topic) {
public ReceiverOptions<Integer, Person> receiverOptions() {
return super.receiverOptions().consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
Expand All @@ -304,7 +285,7 @@ public ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecor
}

private void subscribeToDestTopic(String groupId, String topic, List<Person> received) {
KafkaSource source = new KafkaSource(bootstrapServers, topic);
KafkaSource source = new KafkaSource(bootstrapServers(), topic);
subscribeToDestTopic(groupId, topic, source.receiverOptions(), received);
}

Expand Down Expand Up @@ -333,7 +314,7 @@ private CommittableSource createTestSource(int count, List<Person> expected) {
return new CommittableSource(expected);
}
private void sendMessages(String topic, int count, List<Person> expected) throws Exception {
KafkaSink sink = new KafkaSink(bootstrapServers, topic);
KafkaSink sink = new KafkaSink(bootstrapServers(), topic);
sink.source(createTestSource(count, expected));
sink.runScenario();
}
Expand Down
Expand Up @@ -18,31 +18,19 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import reactor.kafka.AbstractKafkaTest;

public class SampleTest extends AbstractKafkaTest {


@Before
public void setUp() throws Exception {
super.setUp();
}

@After
public void tearDown() {
}

@Test
public void sampleTest() throws Exception {
int count = 10;
CountDownLatch sendLatch = new CountDownLatch(count);
CountDownLatch receiveLatch = new CountDownLatch(count);
SampleConsumer consumer = new SampleConsumer(embeddedKafka.bootstrapServers());
SampleConsumer consumer = new SampleConsumer(bootstrapServers());
consumer.consumeMessages(topic, receiveLatch);
SampleProducer producer = new SampleProducer(embeddedKafka.bootstrapServers());
SampleProducer producer = new SampleProducer(bootstrapServers());
producer.sendMessages(topic, count, sendLatch);
sendLatch.await(10, TimeUnit.SECONDS);
receiveLatch.await(10, TimeUnit.SECONDS);
Expand Down
Expand Up @@ -40,8 +40,6 @@ public class ConsumerPerformanceTest extends AbstractKafkaTest {

@Before
public void setUp() throws Exception {
super.setUp();

numMessages = PerfTestUtils.getTestConfig("reactor.kafka.test.numMessages", 5000000);
messageSize = PerfTestUtils.getTestConfig("reactor.kafka.test.messageSize", 100);
maxPercentDiff = PerfTestUtils.getTestConfig("reactor.kafka.test.maxPercentDiff", 50);
Expand All @@ -51,7 +49,7 @@ public void setUp() throws Exception {
@Test
public void performanceRegressionTest() throws Exception {
ConsumerPerfConfig config = new ConsumerPerfConfig();
Map<String, Object> consumerProps = PerfTestUtils.consumerProps(embeddedKafka);
Map<String, Object> consumerProps = PerfTestUtils.consumerProps(bootstrapServers());

TestUtils.execute(() -> sendToKafka(numMessages, messageSize), timeoutMs);

Expand All @@ -64,7 +62,7 @@ public void performanceRegressionTest() throws Exception {
}

private int sendToKafka(int numRecords, int recordSize) throws InterruptedException {
Map<String, Object> props = PerfTestUtils.producerProps(embeddedKafka);
Map<String, Object> props = PerfTestUtils.producerProps(bootstrapServers());
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
new ReactiveProducerPerformance(props, topic, numRecords, recordSize, -1, null, 0)
.runTest()
Expand Down
Expand Up @@ -33,8 +33,6 @@ public class EndToEndLatencyTest extends AbstractKafkaTest {

@Before
public void setUp() throws Exception {
super.setUp();

numMessages = PerfTestUtils.getTestConfig("reactor.kafka.test.numMessages", 10000);
messageSize = PerfTestUtils.getTestConfig("reactor.kafka.test.messageSize", 100);
maxPercentDiff = PerfTestUtils.getTestConfig("reactor.kafka.test.maxPercentDiff", 100);
Expand All @@ -43,12 +41,12 @@ public void setUp() throws Exception {

@Test
public void performanceRegressionTest() throws Exception {
Map<String, Object> producerProps = PerfTestUtils.producerProps(embeddedKafka);
Map<String, Object> consumerProps = PerfTestUtils.consumerProps(embeddedKafka);
Map<String, Object> producerProps = PerfTestUtils.producerProps(bootstrapServers());
Map<String, Object> consumerProps = PerfTestUtils.consumerProps(bootstrapServers());

NonReactiveEndToEndLatency nonReactive = new NonReactiveEndToEndLatency(consumerProps, producerProps, embeddedKafka.bootstrapServers(), topic);
NonReactiveEndToEndLatency nonReactive = new NonReactiveEndToEndLatency(consumerProps, producerProps, bootstrapServers(), topic);
double[] nrLatencies = TestUtils.execute(() -> nonReactive.runTest(numMessages, messageSize, 10000L), timeoutMs);
ReactiveEndToEndLatency reactive = new ReactiveEndToEndLatency(consumerProps, producerProps, embeddedKafka.bootstrapServers(), topic);
ReactiveEndToEndLatency reactive = new ReactiveEndToEndLatency(consumerProps, producerProps, bootstrapServers(), topic);
double[] rLatencies = TestUtils.execute(() -> reactive.runTest(numMessages, messageSize, 10000L), timeoutMs);

double r75 = rLatencies[(int) (rLatencies.length * 0.75)];
Expand Down
Expand Up @@ -38,8 +38,6 @@ public class ProducerPerformanceTest extends AbstractKafkaTest {

@Before
public void setUp() throws Exception {
super.setUp();

numMessages = PerfTestUtils.getTestConfig("reactor.kafka.test.numMessages", 5000000);
messageSize = PerfTestUtils.getTestConfig("reactor.kafka.test.messageSize", 100);
maxPercentDiff = PerfTestUtils.getTestConfig("reactor.kafka.test.maxPercentDiff", 50);
Expand All @@ -64,6 +62,6 @@ public void performanceRegressionTest() throws Exception {
}

public Map<String, Object> producerProps() {
return PerfTestUtils.producerProps(embeddedKafka);
return PerfTestUtils.producerProps(bootstrapServers());
}
}
Expand Up @@ -23,8 +23,6 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import reactor.kafka.cluster.EmbeddedKafkaCluster;

public class PerfTestUtils {

public static void verifyReactiveThroughput(double reactive, double nonReactive, double maxPercentDiff) {
Expand All @@ -39,18 +37,18 @@ public static void verifyReactiveLatency(double reactive, double nonReactive, do
percentDiff <= maxPercentDiff || reactive < 5);
}

public static Map<String, Object> producerProps(EmbeddedKafkaCluster embeddedKafka) {
public static Map<String, Object> producerProps(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.bootstrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "prod-perf");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.SEND_BUFFER_CONFIG, String.valueOf(1024 * 1024));
return props;
}

public static Map<String, Object> consumerProps(EmbeddedKafkaCluster embeddedKafka) {
public static Map<String, Object> consumerProps(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.bootstrapServers());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "cons-perf");
return props;
}
Expand Down

0 comments on commit 339777e

Please sign in to comment.