Skip to content
Browse files

Introduce a common base class for replication and distribution interc…

…eptors for code reuse

Move some methods from DistributionManager to BaseDistributionInterceptor.
  • Loading branch information...
1 parent cc1ac8e commit f0aaac1fbc91fb362bc8be6fec012321a0f1bde4 @anistor anistor committed with Mircea Markus
View
22 core/src/main/java/org/infinispan/distribution/DistributionManager.java
@@ -22,10 +22,6 @@
*/
package org.infinispan.distribution;
-import org.infinispan.commands.FlagAffectedCommand;
-import org.infinispan.container.entries.CacheEntry;
-import org.infinispan.container.entries.InternalCacheEntry;
-import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
@@ -96,24 +92,6 @@
Set<Address> locateAll(Collection<Object> keys); //todo [anistor] this has to take an additional parameter that specifies if the lookup is for read or write
/**
- * Transforms a cache entry so it is marked for L1 rather than the primary cache data structure. This should be done
- * if it is deemed that the entry is targeted for L1 storage rather than storage in the primary data container.
- *
- * @param entry entry to transform
- */
- void transformForL1(CacheEntry entry);
-
- /**
- * Retrieves a cache entry from a remote source. Would typically involve an RPC call using a {@link org.infinispan.commands.remote.ClusteredGetCommand}
- * and some form of quorum of responses if the responses returned are inconsistent - often the case if there is a
- * rehash in progress, involving nodes that the key maps to.
- *
- * @param key key to look up
- * @return an internal cache entry, or null if it cannot be located
- */
- InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception;
-
- /**
* Retrieves the consistent hash instance currently in use, an instance of the configured ConsistentHash
* class (which defaults to {@link org.infinispan.distribution.ch.DefaultConsistentHash}.
*
View
55 core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
@@ -22,15 +22,6 @@
*/
package org.infinispan.distribution;
-import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.FlagAffectedCommand;
-import org.infinispan.commands.remote.ClusteredGetCommand;
-import org.infinispan.configuration.cache.Configuration;
-import org.infinispan.container.entries.CacheEntry;
-import org.infinispan.container.entries.InternalCacheEntry;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@@ -38,15 +29,9 @@
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.jmx.annotations.Parameter;
import org.infinispan.statetransfer.StateTransferManager;
-import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
-import org.infinispan.remoting.responses.Response;
-import org.infinispan.remoting.responses.SuccessfulResponse;
-import org.infinispan.remoting.rpc.ResponseFilter;
-import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.topology.CacheTopology;
-import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.Immutables;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.logging.Log;
@@ -70,9 +55,7 @@
private static final boolean trace = log.isTraceEnabled();
// Injected components
- private Configuration configuration;
private RpcManager rpcManager;
- private CommandsFactory cf;
private StateTransferManager stateTransferManager;
/**
@@ -82,11 +65,8 @@ public DistributionManagerImpl() {
}
@Inject
- public void init(Configuration configuration, RpcManager rpcManager, CommandsFactory cf,
- StateTransferManager stateTransferManager) {
- this.configuration = configuration;
+ public void init(RpcManager rpcManager, StateTransferManager stateTransferManager) {
this.rpcManager = rpcManager;
- this.cf = cf;
this.stateTransferManager = stateTransferManager;
}
@@ -148,39 +128,6 @@ public Address getPrimaryLocation(Object key) {
}
@Override
- public void transformForL1(CacheEntry entry) {
- if (entry.getLifespan() < 0 || entry.getLifespan() > configuration.clustering().l1().lifespan())
- entry.setLifespan(configuration.clustering().l1().lifespan());
- }
-
- @Override
- public InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception {
- GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
- ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
-
- List<Address> targets = new ArrayList<Address>(getReadConsistentHash().locateOwners(key));
- // if any of the recipients has left the cluster since the command was issued, just don't wait for its response
- targets.retainAll(rpcManager.getTransport().getMembers());
- ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, getAddress());
- Map<Address, Response> responses = rpcManager.invokeRemotely(targets, get, ResponseMode.WAIT_FOR_VALID_RESPONSE,
- configuration.clustering().sync().replTimeout(), true, filter);
-
- if (!responses.isEmpty()) {
- for (Response r : responses.values()) {
- if (r instanceof SuccessfulResponse) {
- InternalCacheValue cacheValue = (InternalCacheValue) ((SuccessfulResponse) r).getResponseValue();
- return cacheValue.toInternalCacheEntry(key);
- }
- }
- }
-
- // TODO If everyone returned null, and the read CH has changed, retry the remote get.
- // Otherwise our get command might be processed by the old owners after they have invalidated their data
- // and we'd return a null even though the key exists on
- return null;
- }
-
- @Override
public ConsistentHash getConsistentHash() {
return getWriteConsistentHash();
}
View
120 core/src/main/java/org/infinispan/interceptors/ClusteringInterceptor.java
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.interceptors;
+
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.FlagAffectedCommand;
+import org.infinispan.commands.read.AbstractDataCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.EntryFactory;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.ch.ConsistentHash;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.interceptors.base.BaseRpcInterceptor;
+import org.infinispan.statetransfer.StateTransferManager;
+import org.infinispan.util.concurrent.locks.LockManager;
+
+/**
+ * Base class for replication and distribution interceptors.
+ *
+ * @author anistor@redhat.com
+ * @since 5.2
+ */
+public abstract class ClusteringInterceptor extends BaseRpcInterceptor {
+
+ protected CommandsFactory cf;
+ protected EntryFactory entryFactory;
+ protected LockManager lockManager;
+ protected DataContainer dataContainer;
+ protected StateTransferManager stateTransferManager;
+ protected boolean needReliableReturnValues;
+
+ @Inject
+ public void injectDependencies(CommandsFactory cf, EntryFactory entryFactory,
+ LockManager lockManager, DataContainer dataContainer,
+ StateTransferManager stateTransferManager) {
+ this.cf = cf;
+ this.entryFactory = entryFactory;
+ this.lockManager = lockManager;
+ this.dataContainer = dataContainer;
+ this.stateTransferManager = stateTransferManager;
+ }
+
+ @Start
+ public void configure() {
+ needReliableReturnValues = !cacheConfiguration.unsafe().unreliableReturnValues();
+ }
+
+ protected boolean isNeedReliableReturnValues(FlagAffectedCommand command) {
+ return !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
+ && !command.hasFlag(Flag.IGNORE_RETURN_VALUES) && needReliableReturnValues;
+ }
+
+ protected boolean needsRemoteGet(InvocationContext ctx, AbstractDataCommand command) {
+ if (command.hasFlag(Flag.CACHE_MODE_LOCAL)
+ || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
+ || command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
+ return false;
+ }
+ boolean shouldFetchFromRemote = false;
+ CacheEntry entry = ctx.lookupEntry(command.getKey());
+ if (entry == null || entry.isNull() || entry.isLockPlaceholder()) {
+ Object key = command.getKey();
+ ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash();
+ shouldFetchFromRemote = ctx.isOriginLocal() && !ch.isKeyLocalToNode(rpcManager.getAddress(), key) && !dataContainer.containsKey(key);
+ if (!shouldFetchFromRemote && getLog().isTraceEnabled()) {
+ getLog().tracef("Not doing a remote get for key %s since entry is mapped to current node (%s) or is in L1. Owners are %s", key, rpcManager.getAddress(), ch.locateOwners(key));
+ }
+ }
+ return shouldFetchFromRemote;
+ }
+
+ /**
+ * For conditional operations (replace, remove, put if absent) Used only for optimistic transactional caches, to solve the following situation:
+ * <pre>
+ * - node A (owner, tx originator) does a successful replace
+ * - the actual value changes
+ * - tx commits. The value is applied on A (the check was performed at operation time) but is not applied on
+ * B (check is performed at commit time).
+ * In such situations (optimistic caches) the remote conditional command should not re-check the old value.
+ * </pre>
+ */
+ protected boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationContext ctx) {
+ return ctx.isOriginLocal() && command.isSuccessful();
+ }
+
+ /**
+ * Retrieves a cache entry from a remote source. Would typically involve an RPC call using a {@link org.infinispan.commands.remote.ClusteredGetCommand}
+ * and some form of quorum of responses if the responses returned are inconsistent - often the case if there is a
+ * rehash in progress, involving nodes that the key maps to.
+ *
+ * @param key key to look up
+ * @return an internal cache entry, or null if it cannot be located
+ */
+ protected abstract InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception;
+}
View
76 core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
@@ -22,7 +22,6 @@
*/
package org.infinispan.interceptors;
-import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.AbstractDataCommand;
@@ -35,19 +34,12 @@
import org.infinispan.commands.write.*;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configurations;
-import org.infinispan.container.DataContainer;
-import org.infinispan.container.EntryFactory;
-import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
-import org.infinispan.distribution.ch.ConsistentHash;
-import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
-import org.infinispan.interceptors.base.BaseRpcInterceptor;
import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
@@ -55,10 +47,8 @@
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException;
-import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.xa.GlobalTransaction;
-import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -71,16 +61,8 @@
* @author Bela Ban
* @since 4.0
*/
-public class ReplicationInterceptor extends BaseRpcInterceptor {
+public class ReplicationInterceptor extends ClusteringInterceptor {
- private CommandsFactory cf;
-
- private EntryFactory entryFactory;
- private LockManager lockManager;
- private DataContainer dataContainer;
- private StateTransferManager stateTransferManager;
-
- private boolean needReliableReturnValues;
private boolean isPessimisticCache;
private static final Log log = LogFactory.getLog(ReplicationInterceptor.class);
@@ -91,19 +73,9 @@ protected Log getLog() {
return log;
}
- @Inject
- public void init(CommandsFactory cf, EntryFactory entryFactory, DataContainer dataContainer, LockManager lockManager, StateTransferManager stateTransferManager) {
- this.cf = cf;
- this.entryFactory = entryFactory;
- this.dataContainer = dataContainer;
- this.lockManager = lockManager;
- this.stateTransferManager = stateTransferManager;
- }
-
@Start
public void start() {
isPessimisticCache = cacheConfiguration.transaction().lockingMode() == LockingMode.PESSIMISTIC;
- needReliableReturnValues = !cacheConfiguration.unsafe().unreliableReturnValues();
}
@Override
@@ -180,44 +152,6 @@ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlComman
return retVal;
}
- private boolean needsRemoteGet(InvocationContext ctx, AbstractDataCommand command) {
- if (command.hasFlag(Flag.CACHE_MODE_LOCAL)
- || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) //todo [anistor] clarify usage of this flag in REPL mode
- || command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
- return false;
- }
- boolean shouldFetchFromRemote = false;
- CacheEntry entry = ctx.lookupEntry(command.getKey());
- if (entry == null || entry.isNull() || entry.isLockPlaceholder()) {
- Object key = command.getKey();
- ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash();
- shouldFetchFromRemote = ctx.isOriginLocal() && !ch.isKeyLocalToNode(rpcManager.getAddress(), key) && !dataContainer.containsKey(key);
- if (!shouldFetchFromRemote) {
- log.tracef("Not doing a remote get for key %s since entry is mapped to current node (%s). Owners are %s", key, rpcManager.getAddress(), ch.locateOwners(key));
- }
- }
- return shouldFetchFromRemote;
- }
-
- private boolean isNeedReliableReturnValues(FlagAffectedCommand command) {
- return !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
- && !command.hasFlag(Flag.IGNORE_RETURN_VALUES) && needReliableReturnValues;
- }
-
- /**
- * For conditional operations (replace, remove, put if absent) Used only for optimistic transactional caches, to solve the following situation:
- * <pre>
- * - node A (owner, tx originator) does a successful replace
- * - the actual value changes
- * - tx commits. The value is applied on A (the check was performed at operation time) but is not applied on
- * B (check is performed at commit time).
- * In such situations (optimistic caches) the remote conditional command should not re-check the old value.
- * </pre>
- */
- private boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationContext ctx) {
- return ctx.isOriginLocal() && command.isSuccessful();
- }
-
/**
* This method retrieves an entry from a remote cache.
* <p/>
@@ -242,7 +176,7 @@ private Object remoteGet(InvocationContext ctx, Object key, FlagAffectedCommand
acquireRemoteLock = isWrite && isPessimisticCache && !txContext.getAffectedKeys().contains(key);
}
// attempt a remote lookup
- InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command.getFlags());
+ InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command);
if (acquireRemoteLock) {
((TxInvocationContext) ctx).addAffectedKey(key);
@@ -265,9 +199,9 @@ protected Address getPrimaryOwner() {
return stateTransferManager.getCacheTopology().getReadConsistentHash().getMembers().get(0);
}
- private InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, Set<Flag> flags) {
+ protected InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) {
GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
- ClusteredGetCommand get = cf.buildClusteredGetCommand(key, flags, acquireRemoteLock, gtx);
+ ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
List<Address> targets = Collections.singletonList(getPrimaryOwner());
ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress());
@@ -289,7 +223,7 @@ private InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContex
private Object localGet(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
InternalCacheEntry ice = dataContainer.get(key);
if (ice != null) {
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice.getValue())) {
if (isWrite)
lockAndWrap(ctx, key, ice, command);
else
View
97 core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java
@@ -22,34 +22,32 @@
*/
package org.infinispan.interceptors.distribution;
-import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
-import org.infinispan.commands.read.AbstractDataCommand;
+import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
-import org.infinispan.container.DataContainer;
-import org.infinispan.container.EntryFactory;
-import org.infinispan.container.entries.CacheEntry;
-import org.infinispan.context.Flag;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.InvocationContext;
-import org.infinispan.distribution.DataLocality;
+import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.interceptors.base.BaseRpcInterceptor;
+import org.infinispan.interceptors.ClusteringInterceptor;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.rpc.ResponseFilter;
+import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.Immutables;
-import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
/**
* Base class for distribution of entries across a cluster.
@@ -60,14 +58,11 @@
* @author Dan Berindei <dan@infinispan.org>
* @since 4.0
*/
-public abstract class BaseDistributionInterceptor extends BaseRpcInterceptor {
+public abstract class BaseDistributionInterceptor extends ClusteringInterceptor {
+
protected DistributionManager dm;
- protected CommandsFactory cf;
- protected DataContainer dataContainer;
- protected EntryFactory entryFactory;
- protected LockManager lockManager;
+
protected ClusteringDependentLogic cdl;
- private boolean needReliableReturnValues;
private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class);
@@ -77,42 +72,36 @@ protected Log getLog() {
}
@Inject
- public void injectDependencies(DistributionManager distributionManager,
- CommandsFactory cf, DataContainer dataContainer, EntryFactory entryFactory,
- LockManager lockManager, ClusteringDependentLogic cdl) {
+ public void injectDependencies(DistributionManager distributionManager, ClusteringDependentLogic cdl) {
this.dm = distributionManager;
- this.cf = cf;
- this.dataContainer = dataContainer;
- this.entryFactory = entryFactory;
- this.lockManager = lockManager;
this.cdl = cdl;
}
- @Start
- public void configure() {
- needReliableReturnValues = !cacheConfiguration.unsafe().unreliableReturnValues();
- }
-
- protected boolean needsRemoteGet(InvocationContext ctx, AbstractDataCommand command) {
- boolean shouldFetchFromRemote = false;
- final CacheEntry entry;
- if (!command.hasFlag(Flag.CACHE_MODE_LOCAL)
- && !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
- && !command.hasFlag(Flag.IGNORE_RETURN_VALUES)
- && ((entry = ctx.lookupEntry(command.getKey())) == null || entry.isNull() || entry.isLockPlaceholder())) {
- Object key = command.getKey();
- DataLocality locality = dm.getReadConsistentHash().isKeyLocalToNode(rpcManager.getAddress(), key) ? DataLocality.LOCAL : DataLocality.NOT_LOCAL;
- shouldFetchFromRemote = ctx.isOriginLocal() && !locality.isLocal() && !dataContainer.containsKey(key);
- if (!shouldFetchFromRemote) {
- log.tracef("Not doing a remote get for key %s since entry is mapped to current node (%s), or is in L1. Owners are %s", key, rpcManager.getAddress(), dm.locate(key));
+ @Override
+ protected InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception {
+ GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
+ ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
+
+ List<Address> targets = new ArrayList<Address>(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key));
+ // if any of the recipients has left the cluster since the command was issued, just don't wait for its response
+ targets.retainAll(rpcManager.getTransport().getMembers());
+ ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress());
+ Map<Address, Response> responses = rpcManager.invokeRemotely(targets, get, ResponseMode.WAIT_FOR_VALID_RESPONSE,
+ cacheConfiguration.clustering().sync().replTimeout(), true, filter);
+
+ if (!responses.isEmpty()) {
+ for (Response r : responses.values()) {
+ if (r instanceof SuccessfulResponse) {
+ InternalCacheValue cacheValue = (InternalCacheValue) ((SuccessfulResponse) r).getResponseValue();
+ return cacheValue.toInternalCacheEntry(key);
+ }
}
}
- return shouldFetchFromRemote;
- }
- protected boolean isNeedReliableReturnValues(FlagAffectedCommand command) {
- return !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
- && !command.hasFlag(Flag.IGNORE_RETURN_VALUES) && needReliableReturnValues;
+ // TODO If everyone returned null, and the read CH has changed, retry the remote get.
+ // Otherwise our get command might be processed by the old owners after they have invalidated their data
+ // and we'd return a null even though the key exists on
+ return null;
}
@Override
@@ -157,9 +146,9 @@ protected boolean isSingleOwnerAndLocal(RecipientGenerator recipientGenerator) {
}
class SingleKeyRecipientGenerator implements RecipientGenerator {
- final Object key;
- final Set<Object> keys;
- List<Address> recipients = null;
+ private final Object key;
+ private final Set<Object> keys;
+ private List<Address> recipients = null;
SingleKeyRecipientGenerator(Object key) {
this.key = key;
@@ -180,8 +169,8 @@ protected boolean isSingleOwnerAndLocal(RecipientGenerator recipientGenerator) {
class MultipleKeysRecipientGenerator implements RecipientGenerator {
- final Collection<Object> keys;
- List<Address> recipients = null;
+ private final Collection<Object> keys;
+ private List<Address> recipients = null;
MultipleKeysRecipientGenerator(Collection<Object> keys) {
this.keys = keys;
View
4 .../src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java
@@ -140,7 +140,7 @@ private Object remoteGetBeforeWrite(InvocationContext ctx, Object key, FlagAffec
if (trace) log.tracef("Doing a remote get for key %s", key);
// attempt a remote lookup
- InternalCacheEntry ice = dm.retrieveFromRemoteSource(key, ctx, false, command);
+ InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command);
if (ice != null) {
if (!ctx.replaceValue(key, ice.getValue())) {
entryFactory.wrapEntryForPut(ctx, key, ice, false, command);
@@ -190,7 +190,7 @@ protected void handleRemoteWrite(InvocationContext ctx, WriteCommand command, Re
private Object remoteGet(InvocationContext ctx, Object key, GetKeyValueCommand command) throws Throwable {
if (trace) log.tracef("Doing a remote get for key %s", key);
- InternalCacheEntry ice = dm.retrieveFromRemoteSource(key, ctx, false, command);
+ InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command);
command.setRemotelyFetchedValue(ice);
if (ice != null) {
return ice.getValue();
View
8 core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java
@@ -117,8 +117,9 @@ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) t
* In such situations (optimistic caches) the remote conditional command should not re-check the old value.
* </pre>
*/
- private boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationContext ctx) {
- return ctx.isOriginLocal() && command.isSuccessful() && cacheConfiguration.transaction().lockingMode() == LockingMode.OPTIMISTIC && !useClusteredWriteSkewCheck;
+ protected boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationContext ctx) {
+ return super.ignorePreviousValueOnBackup(command, ctx)
+ && cacheConfiguration.transaction().lockingMode() == LockingMode.OPTIMISTIC && !useClusteredWriteSkewCheck;
}
@Start
@@ -176,7 +177,6 @@ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
}
}
-
private void lockAndWrap(InvocationContext ctx, Object key, InternalCacheEntry ice, FlagAffectedCommand command) throws InterruptedException {
boolean skipLocking = hasSkipLocking(command);
long lockTimeout = getLockAcquisitionTimeout(command, skipLocking);
@@ -341,7 +341,7 @@ private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean
acquireRemoteLock = isWrite && isPessimisticCache && !txContext.getAffectedKeys().contains(key);
}
// attempt a remote lookup
- InternalCacheEntry ice = dm.retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command);
+ InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command);
if (acquireRemoteLock) {
((TxInvocationContext) ctx).addAffectedKey(key);
View
4 core/src/main/java/org/infinispan/interceptors/locking/ClusteringDependentLogic.java
@@ -324,7 +324,9 @@ public void commitEntry(CacheEntry entry, EntryVersion newVersion, boolean skipO
boolean isForeignOwned = !skipOwnershipCheck && !localNodeIsOwner(entry.getKey());
if (isForeignOwned && !entry.isRemoved()) {
if (configuration.clustering().l1().enabled()) {
- dm.transformForL1(entry);
+ // transform for L1
+ if (entry.getLifespan() < 0 || entry.getLifespan() > configuration.clustering().l1().lifespan())
+ entry.setLifespan(configuration.clustering().l1().lifespan());
} else {
doCommit = false;
}
View
6 core/src/test/java/org/infinispan/interceptors/ReplicationInterceptorTest.java
@@ -45,8 +45,6 @@
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.locks.LockManager;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
@@ -67,8 +65,6 @@
@Test(groups = "functional", testName = "interceptors.ReplicationInterceptorTest")
public class ReplicationInterceptorTest {
- private static final Log log = LogFactory.getLog(ReplicationInterceptorTest.class);
-
public void testRemoteGetForGetKeyValueCommand() throws Throwable {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.clustering().cacheMode(CacheMode.REPL_SYNC);
@@ -114,7 +110,7 @@ public CacheTopology answer(InvocationOnMock invocation) {
return cacheTopology;
}
});
- replInterceptor.init(commandsFactory, entryFactory, dataContainer, lockManager, stateTransferManager);
+ replInterceptor.injectDependencies(commandsFactory, entryFactory, lockManager, dataContainer, stateTransferManager);
RpcManager rpcManager = mock(RpcManager.class);
Transport transport = mock(Transport.class);
when(rpcManager.getAddress()).thenReturn(B);
View
12 core/src/test/java/org/infinispan/statetransfer/BaseDistStateTransferConsistencyTest.java
@@ -238,8 +238,8 @@ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) thro
TestingUtil.waitForRehashToComplete(cache(0), cache(2));
// at this point state transfer is fully done
- log.infof("Data container of NodeA has %d keys: %s", dc0.size(), dc0.keySet());
- log.infof("Data container of NodeC has %d keys: %s", dc2.size(), dc2.keySet());
+ log.infof("Data container of NodeA has %d keys: %s", dc0.size(), dc0.entrySet());
+ log.infof("Data container of NodeC has %d keys: %s", dc2.size(), dc2.entrySet());
if (op == Operation.CLEAR || op == Operation.REMOVE) {
// caches should be empty. check that no keys were revived by an inconsistent state transfer
@@ -262,7 +262,8 @@ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) thro
assertEquals("Wrong number of owners", ch.locateOwners(i).size(), owners);
// check values were not overwritten with old values carried by state transfer
- assertEquals("after_st_" + i, cache(0).get(i));
+ String expected = "after_st_" + i;
+ assertEquals(expected, cache(0).get(i));
assertEquals("after_st_" + i, cache(2).get(i));
}
} else { // PUT_IF_ABSENT
@@ -278,8 +279,9 @@ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) thro
}
assertEquals("Wrong number of owners", ch.locateOwners(i).size(), owners);
- assertEquals("before_st_" + i, cache(0).get(i));
- assertEquals("before_st_" + i, cache(2).get(i));
+ String expected = "before_st_" + i;
+ assertEquals(expected, cache(0).get(i));
+ assertEquals(expected, cache(2).get(i));
}
}
}

0 comments on commit f0aaac1

Please sign in to comment.
Something went wrong with that request. Please try again.