Skip to content

Commit

Permalink
ISPN-1839 - L1 invalidation fails when some of the requestors have le…
Browse files Browse the repository at this point in the history
…ft the cluster

I've added an overload for RpcManager.invokeRemotelyInFuture that ignores
leavers and I've changed L1ManagerImpl to use it when invalidating.
  • Loading branch information
danberindei authored and galderz committed Feb 3, 2012
1 parent 15849eb commit 1253fa0
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 49 deletions.
92 changes: 47 additions & 45 deletions core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java
Expand Up @@ -40,31 +40,33 @@
import java.util.concurrent.ConcurrentMap;

public class L1ManagerImpl implements L1Manager {

private static final Log log = LogFactory.getLog(L1ManagerImpl.class);
private final boolean trace = log.isTraceEnabled();

private RpcManager rpcManager;
private CommandsFactory commandsFactory;
private int threshold;

private final ConcurrentMap<Object, Collection<Address>> requestors;

public L1ManagerImpl() {
requestors = new ConcurrentHashMap<Object, Collection<Address>>();

private static final Log log = LogFactory.getLog(L1ManagerImpl.class);
private final boolean trace = log.isTraceEnabled();

private RpcManager rpcManager;
private CommandsFactory commandsFactory;
private int threshold;
private long rpcTimeout;

private final ConcurrentMap<Object, Collection<Address>> requestors;

public L1ManagerImpl() {
requestors = new ConcurrentHashMap<Object, Collection<Address>>();
}

@Inject
public void init(Configuration configuration, RpcManager rpcManager, CommandsFactory commandsFactory) {
this.rpcManager = rpcManager;
this.commandsFactory = commandsFactory;
this.threshold = configuration.getL1InvalidationThreshold();
this.rpcManager = rpcManager;
this.commandsFactory = commandsFactory;
this.threshold = configuration.getL1InvalidationThreshold();
this.rpcTimeout = configuration.getSyncReplTimeout();
}

public void addRequestor(Object key, Address origin) {
//we do a plain get first as that's likely to be enough
Collection<Address> as = requestors.get(key);

if (as == null) {
// only if needed we create a new HashSet, but make sure we don't replace another one being created
as = new ConcurrentHashSet<Address>();
Expand All @@ -78,54 +80,54 @@ public void addRequestor(Object key, Address origin) {
as.add(origin);
}
}

public NotifyingNotifiableFuture<Object> flushCache(Collection<Object> keys, Object retval, Address origin) {
if (trace) log.tracef("Invalidating L1 caches for keys %s", keys);

NotifyingNotifiableFuture<Object> future = new AggregatingNotifyingFutureImpl(retval, 2);

Collection<Address> invalidationAddresses = buildInvalidationAddressList(keys, origin);

int nodes = invalidationAddresses.size();

if (nodes > 0) {
// No need to invalidate at all if there is no one to invalidate!
boolean multicast = isUseMulticast(nodes);

if (trace) log.tracef("There are %s nodes involved in invalidation. Threshold is: %s; using multicast: %s", nodes, threshold, multicast);

if (trace)
log.tracef("There are %s nodes involved in invalidation. Threshold is: %s; using multicast: %s", nodes, threshold, multicast);

if (multicast) {
if (trace) log.tracef("Invalidating keys %s via multicast", keys);
InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys);
rpcManager.broadcastRpcCommandInFuture(ic, future);
if (trace) log.tracef("Invalidating keys %s via multicast", keys);
InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys);
rpcManager.broadcastRpcCommandInFuture(ic, future);
} else {
InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys);
InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(origin, false, keys);

// Ask the caches who have requested from us to remove
if (trace) log.tracef("Keys %s needs invalidation on %s", keys, invalidationAddresses);
rpcManager.invokeRemotelyInFuture(invalidationAddresses, ic, future);
rpcManager.invokeRemotelyInFuture(invalidationAddresses, ic, true, future, rpcTimeout, true);
return future;
}
} else
if (trace) log.trace("No L1 caches to invalidate");
}
} else if (trace) log.trace("No L1 caches to invalidate");
return future;
}

