Skip to content

Commit

Permalink
Fix L1 invalidation issue
Browse files Browse the repository at this point in the history
  • Loading branch information
maniksurtani committed Dec 1, 2011
1 parent 123c14a commit d7e864e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
Expand Up @@ -22,11 +22,6 @@
*/
package org.infinispan.distribution;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.config.Configuration;
Expand All @@ -39,6 +34,11 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class L1ManagerImpl implements L1Manager {

private static final Log log = LogFactory.getLog(L1ManagerImpl.class);
Expand Down Expand Up @@ -116,8 +116,10 @@ private Collection<Address> buildInvalidationAddressList(Collection<Object> keys

for (Object key : keys) {
Collection<Address> as = requestors.remove(key);
if (as != null)
if (as != null) {
addresses.addAll(as);
if (origin != null && as.contains(origin)) addRequestor(key, origin);
}
}
if (origin != null)
addresses.remove(origin);
Expand Down
Expand Up @@ -66,6 +66,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -313,24 +314,32 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command)

Collection<Address> preparedOn = ((LocalTxInvocationContext) ctx).getRemoteLocksAcquired();

NotifyingNotifiableFuture<Object> f = null;
if (isL1CacheEnabled) {
f = l1Manager.flushCache(ctx.getLockedKeys(), null, null);
}

Future<?> f = flushL1Caches(ctx);
sendCommitCommand(ctx, command, preparedOn);
blockOnL1FutureIfNeeded(f);

if (f != null && configuration.isSyncCommitPhase()) {
try {
f.get();
} catch (Exception e) {
if (log.isInfoEnabled()) log.failedInvalidatingRemoteCache(e);
}
}
} else if (isL1CacheEnabled && !ctx.isOriginLocal() && !ctx.getLockedKeys().isEmpty()) {
// We fall into this block if we are a remote node, happen to be the primary data owner and have locked keys.
// it is still our responsibility to invalidate L1 caches in the cluster.
blockOnL1FutureIfNeeded(flushL1Caches(ctx));
}
return invokeNextInterceptor(ctx, command);
}

private Future<?> flushL1Caches(InvocationContext ctx) {
return isL1CacheEnabled ? l1Manager.flushCache(ctx.getLockedKeys(), null, ctx.getOrigin()) : null;
}

private void blockOnL1FutureIfNeeded(Future<?> f) {
if (f != null && configuration.isSyncCommitPhase()) {
try {
f.get();
} catch (Exception e) {
if (log.isInfoEnabled()) log.failedInvalidatingRemoteCache(e);
}
}
}

private void sendCommitCommand(TxInvocationContext ctx, CommitCommand command, Collection<Address> preparedOn)
throws TimeoutException, InterruptedException {
// we only send the commit command to the nodes that
Expand Down Expand Up @@ -374,16 +383,17 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman
int newCacheViewId = -1;
stateTransferLock.waitForStateTransferToEnd(ctx, command, newCacheViewId);

NotifyingNotifiableFuture<Object> f = null;
if (isL1CacheEnabled && command.isOnePhaseCommit())
f = l1Manager.flushCache(ctx.getLockedKeys(), null, null);
if (command.isOnePhaseCommit()) flushL1Caches(ctx); // if we are one-phase, don't block on this future.

Collection<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
// this method will return immediately if we're the only member (because exclude_self=true)
rpcManager.invokeRemotely(recipients, command, sync);

((LocalTxInvocationContext) ctx).remoteLocksAcquired(recipients);
if (f != null) f.get();
} else if (isL1CacheEnabled && command.isOnePhaseCommit() && !ctx.isOriginLocal() && !ctx.getLockedKeys().isEmpty()) {
// We fall into this block if we are a remote node, happen to be the primary data owner and have locked keys.
// it is still our responsibility to invalidate L1 caches in the cluster.
flushL1Caches(ctx);
}
return retVal;
}
Expand Down

0 comments on commit d7e864e

Please sign in to comment.