Skip to content

KAFKA-19299: Fix race condition in RemoteIndexCacheTest #19927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -219,7 +219,7 @@ public void setup(TestInfo testInfo) {
}

@AfterEach
public void detectLeaks() {
public void detectLeaks() throws InterruptedException {
// Assert no thread leakage of Kafka producer.
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE);
}
10 changes: 3 additions & 7 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
@@ -73,7 +73,6 @@
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -170,12 +169,9 @@ public static MetadataSnapshot metadataSnapshotWith(final int nodes, final Map<S
*
* @throws AssertionError If any thread with the specified name prefix and daemon status is found and is alive.
*/
public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) {
List<Thread> threads = Thread.getAllStackTraces().keySet().stream()
.filter(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName))
.collect(Collectors.toList());
int threadCount = threads.size();
assertEquals(0, threadCount);
public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) throws InterruptedException {
waitForCondition(() -> Thread.getAllStackTraces().keySet().stream()
.noneMatch(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)), String.format("Thread with prefix:[%s] is alive", threadName));
}

/**
Original file line number Diff line number Diff line change
@@ -127,7 +127,7 @@ public void setup() throws IOException, RemoteStorageException {
}

@AfterEach
public void cleanup() {
public void cleanup() throws InterruptedException {
reset(rsm);
// the files created for the test will be deleted automatically on thread exit since we use temp dir
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
@@ -332,13 +332,13 @@ public void testCacheEntryIsDeletedOnRemoval() throws IOException, InterruptedEx
verify(cacheEntry.txnIndex()).renameTo(any(File.class));

// verify no index files on disk
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(),
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(),
"Offset index file should not be present on disk at " + tpDir.toPath());
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(),
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(),
"Txn index file should not be present on disk at " + tpDir.toPath());
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(),
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(),
"Time index file should not be present on disk at " + tpDir.toPath());
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(),
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
"Index file marked for deletion should not be present on disk at " + tpDir.toPath());
}