Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ISPN-6372 Remove deprecatd ReplicableCommand.get/SetParameters
  • Loading branch information
pruivo authored and wburns committed Mar 15, 2016
1 parent caaede3 commit 7fa26ce
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 99 deletions.
Expand Up @@ -103,11 +103,10 @@ public void inject(EmbeddedCacheManager cacheManager, GlobalComponentRegistry re
* *
* *
* @param id id of the command * @param id id of the command
* @param parameters parameters to set
* @param type type of the command * @param type type of the command
* @return a replicable command * @return a replicable command
*/ */
public ReplicableCommand fromStream(byte id, Object[] parameters, byte type) { public ReplicableCommand fromStream(byte id, byte type) {
ReplicableCommand command; ReplicableCommand command;
if (type == 0) { if (type == 0) {
switch (id) { switch (id) {
Expand Down Expand Up @@ -186,7 +185,7 @@ public ReplicableCommand fromStream(byte id, Object[] parameters, byte type) {
} else { } else {
ModuleCommandFactory mcf = commandFactories.get(id); ModuleCommandFactory mcf = commandFactories.get(id);
if (mcf != null) if (mcf != null)
return mcf.fromStream(id, parameters); return mcf.fromStream(id);
else else
throw new CacheException("Unknown command id " + id + "!"); throw new CacheException("Unknown command id " + id + "!");
} }
Expand All @@ -197,12 +196,11 @@ public ReplicableCommand fromStream(byte id, Object[] parameters, byte type) {
* Resolve an {@link CacheRpcCommand} from the stream. * Resolve an {@link CacheRpcCommand} from the stream.
* *
* @param id id of the command * @param id id of the command
* @param parameters parameters to be set
* @param type type of command (whether internal or user defined) * @param type type of command (whether internal or user defined)
* @param cacheName cache name at which this command is directed * @param cacheName cache name at which this command is directed
* @return an instance of {@link CacheRpcCommand} * @return an instance of {@link CacheRpcCommand}
*/ */
public CacheRpcCommand fromStream(byte id, Object[] parameters, byte type, String cacheName) { public CacheRpcCommand fromStream(byte id, byte type, String cacheName) {
CacheRpcCommand command; CacheRpcCommand command;
if (type == 0) { if (type == 0) {
switch (id) { switch (id) {
Expand Down Expand Up @@ -314,7 +312,7 @@ public CacheRpcCommand fromStream(byte id, Object[] parameters, byte type, Strin
} else { } else {
ModuleCommandFactory mcf = commandFactories.get(id); ModuleCommandFactory mcf = commandFactories.get(id);
if (mcf != null) if (mcf != null)
return mcf.fromStream(id, parameters, cacheName); return mcf.fromStream(id, cacheName);
else else
throw new CacheException("Unknown command id " + id + "!"); throw new CacheException("Unknown command id " + id + "!");
} }
Expand Down
34 changes: 2 additions & 32 deletions core/src/main/java/org/infinispan/commands/ReplicableCommand.java
@@ -1,6 +1,5 @@
package org.infinispan.commands; package org.infinispan.commands;


import org.infinispan.commons.util.Util;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;


import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -34,31 +33,6 @@ public interface ReplicableCommand {
*/ */
byte getCommandId(); byte getCommandId();


/**
* Used by marshallers to stream this command across a network
*
* @return an object array of arguments, compatible with pre-2.2.0 MethodCall args.
* @deprecated will be replaced by {@link #writeTo(ObjectOutput)}. Note: don't implement both since they are used
* during the transition period.
*/
@Deprecated
default Object[] getParameters() {
return Util.EMPTY_OBJECT_ARRAY;
}

/**
* Used by the {@link CommandsFactory} to create a command from raw data read off a stream.
*
* @param commandId command id to set. This is usually unused but *could* be used in the event of a command having
* multiple IDs, such as {@link org.infinispan.commands.write.PutKeyValueCommand}.
* @param parameters object array of args
* @deprecated will be replaced by {@link #readFrom(ObjectInput)}. Note: don't implement both since they are used
* during the transition period.
*/
@Deprecated
default void setParameters(int commandId, Object[] parameters) {
}

/** /**
* If true, a return value will be provided when performed remotely. Otherwise, a remote {@link org.infinispan.remoting.responses.ResponseGenerator} * If true, a return value will be provided when performed remotely. Otherwise, a remote {@link org.infinispan.remoting.responses.ResponseGenerator}
* may choose to simply return null to save on marshalling costs. * may choose to simply return null to save on marshalling costs.
Expand All @@ -84,9 +58,7 @@ default void setParameters(int commandId, Object[] parameters) {
* @param output the stream. * @param output the stream.
* @throws IOException if an error occurred during the I/O. * @throws IOException if an error occurred during the I/O.
*/ */
default void writeTo(ObjectOutput output) throws IOException { void writeTo(ObjectOutput output) throws IOException;
//no-op by default
}


/** /**
* Reads this instance from the stream written by {@link #writeTo(ObjectOutput)}. * Reads this instance from the stream written by {@link #writeTo(ObjectOutput)}.
Expand All @@ -95,7 +67,5 @@ default void writeTo(ObjectOutput output) throws IOException {
* @throws IOException if an error occurred during the I/O. * @throws IOException if an error occurred during the I/O.
* @throws ClassNotFoundException if it tries to load an undefined class. * @throws ClassNotFoundException if it tries to load an undefined class.
*/ */
default void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { void readFrom(ObjectInput input) throws IOException, ClassNotFoundException;
//no-op by default
}
} }
Expand Up @@ -32,23 +32,19 @@ public interface ModuleCommandFactory {
Map<Byte, Class<? extends ReplicableCommand>> getModuleCommands(); Map<Byte, Class<? extends ReplicableCommand>> getModuleCommands();


/** /**
* Construct and initialize a {@link ReplicableCommand} based on the command * Construct and initialize a {@link ReplicableCommand} based on the command id.
* id and argument array passed in.
* *
* @param commandId command id to construct * @param commandId command id to construct
* @param args array of arguments with which to initialize the ReplicableCommand
* @return a ReplicableCommand * @return a ReplicableCommand
*/ */
ReplicableCommand fromStream(byte commandId, Object[] args); ReplicableCommand fromStream(byte commandId);


/** /**
* Construct and initialize a {@link CacheRpcCommand} based on the command * Construct and initialize a {@link CacheRpcCommand} based on the command id.
* id and argument array passed in.
* *
* @param commandId command id to construct * @param commandId command id to construct
* @param args array of arguments with which to initialize the {@link CacheRpcCommand}
* @param cacheName cache name at which command to be created is directed * @param cacheName cache name at which command to be created is directed
* @return a {@link CacheRpcCommand} * @return a {@link CacheRpcCommand}
*/ */
CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheName); CacheRpcCommand fromStream(byte commandId, String cacheName);
} }
18 changes: 18 additions & 0 deletions core/src/main/java/org/infinispan/commands/write/ClearCommand.java
Expand Up @@ -131,4 +131,22 @@ public boolean readsExistingValues() {
return false; return false;
} }


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ClearCommand)) return false;

ClearCommand that = (ClearCommand) o;

if (getTopologyId() != that.getTopologyId()) return false;
if (flags != null ? !flags.equals(that.flags) : that.flags != null) return false;
return true;
}

@Override
public int hashCode() {
int result = getTopologyId();
result = 31 * result + (flags != null ? flags.hashCode() : 0);
return result;
}
} }
Expand Up @@ -167,7 +167,7 @@ public CacheRpcCommand readObject(ObjectInput input) throws IOException, ClassNo


CacheRpcCommand cacheRpcCommand; CacheRpcCommand cacheRpcCommand;
try { try {
cacheRpcCommand = cmdExt.fromStream(methodId, cmdExt.readLegacyParameters(paramsInput), type, cacheName); cacheRpcCommand = cmdExt.fromStream(methodId, type, cacheName);
cmdExt.readCommandParameters(paramsInput, cacheRpcCommand); cmdExt.readCommandParameters(paramsInput, cacheRpcCommand);
} finally { } finally {
if (firsTime) { if (firsTime) {
Expand Down
Expand Up @@ -16,7 +16,6 @@
import org.infinispan.commands.remote.CacheRpcCommand; import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand; import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.commands.write.*; import org.infinispan.commands.write.*;
import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.factories.GlobalComponentRegistry; import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.commons.marshall.AbstractExternalizer; import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.util.Util; import org.infinispan.commons.util.Util;
Expand Down Expand Up @@ -57,13 +56,6 @@ protected void writeCommandParameters(ObjectOutput output, ReplicableCommand com
DeltaAwareObjectOutput deltaAwareObjectOutput = output instanceof DeltaAwareObjectOutput ? DeltaAwareObjectOutput deltaAwareObjectOutput = output instanceof DeltaAwareObjectOutput ?
(DeltaAwareObjectOutput) output : (DeltaAwareObjectOutput) output :
new DeltaAwareObjectOutput(output); new DeltaAwareObjectOutput(output);
Object[] args = command.getParameters();
int numArgs = (args == null ? 0 : args.length);

UnsignedNumeric.writeUnsignedInt(output, numArgs);
for (int i = 0; i < numArgs; i++) {
deltaAwareObjectOutput.writeObject(args[i]);
}
command.writeTo(deltaAwareObjectOutput); command.writeTo(deltaAwareObjectOutput);
if (command instanceof TopologyAffectedCommand) { if (command instanceof TopologyAffectedCommand) {
output.writeInt(((TopologyAffectedCommand) command).getTopologyId()); output.writeInt(((TopologyAffectedCommand) command).getTopologyId());
Expand Down Expand Up @@ -93,20 +85,7 @@ public ReplicableCommand readObject(ObjectInput input) throws IOException, Class
protected ReplicableCommand readCommandHeader(ObjectInput input) throws IOException, ClassNotFoundException { protected ReplicableCommand readCommandHeader(ObjectInput input) throws IOException, ClassNotFoundException {
byte type = input.readByte(); byte type = input.readByte();
short methodId = input.readShort(); short methodId = input.readShort();
return cmdFactory.fromStream((byte) methodId, readLegacyParameters(input), type); return cmdFactory.fromStream((byte) methodId, type);
}

protected Object[] readLegacyParameters(ObjectInput input) throws IOException, ClassNotFoundException {
int numArgs = UnsignedNumeric.readUnsignedInt(input);
Object[] args = null;
if (numArgs > 0) {
args = new Object[numArgs];
// For DeltaAware instances, nothing special to be done here.
// Do not merge here since the cache contents are required.
// Instead, merge in PutKeyValueCommand.perform
for (int i = 0; i < numArgs; i++) args[i] = input.readObject();
}
return args;
} }


void readCommandParameters(ObjectInput input, ReplicableCommand command) throws IOException, ClassNotFoundException { void readCommandParameters(ObjectInput input, ReplicableCommand command) throws IOException, ClassNotFoundException {
Expand All @@ -116,8 +95,8 @@ void readCommandParameters(ObjectInput input, ReplicableCommand command) throws
} }
} }


protected CacheRpcCommand fromStream(byte id, Object[] parameters, byte type, String cacheName) { protected CacheRpcCommand fromStream(byte id, byte type, String cacheName) {
return cmdFactory.fromStream(id, parameters, type, cacheName); return cmdFactory.fromStream(id, type, cacheName);
} }


@Override @Override
Expand Down
Expand Up @@ -219,10 +219,7 @@ public void testReplicableCommandsMarshalling() throws Exception {
// SizeCommand does not have an empty constructor, so doesn't look to be one that is marshallable. // SizeCommand does not have an empty constructor, so doesn't look to be one that is marshallable.


GetKeyValueCommand c4 = new GetKeyValueCommand("key", Collections.<Flag>emptySet()); GetKeyValueCommand c4 = new GetKeyValueCommand("key", Collections.<Flag>emptySet());
byte[] bytes = marshaller.objectToByteBuffer(c4); marshallAndAssertEquality(c4);
GetKeyValueCommand rc4 = (GetKeyValueCommand) marshaller.objectFromByteBuffer(bytes);
assert rc4.getCommandId() == c4.getCommandId() : "Writen[" + c4.getCommandId() + "] and read[" + rc4.getCommandId() + "] objects should be the same";
assert Arrays.equals(rc4.getParameters(), c4.getParameters()) : "Writen[" + c4.getParameters() + "] and read[" + rc4.getParameters() + "] objects should be the same";


PutKeyValueCommand c5 = new PutKeyValueCommand("k", "v", false, null, PutKeyValueCommand c5 = new PutKeyValueCommand("k", "v", false, null,
new EmbeddedMetadata.Builder().build(), Collections.<Flag>emptySet(), AnyEquivalence.getInstance(), CommandInvocationId.generateId(null)); new EmbeddedMetadata.Builder().build(), Collections.<Flag>emptySet(), AnyEquivalence.getInstance(), CommandInvocationId.generateId(null));
Expand All @@ -234,26 +231,17 @@ public void testReplicableCommandsMarshalling() throws Exception {
// EvictCommand does not have an empty constructor, so doesn't look to be one that is marshallable. // EvictCommand does not have an empty constructor, so doesn't look to be one that is marshallable.


InvalidateCommand c7 = new InvalidateCommand(null, null, CommandInvocationId.generateId(null), "key1", "key2"); InvalidateCommand c7 = new InvalidateCommand(null, null, CommandInvocationId.generateId(null), "key1", "key2");
bytes = marshaller.objectToByteBuffer(c7); marshallAndAssertEquality(c7);
InvalidateCommand rc7 = (InvalidateCommand) marshaller.objectFromByteBuffer(bytes);
assert rc7.getCommandId() == c7.getCommandId() : "Writen[" + c7.getCommandId() + "] and read[" + rc7.getCommandId() + "] objects should be the same";
assert Arrays.equals(rc7.getParameters(), c7.getParameters()) : "Writen[" + c7.getParameters() + "] and read[" + rc7.getParameters() + "] objects should be the same";


InvalidateCommand c71 = new InvalidateL1Command(null, null, null, Collections.<Flag>emptySet(), CommandInvocationId.generateId(null), "key1", "key2"); InvalidateCommand c71 = new InvalidateL1Command(null, null, null, null, CommandInvocationId.generateId(null), "key1", "key2");
bytes = marshaller.objectToByteBuffer(c71); marshallAndAssertEquality(c71);
InvalidateCommand rc71 = (InvalidateCommand) marshaller.objectFromByteBuffer(bytes);
assert rc71.getCommandId() == c71.getCommandId() : "Writen[" + c71.getCommandId() + "] and read[" + rc71.getCommandId() + "] objects should be the same";
assert Arrays.equals(rc71.getParameters(), c71.getParameters()) : "Writen[" + c71.getParameters() + "] and read[" + rc71.getParameters() + "] objects should be the same";


ReplaceCommand c8 = new ReplaceCommand("key", "oldvalue", "newvalue", ReplaceCommand c8 = new ReplaceCommand("key", "oldvalue", "newvalue",
null, new EmbeddedMetadata.Builder().build(), Collections.EMPTY_SET, AnyEquivalence.getInstance(), CommandInvocationId.generateId(null)); null, new EmbeddedMetadata.Builder().build(), Collections.EMPTY_SET, AnyEquivalence.getInstance(), CommandInvocationId.generateId(null));
marshallAndAssertEquality(c8); marshallAndAssertEquality(c8);


ClearCommand c9 = new ClearCommand(); ClearCommand c9 = new ClearCommand();
bytes = marshaller.objectToByteBuffer(c9); marshallAndAssertEquality(c9);
ClearCommand rc9 = (ClearCommand) marshaller.objectFromByteBuffer(bytes);
assert rc9.getCommandId() == c9.getCommandId() : "Writen[" + c9.getCommandId() + "] and read[" + rc9.getCommandId() + "] objects should be the same";
assert Arrays.equals(rc9.getParameters(), c9.getParameters()) : "Writen[" + c9.getParameters() + "] and read[" + rc9.getParameters() + "] objects should be the same";


Map<Integer, GlobalTransaction> m1 = new HashMap<Integer, GlobalTransaction>(); Map<Integer, GlobalTransaction> m1 = new HashMap<Integer, GlobalTransaction>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -575,6 +563,14 @@ protected void marshallAndAssertEquality(Object writeObj) throws Exception {
assert readObj.equals(writeObj) : "Writen[" + writeObj + "] and read[" + readObj + "] objects should be the same"; assert readObj.equals(writeObj) : "Writen[" + writeObj + "] and read[" + readObj + "] objects should be the same";
} }


protected void marshallAndAssertEquality(ReplicableCommand writeObj) throws Exception {
byte[] bytes = marshaller.objectToByteBuffer(writeObj);
log.debugf("Payload size for object=%s : %s", writeObj, bytes.length);
ReplicableCommand readObj = (ReplicableCommand) marshaller.objectFromByteBuffer(bytes);
assert readObj.getCommandId() == writeObj.getCommandId() : "Writen[" + writeObj.getCommandId() + "] and read[" + readObj.getCommandId() + "] objects should be the same";
assert readObj.equals(writeObj) : "Writen[" + writeObj + "] and read[" + readObj + "] objects should be the same";
}

protected void marshallAndAssertArrayEquality(Object[] writeObj) throws Exception { protected void marshallAndAssertArrayEquality(Object[] writeObj) throws Exception {
byte[] bytes = marshaller.objectToByteBuffer(writeObj); byte[] bytes = marshaller.objectToByteBuffer(writeObj);
log.debugf("Payload size for object=%s : %s", Arrays.toString(writeObj), bytes.length); log.debugf("Payload size for object=%s : %s", Arrays.toString(writeObj), bytes.length);
Expand Down
Expand Up @@ -50,9 +50,7 @@ public void testInvokeAndExceptionWhileUnmarshalling() throws Exception {
PutKeyValueCommand putCommand = new PutKeyValueCommand(); PutKeyValueCommand putCommand = new PutKeyValueCommand();
putCommand.setKey(key); putCommand.setKey(key);
putCommand.setValue(value); putCommand.setValue(value);
SingleRpcCommand rpcCommand = new SingleRpcCommand("replSync"); SingleRpcCommand rpcCommand = new SingleRpcCommand("replSync", putCommand);
Object[] params = new Object[]{putCommand};
rpcCommand.setParameters(SingleRpcCommand.COMMAND_ID, params);
when(mockMarshaller1.objectToBuffer(anyObject())).thenReturn(originalMarshaller1.objectToBuffer(rpcCommand)); when(mockMarshaller1.objectToBuffer(anyObject())).thenReturn(originalMarshaller1.objectToBuffer(rpcCommand));
when(mockMarshaller.objectFromBuffer((byte[]) anyObject(), anyInt(), anyInt())).thenThrow(new EOFException()); when(mockMarshaller.objectFromBuffer((byte[]) anyObject(), anyInt(), anyInt())).thenThrow(new EOFException());
dispatcher1.setRequestMarshaller(mockMarshaller1); dispatcher1.setRequestMarshaller(mockMarshaller1);
Expand Down
Expand Up @@ -4,7 +4,6 @@
import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.module.ExtendedModuleCommandFactory; import org.infinispan.commands.module.ExtendedModuleCommandFactory;
import org.infinispan.commands.module.ModuleCommandExtensions; import org.infinispan.commands.module.ModuleCommandExtensions;
import org.infinispan.commands.module.ModuleCommandFactory;
import org.infinispan.commands.module.ModuleCommandInitializer; import org.infinispan.commands.module.ModuleCommandInitializer;
import org.infinispan.commands.remote.CacheRpcCommand; import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
Expand All @@ -31,7 +30,7 @@ public Map<Byte, Class<? extends ReplicableCommand>> getModuleCommands() {
} }


@Override @Override
public ReplicableCommand fromStream(byte commandId, Object[] args) { public ReplicableCommand fromStream(byte commandId) {
ReplicableCommand c; ReplicableCommand c;
switch (commandId) { switch (commandId) {
case CustomReplicableCommand.COMMAND_ID: case CustomReplicableCommand.COMMAND_ID:
Expand All @@ -40,12 +39,11 @@ public ReplicableCommand fromStream(byte commandId, Object[] args) {
default: default:
throw new IllegalArgumentException("Not registered to handle command id " + commandId); throw new IllegalArgumentException("Not registered to handle command id " + commandId);
} }
c.setParameters(commandId, args);
return c; return c;
} }


@Override @Override
public CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheName) { public CacheRpcCommand fromStream(byte commandId, String cacheName) {
CacheRpcCommand c; CacheRpcCommand c;
switch (commandId) { switch (commandId) {
case CustomCacheRpcCommand.COMMAND_ID: case CustomCacheRpcCommand.COMMAND_ID:
Expand All @@ -57,7 +55,6 @@ public CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheNam
default: default:
throw new IllegalArgumentException("Not registered to handle command id " + commandId); throw new IllegalArgumentException("Not registered to handle command id " + commandId);
} }
c.setParameters(commandId, args);
return c; return c;
} }
}; };
Expand Down
Expand Up @@ -28,14 +28,14 @@ public Map<Byte, Class<? extends ReplicableCommand>> getModuleCommands() {
} }


@Override @Override
public ReplicableCommand fromStream(byte commandId, Object[] args) { public ReplicableCommand fromStream(byte commandId) {
// Should not be called while this factory only // Should not be called while this factory only
// provides cache specific replicable commands. // provides cache specific replicable commands.
return null; return null;
} }


@Override @Override
public CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheName) { public CacheRpcCommand fromStream(byte commandId, String cacheName) {
CacheRpcCommand c; CacheRpcCommand c;
switch (commandId) { switch (commandId) {
case ClusteredQueryCommand.COMMAND_ID: case ClusteredQueryCommand.COMMAND_ID:
Expand All @@ -50,7 +50,6 @@ public CacheRpcCommand fromStream(byte commandId, Object[] args, String cacheNam
default: default:
throw new IllegalArgumentException("Not registered to handle command id " + commandId); throw new IllegalArgumentException("Not registered to handle command id " + commandId);
} }
c.setParameters(commandId, args);
return c; return c;
} }


Expand Down

0 comments on commit 7fa26ce

Please sign in to comment.