Skip to content

Commit 267ac01

Browse files
committed
move LogCompactionTester to production code
1 parent 1b5f04f commit 267ac01

File tree

4 files changed

+15
-12
lines changed

4 files changed

+15
-12
lines changed

tools/src/test/java/org/apache/kafka/tools/LogCompactionTester.java renamed to tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.apache.kafka.common.config.TopicConfig;
3030
import org.apache.kafka.common.serialization.ByteArraySerializer;
3131
import org.apache.kafka.common.serialization.StringDeserializer;
32+
import org.apache.kafka.common.test.TestUtils;
3233
import org.apache.kafka.common.utils.Exit;
3334
import org.apache.kafka.server.util.CommandLineUtils;
34-
import org.apache.kafka.test.TestUtils;
3535

3636
import java.io.BufferedReader;
3737
import java.io.BufferedWriter;
@@ -57,7 +57,6 @@
5757
import joptsimple.OptionParser;
5858
import joptsimple.OptionSet;
5959

60-
import static org.apache.kafka.tools.TestRecordUtils.valuesIterator;
6160

6261
/**
6362
* This is a torture test that runs against an existing broker
@@ -146,23 +145,27 @@ private static void createTopics(String brokerUrl, String[] topics) throws Excep
146145

147146
final List<String> pendingTopics = new ArrayList<>();
148147
TestUtils.waitForCondition(() -> {
149-
Set<String> allTopics = adminClient.listTopics().names().get();
150-
pendingTopics.clear();
151-
pendingTopics.addAll(
152-
Arrays.stream(topics)
153-
.filter(topicName -> !allTopics.contains(topicName))
154-
.toList()
155-
);
156-
return pendingTopics.isEmpty();
148+
try {
149+
Set<String> allTopics = adminClient.listTopics().names().get();
150+
pendingTopics.clear();
151+
pendingTopics.addAll(
152+
Arrays.stream(topics)
153+
.filter(topicName -> !allTopics.contains(topicName))
154+
.toList()
155+
);
156+
return pendingTopics.isEmpty();
157+
} catch (InterruptedException | java.util.concurrent.ExecutionException e) {
158+
throw new RuntimeException(e);
159+
}
157160
}, "timed out waiting for topics: " + pendingTopics);
158161
}
159162
}
160163

161164
private static void validateOutput(File producedDataFile, File consumedDataFile) {
162165
try (BufferedReader producedReader = externalSort(producedDataFile);
163166
BufferedReader consumedReader = externalSort(consumedDataFile)) {
164-
Iterator<TestRecord> produced = valuesIterator(producedReader);
165-
Iterator<TestRecord> consumed = valuesIterator(consumedReader);
167+
Iterator<TestRecord> produced = TestRecordUtils.valuesIterator(producedReader);
168+
Iterator<TestRecord> consumed = TestRecordUtils.valuesIterator(consumedReader);
166169

167170
File producedDedupedFile = new File(producedDataFile.getAbsolutePath() + ".deduped");
168171
File consumedDedupedFile = new File(consumedDataFile.getAbsolutePath() + ".deduped");

0 commit comments

Comments
 (0)