Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-6927 Remove acquireRemoteLock and gtx from ClusteredGetCommand #4494

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,13 @@ public interface CommandsFactory {
* @param flagsBitSet Command flags provided by cache
* @return a ClusteredGetCommand
*/
ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx);
ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet);

@Deprecated
default ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx) {
if (acquireRemoteLock) throw new UnsupportedOperationException("acquireRemoteLock is not supported, use Flag.FORCE_WRITE_LOCK");
return buildClusteredGetCommand(key, flagsBitSet);
}

/**
* Builds a ClusteredGetAllCommand, which is a remote lookup command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,8 @@ public SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call) {
}

@Override
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx) {
return new ClusteredGetCommand(key, cacheName, flagsBitSet, acquireRemoteLock, gtx,
configuration.dataContainer().keyEquivalence());
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet) {
return new ClusteredGetCommand(key, cacheName, flagsBitSet, configuration.dataContainer().keyEquivalence());
}

/**
Expand Down Expand Up @@ -403,8 +402,8 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
case ClusteredGetCommand.COMMAND_ID:
ClusteredGetCommand clusteredGetCommand = (ClusteredGetCommand) c;
clusteredGetCommand.initialize(icf, this, entryFactory,
interceptorChain, txTable,
configuration.dataContainer().keyEquivalence());
interceptorChain,
configuration.dataContainer().keyEquivalence());
break;
case LockControlCommand.COMMAND_ID:
LockControlCommand lcc = (LockControlCommand) c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.io.ObjectOutput;

import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commons.equivalence.Equivalence;
import org.infinispan.commons.util.EnumUtil;
Expand All @@ -18,7 +17,6 @@
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ByteString;
import org.infinispan.util.logging.Log;
Expand All @@ -43,10 +41,7 @@ public class ClusteredGetCommand extends LocalFlagAffectedRpcCommand {
private InvocationContextFactory icf;
private CommandsFactory commandsFactory;
private InterceptorChain invoker;
private boolean acquireRemoteLock;
private GlobalTransaction gtx;

private TransactionTable txTable;
private InternalEntryFactory entryFactory;
private Equivalence keyEquivalence;
//only used by extended statistics. this boolean is local.
Expand All @@ -60,25 +55,19 @@ public ClusteredGetCommand(ByteString cacheName) {
super(cacheName, EnumUtil.EMPTY_BIT_SET);
}

public ClusteredGetCommand(Object key, ByteString cacheName, long flags,
boolean acquireRemoteLock, GlobalTransaction gtx, Equivalence keyEquivalence) {
public ClusteredGetCommand(Object key, ByteString cacheName, long flags, Equivalence keyEquivalence) {
super(cacheName, flags);
this.key = key;
this.acquireRemoteLock = acquireRemoteLock;
this.gtx = gtx;
this.keyEquivalence = keyEquivalence;
this.isWrite = false;
if (acquireRemoteLock && (gtx == null))
throw new IllegalArgumentException("Cannot have null tx if we need to acquire locks");
}

public void initialize(InvocationContextFactory icf, CommandsFactory commandsFactory, InternalEntryFactory entryFactory,
InterceptorChain interceptorChain, TransactionTable txTable,
InterceptorChain interceptorChain,
Equivalence keyEquivalence) {
this.icf = icf;
this.commandsFactory = commandsFactory;
this.invoker = interceptorChain;
this.txTable = txTable;
this.entryFactory = entryFactory;
this.keyEquivalence = keyEquivalence;
}
Expand All @@ -91,7 +80,6 @@ public void initialize(InvocationContextFactory icf, CommandsFactory commandsFac
*/
@Override
public InternalCacheValue perform(InvocationContext context) throws Throwable {
acquireLocksIfNeeded();
// make sure the get command doesn't perform a remote call
// as our caller is already calling the ClusteredGetCommand on all the relevant nodes
long flagBitSet = EnumUtil.bitSetOf(Flag.SKIP_REMOTE_LOOKUP, Flag.CACHE_MODE_LOCAL);
Expand All @@ -113,16 +101,9 @@ public InternalCacheValue perform(InvocationContext context) throws Throwable {
}
}

