diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 8460d0f4c5f37..271fc9d3f3455 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -219,7 +219,7 @@ public void setup(TestInfo testInfo) { } @AfterEach - public void detectLeaks() { + public void detectLeaks() throws InterruptedException { // Assert no thread leakage of Kafka producer. TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE); } diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 0f93d953be51e..ad7ff8dca0513 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -73,7 +73,6 @@ import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -170,12 +169,9 @@ public static MetadataSnapshot metadataSnapshotWith(final int nodes, final Map threads = Thread.getAllStackTraces().keySet().stream() - .filter(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)) - .collect(Collectors.toList()); - int threadCount = threads.size(); - assertEquals(0, threadCount); + public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) throws InterruptedException { + waitForCondition(() -> Thread.getAllStackTraces().keySet().stream() + .noneMatch(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)), String.format("Thread with prefix:[%s] is alive", threadName)); } /** diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java index b1480d3677552..038b5f2d55064 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java @@ -127,7 +127,7 @@ public void setup() throws IOException, RemoteStorageException { } @AfterEach - public void cleanup() { + public void cleanup() throws InterruptedException { reset(rsm); // the files created for the test will be deleted automatically on thread exit since we use temp dir Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); @@ -332,13 +332,13 @@ public void testCacheEntryIsDeletedOnRemoval() throws IOException, InterruptedEx verify(cacheEntry.txnIndex()).renameTo(any(File.class)); // verify no index files on disk - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(), "Offset index file should not be present on disk at " + tpDir.toPath()); - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(), "Txn index file should not be present on disk at " + tpDir.toPath()); - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(), "Time index file should not be present on disk at " + tpDir.toPath()); - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(), "Index file marked for deletion should not be present on disk at " + tpDir.toPath()); }