Permalink
Browse files

ISPN-2757 ReplaceCommand in REPL mode cluster does not perform any mo…

…difications if the entry was not yet transfered during rehashing

* ReplicationInterceptor needs to be smart an if the command is conditional but the affected value is not yet available locally because state transfer did not have the chance to fetch it yet then we should fetch it now.
* The whole logic where the coordinator is the main lock owner has to be replaced to use the first topology member rather than the coordinator (the coordinator might not even run the cache in question)
* ClusteringDependentLogic needs to be split in separate implementations for each cache mode
  • Loading branch information...
1 parent 9e52b8f commit cc1ac8e73d9b96b8d386fb193641919779999ca6 @anistor anistor committed with Mircea Markus Feb 1, 2013
@@ -168,7 +168,7 @@ public boolean isSuccessful() {
@Override
public boolean isConditional() {
- return true;
+ return !ignorePreviousValue;
}
public long getLifespanMillis() {
@@ -81,8 +81,12 @@
Class<?> componentImpl;
if (componentType.equals(ClusteringDependentLogic.class)) {
CacheMode cacheMode = configuration.clustering().cacheMode();
- if (cacheMode.isReplicated() || !cacheMode.isClustered() || cacheMode.isInvalidation()) {
- return componentType.cast(new ClusteringDependentLogic.AllNodesLogic());
+ if (!cacheMode.isClustered()) {
+ return componentType.cast(new ClusteringDependentLogic.LocalLogic());
+ } else if (cacheMode.isInvalidation()) {
+ return componentType.cast(new ClusteringDependentLogic.InvalidationLogic());
+ } else if (cacheMode.isReplicated()) {
+ return componentType.cast(new ClusteringDependentLogic.ReplicationLogic());
} else {
return componentType.cast(new ClusteringDependentLogic.DistributionLogic());
}
@@ -23,28 +23,28 @@
package org.infinispan.interceptors;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.AbstractDataCommand;
+import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
-import org.infinispan.commands.write.ClearCommand;
-import org.infinispan.commands.write.PutKeyValueCommand;
-import org.infinispan.commands.write.PutMapCommand;
-import org.infinispan.commands.write.RemoveCommand;
-import org.infinispan.commands.write.ReplaceCommand;
-import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.commands.write.*;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configurations;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.EntryFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
@@ -58,13 +58,11 @@
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.TimeoutException;
/**
@@ -75,10 +73,14 @@
*/
public class ReplicationInterceptor extends BaseRpcInterceptor {
- protected CommandsFactory cf;
+ private CommandsFactory cf;
+ private EntryFactory entryFactory;
+ private LockManager lockManager;
+ private DataContainer dataContainer;
private StateTransferManager stateTransferManager;
+ private boolean needReliableReturnValues;
private boolean isPessimisticCache;
private static final Log log = LogFactory.getLog(ReplicationInterceptor.class);
@@ -90,14 +92,18 @@ protected Log getLog() {
}
@Inject
- public void init(CommandsFactory cf, StateTransferManager stateTransferManager) {
+ public void init(CommandsFactory cf, EntryFactory entryFactory, DataContainer dataContainer, LockManager lockManager, StateTransferManager stateTransferManager) {
this.cf = cf;
+ this.entryFactory = entryFactory;
+ this.dataContainer = dataContainer;
+ this.lockManager = lockManager;
this.stateTransferManager = stateTransferManager;
}
@Start
public void start() {
isPessimisticCache = cacheConfiguration.transaction().lockingMode() == LockingMode.PESSIMISTIC;
+ needReliableReturnValues = !cacheConfiguration.unsafe().unreliableReturnValues();
}
@Override
@@ -149,7 +155,10 @@ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
// the entry is mapped to the local node.
if (returnValue == null && ctx.isOriginLocal()) {
if (needsRemoteGet(ctx, command)) {
- returnValue = remoteGet(ctx, command, false);
+ returnValue = remoteGet(ctx, command.getKey(), command, false);
+ }
+ if (returnValue == null) {
+ returnValue = localGet(ctx, command.getKey(), false, command);
}
}
return returnValue;
@@ -172,15 +181,43 @@ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlComman
}
private boolean needsRemoteGet(InvocationContext ctx, AbstractDataCommand command) {
- Object key = command.getKey();
- final CacheEntry entry;
- return !command.hasFlag(Flag.CACHE_MODE_LOCAL)
- && !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) //todo [anistor] do we need this? it should normally be used only in distributed mode, never in replicated mode
- && !stateTransferManager.getCacheTopology().getReadConsistentHash().isKeyLocalToNode(rpcManager.getAddress(), key)
- && ((entry = ctx.lookupEntry(key)) == null || entry.isNull() || entry.isLockPlaceholder());
+ if (command.hasFlag(Flag.CACHE_MODE_LOCAL)
+ || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) //todo [anistor] clarify usage of this flag in REPL mode
+ || command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
+ return false;
+ }
+ boolean shouldFetchFromRemote = false;
+ CacheEntry entry = ctx.lookupEntry(command.getKey());
+ if (entry == null || entry.isNull() || entry.isLockPlaceholder()) {
+ Object key = command.getKey();
+ ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash();
+ shouldFetchFromRemote = ctx.isOriginLocal() && !ch.isKeyLocalToNode(rpcManager.getAddress(), key) && !dataContainer.containsKey(key);
+ if (!shouldFetchFromRemote) {
+ log.tracef("Not doing a remote get for key %s since entry is mapped to current node (%s). Owners are %s", key, rpcManager.getAddress(), ch.locateOwners(key));
+ }
+ }
+ return shouldFetchFromRemote;
+ }
+
+ private boolean isNeedReliableReturnValues(FlagAffectedCommand command) {
+ return !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
+ && !command.hasFlag(Flag.IGNORE_RETURN_VALUES) && needReliableReturnValues;
+ }
+
+ /**
+ * For conditional operations (replace, remove, put if absent) Used only for optimistic transactional caches, to solve the following situation:
+ * <pre>
+ * - node A (owner, tx originator) does a successful replace
+ * - the actual value changes
+ * - tx commits. The value is applied on A (the check was performed at operation time) but is not applied on
+ * B (check is performed at commit time).
+ * In such situations (optimistic caches) the remote conditional command should not re-check the old value.
+ * </pre>
+ */
+ private boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationContext ctx) {
+ return ctx.isOriginLocal() && command.isSuccessful();
}
- //todo [anistor] need to revise these methods
/**
* This method retrieves an entry from a remote cache.
* <p/>
@@ -190,12 +227,12 @@ private boolean needsRemoteGet(InvocationContext ctx, AbstractDataCommand comman
*
*
* @param ctx invocation context
+ * @param key
* @param command
* @return value of a remote get, or null
* @throws Throwable if there are problems
*/
- private Object remoteGet(InvocationContext ctx, AbstractDataCommand command, boolean isWrite) throws Throwable {
- Object key = command.getKey();
+ private Object remoteGet(InvocationContext ctx, Object key, FlagAffectedCommand command, boolean isWrite) throws Throwable {
if (trace) {
log.tracef("Key %s is not yet available on %s, so we may need to look elsewhere", key, rpcManager.getAddress());
}
@@ -211,16 +248,28 @@ private Object remoteGet(InvocationContext ctx, AbstractDataCommand command, boo
((TxInvocationContext) ctx).addAffectedKey(key);
}
- return ice != null ? ice.getValue() : null;
+ if (ice != null) {
+ if (!ctx.replaceValue(key, ice.getValue())) {
+ if (isWrite) {
+ lockAndWrap(ctx, key, ice, command);
+ } else {
+ ctx.putLookedUpEntry(key, ice);
+ }
+ }
+ return ice.getValue();
+ }
+ return null;
+ }
+
+ protected Address getPrimaryOwner() {
+ return stateTransferManager.getCacheTopology().getReadConsistentHash().getMembers().get(0);
}
private InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, Set<Flag> flags) {
GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
ClusteredGetCommand get = cf.buildClusteredGetCommand(key, flags, acquireRemoteLock, gtx);
- List<Address> targets = new ArrayList<Address>(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key));
- // if any of the recipients has left the cluster since the command was issued, just don't wait for its response
- targets.retainAll(rpcManager.getTransport().getMembers());
+ List<Address> targets = Collections.singletonList(getPrimaryOwner());
ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress());
Map<Address, Response> responses = rpcManager.invokeRemotely(targets, get, ResponseMode.WAIT_FOR_VALID_RESPONSE,
cacheConfiguration.clustering().sync().replTimeout(), true, filter);
@@ -237,36 +286,77 @@ private InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContex
return null;
}
+ private Object localGet(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
+ InternalCacheEntry ice = dataContainer.get(key);
+ if (ice != null) {
+ if (!ctx.replaceValue(key, ice.getValue())) {
+ if (isWrite)
+ lockAndWrap(ctx, key, ice, command);
+ else
+ ctx.putLookedUpEntry(key, ice);
+ }
+ return command instanceof GetCacheEntryCommand ? ice : ice.getValue();
+ }
+ return null;
+ }
+
+ private void lockAndWrap(InvocationContext ctx, Object key, InternalCacheEntry ice, FlagAffectedCommand command) throws InterruptedException {
+ if (isPessimisticCache && rpcManager.getAddress().equals(getPrimaryOwner())) {
+ boolean skipLocking = hasSkipLocking(command);
+ long lockTimeout = getLockAcquisitionTimeout(command, skipLocking);
+ lockManager.acquireLock(ctx, key, lockTimeout, skipLocking);
+ }
+ entryFactory.wrapEntryForPut(ctx, key, ice, false, command);
+ }
+
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
- return handleCrudMethod(ctx, command);
+ return handleCrudMethod(ctx, command, !ctx.isOriginLocal());
}
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
- return handleCrudMethod(ctx, command);
+ return handleCrudMethod(ctx, command, true);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
- return handleCrudMethod(ctx, command);
+ try {
+ return handleCrudMethod(ctx, command, !ctx.isOriginLocal());
+ } finally {
+ if (ignorePreviousValueOnBackup(command, ctx)) {
+ // the command that will execute remotely must ignore previous values
+ command.setIgnorePreviousValue(true);
+ }
+ }
}
@Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
- return handleCrudMethod(ctx, command);
+ return handleCrudMethod(ctx, command, true);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
- return handleCrudMethod(ctx, command);
+ try {
+ return handleCrudMethod(ctx, command, !ctx.isOriginLocal());
+ } finally {
+ if (ignorePreviousValueOnBackup(command, ctx)) {
+ // the command that will execute remotely must ignore previous values
+ command.setIgnorePreviousValue(true);
+ }
+ }
}
/**
* If we are within one transaction we won't do any replication as replication would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication either.
*/
- private Object handleCrudMethod(final InvocationContext ctx, final WriteCommand command) throws Throwable {
+ private Object handleCrudMethod(InvocationContext ctx, WriteCommand command, boolean skipRemoteGet) throws Throwable {
+ if (!skipRemoteGet) {
+ remoteGetBeforeWrite(ctx, command);
+ }
+
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally do we attempt to replicate.
final Object returnValue = invokeNextInterceptor(ctx, command);
if (!isLocalModeForced(command) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
@@ -275,4 +365,18 @@ private Object handleCrudMethod(final InvocationContext ctx, final WriteCommand
return returnValue;
}
+ private void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command) throws Throwable {
+ if (command instanceof AbstractDataCommand && (isNeedReliableReturnValues(command) || command.isConditional())) {
+ AbstractDataCommand singleKeyCommand = (AbstractDataCommand) command;
+
+ Object returnValue = null;
+ // get it remotely if we do not have it yet
+ if (needsRemoteGet(ctx, singleKeyCommand)) {
+ returnValue = remoteGet(ctx, singleKeyCommand.getKey(), singleKeyCommand, true);
+ }
+ if (returnValue == null) {
+ localGet(ctx, singleKeyCommand.getKey(), true, command);
+ }
+ }
+ }
}
@@ -54,10 +54,11 @@ protected void broadcastPrepare(TxInvocationContext context, PrepareCommand comm
// is then stored in the transactional context to be used during the commit phase.
// However if the current node is already the coordinator, then we fall back to "normal" ReplicationInterceptor
// logic for this step.
- if (!rpcManager.getTransport().isCoordinator()) {
+ Address primaryOwner = getPrimaryOwner();
+ if (!primaryOwner.equals(rpcManager.getAddress())) {
setVersionsSeenOnPrepareCommand((VersionedPrepareCommand) command, context);
Map<Address, Response> resps = rpcManager.invokeRemotely(null, command, true, true);
- Response r = resps.get(rpcManager.getTransport().getCoordinator()); // We only really care about the coordinator's response.
+ Response r = resps.get(primaryOwner); // We only really care about the coordinator's response.
readVersionsFromResponse(r, context.getCacheTransaction());
} else {
super.broadcastPrepare(context, command);
@@ -76,7 +76,7 @@ protected Log getLog() {
}
@Inject
- private void injectConfiguration(Configuration configuration) {
+ public void injectConfiguration(Configuration configuration) {
this.cacheConfiguration = configuration;
}
Oops, something went wrong.

0 comments on commit cc1ac8e

Please sign in to comment.