Skip to content

Commit

Permalink
ISPN-6927 Remove acquireRemoteLock and gtx from ClusteredGetCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa committed Sep 1, 2016
1 parent 5460d29 commit 3fc6cea
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,13 @@ public interface CommandsFactory {
* @param flagsBitSet Command flags provided by cache
* @return a ClusteredGetCommand
*/
ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx);
ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet);

@Deprecated
default ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx) {
if (acquireRemoteLock) throw new UnsupportedOperationException("acquireRemoteLock is not supported, use Flag.FORCE_WRITE_LOCK");
return buildClusteredGetCommand(key, flagsBitSet);
}

/**
* Builds a ClusteredGetAllCommand, which is a remote lookup command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,8 @@ public SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call) {
}

@Override
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx) {
return new ClusteredGetCommand(key, cacheName, flagsBitSet, acquireRemoteLock, gtx,
configuration.dataContainer().keyEquivalence());
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet) {
return new ClusteredGetCommand(key, cacheName, flagsBitSet, configuration.dataContainer().keyEquivalence());
}

/**
Expand Down Expand Up @@ -403,8 +402,8 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
case ClusteredGetCommand.COMMAND_ID:
ClusteredGetCommand clusteredGetCommand = (ClusteredGetCommand) c;
clusteredGetCommand.initialize(icf, this, entryFactory,
interceptorChain, txTable,
configuration.dataContainer().keyEquivalence());
interceptorChain,
configuration.dataContainer().keyEquivalence());
break;
case LockControlCommand.COMMAND_ID:
LockControlCommand lcc = (LockControlCommand) c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.io.ObjectOutput;

import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commons.equivalence.Equivalence;
import org.infinispan.commons.util.EnumUtil;
Expand All @@ -18,7 +17,6 @@
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ByteString;
import org.infinispan.util.logging.Log;
Expand All @@ -43,10 +41,7 @@ public class ClusteredGetCommand extends LocalFlagAffectedRpcCommand {
private InvocationContextFactory icf;
private CommandsFactory commandsFactory;
private InterceptorChain invoker;
private boolean acquireRemoteLock;
private GlobalTransaction gtx;

private TransactionTable txTable;
private InternalEntryFactory entryFactory;
private Equivalence keyEquivalence;
//only used by extended statistics. this boolean is local.
Expand All @@ -60,25 +55,19 @@ public ClusteredGetCommand(ByteString cacheName) {
super(cacheName, EnumUtil.EMPTY_BIT_SET);
}

public ClusteredGetCommand(Object key, ByteString cacheName, long flags,
boolean acquireRemoteLock, GlobalTransaction gtx, Equivalence keyEquivalence) {
public ClusteredGetCommand(Object key, ByteString cacheName, long flags, Equivalence keyEquivalence) {
super(cacheName, flags);
this.key = key;
this.acquireRemoteLock = acquireRemoteLock;
this.gtx = gtx;
this.keyEquivalence = keyEquivalence;
this.isWrite = false;
if (acquireRemoteLock && (gtx == null))
throw new IllegalArgumentException("Cannot have null tx if we need to acquire locks");
}

public void initialize(InvocationContextFactory icf, CommandsFactory commandsFactory, InternalEntryFactory entryFactory,
InterceptorChain interceptorChain, TransactionTable txTable,
InterceptorChain interceptorChain,
Equivalence keyEquivalence) {
this.icf = icf;
this.commandsFactory = commandsFactory;
this.invoker = interceptorChain;
this.txTable = txTable;
this.entryFactory = entryFactory;
this.keyEquivalence = keyEquivalence;
}
Expand All @@ -91,7 +80,6 @@ public void initialize(InvocationContextFactory icf, CommandsFactory commandsFac
*/
@Override
public InternalCacheValue perform(InvocationContext context) throws Throwable {
acquireLocksIfNeeded();
// make sure the get command doesn't perform a remote call
// as our caller is already calling the ClusteredGetCommand on all the relevant nodes
long flagBitSet = EnumUtil.bitSetOf(Flag.SKIP_REMOTE_LOOKUP, Flag.CACHE_MODE_LOCAL);
Expand All @@ -113,16 +101,9 @@ public InternalCacheValue perform(InvocationContext context) throws Throwable {
}
}

