Skip to content

Commit

Permalink
ISPN-5531 java.lang.UnsupportedOperationException during remove (using
Browse files Browse the repository at this point in the history
RemoteCacheManager)

* Batching event manager clears events on exception
* Distributed executor service now returns SuspectException
  • Loading branch information
wburns authored and danberindei committed Jun 11, 2015
1 parent 0928db2 commit 399c790
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 16 deletions.
Expand Up @@ -11,9 +11,16 @@
*/
public class NoOpFuture<E> implements NotifyingNotifiableFuture<E> {
private final E returnValue;
private final Throwable t;

public NoOpFuture(E returnValue) {
this.returnValue = returnValue;
this.t = null;
}

public NoOpFuture(Throwable t) {
this.returnValue = null;
this.t = t;
}

@Override
Expand All @@ -33,12 +40,15 @@ public boolean isDone() {

@Override
public E get() throws InterruptedException, ExecutionException {
if (t != null) {
throw new ExecutionException(t);
}
return returnValue;
}

@Override
public E get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return returnValue;
return get();
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.FutureListener;
import org.infinispan.commons.util.concurrent.NoOpFuture;
import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.infinispan.commons.util.concurrent.NotifyingFutureImpl;
import org.infinispan.configuration.cache.Configuration;
Expand All @@ -36,6 +37,7 @@
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.TopologyAwareAddress;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.security.AuthorizationManager;
import org.infinispan.security.AuthorizationPermission;
import org.infinispan.util.TimeService;
Expand Down Expand Up @@ -411,8 +413,8 @@ public <T> NotifyingFuture<T> submit(Address target, DistributedTask<T> task) {
throw new NullPointerException();
List<Address> members = getMembers();
if (!members.contains(target)) {
throw new IllegalArgumentException("Target node " + target
+ " is not a cluster member, members are " + members);
return new NoOpFuture<>(new SuspectException("Target node " + target
+ " is not a cluster member, members are " + members));
}
Address me = getAddress();
DistributedExecuteCommand<T> c = null;
Expand Down
Expand Up @@ -301,9 +301,19 @@ public void notifyCacheEntryCreated(K key, V value, boolean pre,
EventImpl<K, V> e = EventImpl.createEvent(cache, CACHE_ENTRY_CREATED);
configureEvent(e, key, value, pre, ctx, command, null, null);
boolean isLocalNodePrimaryOwner = clusteringDependentLogic.localNodeIsPrimaryOwner(key);
for (CacheEntryListenerInvocation<K, V> listener : cacheEntryCreatedListeners) listener.invoke(e, isLocalNodePrimaryOwner);
if (!ctx.isInTxScope()) {
eventManager.sendEvents();
boolean sendEvents = !ctx.isInTxScope();
try {
for (CacheEntryListenerInvocation<K, V> listener : cacheEntryCreatedListeners) {
listener.invoke(e, isLocalNodePrimaryOwner);
}
if (sendEvents) {
eventManager.sendEvents();
sendEvents = false;
}
} finally {
if (sendEvents) {
eventManager.dropEvents();
}
}
}
}
Expand All @@ -315,9 +325,19 @@ public void notifyCacheEntryModified(K key, V value, V previousValue, Metadata p
EventImpl<K, V> e = EventImpl.createEvent(cache, CACHE_ENTRY_MODIFIED);
configureEvent(e, key, value, pre, ctx, command, previousValue, previousMetadata);
boolean isLocalNodePrimaryOwner = clusteringDependentLogic.localNodeIsPrimaryOwner(key);
for (CacheEntryListenerInvocation<K, V> listener : cacheEntryModifiedListeners) listener.invoke(e, isLocalNodePrimaryOwner);
if (!ctx.isInTxScope()) {
eventManager.sendEvents();
boolean sendEvents = !ctx.isInTxScope();
try {
for (CacheEntryListenerInvocation<K, V> listener : cacheEntryModifiedListeners) {
listener.invoke(e, isLocalNodePrimaryOwner);
}
if (sendEvents) {
eventManager.sendEvents();
sendEvents = false;
}
} finally {
if (sendEvents) {
eventManager.dropEvents();
}
}
}
}
Expand All @@ -330,9 +350,19 @@ public void notifyCacheEntryRemoved(K key, V previousValue, Metadata previousMet
configureEvent(e, key, null, pre, ctx, command, previousValue, previousMetadata);
setTx(ctx, e);
boolean isLocalNodePrimaryOwner = clusteringDependentLogic.localNodeIsPrimaryOwner(key);
for (CacheEntryListenerInvocation<K, V> listener : cacheEntryRemovedListeners) listener.invoke(e, isLocalNodePrimaryOwner);
if (!ctx.isInTxScope()) {
eventManager.sendEvents();
boolean sendEvents = !ctx.isInTxScope();
try {
for (CacheEntryListenerInvocation<K, V> listener : cacheEntryRemovedListeners) {
listener.invoke(e, isLocalNodePrimaryOwner);
}
if (sendEvents) {
eventManager.sendEvents();
sendEvents = false;
}
} finally {
if (sendEvents) {
eventManager.dropEvents();
}
}
}
}
Expand Down
Expand Up @@ -99,7 +99,7 @@ public void sendToTargets(DistributedExecutorService service) {
} else {
service.submit(entry.getKey(), new MultiClusterEventCallable<>(value.events));
}
} else if (value.events.size() == 1){
} else if (value.events.size() == 1) {
Entry<UUID, Collection<ClusterEvent<K, V>>> entryValue = value.events.entrySet().iterator().next();
if (value.sync) {
completion.submit(entry.getKey(), new ClusterEventCallable<K, V>(entryValue.getKey(), entryValue.getValue()));
Expand Down
Expand Up @@ -4,6 +4,7 @@
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.test.MultipleCacheManagersTest;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
Expand All @@ -23,6 +24,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;

/**
* Test for verifying that the DistributedExecutors also work on the Local Cache.
*
Expand Down Expand Up @@ -382,8 +386,8 @@ public void testBasicTargetCallableWithNullTarget() {
des.submit((Address) null, new SimpleCallable());
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testBasicTargetCallableWithIllegalTarget() {
@Test
public void testBasicTargetCallableWithIllegalTarget() throws InterruptedException, ExecutionException {
Cache<Object, Object> cache1 = getCache();

DistributedExecutorService des = createDES(cache1);
Expand All @@ -393,7 +397,16 @@ public int compareTo(Address o) {
return -1;
}
};
des.submit(fakeAddress, new SimpleCallable());
Future<?> future = des.submit(fakeAddress, new SimpleCallable());
try {
future.get();
fail("Test should have thrown an execution exception!");
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (!(t instanceof SuspectException)) {
throw e;
}
}
}

public void testBasicDistributedCallableWitkKeys() throws Exception {
Expand Down

0 comments on commit 399c790

Please sign in to comment.