diff --git a/commons/src/main/java/org/infinispan/commons/util/concurrent/NoOpFuture.java b/commons/src/main/java/org/infinispan/commons/util/concurrent/NoOpFuture.java index f6d44497acdd..02b01ed48e2f 100644 --- a/commons/src/main/java/org/infinispan/commons/util/concurrent/NoOpFuture.java +++ b/commons/src/main/java/org/infinispan/commons/util/concurrent/NoOpFuture.java @@ -11,9 +11,16 @@ */ public class NoOpFuture implements NotifyingNotifiableFuture { private final E returnValue; + private final Throwable t; public NoOpFuture(E returnValue) { this.returnValue = returnValue; + this.t = null; + } + + public NoOpFuture(Throwable t) { + this.returnValue = null; + this.t = t; } @Override @@ -33,12 +40,15 @@ public boolean isDone() { @Override public E get() throws InterruptedException, ExecutionException { + if (t != null) { + throw new ExecutionException(t); + } return returnValue; } @Override public E get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - return returnValue; + return get(); } @Override diff --git a/core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java b/core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java index 86534b3a9cac..82550e571baf 100644 --- a/core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java +++ b/core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java @@ -21,6 +21,7 @@ import org.infinispan.commons.util.InfinispanCollections; import org.infinispan.commons.util.Util; import org.infinispan.commons.util.concurrent.FutureListener; +import org.infinispan.commons.util.concurrent.NoOpFuture; import org.infinispan.commons.util.concurrent.NotifyingFuture; import org.infinispan.commons.util.concurrent.NotifyingFutureImpl; import org.infinispan.configuration.cache.Configuration; @@ -36,6 +37,7 @@ import org.infinispan.remoting.rpc.RpcManager; import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.TopologyAwareAddress; +import org.infinispan.remoting.transport.jgroups.SuspectException; import org.infinispan.security.AuthorizationManager; import org.infinispan.security.AuthorizationPermission; import org.infinispan.util.TimeService; @@ -411,8 +413,8 @@ public NotifyingFuture submit(Address target, DistributedTask task) { throw new NullPointerException(); List
members = getMembers(); if (!members.contains(target)) { - throw new IllegalArgumentException("Target node " + target - + " is not a cluster member, members are " + members); + return new NoOpFuture<>(new SuspectException("Target node " + target + + " is not a cluster member, members are " + members)); } Address me = getAddress(); DistributedExecuteCommand c = null; diff --git a/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java b/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java index 14142820d8d5..2bb6c35157c1 100644 --- a/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java +++ b/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java @@ -301,9 +301,19 @@ public void notifyCacheEntryCreated(K key, V value, boolean pre, EventImpl e = EventImpl.createEvent(cache, CACHE_ENTRY_CREATED); configureEvent(e, key, value, pre, ctx, command, null, null); boolean isLocalNodePrimaryOwner = clusteringDependentLogic.localNodeIsPrimaryOwner(key); - for (CacheEntryListenerInvocation listener : cacheEntryCreatedListeners) listener.invoke(e, isLocalNodePrimaryOwner); - if (!ctx.isInTxScope()) { - eventManager.sendEvents(); + boolean sendEvents = !ctx.isInTxScope(); + try { + for (CacheEntryListenerInvocation listener : cacheEntryCreatedListeners) { + listener.invoke(e, isLocalNodePrimaryOwner); + } + if (sendEvents) { + eventManager.sendEvents(); + sendEvents = false; + } + } finally { + if (sendEvents) { + eventManager.dropEvents(); + } } } } @@ -315,9 +325,19 @@ public void notifyCacheEntryModified(K key, V value, V previousValue, Metadata p EventImpl e = EventImpl.createEvent(cache, CACHE_ENTRY_MODIFIED); configureEvent(e, key, value, pre, ctx, command, previousValue, previousMetadata); boolean isLocalNodePrimaryOwner = clusteringDependentLogic.localNodeIsPrimaryOwner(key); - for (CacheEntryListenerInvocation listener : cacheEntryModifiedListeners) listener.invoke(e, isLocalNodePrimaryOwner); - if (!ctx.isInTxScope()) { - eventManager.sendEvents(); + boolean sendEvents = !ctx.isInTxScope(); + try { + for (CacheEntryListenerInvocation listener : cacheEntryModifiedListeners) { + listener.invoke(e, isLocalNodePrimaryOwner); + } + if (sendEvents) { + eventManager.sendEvents(); + sendEvents = false; + } + } finally { + if (sendEvents) { + eventManager.dropEvents(); + } } } } @@ -330,9 +350,19 @@ public void notifyCacheEntryRemoved(K key, V previousValue, Metadata previousMet configureEvent(e, key, null, pre, ctx, command, previousValue, previousMetadata); setTx(ctx, e); boolean isLocalNodePrimaryOwner = clusteringDependentLogic.localNodeIsPrimaryOwner(key); - for (CacheEntryListenerInvocation listener : cacheEntryRemovedListeners) listener.invoke(e, isLocalNodePrimaryOwner); - if (!ctx.isInTxScope()) { - eventManager.sendEvents(); + boolean sendEvents = !ctx.isInTxScope(); + try { + for (CacheEntryListenerInvocation listener : cacheEntryRemovedListeners) { + listener.invoke(e, isLocalNodePrimaryOwner); + } + if (sendEvents) { + eventManager.sendEvents(); + sendEvents = false; + } + } finally { + if (sendEvents) { + eventManager.dropEvents(); + } } } } diff --git a/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java b/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java index 86f1223f67b1..fd5f28ffd151 100644 --- a/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java +++ b/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java @@ -99,7 +99,7 @@ public void sendToTargets(DistributedExecutorService service) { } else { service.submit(entry.getKey(), new MultiClusterEventCallable<>(value.events)); } - } else if (value.events.size() == 1){ + } else if (value.events.size() == 1) { Entry>> entryValue = value.events.entrySet().iterator().next(); if (value.sync) { completion.submit(entry.getKey(), new ClusterEventCallable(entryValue.getKey(), entryValue.getValue())); diff --git a/core/src/test/java/org/infinispan/distexec/LocalDistributedExecutorTest.java b/core/src/test/java/org/infinispan/distexec/LocalDistributedExecutorTest.java index 261c827f4cbe..b457af3b1ed7 100644 --- a/core/src/test/java/org/infinispan/distexec/LocalDistributedExecutorTest.java +++ b/core/src/test/java/org/infinispan/distexec/LocalDistributedExecutorTest.java @@ -4,6 +4,7 @@ import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.remoting.transport.Address; +import org.infinispan.remoting.transport.jgroups.SuspectException; import org.infinispan.test.MultipleCacheManagersTest; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -23,6 +24,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + /** * Test for verifying that the DistributedExecutors also work on the Local Cache. * @@ -382,8 +386,8 @@ public void testBasicTargetCallableWithNullTarget() { des.submit((Address) null, new SimpleCallable()); } - @Test(expectedExceptions = IllegalArgumentException.class) - public void testBasicTargetCallableWithIllegalTarget() { + @Test + public void testBasicTargetCallableWithIllegalTarget() throws InterruptedException, ExecutionException { Cache cache1 = getCache(); DistributedExecutorService des = createDES(cache1); @@ -393,7 +397,16 @@ public int compareTo(Address o) { return -1; } }; - des.submit(fakeAddress, new SimpleCallable()); + Future future = des.submit(fakeAddress, new SimpleCallable()); + try { + future.get(); + fail("Test should have thrown an execution exception!"); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (!(t instanceof SuspectException)) { + throw e; + } + } } public void testBasicDistributedCallableWitkKeys() throws Exception {