From 1253fa07d20562ea3aef4d2f4d367d0e80e338d1 Mon Sep 17 00:00:00 2001 From: Dan Berindei Date: Fri, 3 Feb 2012 01:11:40 +0200 Subject: [PATCH] ISPN-1839 - L1 invalidation fails when some of the requestors have left the cluster I've added an overload for RpcManager.invokeRemotelyInFuture that ignores leavers and I've changed L1ManagerImpl to use it when invalidating. --- .../distribution/L1ManagerImpl.java | 92 ++++++++++--------- .../infinispan/remoting/rpc/RpcManager.java | 16 ++++ .../remoting/rpc/RpcManagerImpl.java | 19 +++- .../SingleRpcOnPessimisticLockingTest.java | 7 ++ .../tx/dld/ControlledRpcManager.java | 7 ++ .../server/hotrod/HotRodMultiNodeTest.scala | 2 +- 6 files changed, 94 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java b/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java index a3778664fa28..6f12b4f64143 100644 --- a/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java +++ b/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java @@ -40,31 +40,33 @@ import java.util.concurrent.ConcurrentMap; public class L1ManagerImpl implements L1Manager { - - private static final Log log = LogFactory.getLog(L1ManagerImpl.class); - private final boolean trace = log.isTraceEnabled(); - - private RpcManager rpcManager; - private CommandsFactory commandsFactory; - private int threshold; - - private final ConcurrentMap> requestors; - - public L1ManagerImpl() { - requestors = new ConcurrentHashMap>(); + + private static final Log log = LogFactory.getLog(L1ManagerImpl.class); + private final boolean trace = log.isTraceEnabled(); + + private RpcManager rpcManager; + private CommandsFactory commandsFactory; + private int threshold; + private long rpcTimeout; + + private final ConcurrentMap> requestors; + + public L1ManagerImpl() { + requestors = new ConcurrentHashMap>(); } - + @Inject public void init(Configuration configuration, RpcManager rpcManager, CommandsFactory commandsFactory) { - this.rpcManager = rpcManager; - this.commandsFactory = commandsFactory; - this.threshold = configuration.getL1InvalidationThreshold(); + this.rpcManager = rpcManager; + this.commandsFactory = commandsFactory; + this.threshold = configuration.getL1InvalidationThreshold(); + this.rpcTimeout = configuration.getSyncReplTimeout(); } - + public void addRequestor(Object key, Address origin) { //we do a plain get first as that's likely to be enough Collection
as = requestors.get(key); - + if (as == null) { // only if needed we create a new HashSet, but make sure we don't replace another one being created as = new ConcurrentHashSet
(); @@ -78,54 +80,54 @@ public void addRequestor(Object key, Address origin) { as.add(origin); } } - + public NotifyingNotifiableFuture flushCache(Collection keys, Object retval, Address origin) { if (trace) log.tracef("Invalidating L1 caches for keys %s", keys); - + NotifyingNotifiableFuture future = new AggregatingNotifyingFutureImpl(retval, 2); - + Collection
invalidationAddresses = buildInvalidationAddressList(keys, origin); - + int nodes = invalidationAddresses.size(); if (nodes > 0) { // No need to invalidate at all if there is no one to invalidate! boolean multicast = isUseMulticast(nodes); - if (trace) log.tracef("There are %s nodes involved in invalidation. Threshold is: %s; using multicast: %s", nodes, threshold, multicast); - + if (trace) + log.tracef("There are %s nodes involved in invalidation. Threshold is: %s; using multicast: %s", nodes, threshold, multicast); + if (multicast) { - if (trace) log.tracef("Invalidating keys %s via multicast", keys); - InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys); - rpcManager.broadcastRpcCommandInFuture(ic, future); + if (trace) log.tracef("Invalidating keys %s via multicast", keys); + InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys); + rpcManager.broadcastRpcCommandInFuture(ic, future); } else { - InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys); - + InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys); + // Ask the caches who have requested from us to remove if (trace) log.tracef("Keys %s needs invalidation on %s", keys, invalidationAddresses); - rpcManager.invokeRemotelyInFuture(invalidationAddresses, ic, future); + rpcManager.invokeRemotelyInFuture(invalidationAddresses, ic, true, future, rpcTimeout, true); return future; - } - } else - if (trace) log.trace("No L1 caches to invalidate"); + } + } else if (trace) log.trace("No L1 caches to invalidate"); return future; } - + private Collection
buildInvalidationAddressList(Collection keys, Address origin) { - Collection
addresses = new HashSet
(2); - - for (Object key : keys) { - Collection
as = requestors.remove(key); - if (as != null) { - addresses.addAll(as); + Collection
addresses = new HashSet
(2); + + for (Object key : keys) { + Collection
as = requestors.remove(key); + if (as != null) { + addresses.addAll(as); if (origin != null && as.contains(origin)) addRequestor(key, origin); } - } - if (origin != null) - addresses.remove(origin); - return addresses; + } + if (origin != null) + addresses.remove(origin); + return addresses; } - + private boolean isUseMulticast(int nodes) { // User has requested unicast only if (threshold == -1) return false; diff --git a/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java b/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java index 5f88e200695a..6f58f23723cc 100644 --- a/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java +++ b/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java @@ -182,6 +182,22 @@ public interface RpcManager { */ void invokeRemotelyInFuture(final Collection
recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture future, final long timeout); + /** + * The same as {@link #invokeRemotelyInFuture(java.util.Collection, org.infinispan.commands.ReplicableCommand, + * boolean, org.infinispan.util.concurrent.NotifyingNotifiableFuture, long)} except that you can specify a response mode. + * + * @param recipients recipients to invoke remote call on + * @param rpc command to execute remotely + * @param usePriorityQueue if true, a priority queue is used + * @param future the future which will be passed back to the user + * @param timeout after which to give up (in millis) + * @param ignoreLeavers if {@code true}, recipients that leave or have already left the cluster are ignored + * if {@code false}, a {@code SuspectException} is thrown when a leave is detected + */ + void invokeRemotelyInFuture(Collection
recipients, ReplicableCommand rpc, + boolean usePriorityQueue, NotifyingNotifiableFuture future, + long timeout, boolean ignoreLeavers); + /** * @return a reference to the underlying transport. */ diff --git a/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java b/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java index fe1e03216cd2..4cf6289e5c39 100644 --- a/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java +++ b/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java @@ -202,6 +202,11 @@ public final Map invokeRemotely(Collection
recipient } public final Map invokeRemotely(Collection
recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws RpcException { + ResponseMode responseMode = getResponseMode(sync); + return invokeRemotely(recipients, rpc, sync, usePriorityQueue, timeout, responseMode); + } + + private Map invokeRemotely(Collection
recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout, ResponseMode responseMode) { if (trace) log.tracef("%s broadcasting call %s to recipient list %s", t.getAddress(), rpc, recipients); if (useReplicationQueue(sync)) { @@ -211,7 +216,7 @@ public final Map invokeRemotely(Collection
recipient if (!(rpc instanceof CacheRpcCommand)) { rpc = cf.buildSingleRpcCommand(rpc); } - Map rsps = invokeRemotely(recipients, rpc, getResponseMode(sync), timeout, usePriorityQueue); + Map rsps = invokeRemotely(recipients, rpc, responseMode, timeout, usePriorityQueue); if (trace) log.tracef("Response(s) to %s is %s", rpc, rsps); if (sync) checkResponses(rsps); return rsps; @@ -227,13 +232,21 @@ public final void invokeRemotelyInFuture(final Collection
recipients, f } public final void invokeRemotelyInFuture(final Collection
recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture l, final long timeout) { + invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, l, timeout, false); + } + + @Override + public void invokeRemotelyInFuture(final Collection
recipients, final ReplicableCommand rpc, + final boolean usePriorityQueue, final NotifyingNotifiableFuture l, + final long timeout, final boolean ignoreLeavers) { if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients); + final ResponseMode responseMode = ignoreLeavers ? ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS : ResponseMode.SYNCHRONOUS; final CountDownLatch futureSet = new CountDownLatch(1); Callable c = new Callable() { public Object call() throws Exception { Object result = null; try { - result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout); + result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout, responseMode); } finally { try { futureSet.await(); @@ -247,7 +260,7 @@ public Object call() throws Exception { } }; l.setNetworkFuture(asyncExecutor.submit(c)); - futureSet.countDown(); + futureSet.countDown(); } public Transport getTransport() { diff --git a/core/src/test/java/org/infinispan/lock/singlelock/SingleRpcOnPessimisticLockingTest.java b/core/src/test/java/org/infinispan/lock/singlelock/SingleRpcOnPessimisticLockingTest.java index a25e9be751b3..6bb3fc28f413 100644 --- a/core/src/test/java/org/infinispan/lock/singlelock/SingleRpcOnPessimisticLockingTest.java +++ b/core/src/test/java/org/infinispan/lock/singlelock/SingleRpcOnPessimisticLockingTest.java @@ -235,6 +235,13 @@ public void invokeRemotelyInFuture(Collection
recipients, ReplicableCom realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout); } + @Override + public void invokeRemotelyInFuture(Collection
recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture future, long timeout, boolean ignoreLeavers) { + log.trace("ControlledRpcManager.invokeRemotelyInFuture4"); + aboutToInvokeRpc(rpc); + realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout, ignoreLeavers); + } + public Transport getTransport() { return realOne.getTransport(); } diff --git a/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java b/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java index bc492aa1118e..43a7f7ccf08b 100644 --- a/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java +++ b/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java @@ -173,6 +173,13 @@ public void invokeRemotelyInFuture(Collection
recipients, ReplicableCom realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout); } + @Override + public void invokeRemotelyInFuture(Collection
recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture future, long timeout, boolean ignoreLeavers) { + log.trace("ControlledRpcManager.invokeRemotelyInFuture4"); + waitFirst(rpc); + realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout, ignoreLeavers); + } + public Transport getTransport() { return realOne.getTransport(); } diff --git a/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodMultiNodeTest.scala b/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodMultiNodeTest.scala index 0e895425af25..b0018849bb0d 100644 --- a/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodMultiNodeTest.scala +++ b/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodMultiNodeTest.scala @@ -85,7 +85,7 @@ abstract class HotRodMultiNodeTest extends MultipleCacheManagersTest { server.stop server.getCacheManager.stop TestingUtil.blockUntilViewsReceived( - 50000, true, manager(0), manager(1)) + 50000, false, manager(0), manager(1)) } @AfterClass(alwaysRun = true)