Skip to content

Commit

Permalink
ISPN-11541 Add internal metadata to write commands
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo committed Mar 27, 2020
1 parent eb22486 commit ae96055
Show file tree
Hide file tree
Showing 47 changed files with 616 additions and 145 deletions.
Expand Up @@ -218,6 +218,7 @@ public interface Ids {

int PUBLISHER_RESPONSE = 149;
int CACHE_BI_CONSUMERS = 150;
int PREPARE_RESPONSE = 151;

int COUNTER_CONFIGURATION = 2000; //from counter
int COUNTER_STATE = 2001; //from counter
Expand Down
Expand Up @@ -7,6 +7,7 @@
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.functional.impl.Params;

public abstract class AbstractWriteKeyCommand<K, V> extends AbstractDataWriteCommand implements FunctionalCommand<K, V> {
Expand All @@ -16,6 +17,7 @@ public abstract class AbstractWriteKeyCommand<K, V> extends AbstractDataWriteCom
boolean successful = true;
DataConversion keyDataConversion;
DataConversion valueDataConversion;
MetaParamsInternalMetadata internalMetadata;

public AbstractWriteKeyCommand(Object key, ValueMatcher valueMatcher, int segment,
CommandInvocationId id, Params params,
Expand Down Expand Up @@ -84,4 +86,14 @@ public DataConversion getKeyDataConversion() {
public DataConversion getValueDataConversion() {
return valueDataConversion;
}

@Override
public MetaParamsInternalMetadata getInternalMetadata() {
return internalMetadata;
}

@Override
public void setInternalMetadata(MetaParamsInternalMetadata internalMetadata) {
this.internalMetadata = internalMetadata;
}
}
@@ -1,11 +1,15 @@
package org.infinispan.commands.functional;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.functional.impl.Params;
import org.infinispan.util.concurrent.locks.RemoteLockCommand;

Expand All @@ -20,6 +24,7 @@ public abstract class AbstractWriteManyCommand<K, V> implements WriteCommand, Fu
long flags;
DataConversion keyDataConversion;
DataConversion valueDataConversion;
Map<Object, MetaParamsInternalMetadata> internalMetadataMap;

protected AbstractWriteManyCommand(CommandInvocationId commandInvocationId,
Params params,
Expand All @@ -30,13 +35,15 @@ protected AbstractWriteManyCommand(CommandInvocationId commandInvocationId,
this.flags = params.toFlagsBitSet();
this.keyDataConversion = keyDataConversion;
this.valueDataConversion = valueDataConversion;
this.internalMetadataMap = new ConcurrentHashMap<>();
}

protected <K, V> AbstractWriteManyCommand(AbstractWriteManyCommand<K, V> command) {
this.commandInvocationId = command.commandInvocationId;
this.topologyId = command.topologyId;
this.params = command.params;
this.flags = command.flags;
this.internalMetadataMap = new ConcurrentHashMap<>(command.internalMetadataMap);
}

protected AbstractWriteManyCommand() {
Expand Down Expand Up @@ -139,4 +146,14 @@ public DataConversion getKeyDataConversion() {
public DataConversion getValueDataConversion() {
return valueDataConversion;
}

@Override
public MetaParamsInternalMetadata getInternalMetadata(Object key) {
return internalMetadataMap.get(key);
}

@Override
public void setInternalMetadata(Object key, MetaParamsInternalMetadata internalMetadata) {
this.internalMetadataMap.put(key, internalMetadata);
}
}
Expand Up @@ -18,6 +18,7 @@
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.EntryView.ReadWriteEntryView;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.functional.impl.Params;

// TODO: the command does not carry previous values to backup, so it can cause
Expand Down Expand Up @@ -63,6 +64,7 @@ public void writeTo(ObjectOutput output) throws IOException {
CommandInvocationId.writeTo(output, commandInvocationId);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
output.writeObject(internalMetadata);
}

@Override
Expand All @@ -76,6 +78,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
commandInvocationId = CommandInvocationId.readFrom(input);
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
internalMetadata = (MetaParamsInternalMetadata) input.readObject();
}

@Override
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.EntryView.ReadWriteEntryView;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.functional.impl.Params;
import org.infinispan.metadata.Metadata;

Expand Down Expand Up @@ -71,6 +72,7 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(prevMetadata);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
output.writeObject(internalMetadata);
}

@Override
Expand All @@ -87,6 +89,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
prevMetadata = (Metadata) input.readObject();
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
internalMetadata = (MetaParamsInternalMetadata) input.readObject();
}

@Override
Expand Down
Expand Up @@ -5,6 +5,7 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.infinispan.commands.CommandInvocationId;
Expand Down Expand Up @@ -62,6 +63,7 @@ public Function<ReadWriteEntryView<K, V>, R> getFunction() {

public void setKeys(Collection<?> keys) {
this.keys = keys;
this.internalMetadataMap.keySet().retainAll(keys);
}

public final ReadWriteManyCommand<K, V, R> withKeys(Collection<?> keys) {
Expand All @@ -85,6 +87,7 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeLong(flags);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
MarshallUtil.marshallMap(internalMetadataMap, output);
}

@Override
Expand All @@ -98,6 +101,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
flags = input.readLong();
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
this.internalMetadataMap = MarshallUtil.unmarshallMap(input, ConcurrentHashMap::new);
}

public boolean isForwarded() {
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

import org.infinispan.commands.CommandInvocationId;
Expand Down Expand Up @@ -68,6 +69,7 @@ public BiFunction<T, ReadWriteEntryView<K, V>, R> getBiFunction() {

public void setArguments(Map<?, ?> arguments) {
this.arguments = arguments;
this.internalMetadataMap.keySet().retainAll(arguments.keySet());
}

public final ReadWriteManyEntriesCommand<K, V, T, R> withArguments(Map<?, ?> entries) {
Expand All @@ -91,6 +93,7 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeLong(flags);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
MarshallUtil.marshallMap(internalMetadataMap, output);
}

@Override
Expand All @@ -105,6 +108,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
flags = input.readLong();
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
this.internalMetadataMap = MarshallUtil.unmarshallMap(input, ConcurrentHashMap::new);
}

public boolean isForwarded() {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.EntryView.WriteEntryView;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.functional.impl.Params;

public final class WriteOnlyKeyCommand<K, V> extends AbstractWriteKeyCommand<K, V> {
Expand Down Expand Up @@ -61,6 +62,7 @@ public void writeTo(ObjectOutput output) throws IOException {
CommandInvocationId.writeTo(output, commandInvocationId);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
output.writeObject(internalMetadata);
}

@Override
Expand All @@ -74,6 +76,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
commandInvocationId = CommandInvocationId.readFrom(input);
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
internalMetadata = (MetaParamsInternalMetadata) input.readObject();
}

@Override
Expand Down
Expand Up @@ -16,6 +16,7 @@
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.EntryView.WriteEntryView;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.functional.impl.Params;

public final class WriteOnlyKeyValueCommand<K, V, T> extends AbstractWriteKeyCommand<K, V> {
Expand Down Expand Up @@ -65,6 +66,7 @@ public void writeTo(ObjectOutput output) throws IOException {
CommandInvocationId.writeTo(output, commandInvocationId);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
output.writeObject(internalMetadata);
}

@Override
Expand All @@ -79,6 +81,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
commandInvocationId = CommandInvocationId.readFrom(input);
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
internalMetadata = (MetaParamsInternalMetadata) input.readObject();
}

@Override
Expand Down
Expand Up @@ -5,6 +5,7 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.infinispan.commands.CommandInvocationId;
Expand Down Expand Up @@ -59,6 +60,7 @@ public Consumer<WriteEntryView<K, V>> getConsumer() {

public void setKeys(Collection<?> keys) {
this.keys = keys;
this.internalMetadataMap.keySet().retainAll(keys);
}

public final WriteOnlyManyCommand<K, V> withKeys(Collection<?> keys) {
Expand All @@ -82,6 +84,7 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeLong(flags);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
MarshallUtil.marshallMap(internalMetadataMap, output);
}

@Override
Expand All @@ -95,6 +98,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
flags = input.readLong();
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
this.internalMetadataMap = MarshallUtil.unmarshallMap(input, ConcurrentHashMap::new);
}

@Override
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

import org.infinispan.commands.CommandInvocationId;
Expand Down Expand Up @@ -64,6 +65,7 @@ public BiConsumer<T, WriteEntryView<K, V>> getBiConsumer() {

public void setArguments(Map<?, ?> arguments) {
this.arguments = arguments;
this.internalMetadataMap.keySet().retainAll(arguments.keySet());
}

public final WriteOnlyManyEntriesCommand<K, V, T> withArguments(Map<?, ?> entries) {
Expand All @@ -87,6 +89,7 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeLong(flags);
DataConversion.writeTo(output, keyDataConversion);
DataConversion.writeTo(output, valueDataConversion);
MarshallUtil.marshallMap(internalMetadataMap, output);
}

@Override
Expand All @@ -101,6 +104,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
flags = input.readLong();
keyDataConversion = DataConversion.readFrom(input);
valueDataConversion = DataConversion.readFrom(input);
this.internalMetadataMap = MarshallUtil.unmarshallMap(input, ConcurrentHashMap::new);
}

@Override
Expand Down
Expand Up @@ -10,6 +10,7 @@
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.functional.impl.MetaParamsInternalMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.util.ByteString;
import org.infinispan.util.TriangleFunctionsUtil;
Expand All @@ -26,6 +27,7 @@ public class PutMapBackupWriteCommand extends BackupWriteCommand {

private Map<Object, Object> map;
private Metadata metadata;
private Map<Object, MetaParamsInternalMetadata> internalMetadataMap;

//for testing
@SuppressWarnings("unused")
Expand All @@ -47,19 +49,25 @@ public void writeTo(ObjectOutput output) throws IOException {
writeBase(output);
MarshallUtil.marshallMap(map, output);
output.writeObject(metadata);
MarshallUtil.marshallMap(internalMetadataMap, output);
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
readBase(input);
map = MarshallUtil.unmarshallMap(input, HashMap::new);
metadata = (Metadata) input.readObject();
internalMetadataMap = MarshallUtil.unmarshallMap(input, HashMap::new);
}

public void setPutMapCommand(PutMapCommand command, Collection<Object> keys) {
setCommonAttributesFromCommand(command);
this.map = TriangleFunctionsUtil.filterEntries(command.getMap(), keys);
this.metadata = command.getMetadata();
this.internalMetadataMap = new HashMap<>();
for (Object key : map.keySet()) {
internalMetadataMap.put(key, command.getInternalMetadata(key));
}
}

@Override
Expand All @@ -71,13 +79,15 @@ public String toString() {
WriteCommand createWriteCommand() {
PutMapCommand cmd = new PutMapCommand(map, metadata, getFlags(), getCommandInvocationId());
cmd.setForwarded(true);
internalMetadataMap.forEach(cmd::setInternalMetadata);
return cmd;
}

@Override
String toStringFields() {
return super.toStringFields() +
", map=" + map +
", metadata=" + metadata;
", metadata=" + metadata +
", internalMetadata=" + internalMetadataMap;
}
}

0 comments on commit ae96055

Please sign in to comment.