@Deprecated
public GlobalTransaction getGlobalTransaction() {
return gtx;
}

private void acquireLocksIfNeeded() throws Throwable {
if (acquireRemoteLock) {
LockControlCommand lockControlCommand = commandsFactory.buildLockControlCommand(key, getFlagsBitSet(), gtx);
lockControlCommand.init(invoker, icf, txTable);
lockControlCommand.perform(null);
}
return null;
}

@Override
Expand All @@ -134,20 +115,12 @@ public byte getCommandId() {
public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(key);
output.writeLong(Flag.copyWithoutRemotableFlags(getFlagsBitSet()));
output.writeBoolean(acquireRemoteLock);
if (acquireRemoteLock) {
output.writeObject(gtx);
}
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
key = input.readObject();
setFlagsBitSet(input.readLong());
acquireRemoteLock = input.readBoolean();
if (acquireRemoteLock) {
gtx = (GlobalTransaction) input.readObject();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,14 @@ public final CompletableFuture<Void> visitClearCommand(InvocationContext ctx, Cl
return ctx.continueInvocation();
}

protected final CompletableFuture<InternalCacheEntry> retrieveFromProperSource(Object key,
InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command, boolean isWrite)
protected final CompletableFuture<InternalCacheEntry> retrieveFromProperSource(
Object key, FlagAffectedCommand command, boolean isWrite)
throws Exception {
return doRetrieveFromProperSource(key, null, -1, ctx, command, acquireRemoteLock, isWrite);
return doRetrieveFromProperSource(key, null, -1, command, isWrite);
}

private CompletableFuture<InternalCacheEntry> doRetrieveFromProperSource(Object key, InternalCacheEntry value,
int lastTopologyId, InvocationContext ctx, FlagAffectedCommand command, boolean acquireRemoteLock,
boolean isWrite) {
int lastTopologyId, FlagAffectedCommand command, boolean isWrite) {
if (value != null)
return CompletableFuture.completedFuture(value);

Expand Down Expand Up @@ -203,17 +202,14 @@ private CompletableFuture<InternalCacheEntry> doRetrieveFromProperSource(Object
}
return CompletableFutures.completedNull();
}

GlobalTransaction gtx = ctx.isInTxScope() ? ((TxInvocationContext) ctx).getGlobalTransaction() : null;
ClusteredGetCommand getCommand =
cf.buildClusteredGetCommand(key, command.getFlagsBitSet(), acquireRemoteLock, gtx);

ClusteredGetCommand getCommand = cf.buildClusteredGetCommand(key, command.getFlagsBitSet());
getCommand.setWrite(isWrite);

RpcOptionsBuilder rpcOptionsBuilder =
rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, DeliverOrder.NONE);
return invokeClusterGetCommandRemotely(targets, rpcOptionsBuilder, getCommand, key).thenCompose(
newValue -> doRetrieveFromProperSource(key, newValue, newTopologyId, ctx, command, acquireRemoteLock,
isWrite));
newValue -> doRetrieveFromProperSource(key, newValue, newTopologyId, command, isWrite));
}

