Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-8078 Support functional commands in scattered mode #5316

Merged
merged 2 commits into from Sep 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,54 @@
package org.infinispan.commons.util;

import java.util.EnumSet;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;

public class ArrayCollector implements java.util.stream.Collector<Object, ArrayCollector, ArrayCollector>, Supplier<ArrayCollector> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is an utility class now and not a private class, could be nice to test it unitary and document it uses ?

private final Object[] array;
private int pos = 0;

public ArrayCollector(Object[] array) {
this.array = array;
}

public void add(Object item) {
array[pos] = item;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling this method too much could lead into ArrayIndexOutOfBounds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much else I could do if the array is not sized properly.

++pos;
}

@Override
public Supplier<ArrayCollector> supplier() {
return this;
}

@Override
public ArrayCollector get() {
return this;
}

@Override
public BiConsumer<ArrayCollector, Object> accumulator() {
return ArrayCollector::add;
}

@Override
public BinaryOperator<ArrayCollector> combiner() {
return (a1, a2) -> {
throw new UnsupportedOperationException("The stream is not supposed to be parallel");
};
}

@Override
public Function<ArrayCollector, ArrayCollector> finisher() {
return Function.identity();
}

@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH);
}
}
Expand Up @@ -7,12 +7,14 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.Visitor;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.functional.EntryView.ReadWriteEntryView;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
Expand Down Expand Up @@ -67,7 +69,7 @@ public byte getCommandId() {
@Override
public void writeTo(ObjectOutput output) throws IOException {
CommandInvocationId.writeTo(output, commandInvocationId);
output.writeObject(entries);
MarshallUtil.marshallMap(entries, output);
output.writeObject(f);
output.writeBoolean(isForwarded);
Params.writeObject(output, params);
Expand All @@ -78,7 +80,8 @@ public void writeTo(ObjectOutput output) throws IOException {
@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
commandInvocationId = CommandInvocationId.readFrom(input);
entries = (Map<? extends K, ? extends V>) input.readObject();
// We use LinkedHashMap in order to guarantee the same order of iteration
entries = MarshallUtil.unmarshallMap(input, LinkedHashMap::new);
f = (BiFunction<V, ReadWriteEntryView<K, V>, R>) input.readObject();
isForwarded = input.readBoolean();
params = Params.readObject(input);
Expand Down
Expand Up @@ -93,7 +93,8 @@ public Object perform(InvocationContext ctx) throws Throwable {

@Override
public boolean isReturnValueExpected() {
return false;
// Scattered cache always needs some response.
return true;
}

@Override
Expand Down
Expand Up @@ -4,11 +4,13 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.Visitor;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.functional.EntryView.WriteEntryView;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
Expand Down Expand Up @@ -58,7 +60,7 @@ public byte getCommandId() {
@Override
public void writeTo(ObjectOutput output) throws IOException {
CommandInvocationId.writeTo(output, commandInvocationId);
output.writeObject(entries);
MarshallUtil.marshallMap(entries, output);
output.writeObject(f);
output.writeBoolean(isForwarded);
Params.writeObject(output, params);
Expand All @@ -69,7 +71,8 @@ public void writeTo(ObjectOutput output) throws IOException {
@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
commandInvocationId = CommandInvocationId.readFrom(input);
entries = (Map<? extends K, ? extends V>) input.readObject();
// We use LinkedHashMap in order to guarantee the same order of iteration
entries = MarshallUtil.unmarshallMap(input, LinkedHashMap::new);
f = (BiConsumer<V, WriteEntryView<V>>) input.readObject();
isForwarded = input.readBoolean();
params = Params.readObject(input);
Expand All @@ -93,7 +96,8 @@ public Object perform(InvocationContext ctx) throws Throwable {

@Override
public boolean isReturnValueExpected() {
return false;
// Scattered cache always needs some response.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but for this case, the return is null ... isn't it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In scattered cache it will return an array of InternalCacheValue (after modification), because originator cannot apply the function locally (it does not have the previous version).

return true;
}

@Override
Expand Down
Expand Up @@ -9,13 +9,15 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.infinispan.commands.AbstractTopologyAffectedCommand;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.MetadataAwareCommand;
import org.infinispan.commands.Visitor;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
Expand Down Expand Up @@ -157,7 +159,7 @@ public byte getCommandId() {

@Override
public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(map);
MarshallUtil.marshallMap(map, output);
output.writeObject(metadata);
output.writeBoolean(isForwarded);
output.writeLong(FlagBitSets.copyWithoutRemotableFlags(getFlagsBitSet()));
Expand All @@ -166,8 +168,7 @@ public void writeTo(ObjectOutput output) throws IOException {

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
//noinspection unchecked
map = (Map<Object, Object>) input.readObject();
map = MarshallUtil.unmarshallMap(input, LinkedHashMap::new);
metadata = (Metadata) input.readObject();
isForwarded = input.readBoolean();
setFlagsBitSet(input.readLong());
Expand Down
Expand Up @@ -235,6 +235,9 @@ protected Object performRemove(MVCCEntry e, Object prevValue, InvocationContext
e.setValid(false);
e.setChanged(true);
e.setValue(null);
if (metadata != null) {
e.setMetadata(metadata);
}

if (valueMatcher != ValueMatcher.MATCH_EXPECTED_OR_NEW) {
return isConditional() ? true : prevValue;
Expand Down
Expand Up @@ -52,8 +52,11 @@ public void injectDependencies(DataContainer dataContainer, Configuration config

@Start (priority = 8)
public void init() {
// Scattered mode needs repeatable-read entries to properly retry half-committed multi-key operations
// (see RetryingEntryWrappingInterceptor for details).
useRepeatableRead = configuration.transaction().transactionMode().isTransactional()
&& configuration.locking().isolationLevel() == IsolationLevel.REPEATABLE_READ;
&& configuration.locking().isolationLevel() == IsolationLevel.REPEATABLE_READ
|| configuration.clustering().cacheMode().isScattered();
isL1Enabled = configuration.clustering().l1().enabled();
// Write-skew check implies isolation level = REPEATABLE_READ && locking mode = OPTIMISTIC
useVersioning = Configurations.isTxVersioned(configuration);
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/infinispan/container/entries/MVCCEntry.java
Expand Up @@ -67,4 +67,14 @@ default void setRead() {}
default boolean isRead() {
return false;
}

/**
* Mark this context-entry as already committed to the {@link DataContainer}.
*/
default void setCommitted() {}

/**
* @return True if this context entry has been committed to the {@link DataContainer}
*/
default boolean isCommitted() { return false; }
}
Expand Up @@ -2,6 +2,7 @@

import static org.infinispan.commons.util.Util.toStr;
import static org.infinispan.container.entries.ReadCommittedEntry.Flags.CHANGED;
import static org.infinispan.container.entries.ReadCommittedEntry.Flags.COMMITTED;
import static org.infinispan.container.entries.ReadCommittedEntry.Flags.CREATED;
import static org.infinispan.container.entries.ReadCommittedEntry.Flags.EVICTED;
import static org.infinispan.container.entries.ReadCommittedEntry.Flags.EXPIRED;
Expand Down Expand Up @@ -44,7 +45,7 @@ protected enum Flags {
CHANGED(1),
CREATED(1 << 1),
REMOVED(1 << 2),
// 1 << 3 no longer used (was: VALID)
COMMITTED(1 << 3),
EVICTED(1 << 4),
EXPIRED(1 << 5),
SKIP_LOOKUP(1 << 6),
Expand Down Expand Up @@ -208,6 +209,16 @@ public boolean isExpired() {
return isFlagSet(EXPIRED);
}

@Override
public void setCommitted() {
setFlag(COMMITTED);
}

@Override
public boolean isCommitted() {
return isFlagSet(COMMITTED);
}

@Override
public void resetCurrentValue() {
// noop, the entry is removed from context
Expand Down
Expand Up @@ -18,10 +18,12 @@ public class RepeatableReadEntry extends ReadCommittedEntry {

/* Value before the last modification. Serves as the previous value when the operation is retried */
protected Object oldValue;
protected Metadata oldMetadata;

public RepeatableReadEntry(Object key, Object value, Metadata metadata) {
super(key, value, metadata);
this.oldValue = value;
this.oldMetadata = metadata;
}

@Override
Expand Down Expand Up @@ -49,11 +51,21 @@ public final Object setValue(Object value) {
@Override
public void resetCurrentValue() {
value = oldValue;
metadata = oldMetadata;
}

@Override
public void updatePreviousValue() {
oldValue = value;
oldMetadata = metadata;
}

public Object getOldValue() {
return oldValue;
}

public Metadata getOldMetadata() {
return oldMetadata;
}

public void setRead() {
Expand Down