Skip to content

Commit

Permalink
ISPN-6586 Run ReadOnlyKeyCommand and ReadOnlyManyCommand on owner
Browse files Browse the repository at this point in the history
* Also make sure that errors from functional commands are thrown as
  CacheException (if the exception is thrown on local node)
  • Loading branch information
rvansa authored and galderz committed Sep 26, 2016
1 parent 5bcf052 commit 811b51e
Show file tree
Hide file tree
Showing 31 changed files with 932 additions and 282 deletions.
Expand Up @@ -34,4 +34,8 @@ public CacheException(String msg) {
public CacheException(String msg, Throwable cause) { public CacheException(String msg, Throwable cause) {
super(msg, cause); super(msg, cause);
} }

public CacheException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
} }
Expand Up @@ -9,10 +9,13 @@
import static org.infinispan.commons.marshall.MarshallableFunctions.SetValueMetasIfPresentReturnPrevOrNull; import static org.infinispan.commons.marshall.MarshallableFunctions.SetValueMetasIfPresentReturnPrevOrNull;
import static org.infinispan.commons.marshall.MarshallableFunctions.SetValueMetasReturnPrevOrNull; import static org.infinispan.commons.marshall.MarshallableFunctions.SetValueMetasReturnPrevOrNull;
import static org.infinispan.commons.marshall.MarshallableFunctions.SetValueMetasReturnView; import static org.infinispan.commons.marshall.MarshallableFunctions.SetValueMetasReturnView;
import static org.infinispan.commons.marshall.MarshallableFunctions.identity;
import static org.infinispan.commons.marshall.MarshallableFunctions.removeConsumer; import static org.infinispan.commons.marshall.MarshallableFunctions.removeConsumer;
import static org.infinispan.commons.marshall.MarshallableFunctions.removeIfValueEqualsReturnBoolean; import static org.infinispan.commons.marshall.MarshallableFunctions.removeIfValueEqualsReturnBoolean;
import static org.infinispan.commons.marshall.MarshallableFunctions.removeReturnBoolean; import static org.infinispan.commons.marshall.MarshallableFunctions.removeReturnBoolean;
import static org.infinispan.commons.marshall.MarshallableFunctions.removeReturnPrevOrNull; import static org.infinispan.commons.marshall.MarshallableFunctions.removeReturnPrevOrNull;
import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadOnlyFindIsPresent;
import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadOnlyFindOrNull;
import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadWriteFind; import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadWriteFind;
import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadWriteGet; import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadWriteGet;
import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadWriteView; import static org.infinispan.commons.marshall.MarshallableFunctions.returnReadWriteView;
Expand Down Expand Up @@ -58,6 +61,9 @@ public class MarshallableFunctionExternalizers {
private static final int RETURN_READ_WRITE_FIND = 12 | VALUE_MATCH_ALWAYS; private static final int RETURN_READ_WRITE_FIND = 12 | VALUE_MATCH_ALWAYS;
private static final int RETURN_READ_WRITE_GET = 13 | VALUE_MATCH_ALWAYS; private static final int RETURN_READ_WRITE_GET = 13 | VALUE_MATCH_ALWAYS;
private static final int RETURN_READ_WRITE_VIEW = 14 | VALUE_MATCH_ALWAYS; private static final int RETURN_READ_WRITE_VIEW = 14 | VALUE_MATCH_ALWAYS;
private static final int RETURN_READ_ONLY_FIND_OR_NULL = 15 | VALUE_MATCH_ALWAYS;
private static final int RETURN_READ_ONLY_FIND_IS_PRESENT = 16 | VALUE_MATCH_ALWAYS;
private static final int IDENTITY = 17 | VALUE_MATCH_ALWAYS;


public static final class ConstantLambdaExternalizer implements LambdaExternalizer<Object> { public static final class ConstantLambdaExternalizer implements LambdaExternalizer<Object> {
private final IdentityIntMap<Class<?>> numbers = new IdentityIntMap<>(16); private final IdentityIntMap<Class<?>> numbers = new IdentityIntMap<>(16);
Expand All @@ -77,6 +83,9 @@ public ConstantLambdaExternalizer() {
numbers.put(returnReadWriteFind().getClass(), RETURN_READ_WRITE_FIND); numbers.put(returnReadWriteFind().getClass(), RETURN_READ_WRITE_FIND);
numbers.put(returnReadWriteGet().getClass(), RETURN_READ_WRITE_GET); numbers.put(returnReadWriteGet().getClass(), RETURN_READ_WRITE_GET);
numbers.put(returnReadWriteView().getClass(), RETURN_READ_WRITE_VIEW); numbers.put(returnReadWriteView().getClass(), RETURN_READ_WRITE_VIEW);
numbers.put(returnReadOnlyFindOrNull().getClass(), RETURN_READ_ONLY_FIND_OR_NULL);
numbers.put(returnReadOnlyFindIsPresent().getClass(), RETURN_READ_ONLY_FIND_IS_PRESENT);
numbers.put(identity().getClass(), IDENTITY);
} }


@Override @Override
Expand Down Expand Up @@ -106,7 +115,10 @@ public Set<Class<?>> getTypeClasses() {
removeConsumer().getClass(), removeConsumer().getClass(),
returnReadWriteFind().getClass(), returnReadWriteFind().getClass(),
returnReadWriteGet().getClass(), returnReadWriteGet().getClass(),
returnReadWriteView().getClass() returnReadWriteView().getClass(),
returnReadOnlyFindOrNull().getClass(),
returnReadOnlyFindIsPresent().getClass(),
identity().getClass()
); );
} }