private CompletableFuture<InternalCacheEntry> invokeClusterGetCommandRemotely(List<Address> targets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private <T extends AbstractDataCommand & RemoteFetchingCommand> CompletableFutur
if (trace)
log.tracef("Doing a remote get for key %s", key);
CompletableFuture<InternalCacheEntry> remoteFuture =
retrieveFromProperSource(key, ctx, false, command, false);
retrieveFromProperSource(key, command, false);
return remoteFuture.thenCompose(remoteEntry -> {
command.setRemotelyFetchedValue(remoteEntry);
handleRemoteEntry(ctx, key, remoteEntry);
Expand Down Expand Up @@ -255,7 +255,7 @@ public CompletableFuture<Void> visitReadOnlyKeyCommand(InvocationContext ctx, Re
if (readNeedsRemoteValue(ctx, command)) {
if (trace)
log.tracef("Doing a remote get for key %s", key);
remoteFuture = retrieveFromProperSource(key, ctx, false, command, false);
remoteFuture = retrieveFromProperSource(key, command, false);
} else {
remoteFuture = CompletableFutures.completedNull();
}
Expand Down Expand Up @@ -724,7 +724,7 @@ protected CompletableFuture<Void> remoteGetBeforeWrite(InvocationContext ctx, Wr
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
cmdTopology + ", got " + currentTopologyId);
}
remoteFuture = retrieveFromProperSource(key, ctx, false, command, false);
remoteFuture = retrieveFromProperSource(key, command, false);
return remoteFuture.thenCompose(remoteEntry -> {
handleRemoteEntry(ctx, key, remoteEntry);
return ctx.continueInvocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ protected CompletableFuture<Void> remoteGetBeforeWrite(InvocationContext ctx, Wr
protected InternalCacheEntry remoteGet(InvocationContext ctx, Object key, boolean isWrite,
FlagAffectedCommand command) throws Throwable {
// attempt a remote lookup
InternalCacheEntry ice = retrieveFromProperSource(key, ctx, false, command, isWrite).get();
InternalCacheEntry ice = retrieveFromProperSource(key, command, isWrite).get();

if (ice != null) {
if (useClusteredWriteSkewCheck && ctx.isInTxScope()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.configuration.cache.ClusterLoaderConfiguration;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.spi.CacheLoader;
Expand Down Expand Up @@ -60,8 +61,8 @@ public MarshalledEntry load(Object key) throws PersistenceException {
if (!isCacheReady()) return null;

ClusteredGetCommand clusteredGetCommand = new ClusteredGetCommand(
key, cacheName, EnumUtil.EMPTY_BIT_SET, false, null,
cache.getCacheConfiguration().dataContainer().keyEquivalence());
key, cacheName, EnumUtil.bitSetOf(Flag.SKIP_OWNERSHIP_CHECK),
cache.getCacheConfiguration().dataContainer().keyEquivalence());

Collection<Response> responses = doRemoteCall(clusteredGetCommand);
if (responses.isEmpty()) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testImmutableResponseMarshalling() throws Exception {
public void testReplicableCommandsMarshalling() throws Exception {
ByteString cacheName = ByteString.fromString(EmbeddedCacheManager.DEFAULT_CACHE_NAME);
ClusteredGetCommand c2 = new ClusteredGetCommand("key", cacheName,
EnumUtil.EMPTY_BIT_SET, false, null, AnyEquivalence.getInstance());
EnumUtil.EMPTY_BIT_SET, AnyEquivalence.getInstance());
marshallAndAssertEquality(c2);

// SizeCommand does not have an empty constructor, so doesn't look to be one that is marshallable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public void setUp() throws Throwable {

//populate commands
blockingCacheRpcCommand = new StreamRequestCommand<>(cacheName);
nonBlockingCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, false, null, null);
nonBlockingCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, null);
blockingNonCacheRpcCommand = new CacheTopologyControlCommand(null, CacheTopologyControlCommand.Type.POLICY_GET_STATUS, null, 0);
//the GetKeyValueCommand is not replicated, but I only need a command that returns false in canBlock()
nonBlockingNonCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, false, null, AnyEquivalence.STRING);
nonBlockingNonCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, AnyEquivalence.STRING);
blockingSingleRpcCommand = new SingleRpcCommand(cacheName, blockingReplicableCommand);
nonBlockingSingleRpcCommand = new SingleRpcCommand(cacheName, nonBlockingReplicableCommand);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ public SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call) {
}

@Override
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx) {
return actual.buildClusteredGetCommand(key, flagsBitSet, acquireRemoteLock, gtx);
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet) {
return actual.buildClusteredGetCommand(key, flagsBitSet);
}

@Override
Expand Down

0 comments on commit 3fc6cea

Please sign in to comment.