Skip to content

Commit

Permalink
ISPN-11280 ConcurrentModificationException in ConditionFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei authored and pruivo committed Feb 14, 2020
1 parent b853345 commit 35d5dec
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 19 deletions.
@@ -1,7 +1,10 @@
package org.infinispan.util.concurrent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand All @@ -21,7 +24,7 @@
* @author Dan Berindei
*/
public class ConditionFuture<T> {
private final Map<Predicate<T>, Data> futures = new IdentityHashMap<>();
private final Map<Predicate<T>, Data> futures = Collections.synchronizedMap(new IdentityHashMap<>());
private final ScheduledExecutorService timeoutExecutor;
private volatile T lastValue;
private volatile boolean running = true;
Expand Down Expand Up @@ -96,35 +99,58 @@ public void updateAsync(T value, Executor executor) {

try {
executor.execute(() -> checkConditions(value));
} catch (Exception e) {
for (Data data : futures.values()) {
data.cancelFuture.cancel(false);
data.completeExceptionally(e);
} catch (Throwable t) {
List<CompletableFuture<?>> completed;
synchronized (futures) {
completed = new ArrayList<>(futures.size());
for (Data data : futures.values()) {
data.cancelFuture.cancel(false);
completed.add(data);
}
}
for (CompletableFuture<?> future : completed) {
future.completeExceptionally(t);
}
}
}

private void checkConditions(T value) {
for (Iterator<Map.Entry<Predicate<T>, Data>> iterator = futures.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<Predicate<T>, Data> e = iterator.next();
if (e.getKey().test(value)) {
Data data = e.getValue();
data.cancelFuture.cancel(false);
data.complete(null);
iterator.remove();
List<Data> completed;
synchronized (futures) {
completed = new ArrayList<>(futures.size());
for (Iterator<Map.Entry<Predicate<T>, Data>> iterator = futures.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<Predicate<T>, Data> e = iterator.next();
if (e.getKey().test(value)) {
Data data = e.getValue();
data.cancelFuture.cancel(false);
completed.add(data);
iterator.remove();
}
}
}
for (Data data : completed) {
data.complete(null);
}
}

public void stop() {
running = false;
lastValue = null;

List<CompletableFuture<?>> completed;
synchronized (futures) {
completed = new ArrayList<>(futures.size());
for (Data data : futures.values()) {
data.cancelFuture.cancel(false);
completed.add(data);
}
futures.clear();
}

IllegalLifecycleStateException exception = new IllegalLifecycleStateException();
for (Data data : futures.values()) {
data.cancelFuture.cancel(false);
data.completeExceptionally(exception);
for (CompletableFuture<?> future : completed) {
future.completeExceptionally(exception);
}
futures.clear();
}

private static class Data extends CompletableFuture<Void> {
Expand Down
@@ -1,12 +1,14 @@
package org.infinispan.util.concurrent;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.infinispan.util.concurrent.CompletionStages.isCompletedSuccessfully;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.infinispan.test.AbstractInfinispanTest;
import org.testng.annotations.AfterClass;
Expand All @@ -24,7 +26,7 @@ public void tearDown() {

public void testBeforeFirstUpdate() {
ConditionFuture<Integer> conditionFuture = new ConditionFuture<>(timeoutExecutor);
CompletionStage<Void> stage = conditionFuture.newConditionStage(i -> i > 0, 1, TimeUnit.SECONDS);
CompletionStage<Void> stage = conditionFuture.newConditionStage(i -> i > 0, 10, SECONDS);
assertFalse(stage.toCompletableFuture().isDone());

conditionFuture.update(1);
Expand All @@ -34,7 +36,36 @@ public void testBeforeFirstUpdate() {
public void testAlreadyCompleted() {
ConditionFuture<Integer> conditionFuture = new ConditionFuture<>(timeoutExecutor);
conditionFuture.update(1);
CompletionStage<Void> stage = conditionFuture.newConditionStage(i -> i > 0, 1, TimeUnit.SECONDS);
CompletionStage<Void> stage = conditionFuture.newConditionStage(i -> i > 0, 10, SECONDS);
assertTrue(stage.toCompletableFuture().isDone());
}

public void testConcurrentModification() {
ConditionFuture<Integer> conditionFuture = new ConditionFuture<>(timeoutExecutor);
CompletionStage<Void> stage11 = conditionFuture.newConditionStage(i -> i > 0, 10, SECONDS);
CompletionStage<Void> stage12 = conditionFuture.newConditionStage(i -> i > 0, 10, SECONDS);

// Block the completion of stage11
CompletableFuture<Void> updateReleased = new CompletableFuture<>();
stage11.thenRun(updateReleased::join);
stage12.thenRun(updateReleased::join);

// Update the condition future, triggering the completion of stage1x
conditionFuture.updateAsync(1, testExecutor());
eventually(() -> isCompletedSuccessfully(stage11) || isCompletedSuccessfully(stage12));

// Add 2 new condition stages while the update is blocked, to increment modCount by 2
CompletionStage<Void> stage21 = conditionFuture.newConditionStage(i -> i > 1, 10, SECONDS);
CompletionStage<Void> stage22 = conditionFuture.newConditionStage(i -> i > 1, 10, SECONDS);

// Unblock the condition future update
updateReleased.complete(null);
CompletionStages.join(stage11);
CompletionStages.join(stage12);

// Update again to complete stage21 and stage22
conditionFuture.update(2);
CompletionStages.join(stage21);
CompletionStages.join(stage22);
}
}

0 comments on commit 35d5dec

Please sign in to comment.