private Collection<Address> buildInvalidationAddressList(Collection<Object> keys, Address origin) {
Collection<Address> addresses = new HashSet<Address>(2);
for (Object key : keys) {
Collection<Address> as = requestors.remove(key);
if (as != null) {
addresses.addAll(as);
Collection<Address> addresses = new HashSet<Address>(2);

for (Object key : keys) {
Collection<Address> as = requestors.remove(key);
if (as != null) {
addresses.addAll(as);
if (origin != null && as.contains(origin)) addRequestor(key, origin);
}
}
if (origin != null)
addresses.remove(origin);
return addresses;
}
if (origin != null)
addresses.remove(origin);
return addresses;
}

private boolean isUseMulticast(int nodes) {
// User has requested unicast only
if (threshold == -1) return false;
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
Expand Up @@ -182,6 +182,22 @@ public interface RpcManager {
*/
void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> future, final long timeout);

/**
* The same as {@link #invokeRemotelyInFuture(java.util.Collection, org.infinispan.commands.ReplicableCommand,
* boolean, org.infinispan.util.concurrent.NotifyingNotifiableFuture, long)} except that you can specify a response mode.
*
* @param recipients recipients to invoke remote call on
* @param rpc command to execute remotely
* @param usePriorityQueue if true, a priority queue is used
* @param future the future which will be passed back to the user
* @param timeout after which to give up (in millis)
* @param ignoreLeavers if {@code true}, recipients that leave or have already left the cluster are ignored
* if {@code false}, a {@code SuspectException} is thrown when a leave is detected
*/
void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc,
boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future,
long timeout, boolean ignoreLeavers);

/**
* @return a reference to the underlying transport.
*/
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Expand Up @@ -202,6 +202,11 @@ public final Map<Address, Response> invokeRemotely(Collection<Address> recipient
}

public final Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws RpcException {
ResponseMode responseMode = getResponseMode(sync);
return invokeRemotely(recipients, rpc, sync, usePriorityQueue, timeout, responseMode);
}

private Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout, ResponseMode responseMode) {
if (trace) log.tracef("%s broadcasting call %s to recipient list %s", t.getAddress(), rpc, recipients);

if (useReplicationQueue(sync)) {
Expand All @@ -211,7 +216,7 @@ public final Map<Address, Response> invokeRemotely(Collection<Address> recipient
if (!(rpc instanceof CacheRpcCommand)) {
rpc = cf.buildSingleRpcCommand(rpc);
}
Map<Address, Response> rsps = invokeRemotely(recipients, rpc, getResponseMode(sync), timeout, usePriorityQueue);
Map<Address, Response> rsps = invokeRemotely(recipients, rpc, responseMode, timeout, usePriorityQueue);
if (trace) log.tracef("Response(s) to %s is %s", rpc, rsps);
if (sync) checkResponses(rsps);
return rsps;
Expand All @@ -227,13 +232,21 @@ public final void invokeRemotelyInFuture(final Collection<Address> recipients, f
}

public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, l, timeout, false);
}

@Override
public void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc,
final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l,
final long timeout, final boolean ignoreLeavers) {
if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients);
final ResponseMode responseMode = ignoreLeavers ? ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS : ResponseMode.SYNCHRONOUS;
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout, responseMode);
} finally {
try {
futureSet.await();
Expand All @@ -247,7 +260,7 @@ public Object call() throws Exception {
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
futureSet.countDown();
}

public Transport getTransport() {
Expand Down
Expand Up @@ -235,6 +235,13 @@ public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCom
realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout);
}

@Override
public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future, long timeout, boolean ignoreLeavers) {
log.trace("ControlledRpcManager.invokeRemotelyInFuture4");
aboutToInvokeRpc(rpc);
realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout, ignoreLeavers);
}

public Transport getTransport() {
return realOne.getTransport();
}
Expand Down
Expand Up @@ -173,6 +173,13 @@ public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCom
realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout);
}

@Override
public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future, long timeout, boolean ignoreLeavers) {
log.trace("ControlledRpcManager.invokeRemotelyInFuture4");
waitFirst(rpc);
realOne.invokeRemotelyInFuture(recipients, rpc, usePriorityQueue, future, timeout, ignoreLeavers);
}

public Transport getTransport() {
return realOne.getTransport();
}
Expand Down
Expand Up @@ -85,7 +85,7 @@ abstract class HotRodMultiNodeTest extends MultipleCacheManagersTest {
server.stop
server.getCacheManager.stop
TestingUtil.blockUntilViewsReceived(
50000, true, manager(0), manager(1))
50000, false, manager(0), manager(1))
}

@AfterClass(alwaysRun = true)
Expand Down

0 comments on commit 1253fa0

Please sign in to comment.