Expand Down Expand Up @@ -137,6 +149,9 @@ public Object readObject(ObjectInput input) throws IOException {
case RETURN_READ_WRITE_FIND: return returnReadWriteFind(); case RETURN_READ_WRITE_FIND: return returnReadWriteFind();
case RETURN_READ_WRITE_GET: return returnReadWriteGet(); case RETURN_READ_WRITE_GET: return returnReadWriteGet();
case RETURN_READ_WRITE_VIEW: return returnReadWriteView(); case RETURN_READ_WRITE_VIEW: return returnReadWriteView();
case RETURN_READ_ONLY_FIND_OR_NULL: return returnReadOnlyFindOrNull();
case RETURN_READ_ONLY_FIND_IS_PRESENT: return returnReadOnlyFindIsPresent();
case IDENTITY: return identity();
default: default:
throw new IllegalStateException("Unknown lambda ID: " + id); throw new IllegalStateException("Unknown lambda ID: " + id);
} }
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;


import org.infinispan.commons.api.functional.EntryView.ReadEntryView;
import org.infinispan.commons.api.functional.EntryView.ReadWriteEntryView; import org.infinispan.commons.api.functional.EntryView.ReadWriteEntryView;
import org.infinispan.commons.api.functional.EntryView.WriteEntryView; import org.infinispan.commons.api.functional.EntryView.WriteEntryView;
import org.infinispan.commons.api.functional.MetaParam; import org.infinispan.commons.api.functional.MetaParam;
Expand Down Expand Up @@ -100,6 +101,18 @@ public static <K, V> Function<ReadWriteEntryView<K, V>, ReadWriteEntryView<K, V>
return ReturnReadWriteView.getInstance(); return ReturnReadWriteView.getInstance();
} }


public static <K, V> Function<ReadEntryView<K, V>, V> returnReadOnlyFindOrNull() {
return ReturnReadOnlyFindOrNull.getInstance();
}

public static <K, V> Function<ReadEntryView<K, V>, Boolean> returnReadOnlyFindIsPresent() {
return ReturnReadOnlyFindIsPresent.getInstance();
}

public static <T> Function<T, T> identity() {
return Identity.getInstance();
}

private static abstract class AbstractSetValueReturnPrevOrNull<K, V> private static abstract class AbstractSetValueReturnPrevOrNull<K, V>
implements BiFunction<V, ReadWriteEntryView<K, V>, V> { implements BiFunction<V, ReadWriteEntryView<K, V>, V> {
final MetaParam.Writable[] metas; final MetaParam.Writable[] metas;
Expand Down Expand Up @@ -529,6 +542,47 @@ private static <K, V> Function<ReadWriteEntryView<K, V>, ReadWriteEntryView<K, V
} }
} }


private static final class ReturnReadOnlyFindOrNull<K, V>
implements Function<ReadEntryView<K, V>, V> {
@Override
public V apply(ReadEntryView<K, V> ro) {
return ro.find().orElse(null);
}

private static final ReturnReadOnlyFindOrNull INSTANCE = new ReturnReadOnlyFindOrNull<>();

private static <K, V> Function<ReadEntryView<K, V>, V> getInstance() {
return INSTANCE;
}
}

private static final class ReturnReadOnlyFindIsPresent<K, V>
implements Function<ReadEntryView<K, V>, Boolean> {
@Override
public Boolean apply(ReadEntryView<K, V> ro) {
return ro.find().isPresent();
}

private static final ReturnReadOnlyFindIsPresent INSTANCE = new ReturnReadOnlyFindIsPresent<>();

private static <K, V> Function<ReadEntryView<K, V>, Boolean> getInstance() {
return INSTANCE;
}
}