@Deprecated
public GlobalTransaction getGlobalTransaction() {
return gtx;
}

private void acquireLocksIfNeeded() throws Throwable {
if (acquireRemoteLock) {
LockControlCommand lockControlCommand = commandsFactory.buildLockControlCommand(key, getFlagsBitSet(), gtx);
lockControlCommand.init(invoker, icf, txTable);
lockControlCommand.perform(null);
}
return null;
}

@Override
Expand All @@ -134,20 +115,12 @@ public byte getCommandId() {
public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(key);
output.writeLong(Flag.copyWithoutRemotableFlags(getFlagsBitSet()));
output.writeBoolean(acquireRemoteLock);
if (acquireRemoteLock) {
output.writeObject(gtx);
}
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
key = input.readObject();
setFlagsBitSet(input.readLong());
acquireRemoteLock = input.readBoolean();
if (acquireRemoteLock) {
gtx = (GlobalTransaction) input.readObject();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,14 @@ public final CompletableFuture<Void> visitClearCommand(InvocationContext ctx, Cl
return ctx.continueInvocation();
}

protected final CompletableFuture<InternalCacheEntry> retrieveFromProperSource(Object key,
InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command, boolean isWrite)
protected final CompletableFuture<InternalCacheEntry> retrieveFromProperSource(
Object key, FlagAffectedCommand command, boolean isWrite)
throws Exception {
return doRetrieveFromProperSource(key, null, -1, ctx, command, acquireRemoteLock, isWrite);
return doRetrieveFromProperSource(key, null, -1, command, isWrite);
}

private CompletableFuture<InternalCacheEntry> doRetrieveFromProperSource(Object key, InternalCacheEntry value,
int lastTopologyId, InvocationContext ctx, FlagAffectedCommand command, boolean acquireRemoteLock,
boolean isWrite) {
int lastTopologyId, FlagAffectedCommand command, boolean isWrite) {
if (value != null)
return CompletableFuture.completedFuture(value);

Expand Down Expand Up @@ -203,17 +202,14 @@ private CompletableFuture<InternalCacheEntry> doRetrieveFromProperSource(Object
}
return CompletableFutures.completedNull();
}

GlobalTransaction gtx = ctx.isInTxScope() ? ((TxInvocationContext) ctx).getGlobalTransaction() : null;
ClusteredGetCommand getCommand =
cf.buildClusteredGetCommand(key, command.getFlagsBitSet(), acquireRemoteLock, gtx);

ClusteredGetCommand getCommand = cf.buildClusteredGetCommand(key, command.getFlagsBitSet());
getCommand.setWrite(isWrite);

RpcOptionsBuilder rpcOptionsBuilder =
rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, DeliverOrder.NONE);
return invokeClusterGetCommandRemotely(targets, rpcOptionsBuilder, getCommand, key).thenCompose(
newValue -> doRetrieveFromProperSource(key, newValue, newTopologyId, ctx, command, acquireRemoteLock,
isWrite));
newValue -> doRetrieveFromProperSource(key, newValue, newTopologyId, command, isWrite));
}

