From d7e864ecaf622955c1419ef906a51ad80a6ae717 Mon Sep 17 00:00:00 2001 From: Manik Surtani Date: Thu, 1 Dec 2011 12:24:00 +0000 Subject: [PATCH] Fix L1 invalidation issue --- .../distribution/L1ManagerImpl.java | 14 ++++--- .../interceptors/DistributionInterceptor.java | 42 ++++++++++++------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java b/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java index 0a7dc2f165a8..109292f9601a 100644 --- a/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java +++ b/core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java @@ -22,11 +22,6 @@ */ package org.infinispan.distribution; -import java.util.Collection; -import java.util.HashSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.config.Configuration; @@ -39,6 +34,11 @@ import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class L1ManagerImpl implements L1Manager { private static final Log log = LogFactory.getLog(L1ManagerImpl.class); @@ -116,8 +116,10 @@ private Collection
buildInvalidationAddressList(Collection keys for (Object key : keys) { Collection
as = requestors.remove(key); - if (as != null) + if (as != null) { addresses.addAll(as); + if (origin != null && as.contains(origin)) addRequestor(key, origin); + } } if (origin != null) addresses.remove(origin); diff --git a/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java index 3f631591150b..eeb385469122 100644 --- a/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java @@ -66,6 +66,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; /** @@ -313,24 +314,32 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) Collection
preparedOn = ((LocalTxInvocationContext) ctx).getRemoteLocksAcquired(); - NotifyingNotifiableFuture f = null; - if (isL1CacheEnabled) { - f = l1Manager.flushCache(ctx.getLockedKeys(), null, null); - } - + Future f = flushL1Caches(ctx); sendCommitCommand(ctx, command, preparedOn); + blockOnL1FutureIfNeeded(f); - if (f != null && configuration.isSyncCommitPhase()) { - try { - f.get(); - } catch (Exception e) { - if (log.isInfoEnabled()) log.failedInvalidatingRemoteCache(e); - } - } + } else if (isL1CacheEnabled && !ctx.isOriginLocal() && !ctx.getLockedKeys().isEmpty()) { + // We fall into this block if we are a remote node, happen to be the primary data owner and have locked keys. + // it is still our responsibility to invalidate L1 caches in the cluster. + blockOnL1FutureIfNeeded(flushL1Caches(ctx)); } return invokeNextInterceptor(ctx, command); } + private Future flushL1Caches(InvocationContext ctx) { + return isL1CacheEnabled ? l1Manager.flushCache(ctx.getLockedKeys(), null, ctx.getOrigin()) : null; + } + + private void blockOnL1FutureIfNeeded(Future f) { + if (f != null && configuration.isSyncCommitPhase()) { + try { + f.get(); + } catch (Exception e) { + if (log.isInfoEnabled()) log.failedInvalidatingRemoteCache(e); + } + } + } + private void sendCommitCommand(TxInvocationContext ctx, CommitCommand command, Collection
preparedOn) throws TimeoutException, InterruptedException { // we only send the commit command to the nodes that @@ -374,16 +383,17 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman int newCacheViewId = -1; stateTransferLock.waitForStateTransferToEnd(ctx, command, newCacheViewId); - NotifyingNotifiableFuture f = null; - if (isL1CacheEnabled && command.isOnePhaseCommit()) - f = l1Manager.flushCache(ctx.getLockedKeys(), null, null); + if (command.isOnePhaseCommit()) flushL1Caches(ctx); // if we are one-phase, don't block on this future. Collection
recipients = dm.getAffectedNodes(ctx.getAffectedKeys()); // this method will return immediately if we're the only member (because exclude_self=true) rpcManager.invokeRemotely(recipients, command, sync); ((LocalTxInvocationContext) ctx).remoteLocksAcquired(recipients); - if (f != null) f.get(); + } else if (isL1CacheEnabled && command.isOnePhaseCommit() && !ctx.isOriginLocal() && !ctx.getLockedKeys().isEmpty()) { + // We fall into this block if we are a remote node, happen to be the primary data owner and have locked keys. + // it is still our responsibility to invalidate L1 caches in the cluster. + flushL1Caches(ctx); } return retVal; }