private static final class Identity<T> implements Function<T, T> {
@Override
public T apply(T o) {
return o;
}

private static final Identity INSTANCE = new Identity<>();

private static <T> Function<T, T> getInstance() {
return INSTANCE;
}
}

private MarshallableFunctions() { private MarshallableFunctions() {
// No-op, holds static variables // No-op, holds static variables
} }
Expand Down
Expand Up @@ -3,6 +3,8 @@
import java.util.Map; import java.util.Map;


import org.infinispan.commands.control.LockControlCommand; import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand; import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand; import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand; import org.infinispan.commands.functional.ReadWriteManyCommand;
Expand Down Expand Up @@ -177,6 +179,12 @@ public ReplicableCommand fromStream(byte id, byte type) {
case ReplicableCommandManagerFunction.COMMAND_ID: case ReplicableCommandManagerFunction.COMMAND_ID:
command = new ReplicableCommandManagerFunction(); command = new ReplicableCommandManagerFunction();
break; break;
case ReadOnlyKeyCommand.COMMAND_ID:
command = new ReadOnlyKeyCommand();
break;
case ReadOnlyManyCommand.COMMAND_ID:
command = new ReadOnlyManyCommand<>();
break;
default: default:
throw new CacheException("Unknown command id " + id + "!"); throw new CacheException("Unknown command id " + id + "!");
} }
Expand Down
Expand Up @@ -18,6 +18,7 @@