private CompletableFuture<InternalCacheEntry> invokeClusterGetCommandRemotely(List<Address> targets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private <T extends AbstractDataCommand & RemoteFetchingCommand> CompletableFutur
if (trace)
log.tracef("Doing a remote get for key %s", key);
CompletableFuture<InternalCacheEntry> remoteFuture =
retrieveFromProperSource(key, ctx, false, command, false);
retrieveFromProperSource(key, command, false);
return remoteFuture.thenCompose(remoteEntry -> {
command.setRemotelyFetchedValue(remoteEntry);
handleRemoteEntry(ctx, key, remoteEntry);
Expand Down Expand Up @@ -255,7 +255,7 @@ public CompletableFuture<Void> visitReadOnlyKeyCommand(InvocationContext ctx, Re
if (readNeedsRemoteValue(ctx, command)) {
if (trace)
log.tracef("Doing a remote get for key %s", key);
remoteFuture = retrieveFromProperSource(key, ctx, false, command, false);
remoteFuture = retrieveFromProperSource(key, command, false);
} else {
remoteFuture = CompletableFutures.completedNull();
}
Expand Down Expand Up @@ -724,7 +724,7 @@ protected CompletableFuture<Void> remoteGetBeforeWrite(InvocationContext ctx, Wr
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
cmdTopology + ", got " + currentTopologyId);
}
remoteFuture = retrieveFromProperSource(key, ctx, false, command, false);
remoteFuture = retrieveFromProperSource(key, command, false);
return remoteFuture.thenCompose(remoteEntry -> {
handleRemoteEntry(ctx, key, remoteEntry);
return ctx.continueInvocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ protected CompletableFuture<Void> remoteGetBeforeWrite(InvocationContext ctx, Wr
protected InternalCacheEntry remoteGet(InvocationContext ctx, Object key, boolean isWrite,
FlagAffectedCommand command) throws Throwable {
// attempt a remote lookup
InternalCacheEntry ice = retrieveFromProperSource(key, ctx, false, command, isWrite).get();
InternalCacheEntry ice = retrieveFromProperSource(key, command, isWrite).get();

if (ice != null) {
if (useClusteredWriteSkewCheck && ctx.isInTxScope()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.configuration.cache.ClusterLoaderConfiguration;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.spi.CacheLoader;
Expand Down Expand Up @@ -60,8 +61,8 @@ public MarshalledEntry load(Object key) throws PersistenceException {
if (!isCacheReady()) return null;

ClusteredGetCommand clusteredGetCommand = new ClusteredGetCommand(
key, cacheName, EnumUtil.EMPTY_BIT_SET, false, null,
cache.getCacheConfiguration().dataContainer().keyEquivalence());
key, cacheName, EnumUtil.bitSetOf(Flag.SKIP_OWNERSHIP_CHECK),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a change that got in from the scattered cache (there I use SKIP_OWNERSHIP_CHECK more extensively as it has better meaning than just CACHE_MODE_LOCAL). It is unintentional within a scope of this PR.

cache.getCacheConfiguration().dataContainer().keyEquivalence());

Collection<Response> responses = doRemoteCall(clusteredGetCommand);
if (responses.isEmpty()) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testImmutableResponseMarshalling() throws Exception {
public void testReplicableCommandsMarshalling() throws Exception {
ByteString cacheName = ByteString.fromString(EmbeddedCacheManager.DEFAULT_CACHE_NAME);
ClusteredGetCommand c2 = new ClusteredGetCommand("key", cacheName,
EnumUtil.EMPTY_BIT_SET, false, null, AnyEquivalence.getInstance());
EnumUtil.EMPTY_BIT_SET, AnyEquivalence.getInstance());
marshallAndAssertEquality(c2);

// SizeCommand does not have an empty constructor, so doesn't look to be one that is marshallable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public void setUp() throws Throwable {

//populate commands
blockingCacheRpcCommand = new StreamRequestCommand<>(cacheName);
nonBlockingCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, false, null, null);
nonBlockingCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, null);
blockingNonCacheRpcCommand = new CacheTopologyControlCommand(null, CacheTopologyControlCommand.Type.POLICY_GET_STATUS, null, 0);
//the GetKeyValueCommand is not replicated, but I only need a command that returns false in canBlock()
nonBlockingNonCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, false, null, AnyEquivalence.STRING);
nonBlockingNonCacheRpcCommand = new ClusteredGetCommand("key", cacheName, EnumUtil.EMPTY_BIT_SET, AnyEquivalence.STRING);
blockingSingleRpcCommand = new SingleRpcCommand(cacheName, blockingReplicableCommand);
nonBlockingSingleRpcCommand = new SingleRpcCommand(cacheName, nonBlockingReplicableCommand);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ public SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call) {
}

@Override
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet, boolean acquireRemoteLock, GlobalTransaction gtx) {
return actual.buildClusteredGetCommand(key, flagsBitSet, acquireRemoteLock, gtx);
public ClusteredGetCommand buildClusteredGetCommand(Object key, long flagsBitSet) {
return actual.buildClusteredGetCommand(key, flagsBitSet);
}

@Override
Expand Down