Skip to content

Commit

Permalink
Fix for ben-manes#1065: Make EventDispatcher an instance field
Browse files Browse the repository at this point in the history
This sidesteps an edge-case wherein a JCache listener in a primary cache
mutates a secondary Caffeine cache. While this is typically not a problem,
if the thread that mutates the primary cache is from a ForkJoinPool, the
EventDispatcher can deadlock as the calling thread may be re-used. Under
this condition, the secondary cache mutation is effectively waiting on its
own Future and will deadlock.
  • Loading branch information
Taylor Jones committed Jul 11, 2023
1 parent 3dfba16 commit 17105d5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
*/
public final class EventDispatcher<K, V> {
static final Logger logger = System.getLogger(EventDispatcher.class.getName());
static final ThreadLocal<List<CompletableFuture<Void>>> pending =
final ThreadLocal<List<CompletableFuture<Void>>> pending =
ThreadLocal.withInitial(ArrayList::new);

final ConcurrentMap<Registration<K, V>, ConcurrentMap<K, CompletableFuture<Void>>> dispatchQueues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public final class EventDispatcherTest {
@BeforeMethod
public void beforeMethod() throws Exception {
MockitoAnnotations.openMocks(this).close();
EventDispatcher.pending.remove();
}

@AfterTest
Expand Down Expand Up @@ -136,7 +135,7 @@ public void publishCreated() {

dispatcher.publishCreated(cache, 1, 2);
verify(createdListener, times(4)).onCreated(any());
assertThat(EventDispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.dispatchQueues.values().stream()
.flatMap(queue -> queue.entrySet().stream())).isEmpty();
}
Expand All @@ -148,7 +147,7 @@ public void publishUpdated() {

dispatcher.publishUpdated(cache, 1, 2, 3);
verify(updatedListener, times(4)).onUpdated(any());
assertThat(EventDispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.dispatchQueues.values().stream()
.flatMap(queue -> queue.entrySet().stream())).isEmpty();
}
Expand All @@ -160,7 +159,7 @@ public void publishRemoved() {

dispatcher.publishRemoved(cache, 1, 2);
verify(removedListener, times(4)).onRemoved(any());
assertThat(EventDispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.dispatchQueues.values().stream()
.flatMap(queue -> queue.entrySet().stream())).isEmpty();
}
Expand All @@ -172,7 +171,7 @@ public void publishExpired() {

dispatcher.publishExpired(cache, 1, 2);
verify(expiredListener, times(4)).onExpired(any());
assertThat(EventDispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.pending.get()).hasSize(2);
assertThat(dispatcher.dispatchQueues.values().stream()
.flatMap(queue -> queue.entrySet().stream())).isEmpty();
}
Expand Down Expand Up @@ -305,30 +304,30 @@ public void parallel_listeners() {

@Test
public void awaitSynchronous() {
EventDispatcher.pending.get().add(CompletableFuture.completedFuture(null));
var dispatcher = new EventDispatcher<Integer, Integer>(Runnable::run);
dispatcher.pending.get().add(CompletableFuture.completedFuture(null));
dispatcher.awaitSynchronous();
assertThat(EventDispatcher.pending.get()).isEmpty();
assertThat(dispatcher.pending.get()).isEmpty();
}

@Test
public void awaitSynchronous_failure() {
var dispatcher = new EventDispatcher<Integer, Integer>(Runnable::run);
var future = new CompletableFuture<Void>();
future.completeExceptionally(new RuntimeException());
EventDispatcher.pending.get().add(future);
dispatcher.pending.get().add(future);

var dispatcher = new EventDispatcher<Integer, Integer>(Runnable::run);
dispatcher.awaitSynchronous();
assertThat(EventDispatcher.pending.get()).isEmpty();
assertThat(dispatcher.pending.get()).isEmpty();
}

@Test
public void ignoreSynchronous() {
EventDispatcher.pending.get().add(CompletableFuture.completedFuture(null));
var dispatcher = new EventDispatcher<Integer, Integer>(Runnable::run);
dispatcher.pending.get().add(CompletableFuture.completedFuture(null));

dispatcher.ignoreSynchronous();
assertThat(EventDispatcher.pending.get()).isEmpty();
assertThat(dispatcher.pending.get()).isEmpty();
}

/**
Expand Down

0 comments on commit 17105d5

Please sign in to comment.