Skip to content

Commit

Permalink
ISPN-7107 Unroot InvalidateCommand from RemoteCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa authored and danberindei committed Oct 20, 2016
1 parent 7b994f7 commit dba010d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,11 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
break;
case InvalidateCommand.COMMAND_ID:
InvalidateCommand ic = (InvalidateCommand) c;
ic.init(notifier, configuration);
ic.init(notifier);
break;
case InvalidateL1Command.COMMAND_ID:
InvalidateL1Command ilc = (InvalidateL1Command) c;
ilc.init(configuration, distributionManager, notifier, dataContainer);
ilc.init(distributionManager, notifier, dataContainer);
break;
case PrepareCommand.COMMAND_ID:
case VersionedPrepareCommand.COMMAND_ID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import java.util.Collection;
import java.util.Set;

import org.infinispan.commands.AbstractFlagAffectedCommand;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.Visitor;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.Util;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.concurrent.locks.RemoteLockCommand;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

Expand All @@ -28,31 +31,29 @@
* @author Mircea.Markus@jboss.com
* @since 4.0
*/
public class InvalidateCommand extends RemoveCommand {
public class InvalidateCommand extends AbstractFlagAffectedCommand implements WriteCommand, RemoteLockCommand {
public static final int COMMAND_ID = 6;
private static final Log log = LogFactory.getLog(InvalidateCommand.class);
private static final boolean trace = log.isTraceEnabled();
protected Object[] keys;
protected CommandInvocationId commandInvocationId;
protected CacheNotifier notifier;

public InvalidateCommand() {
// The value matcher will always be the same, so we don't need to serialize it like we do for the other commands
this.valueMatcher = ValueMatcher.MATCH_ALWAYS;
}

public InvalidateCommand(CacheNotifier notifier, long flagsBitSet, CommandInvocationId commandInvocationId, Object... keys) {
//valueEquivalence can be null because this command never compares values.
super(null, null, notifier, flagsBitSet, null, commandInvocationId);
this.keys = keys;
this.notifier = notifier;
this.commandInvocationId = commandInvocationId;
setFlagsBitSet(flagsBitSet);
}

public InvalidateCommand(CacheNotifier notifier, long flagsBitSet, Collection<Object> keys, CommandInvocationId commandInvocationId) {
//valueEquivalence can be null because this command never compares values.
super(null, null, notifier, flagsBitSet, null, commandInvocationId);
if (keys == null || keys.isEmpty())
this.keys = Util.EMPTY_OBJECT_ARRAY;
else
this.keys = keys.toArray(new Object[keys.size()]);
this(notifier, flagsBitSet, commandInvocationId, keys == null || keys.isEmpty() ? Util.EMPTY_OBJECT_ARRAY : keys.toArray(new Object[keys.size()]));
}

public void init(CacheNotifier notifier) {
this.notifier = notifier;
}

Expand All @@ -67,26 +68,36 @@ public Object perform(InvocationContext ctx) throws Throwable {
if (trace) {
log.tracef("Invalidating keys %s", toStr(Arrays.asList(keys)));
}
for (Object k : keys) {
invalidate(ctx, k);
for (Object key : keys) {
MVCCEntry e = (MVCCEntry) ctx.lookupEntry(key);
if (e != null) {
notify(ctx, e, true);
e.setChanged(true);
e.setRemoved(true);
e.setCreated(false);
e.setValid(false);
}
}
return null;
}

protected void invalidate(InvocationContext ctx, Object keyToInvalidate) throws Throwable {
key = keyToInvalidate; // so that the superclass can see it
super.perform(ctx);
protected void notify(InvocationContext ctx, MVCCEntry e, boolean pre) {
notifier.notifyCacheEntryInvalidated(e.getKey(), e.getValue(), e.getMetadata(), pre, ctx, this);
}

@Override
public void notify(InvocationContext ctx, Object removedValue, Metadata removedMetadata,
boolean isPre) {
notifier.notifyCacheEntryInvalidated(key, removedValue, removedMetadata, isPre, ctx, this);
public byte getCommandId() {
return COMMAND_ID;
}

@Override
public byte getCommandId() {
return COMMAND_ID;
public boolean isReturnValueExpected() {
return false;
}

@Override
public boolean canBlock() {
return false;
}

@Override
Expand All @@ -100,12 +111,14 @@ public String toString() {
public void writeTo(ObjectOutput output) throws IOException {
CommandInvocationId.writeTo(output, commandInvocationId);
MarshallUtil.marshallArray(keys, output);
output.writeLong(getFlagsBitSet());
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
commandInvocationId = CommandInvocationId.readFrom(input);
keys = MarshallUtil.unmarshallArray(input, Object[]::new);
setFlagsBitSet(input.readLong());
}

@Override
Expand All @@ -114,24 +127,62 @@ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throw
}

@Override
public Object getKey() {
throw new UnsupportedOperationException("Not supported. Use getKeys() instead.");
public boolean shouldInvoke(InvocationContext ctx) {
return true;
}

public Object[] getKeys() {
return keys;
}

@Override
public boolean isSuccessful() {
return true;
}

@Override
public boolean isConditional() {
return false;
}

@Override
public ValueMatcher getValueMatcher() {
return ValueMatcher.MATCH_ALWAYS;
}

@Override
public void setValueMatcher(ValueMatcher valueMatcher) {
}

@Override
public Set<Object> getAffectedKeys() {
return CollectionFactory.makeSet(keys);
}

@Override
public void updateStatusFromRemoteResponse(Object remoteResponse) {
}

@Override
public Collection<Object> getKeysToLock() {
return Arrays.asList(keys);
}

@Override
public Object getKeyLockOwner() {
return commandInvocationId;
}

@Override
public boolean hasZeroLockAcquisition() {
return hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT);
}

@Override
public boolean hasSkipLocking() {
return hasFlag(Flag.SKIP_LOCKING);
}

@Override
public boolean ignoreCommandOnStatus(ComponentStatus status) {
switch (status) {
Expand All @@ -151,23 +202,21 @@ public boolean readsExistingValues() {
}

@Override
public boolean equals(Object o) {
if (!super.equals(o)) {
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
}

InvalidateCommand that = (InvalidateCommand) o;

if (!Arrays.equals(keys, that.keys)) {
if (getClass() != obj.getClass())
return false;
}
return true;
InvalidateCommand that = (InvalidateCommand) obj;
if (!hasSameFlags(that))
return false;
return Arrays.equals(keys, that.keys);
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (keys != null ? Arrays.hashCode(keys) : 0);
return result;
return keys != null ? Arrays.hashCode(keys) : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DataLocality;
import org.infinispan.distribution.DistributionManager;
Expand Down Expand Up @@ -64,8 +65,8 @@ public byte getCommandId() {
return COMMAND_ID;
}

public void init(Configuration config, DistributionManager dm, CacheNotifier n, DataContainer dc) {
super.init(n, config);
public void init( DistributionManager dm, CacheNotifier n, DataContainer dc) {
super.init(n);
this.dm = dm;
this.dataContainer = dc;
}
Expand All @@ -78,10 +79,14 @@ public Object perform(InvocationContext ctx) throws Throwable {
InternalCacheEntry ice = dataContainer.get(k);
if (ice != null) {
DataLocality locality = dm.getLocality(k);

if (!locality.isLocal()) {
if (trace) log.tracef("Invalidating key %s.", k);
invalidate(ctx, k);
MVCCEntry e = (MVCCEntry) ctx.lookupEntry(k);
notify(ctx, e, true);
e.setRemoved(true);
e.setChanged(true);
e.setCreated(false);
e.setValid(false);
} else {
log.tracef("Not invalidating key %s as it is local now", k);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public final BasicInvocationStage visitInvalidateCommand(InvocationContext ctx,
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_NON_NULL, false, true);
}
}
return setSkipRemoteGetsAndInvokeNextForDataCommand(ctx, command, null);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

@Override
Expand Down Expand Up @@ -286,7 +286,7 @@ public BasicInvocationStage visitInvalidateL1Command(InvocationContext ctx, Inva
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_NON_NULL, false, true);
if (trace) log.tracef("Entry to be removed: %s", toStr(key));
}
return setSkipRemoteGetsAndInvokeNextForDataCommand(ctx, command, null);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

@Override
Expand Down Expand Up @@ -394,7 +394,7 @@ public BasicInvocationStage visitPutMapCommand(InvocationContext ctx, PutMapComm
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_ALL, true, false);
}
}
return setSkipRemoteGetsAndInvokeNextForPutMapCommand(ctx, command);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

@Override
Expand Down Expand Up @@ -489,7 +489,7 @@ public BasicInvocationStage visitWriteOnlyManyEntriesCommand(InvocationContext c
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_ALL, true, false);
}
}
return setSkipRemoteGetsAndInvokeNextForPutMapCommand(ctx, command);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

@Override
Expand All @@ -501,7 +501,7 @@ public BasicInvocationStage visitWriteOnlyManyCommand(InvocationContext ctx, Wri
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_ALL, true, false);
}
}
return setSkipRemoteGetsAndInvokeNextForPutMapCommand(ctx, command);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

@Override
Expand All @@ -519,7 +519,7 @@ public BasicInvocationStage visitReadWriteManyCommand(InvocationContext ctx, Rea
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_ALL, false, false);
}
}
return setSkipRemoteGetsAndInvokeNextForPutMapCommand(ctx, command);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

@Override
Expand All @@ -530,7 +530,7 @@ public BasicInvocationStage visitReadWriteManyEntriesCommand(InvocationContext c
entryFactory.wrapEntryForWriting(ctx, key, EntryFactory.Wrap.WRAP_ALL, false, false);
}
}
return setSkipRemoteGetsAndInvokeNextForPutMapCommand(ctx, command);
return setSkipRemoteGetsAndInvokeNextForWriteCommand(ctx, command);
}

