Skip to content

Commit

Permalink
Kafka 0.9.0.1 for JDK 7
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-szymanski committed Mar 20, 2016
1 parent c44506b commit 74129bd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>com.github.charithe</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.1-SNAPSHOT</version>
<version>1.9.1-SNAPSHOT</version>

<name>kafka-junit</name>
<description>JUnit rule to spin-up a Kafka broker during tests</description>
Expand All @@ -42,7 +42,7 @@


<properties>
<java.version>1.8</java.version>
<java.version>1.7</java.version>
<kafka.version>0.9.0.1</kafka.version>
</properties>

Expand Down
27 changes: 16 additions & 11 deletions src/main/java/com/github/charithe/kafka/KafkaJunitRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -272,17 +273,21 @@ public <T> List<T> readMessages(final KafkaConsumer<T, T> consumer, final String
ExecutorService singleThread = Executors.newSingleThreadExecutor();
try {
consumer.subscribe(Lists.newArrayList(topic));
Future<List<T>> future = singleThread.submit(() -> {
List<T> messages = new ArrayList<>(expectedMessages);
while (messages.size() < expectedMessages) {
ConsumerRecords<T, T> records = consumer.poll(POLL_TIMEOUT_MS);
for (ConsumerRecord<T, T> rec : records) {
LOGGER.debug("Received message: {} -> {}", rec.key(), rec.value());
messages.add(rec.value());
}
}
return messages;
}
Future<List<T>> future = singleThread.submit(
new Callable<List<T>>() {
@Override
public List<T> call() throws Exception {
List<T> messages = new ArrayList<>(expectedMessages);
while (messages.size() < expectedMessages) {
ConsumerRecords<T, T> records = consumer.poll(POLL_TIMEOUT_MS);
for (ConsumerRecord<T, T> rec : records) {
LOGGER.debug("Received message: {} -> {}", rec.key(), rec.value());
messages.add(rec.value());
}
}
return messages;
}
}
);

return future.get(timeoutSeconds, SECONDS);
Expand Down

0 comments on commit 74129bd

Please sign in to comment.