public final class ReadOnlyKeyCommand<K, V, R> extends AbstractDataCommand implements LocalCommand { public final class ReadOnlyKeyCommand<K, V, R> extends AbstractDataCommand implements LocalCommand {


public static final int COMMAND_ID = 62;
private Function<ReadEntryView<K, V>, R> f; private Function<ReadEntryView<K, V>, R> f;


public ReadOnlyKeyCommand(Object key, Function<ReadEntryView<K, V>, R> f) { public ReadOnlyKeyCommand(Object key, Function<ReadEntryView<K, V>, R> f) {
Expand All @@ -30,30 +31,30 @@ public ReadOnlyKeyCommand() {


@Override @Override
public byte getCommandId() { public byte getCommandId() {
return -1; return COMMAND_ID;
} }


@Override @Override
public void writeTo(ObjectOutput output) throws IOException { public void writeTo(ObjectOutput output) throws IOException {
// Not really replicated output.writeObject(key);
output.writeObject(f);
} }


@Override @Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
// Not really replicated key = input.readObject();
f = (Function<ReadEntryView<K, V>, R>) input.readObject();
} }


// Not really invoked unless in local mode
@Override @Override
public Object perform(InvocationContext ctx) throws Throwable { public Object perform(InvocationContext ctx) throws Throwable {
CacheEntry<K, V> entry = ctx.lookupEntry(key); CacheEntry<K, V> entry = ctx.lookupEntry(key);


// Could be that the key is not local, 'null' is how this is signalled // Could be that the key is not local, 'null' is how this is signalled
// When the entry is local, the entry is NullCacheEntry instead
if (entry == null) return null; if (entry == null) return null;


return perform(entry);
}

public Object perform(CacheEntry<K, V> entry) {
ReadEntryView<K, V> ro = (entry == null || entry.isNull()) ReadEntryView<K, V> ro = (entry == null || entry.isNull())
? EntryViews.noValue((K) key) : EntryViews.readOnly(entry); ? EntryViews.noValue((K) key) : EntryViews.readOnly(entry);
R ret = f.apply(ro); R ret = f.apply(ro);
Expand All @@ -78,6 +79,7 @@ public boolean alwaysReadsExistingValues() {
@Override @Override
public String toString() { public String toString() {
return "ReadOnlyKeyCommand{" + return "ReadOnlyKeyCommand{" +
"key=" + key +
"f=" + f + "f=" + f +
'}'; '}';
} }
Expand Down
Expand Up @@ -5,30 +5,27 @@
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInput; import java.io.ObjectInput;
import java.io.ObjectOutput; import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;


import org.infinispan.commands.AbstractFlagAffectedCommand;
import org.infinispan.commands.LocalCommand; import org.infinispan.commands.LocalCommand;
import org.infinispan.commands.Visitor; import org.infinispan.commands.Visitor;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commons.api.functional.EntryView.ReadEntryView; import org.infinispan.commons.api.functional.EntryView.ReadEntryView;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.functional.impl.EntryViews; import org.infinispan.functional.impl.EntryViews;
import org.infinispan.lifecycle.ComponentStatus;


public final class ReadOnlyManyCommand<K, V, R> extends AbstractDataCommand implements LocalCommand { public final class ReadOnlyManyCommand<K, V, R> extends AbstractFlagAffectedCommand implements LocalCommand {
public static final int COMMAND_ID = 63;


private Collection<? extends K> keys; private Collection<? extends K> keys;
private Function<ReadEntryView<K, V>, R> f; private Function<ReadEntryView<K, V>, R> f;


private ConsistentHash ch;
// TODO: remotely fetched are because of compatibility - can't we just always return InternalCacheEntry and have
// the unboxing executed as the topmost interceptor?
private Map<Object, InternalCacheEntry> remotelyFetched;

public ReadOnlyManyCommand(Collection<? extends K> keys, Function<ReadEntryView<K, V>, R> f) { public ReadOnlyManyCommand(Collection<? extends K> keys, Function<ReadEntryView<K, V>, R> f) {
this.keys = keys; this.keys = keys;
this.f = f; this.f = f;
Expand All @@ -41,44 +38,47 @@ public Collection<? extends K> getKeys() {
return keys; return keys;
} }


@Override public Function<ReadEntryView<K, V>, R> getFunction() {
public byte getCommandId() { return f;
return -1;
} }


@Override @Override
public void writeTo(ObjectOutput output) throws IOException { public byte getCommandId() {
// Not really replicated return COMMAND_ID;
} }


@Override @Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { public boolean isReturnValueExpected() {
// Not really replicated return true;
}

public ConsistentHash getConsistentHash() {
return ch;
} }


public void setConsistentHash(ConsistentHash ch) { @Override
this.ch = ch; public boolean canBlock() {
return false;
} }


public Map<Object, InternalCacheEntry> getRemotelyFetched() { @Override
return remotelyFetched; public void writeTo(ObjectOutput output) throws IOException {
MarshallUtil.marshallCollection(keys, output);
output.writeObject(f);
} }


public void setRemotelyFetched(Map<Object, InternalCacheEntry> remotelyFetched) { @Override
this.remotelyFetched = remotelyFetched; public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
this.keys = MarshallUtil.unmarshallCollection(input, ArrayList::new);
this.f = (Function<ReadEntryView<K, V>, R>) input.readObject();
} }


@Override @Override
public Object perform(InvocationContext ctx) throws Throwable { public Object perform(InvocationContext ctx) throws Throwable {
return keys.stream().map(k -> { // lazy execution triggers exceptions on unexpected places
ArrayList<R> retvals = new ArrayList<R>(keys.size());
for (K k : keys) {
CacheEntry<K, V> me = lookupCacheEntry(ctx, k); CacheEntry<K, V> me = lookupCacheEntry(ctx, k);
R ret = f.apply(me == null ? EntryViews.noValue(k) : EntryViews.readOnly(me)); R ret = f.apply(me == null || me.isNull() ? EntryViews.noValue(k) : EntryViews.readOnly(me));
return snapshot(ret); retvals.add(snapshot(ret));
}); }
return retvals.stream();
} }


private CacheEntry<K, V> lookupCacheEntry(InvocationContext ctx, Object key) { private CacheEntry<K, V> lookupCacheEntry(InvocationContext ctx, Object key) {
Expand All @@ -91,22 +91,25 @@ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throw
} }


@Override @Override
public boolean readsExistingValues() { public boolean shouldInvoke(InvocationContext ctx) {
return true; return true;
} }


@Override @Override
public boolean alwaysReadsExistingValues() { public boolean ignoreCommandOnStatus(ComponentStatus status) {
return false; return false;
} }


@Override
public boolean readsExistingValues() {
return true;
}

@Override @Override
public String toString() { public String toString() {
return "ReadOnlyManyCommand{" + return "ReadOnlyManyCommand{" +
"keys=" + keys + "keys=" + keys +
", f=" + f + ", f=" + f +
", ch=" + ch +
", remotelyFetched=" + remotelyFetched +
'}'; '}';
} }
} }
Expand Up @@ -72,8 +72,9 @@ public Object perform(InvocationContext ctx) throws Throwable {
CacheEntry<K, V> cacheEntry = ctx.lookupEntry(entry.getKey()); CacheEntry<K, V> cacheEntry = ctx.lookupEntry(entry.getKey());


// Could be that the key is not local, 'null' is how this is signalled // Could be that the key is not local, 'null' is how this is signalled
if (cacheEntry != null) if (cacheEntry != null) {
f.accept(entry.getValue(), EntryViews.writeOnly(cacheEntry)); f.accept(entry.getValue(), EntryViews.writeOnly(cacheEntry));
}
} }


return null; return null;
Expand Down

0 comments on commit 811b51e

Please sign in to comment.