private Flag extractStateTransferFlag(InvocationContext ctx, FlagAffectedCommand command) {
Expand Down Expand Up @@ -620,8 +620,8 @@ private void applyChanges(InvocationContext ctx, FlagAffectedCommand command, Me
/**
* Locks the value for the keys accessed by the command to avoid being override from a remote get.
*/
private BasicInvocationStage setSkipRemoteGetsAndInvokeNextForPutMapCommand(InvocationContext ctx,
WriteCommand command) throws Throwable {
private BasicInvocationStage setSkipRemoteGetsAndInvokeNextForWriteCommand(InvocationContext ctx,
WriteCommand command) throws Throwable {
return invokeNext(ctx, command).thenAccept((rCtx, rCommand, rv) -> {
WriteCommand writeCommand = (WriteCommand) rCommand;
if (!rCtx.isInTxScope()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.commands.tx.totalorder.TotalOrderPrepareCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.Immutables;
Expand Down Expand Up @@ -137,7 +138,9 @@ protected void notifyCommitEntry(boolean created, boolean removed, boolean expir
boolean isWriteOnly = (command instanceof WriteCommand) && ((WriteCommand) command).isWriteOnly();
if (removed) {
if (command instanceof RemoveCommand) {
((RemoveCommand)command).notify(ctx, previousValue, previousMetadata, false);
((RemoveCommand) command).notify(ctx, previousValue, previousMetadata, false);
} else if (command instanceof InvalidateCommand) {
notifier.notifyCacheEntryInvalidated(entry.getKey(), entry.getValue(), entry.getMetadata(), false, ctx, command);
} else {
if (expired) {
notifier.notifyCacheEntryExpired(entry.getKey(), previousValue, previousMetadata, ctx);
Expand Down

0 comments on commit dba010d

Please sign in to comment.