Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-7172 Total order caches can hang during join #4663

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
* Executes tasks in the given executor, but never has more than {@code maxConcurrentTasks} tasks running at the same
* time.
*
* A task can finish running without allowing another task to run in its stead, with {@link #executeAsync(Supplier)}.
* A new task will only start after the {@code CompletableFuture} returned by the task has completed.
* <p>A task can finish running without allowing another task to run in its stead, with {@link #executeAsync(Supplier)}.
* A new task will only start after the {@code CompletableFuture} returned by the task has completed.</p>
*
* <p><em>Blocking mode.</em> If the executor is a {@link WithinThreadExecutor}, tasks will run in the thread that
* submitted them. If there are no available permits, the caller thread will block until a permit becomes available.</p>
*
* @author Dan Berindei
* @since 9.0
Expand Down Expand Up @@ -76,6 +79,7 @@ public void execute(Runnable command) {
log.debug("Exception in blocking task", e);
} finally {
addPermit();
tryExecute();
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,17 @@ public CacheTopology join(String cacheName, CacheJoinInfo joinInfo, CacheTopolog
// For Total Order caches, we must not move the topology updates to another thread
ExecutorService topologyUpdatesExecutor = joinInfo.isTotalOrder() ? withinThreadExecutor : asyncTransportExecutor;
LocalCacheStatus cacheStatus = new LocalCacheStatus(cacheName, joinInfo, stm, phm, topologyUpdatesExecutor);
runningCaches.put(cacheName, cacheStatus);

long timeout = joinInfo.getTimeout();
long endTime = timeService.expectedEndTime(timeout, TimeUnit.MILLISECONDS);
// Pretend the join is using up a thread from the topology updates executor.
// This ensures that the initial topology and the GET_CACHE_LISTENERS request will happen on this thread,
// and other topology updates are only handled after we call backgroundTaskFinished(null)
// and other topology updates are only handled after we complete joinFuture.
CompletableFuture<Void> joinFuture = new CompletableFuture<>();
cacheStatus.getTopologyUpdatesExecutor().executeAsync(() -> joinFuture);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change really needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's strictly necessary for correctness at this point or it just makes tests more predictable, but I think it's safer to implement it as promised in the comments above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how initializing the runningCaches before or after the sending the joinFuture to the executor could affect anything...
My point is: I'm not seeing any difference in the logic executed. So, what side effect am I missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitting joinFuture will prevent the executor from running any other task until joinFuture is completed. If the cache exists in runningCaches and it's LimitedExecutor has a free spot, it will process topology updates, and those have a chance of doing "stuff" before we have properly joined.
I think the initial stuff I was worried about was just blocking for a new view, which could overwhelm the OOB thread pool given enough caches. In this bug, the topology update was exposing the bug in LimitedExecutor itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.

runningCaches.put(cacheName, cacheStatus);

long timeout = joinInfo.getTimeout();
long endTime = timeService.expectedEndTime(timeout, TimeUnit.MILLISECONDS);
try {
while (true) {
int viewId = transport.getViewId();
Expand Down
101 changes: 71 additions & 30 deletions core/src/test/java/org/infinispan/executors/LimitedExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.testng.AssertJUnit.assertFalse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

Expand Down Expand Up @@ -33,12 +34,12 @@ public void stopExecutors() {
executor.shutdownNow();
}

public void testConcurrency1WithinThread() throws Exception {
public void testBasicWithinThread() throws Exception {
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, new WithinThreadExecutor(), 1);

CompletableFuture<String> cf = new CompletableFuture<>();
limitedExecutor.execute(() -> cf.complete("bla"));
assertEquals("bla", cf.get());
limitedExecutor.execute(() -> cf.complete("value"));
assertEquals("value", cf.getNow("task did not run synchronously"));
}

/**
Expand All @@ -47,10 +48,10 @@ public void testConcurrency1WithinThread() throws Exception {
public void testConcurrencyLimit() throws Exception {
eventuallyEquals(0, executor::getActiveCount);
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, executor, 1);
CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> blocker2 = new CompletableFuture<>();

CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> cf1 = new CompletableFuture<>();

limitedExecutor.execute(() -> {
try {
cf1.complete(blocker1.get(10, SECONDS));
Expand All @@ -59,54 +60,94 @@ public void testConcurrencyLimit() throws Exception {
}
});

CompletableFuture<String> cf2 = new CompletableFuture<>();
limitedExecutor.execute(() -> {
try {
cf2.complete(cf1.getNow("task 2 ran too early") + " " + blocker2.get(10, SECONDS));
} catch (Exception e) {
cf2.completeExceptionally(e);
}
});

assertFalse(cf1.isDone());
assertFalse(cf2.isDone());

blocker1.complete("value1");
blocker2.complete("value2");
assertEquals("value1", cf1.get(10, SECONDS));
assertEquals("value1 value2", cf2.get(10, SECONDS));
eventuallyEquals(0, executor::getActiveCount);
verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
}

/**
* Test that an async task ({@code executeAsync()}) will block another task from running
* until its {@code CompletableFuture} is completed.
*/
public void testAsyncTasks() throws Exception {
public void testConcurrencyLimitExecuteAsync() throws Exception {
eventuallyEquals(0, executor::getActiveCount);
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, executor, 1);

CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> cf1 = new CompletableFuture<>();

limitedExecutor.executeAsync(() -> blocker1.thenAccept(cf1::complete));

verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
}

/**
* Test that no more than 1 task runs at a time when using a {@link WithinThreadExecutor}.
*/
public void testConcurrencyLimitWithinThread() throws Exception {
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, new WithinThreadExecutor(), 1);

CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> blocker2 = new CompletableFuture<>();
CompletableFuture<String> cf1 = new CompletableFuture<>();

// execute() will block
Future<?> fork1 = fork(() -> {
limitedExecutor.execute(() -> {
blocker2.complete("blocking");
try {
cf1.complete(blocker1.get(10, SECONDS));
} catch (Exception e) {
cf1.completeExceptionally(e);
}
});
});
assertEquals("blocking", blocker2.get(10, SECONDS));

verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
fork1.get(10, SECONDS);
}

/**
* Test that an async task ({@code executeAsync()}) will block another task from running
* until its {@code CompletableFuture} is completed, when using a {@link WithinThreadExecutor}.
*/
public void testConcurrencyLimitExecuteAsyncWithinThread() throws Exception {
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, new WithinThreadExecutor(), 1);

CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> cf1 = new CompletableFuture<>();

// executeAsync() will not block
limitedExecutor.executeAsync(() -> blocker1.thenAccept(cf1::complete));

verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
}

private void verifyTaskIsBlocked(LimitedExecutor limitedExecutor, CompletableFuture<String> blocker1,
CompletableFuture<String> cf1) throws Exception {
CompletableFuture<String> blocker2 = new CompletableFuture<>();
CompletableFuture<String> cf2 = new CompletableFuture<>();
limitedExecutor.execute(() -> {
try {
cf2.complete(cf1.getNow("task 2 ran too early") + " " + blocker2.get(10, SECONDS));
} catch (Exception e) {
cf2.completeExceptionally(e);
}

// execute() may block
Future<?> fork2 = fork(() -> {
limitedExecutor.execute(() -> {
try {
cf2.complete(cf1.getNow("task 2 ran too early") + " " + blocker2.get(10, SECONDS));
} catch (Exception e) {
cf2.completeExceptionally(e);
}
});
});

assertFalse(cf1.isDone());
assertFalse(cf2.isDone());

blocker1.complete("value1");
blocker2.complete("value2");
assertEquals("value1", cf1.get(10, SECONDS));
assertFalse(cf2.isDone());

blocker2.complete("value2");
assertEquals("value1 value2", cf2.get(10, SECONDS));
fork2.get(10, SECONDS);
eventuallyEquals(0, executor::getActiveCount);
}
}