From 4f5c52eb09b4423cf4a717f1b09441d0a0427c59 Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Fri, 21 Jul 2017 15:06:37 +0200 Subject: [PATCH 1/2] ISPN-8078 Refactor helpers out of NonTxDistributionInterceptor * Also move BaseDistributionInterceptor$ArrayIterator -> commons/ArrayCollector --- .../commons/util/ArrayCollector.java | 54 +++ .../BaseDistributionInterceptor.java | 94 +---- .../CountDownCompletableFuture.java | 37 ++ .../MergingCompletableFuture.java | 57 +++ .../NonTxDistributionInterceptor.java | 395 ++---------------- .../distribution/PutMapHelper.java | 76 ++++ .../ReadWriteManyEntriesHelper.java | 73 ++++ .../distribution/ReadWriteManyHelper.java | 71 ++++ .../distribution/WriteManyCommandHelper.java | 37 ++ .../WriteOnlyManyEntriesHelper.java | 73 ++++ .../distribution/WriteOnlyManyHelper.java | 72 ++++ 11 files changed, 585 insertions(+), 454 deletions(-) create mode 100644 commons/src/main/java/org/infinispan/commons/util/ArrayCollector.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/CountDownCompletableFuture.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java create mode 100644 core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java diff --git a/commons/src/main/java/org/infinispan/commons/util/ArrayCollector.java b/commons/src/main/java/org/infinispan/commons/util/ArrayCollector.java new file mode 100644 index 000000000000..f60476ce7099 --- /dev/null +++ b/commons/src/main/java/org/infinispan/commons/util/ArrayCollector.java @@ -0,0 +1,54 @@ +package org.infinispan.commons.util; + +import java.util.EnumSet; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Supplier; + +public class ArrayCollector implements java.util.stream.Collector, Supplier { + private final Object[] array; + private int pos = 0; + + public ArrayCollector(Object[] array) { + this.array = array; + } + + public void add(Object item) { + array[pos] = item; + ++pos; + } + + @Override + public Supplier supplier() { + return this; + } + + @Override + public ArrayCollector get() { + return this; + } + + @Override + public BiConsumer accumulator() { + return ArrayCollector::add; + } + + @Override + public BinaryOperator combiner() { + return (a1, a2) -> { + throw new UnsupportedOperationException("The stream is not supposed to be parallel"); + }; + } + + @Override + public Function finisher() { + return Function.identity(); + } + + @Override + public Set characteristics() { + return EnumSet.of(Characteristics.IDENTITY_FINISH); + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java index c9034e063339..60a2e4438145 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java @@ -10,11 +10,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Stream; +import java.util.stream.*; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.ReplicableCommand; @@ -34,6 +32,7 @@ import org.infinispan.commands.write.DataWriteCommand; import org.infinispan.commands.write.ValueMatcher; import org.infinispan.commons.CacheException; +import org.infinispan.commons.util.ArrayCollector; import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.container.entries.InternalCacheValue; @@ -563,8 +562,8 @@ protected Object handl // On FutureMode.ASYNC, there should be one command per target node going from the top level // to allow retries in StateTransferInterceptor in case of topology change. MergingCompletableFuture allFuture = new MergingCompletableFuture<>( - ctx, requestedKeys.size() + (availableKeys.isEmpty() ? 0 : 1), - new Object[keys.size()], helper::transformResult); + requestedKeys.size() + (availableKeys.isEmpty() ? 0 : 1), + new Object[keys.size()], helper::transformResult); handleLocallyAvailableKeys(ctx, command, availableKeys, allFuture, helper); int pos = availableKeys.size(); @@ -764,86 +763,6 @@ protected Object[] unwrapFunctionalManyResultOnOrigin(InvocationContext ctx, Lis return responseValue instanceof Object[] ? (Object[]) responseValue : null; } - protected static class ArrayIterator { - private final Object[] array; - private int pos = 0; - - public ArrayIterator(Object[] array) { - this.array = array; - } - - public void add(Object item) { - array[pos] = item; - ++pos; - } - - public void combine(ArrayIterator other) { - throw new UnsupportedOperationException("The stream is not supposed to be parallel"); - } - } - - protected static class CountDownCompletableFuture extends CompletableFuture { - protected final InvocationContext ctx; - protected final AtomicInteger counter; - - public CountDownCompletableFuture(InvocationContext ctx, int participants) { - if (trace) log.tracef("Creating shortcut countdown with %d participants", participants); - this.ctx = ctx; - this.counter = new AtomicInteger(participants); - } - - public void countDown() { - if (counter.decrementAndGet() == 0) { - Object result = null; - try { - result = result(); - } catch (Throwable t) { - completeExceptionally(t); - } finally { - // no-op when completed with exception - complete(result); - } - } - } - - public void increment() { - int preValue = counter.getAndIncrement(); - if (preValue == 0) { - throw new IllegalStateException(); - } - } - - protected Object result() { - return null; - } - } - - protected static class MergingCompletableFuture extends CountDownCompletableFuture { - private final Function transform; - protected final T[] results; - protected volatile boolean hasUnsureResponse; - protected volatile boolean lostData; - - public MergingCompletableFuture(InvocationContext ctx, int participants, T[] results, Function transform) { - super(ctx, participants); - // results can be null if the command has flag IGNORE_RETURN_VALUE - this.results = results; - this.transform = transform; - } - - @Override - protected Object result() { - // If we've lost data but did not get any unsure responses we should return limited stream. - // If we've got unsure response but did not lose any data - no problem, there has been another - // response delivering the results. - // Only if those two combine we'll rather throw OTE and retry. - if (hasUnsureResponse && lostData) { - throw OutdatedTopologyException.INSTANCE; - } - return transform == null || results == null ? null : transform.apply(results); - } - } - private Object visitGetCommand(InvocationContext ctx, AbstractDataCommand command) throws Throwable { return ctx.lookupEntry(command.getKey()) == null ? onEntryMiss(ctx, command) : invokeNext(ctx, command); } @@ -987,10 +906,7 @@ public ReplicableCommand copyForRemote(ReadOnlyManyCommand command, List @Override public void applyLocalResult(MergingCompletableFuture allFuture, Object rv) { - Supplier supplier = () -> new ArrayIterator(allFuture.results); - BiConsumer consumer = ArrayIterator::add; - BiConsumer combiner = ArrayIterator::combine; - ((Stream) rv).collect(supplier, consumer, combiner); + ((Stream) rv).collect(new ArrayCollector(allFuture.results)); } @Override diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/CountDownCompletableFuture.java b/core/src/main/java/org/infinispan/interceptors/distribution/CountDownCompletableFuture.java new file mode 100644 index 000000000000..358e3cb96579 --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/CountDownCompletableFuture.java @@ -0,0 +1,37 @@ +package org.infinispan.interceptors.distribution; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +class CountDownCompletableFuture extends CompletableFuture { + protected final AtomicInteger counter; + + public CountDownCompletableFuture(int participants) { + this.counter = new AtomicInteger(participants); + } + + public void countDown() { + if (counter.decrementAndGet() == 0) { + Object result = null; + try { + result = result(); + } catch (Throwable t) { + completeExceptionally(t); + } finally { + // no-op when completed with exception + complete(result); + } + } + } + + public void increment() { + int preValue = counter.getAndIncrement(); + if (preValue == 0) { + throw new IllegalStateException(); + } + } + + protected Object result() { + return null; + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java b/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java new file mode 100644 index 000000000000..808faaa49456 --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java @@ -0,0 +1,57 @@ +package org.infinispan.interceptors.distribution; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import org.infinispan.statetransfer.OutdatedTopologyException; + +class MergingCompletableFuture extends CountDownCompletableFuture { + private final Function transform; + protected final T[] results; + protected volatile boolean hasUnsureResponse; + protected volatile boolean lostData; + + MergingCompletableFuture(int participants, T[] results, Function transform) { + super(participants); + // results can be null if the command has flag IGNORE_RETURN_VALUE + this.results = results; + this.transform = transform; + } + + static BiConsumer, Object> moveListItemsToFuture(int myOffset) { + return (f, rv) -> moveListItemsToFuture(rv, f, myOffset); + } + + static void moveListItemsToFuture(Object rv, MergingCompletableFuture f, int myOffset) { + Collection items; + if (rv == null && f.results == null) { + return; + } else if (rv instanceof Map) { + items = ((Map) rv).entrySet(); + } else if (rv instanceof Collection) { + items = (Collection) rv; + } else { + f.completeExceptionally(new IllegalArgumentException("Unexpected result value " + rv)); + return; + } + Iterator it = items.iterator(); + for (int i = 0; it.hasNext(); ++i) { + f.results[myOffset + i] = it.next(); + } + } + + @Override + protected Object result() { + // If we've lost data but did not get any unsure responses we should return limited stream. + // If we've got unsure response but did not lose any data - no problem, there has been another + // response delivering the results. + // Only if those two combine we'll rather throw OTE and retry. + if (hasUnsureResponse && lostData) { + throw OutdatedTopologyException.INSTANCE; + } + return transform == null || results == null ? null : transform.apply(results); + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java index fba7c884d935..71beeaf8a204 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java @@ -1,12 +1,9 @@ package org.infinispan.interceptors.distribution; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -14,7 +11,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.functional.ReadWriteKeyCommand; import org.infinispan.commands.functional.ReadWriteKeyValueCommand; import org.infinispan.commands.functional.ReadWriteManyCommand; @@ -36,8 +32,6 @@ import org.infinispan.context.impl.FlagBitSets; import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.ch.ConsistentHash; -import org.infinispan.distribution.util.ReadOnlySegmentAwareCollection; -import org.infinispan.distribution.util.ReadOnlySegmentAwareMap; import org.infinispan.interceptors.InvocationFinallyAction; import org.infinispan.interceptors.InvocationSuccessFunction; import org.infinispan.remoting.responses.SuccessfulResponse; @@ -69,35 +63,11 @@ public class NonTxDistributionInterceptor extends BaseDistributionInterceptor { private static Log log = LogFactory.getLog(NonTxDistributionInterceptor.class); private static final boolean trace = log.isTraceEnabled(); - private final PutMapHelper putMapHelper = new PutMapHelper(); - private final ReadWriteManyHelper readWriteManyHelper = new ReadWriteManyHelper(); - private final ReadWriteManyEntriesHelper readWriteManyEntriesHelper = new ReadWriteManyEntriesHelper(); - private final WriteOnlyManyEntriesHelper writeOnlyManyEntriesHelper = new WriteOnlyManyEntriesHelper(); - private final WriteOnlyManyHelper writeOnlyManyHelper = new WriteOnlyManyHelper(); - - private static BiConsumer, Object> moveListItemsToFuture(int myOffset) { - return (f, rv) -> { - Collection items; - if (rv == null && f.results == null) { - return; - } else if (rv instanceof Map) { - items = ((Map) rv).entrySet(); - } else if (rv instanceof Collection) { - items = (Collection) rv; - } else { - f.completeExceptionally(new IllegalArgumentException("Unexpected result value " + rv)); - return; - } - if (trace) { - log.tracef("Copying %d items %s to results (%s), starting offset %d", items.size(), items, - Arrays.toString(f.results), myOffset); - } - Iterator it = items.iterator(); - for (int i = 0; it.hasNext(); ++i) { - f.results[myOffset + i] = it.next(); - } - }; - } + private final PutMapHelper putMapHelper = new PutMapHelper(this::createRemoteCallback); + private final ReadWriteManyHelper readWriteManyHelper = new ReadWriteManyHelper(this::createRemoteCallback); + private final ReadWriteManyEntriesHelper readWriteManyEntriesHelper = new ReadWriteManyEntriesHelper(this::createRemoteCallback); + private final WriteOnlyManyEntriesHelper writeOnlyManyEntriesHelper = new WriteOnlyManyEntriesHelper(this::createRemoteCallback); + private final WriteOnlyManyHelper writeOnlyManyHelper = new WriteOnlyManyHelper(this::createRemoteCallback); private Map> primaryOwnersOfSegments(ConsistentHash ch) { Map> map = new HashMap<>(ch.getMembers().size()); @@ -205,7 +175,7 @@ private Object handleWriteOnlyManyComm ConsistentHash ch = cacheTopology.getWriteConsistentHash(); if (ctx.isOriginLocal()) { Map> segmentMap = primaryOwnersOfSegments(ch); - CountDownCompletableFuture allFuture = new CountDownCompletableFuture(ctx, segmentMap.size()); + CountDownCompletableFuture allFuture = new CountDownCompletableFuture(segmentMap.size()); // Go through all members, for this node invokeNext (if this node is an owner of some keys), // for the others (that own some keys) issue a remote call. @@ -268,7 +238,7 @@ private Object handleRemoteWriteOnlyManyCommand( } if (helper.shouldRegisterRemoteCallback(command)) { - return invokeNextThenApply(ctx, command, helper); + return invokeNextThenApply(ctx, command, helper.remoteCallback); } else { return invokeNext(ctx, command); } @@ -307,7 +277,7 @@ private Object handleReadWriteManyComm results = new Object[helper.getItems(command).size()]; } MergingCompletableFuture allFuture - = new MergingCompletableFuture<>(ctx, segmentMap.size(), results, helper::transformResult); + = new MergingCompletableFuture<>(segmentMap.size(), results, helper::transformResult); MutableInt offset = new MutableInt(); // Go through all members, for this node invokeNext (if this node is an owner of some keys), @@ -352,7 +322,7 @@ private void handleLocalSegmentsForRea C localCommand = helper.copyForLocal(command, myItems); InvocationFinallyAction handler = - createLocalInvocationHandler(ch, allFuture, segments, helper, moveListItemsToFuture(myOffset)); + createLocalInvocationHandler(ch, allFuture, segments, helper, MergingCompletableFuture.moveListItemsToFuture(myOffset)); if (retrievals == null) { invokeNextAndFinally(ctx, localCommand, handler); } else { @@ -391,7 +361,7 @@ private void handleRemoteSegmentsForReadWriteMany return; } Object responseValue = response.getResponseValue(); - moveListItemsToFuture(myOffset).accept(allFuture, responseValue); + MergingCompletableFuture.moveListItemsToFuture(responseValue, allFuture, myOffset); allFuture.countDown(); } }); @@ -414,7 +384,7 @@ private Object handleRemoteReadWriteManyCommand( } Object result = asyncInvokeNext(ctx, command, delay); if (helper.shouldRegisterRemoteCallback(command)) { - return makeStage(result).thenApply(ctx, command, helper); + return makeStage(result).thenApply(ctx, command, helper.remoteCallback); } else { return result; } @@ -492,333 +462,28 @@ private final static class MutableInt { public int value; } - private abstract class WriteManyCommandHelper - implements InvocationSuccessFunction { - public abstract C copyForLocal(C cmd, Container container); - - public abstract C copyForPrimary(C cmd, ConsistentHash ch, Set segments); - - public abstract C copyForBackup(C cmd, ConsistentHash ch, Set segments); - - public abstract Collection getItems(C cmd); - - public abstract Object item2key(Item item); - - public abstract Container newContainer(); - - public abstract void accumulate(Container container, Item item); - - public abstract int containerSize(Container container); - - public abstract boolean shouldRegisterRemoteCallback(C cmd); - - public abstract Object transformResult(Object[] results); - - @Override - public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv) throws Throwable { - C original = (C) rCommand; - ConsistentHash ch = checkTopologyId(original).getWriteConsistentHash(); - // We have already checked that the command topology is actual, so we can assume that we really are primary owner - Map> backups = backupOwnersOfSegments(ch, ch.getPrimarySegmentsForOwner(rpcManager.getAddress())); - if (backups.isEmpty()) { - return rv; - } - boolean isSync = isSynchronous(original); - CompletableFuture[] futures = isSync ? new CompletableFuture[backups.size()] : null; - int future = 0; - for (Entry> backup : backups.entrySet()) { - C copy = copyForBackup(original, ch, backup.getValue()); - if (isSync) { - futures[future++] = rpcManager.invokeRemotelyAsync(Collections.singleton(backup.getKey()), copy, defaultSyncOptions); - } else { - rpcManager.invokeRemotelyAsync(Collections.singleton(backup.getKey()), copy, defaultAsyncOptions); - } - } - return isSync ? asyncValue(CompletableFuture.allOf(futures).thenApply(nil -> rv)) : rv; - } - } - - private class PutMapHelper extends WriteManyCommandHelper, Entry> { - @Override - public PutMapCommand copyForLocal(PutMapCommand cmd, Map container) { - return new PutMapCommand(cmd).withMap(container); - } - - @Override - public PutMapCommand copyForPrimary(PutMapCommand cmd, ConsistentHash ch, Set segments) { - return new PutMapCommand(cmd).withMap(new ReadOnlySegmentAwareMap<>(cmd.getMap(), ch, segments)); - } - - @Override - public PutMapCommand copyForBackup(PutMapCommand cmd, ConsistentHash ch, Set segments) { - PutMapCommand copy = new PutMapCommand(cmd).withMap(new ReadOnlySegmentAwareMap(cmd.getMap(), ch, segments)); - copy.setForwarded(true); - return copy; - } - - @Override - public Collection> getItems(PutMapCommand cmd) { - return cmd.getMap().entrySet(); - } - - @Override - public Object item2key(Entry entry) { - return entry.getKey(); - } - - @Override - public Map newContainer() { - return new HashMap<>(); - } - - @Override - public void accumulate(Map map, Entry entry) { - map.put(entry.getKey(), entry.getValue()); - } - - @Override - public int containerSize(Map map) { - return map.size(); - } - - @Override - public boolean shouldRegisterRemoteCallback(PutMapCommand cmd) { - return !cmd.isForwarded(); - } - - @Override - public Object transformResult(Object[] results) { - if (results == null) return null; - Map result = new HashMap<>(); - for (Object r : results) { - Map.Entry entry = (Entry) r; - result.put(entry.getKey(), entry.getValue()); + private Object writeManyRemoteCallback(WriteManyCommandHelper helper, InvocationContext ctx, C command, Object rv) { + ConsistentHash ch = checkTopologyId(command).getWriteConsistentHash(); + // We have already checked that the command topology is actual, so we can assume that we really are primary owner + Map> backups = backupOwnersOfSegments(ch, ch.getPrimarySegmentsForOwner(rpcManager.getAddress())); + if (backups.isEmpty()) { + return rv; + } + boolean isSync = isSynchronous(command); + CompletableFuture[] futures = isSync ? new CompletableFuture[backups.size()] : null; + int future = 0; + for (Entry> backup : backups.entrySet()) { + C copy = helper.copyForBackup(command, ch, backup.getValue()); + if (isSync) { + futures[future++] = rpcManager.invokeRemotelyAsync(Collections.singleton(backup.getKey()), copy, defaultSyncOptions); + } else { + rpcManager.invokeRemotelyAsync(Collections.singleton(backup.getKey()), copy, defaultAsyncOptions); } - return result; } + return isSync ? asyncValue(CompletableFuture.allOf(futures).thenApply(nil -> rv)) : rv; } - private class ReadWriteManyEntriesHelper extends WriteManyCommandHelper, Entry> { - @Override - public ReadWriteManyEntriesCommand copyForLocal(ReadWriteManyEntriesCommand cmd, Map entries) { - return new ReadWriteManyEntriesCommand(cmd).withEntries(entries); - } - - @Override - public ReadWriteManyEntriesCommand copyForPrimary(ReadWriteManyEntriesCommand cmd, ConsistentHash ch, Set segments) { - return new ReadWriteManyEntriesCommand(cmd) - .withEntries(new ReadOnlySegmentAwareMap<>(cmd.getEntries(), ch, segments)); - } - - @Override - public ReadWriteManyEntriesCommand copyForBackup(ReadWriteManyEntriesCommand cmd, ConsistentHash ch, Set segments) { - ReadWriteManyEntriesCommand copy = new ReadWriteManyEntriesCommand(cmd) - .withEntries(new ReadOnlySegmentAwareMap(cmd.getEntries(), ch, segments)); - copy.setForwarded(true); - return copy; - } - - @Override - public Collection> getItems(ReadWriteManyEntriesCommand cmd) { - return cmd.getEntries().entrySet(); - } - - @Override - public Object item2key(Entry entry) { - return entry.getKey(); - } - - @Override - public Map newContainer() { - return new HashMap<>(); - } - - @Override - public void accumulate(Map map, Entry entry) { - map.put(entry.getKey(), entry.getValue()); - } - - @Override - public int containerSize(Map map) { - return map.size(); - } - - @Override - public boolean shouldRegisterRemoteCallback(ReadWriteManyEntriesCommand cmd) { - return !cmd.isForwarded(); - } - - @Override - public Object transformResult(Object[] results) { - return results == null ? null : Arrays.asList(results); - } - } - - private class ReadWriteManyHelper extends WriteManyCommandHelper, Object> { - @Override - public ReadWriteManyCommand copyForLocal(ReadWriteManyCommand cmd, Collection keys) { - return new ReadWriteManyCommand(cmd).withKeys(keys); - } - - @Override - public ReadWriteManyCommand copyForPrimary(ReadWriteManyCommand cmd, ConsistentHash ch, Set segments) { - return new ReadWriteManyCommand(cmd).withKeys(new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); - } - - @Override - public ReadWriteManyCommand copyForBackup(ReadWriteManyCommand cmd, ConsistentHash ch, Set segments) { - ReadWriteManyCommand copy = new ReadWriteManyCommand(cmd).withKeys( - new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); - copy.setForwarded(true); - return copy; - } - - @Override - public Collection getItems(ReadWriteManyCommand cmd) { - return cmd.getAffectedKeys(); - } - - @Override - public Object item2key(Object key) { - return key; - } - - @Override - public Collection newContainer() { - return new ArrayList<>(); - } - - @Override - public void accumulate(Collection list, Object key) { - list.add(key); - } - - @Override - public int containerSize(Collection list) { - return list.size(); - } - - @Override - public boolean shouldRegisterRemoteCallback(ReadWriteManyCommand cmd) { - return !cmd.isForwarded(); - } - - @Override - public Object transformResult(Object[] results) { - return results == null ? null : Arrays.asList(results); - } - } - - private class WriteOnlyManyEntriesHelper extends WriteManyCommandHelper, Entry> { - - @Override - public WriteOnlyManyEntriesCommand copyForLocal(WriteOnlyManyEntriesCommand cmd, Map entries) { - return new WriteOnlyManyEntriesCommand(cmd).withEntries(entries); - } - - @Override - public WriteOnlyManyEntriesCommand copyForPrimary(WriteOnlyManyEntriesCommand cmd, ConsistentHash ch, Set segments) { - return new WriteOnlyManyEntriesCommand(cmd) - .withEntries(new ReadOnlySegmentAwareMap<>(cmd.getEntries(), ch, segments)); - } - - @Override - public WriteOnlyManyEntriesCommand copyForBackup(WriteOnlyManyEntriesCommand cmd, ConsistentHash ch, Set segments) { - WriteOnlyManyEntriesCommand copy = new WriteOnlyManyEntriesCommand(cmd) - .withEntries(new ReadOnlySegmentAwareMap(cmd.getEntries(), ch, segments)); - copy.setForwarded(true); - return copy; - } - - @Override - public Collection> getItems(WriteOnlyManyEntriesCommand cmd) { - return cmd.getEntries().entrySet(); - } - - @Override - public Object item2key(Entry entry) { - return entry.getKey(); - } - - @Override - public Map newContainer() { - return new HashMap<>(); - } - - @Override - public void accumulate(Map map, Entry entry) { - map.put(entry.getKey(), entry.getValue()); - } - - @Override - public int containerSize(Map map) { - return map.size(); - } - - @Override - public boolean shouldRegisterRemoteCallback(WriteOnlyManyEntriesCommand cmd) { - return !cmd.isForwarded(); - } - - @Override - public Object transformResult(Object[] results) { - return results == null ? null : Arrays.asList(results); - } - } - - private class WriteOnlyManyHelper extends WriteManyCommandHelper, Object> { - @Override - public WriteOnlyManyCommand copyForLocal(WriteOnlyManyCommand cmd, Collection keys) { - return new WriteOnlyManyCommand(cmd).withKeys(keys); - } - - @Override - public WriteOnlyManyCommand copyForPrimary(WriteOnlyManyCommand cmd, ConsistentHash ch, Set segments) { - return new WriteOnlyManyCommand(cmd) - .withKeys(new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); - } - - @Override - public WriteOnlyManyCommand copyForBackup(WriteOnlyManyCommand cmd, ConsistentHash ch, Set segments) { - WriteOnlyManyCommand copy = new WriteOnlyManyCommand(cmd) - .withKeys(new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); - copy.setForwarded(true); - return copy; - } - - @Override - public Collection getItems(WriteOnlyManyCommand cmd) { - return cmd.getAffectedKeys(); - } - - @Override - public Object item2key(Object key) { - return key; - } - - @Override - public Collection newContainer() { - return new ArrayList<>(); - } - - @Override - public void accumulate(Collection list, Object key) { - list.add(key); - } - - @Override - public int containerSize(Collection list) { - return list.size(); - } - - @Override - public boolean shouldRegisterRemoteCallback(WriteOnlyManyCommand cmd) { - return !cmd.isForwarded(); - } - - @Override - public Object transformResult(Object[] results) { - return results == null ? null : Arrays.asList(results); - } + private InvocationSuccessFunction createRemoteCallback(WriteManyCommandHelper helper) { + return (ctx, command, rv) -> writeManyRemoteCallback(helper, ctx, (C) command, rv); } } diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java new file mode 100644 index 000000000000..27e45f599134 --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java @@ -0,0 +1,76 @@ +package org.infinispan.interceptors.distribution; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.commands.write.PutMapCommand; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.distribution.util.ReadOnlySegmentAwareMap; +import org.infinispan.interceptors.InvocationSuccessFunction; + +class PutMapHelper extends WriteManyCommandHelper, Map.Entry> { + PutMapHelper(Function, InvocationSuccessFunction> createRemoteCallback) { + super(createRemoteCallback); + } + + @Override + public PutMapCommand copyForLocal(PutMapCommand cmd, Map container) { + return new PutMapCommand(cmd).withMap(container); + } + + @Override + public PutMapCommand copyForPrimary(PutMapCommand cmd, ConsistentHash ch, Set segments) { + return new PutMapCommand(cmd).withMap(new ReadOnlySegmentAwareMap<>(cmd.getMap(), ch, segments)); + } + + @Override + public PutMapCommand copyForBackup(PutMapCommand cmd, ConsistentHash ch, Set segments) { + PutMapCommand copy = new PutMapCommand(cmd).withMap(new ReadOnlySegmentAwareMap(cmd.getMap(), ch, segments)); + copy.setForwarded(true); + return copy; + } + + @Override + public Collection> getItems(PutMapCommand cmd) { + return cmd.getMap().entrySet(); + } + + @Override + public Object item2key(Map.Entry entry) { + return entry.getKey(); + } + + @Override + public Map newContainer() { + return new HashMap<>(); + } + + @Override + public void accumulate(Map map, Map.Entry entry) { + map.put(entry.getKey(), entry.getValue()); + } + + @Override + public int containerSize(Map map) { + return map.size(); + } + + @Override + public boolean shouldRegisterRemoteCallback(PutMapCommand cmd) { + return !cmd.isForwarded(); + } + + @Override + public Object transformResult(Object[] results) { + if (results == null) return null; + Map result = new HashMap<>(); + for (Object r : results) { + Map.Entry entry = (Map.Entry) r; + result.put(entry.getKey(), entry.getValue()); + } + return result; + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java new file mode 100644 index 000000000000..3bff44e7bcb3 --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java @@ -0,0 +1,73 @@ +package org.infinispan.interceptors.distribution; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.commands.functional.ReadWriteManyEntriesCommand; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.distribution.util.ReadOnlySegmentAwareMap; +import org.infinispan.interceptors.InvocationSuccessFunction; + +class ReadWriteManyEntriesHelper extends WriteManyCommandHelper, Map.Entry> { + ReadWriteManyEntriesHelper(Function, InvocationSuccessFunction> createRemoteCallback) { + super(createRemoteCallback); + } + + @Override + public ReadWriteManyEntriesCommand copyForLocal(ReadWriteManyEntriesCommand cmd, Map entries) { + return new ReadWriteManyEntriesCommand(cmd).withEntries(entries); + } + + @Override + public ReadWriteManyEntriesCommand copyForPrimary(ReadWriteManyEntriesCommand cmd, ConsistentHash ch, Set segments) { + return new ReadWriteManyEntriesCommand(cmd) + .withEntries(new ReadOnlySegmentAwareMap<>(cmd.getEntries(), ch, segments)); + } + + @Override + public ReadWriteManyEntriesCommand copyForBackup(ReadWriteManyEntriesCommand cmd, ConsistentHash ch, Set segments) { + ReadWriteManyEntriesCommand copy = new ReadWriteManyEntriesCommand(cmd) + .withEntries(new ReadOnlySegmentAwareMap(cmd.getEntries(), ch, segments)); + copy.setForwarded(true); + return copy; + } + + @Override + public Collection> getItems(ReadWriteManyEntriesCommand cmd) { + return cmd.getEntries().entrySet(); + } + + @Override + public Object item2key(Map.Entry entry) { + return entry.getKey(); + } + + @Override + public Map newContainer() { + return new HashMap<>(); + } + + @Override + public void accumulate(Map map, Map.Entry entry) { + map.put(entry.getKey(), entry.getValue()); + } + + @Override + public int containerSize(Map map) { + return map.size(); + } + + @Override + public boolean shouldRegisterRemoteCallback(ReadWriteManyEntriesCommand cmd) { + return !cmd.isForwarded(); + } + + @Override + public Object transformResult(Object[] results) { + return results == null ? null : Arrays.asList(results); + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java new file mode 100644 index 000000000000..5aa758c81434 --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java @@ -0,0 +1,71 @@ +package org.infinispan.interceptors.distribution; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.commands.functional.ReadWriteManyCommand; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.distribution.util.ReadOnlySegmentAwareCollection; +import org.infinispan.interceptors.InvocationSuccessFunction; + +class ReadWriteManyHelper extends WriteManyCommandHelper, Object> { + ReadWriteManyHelper(Function, InvocationSuccessFunction> createRemoteCallback) { + super(createRemoteCallback); + } + + @Override + public ReadWriteManyCommand copyForLocal(ReadWriteManyCommand cmd, Collection keys) { + return new ReadWriteManyCommand(cmd).withKeys(keys); + } + + @Override + public ReadWriteManyCommand copyForPrimary(ReadWriteManyCommand cmd, ConsistentHash ch, Set segments) { + return new ReadWriteManyCommand(cmd).withKeys(new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); + } + + @Override + public ReadWriteManyCommand copyForBackup(ReadWriteManyCommand cmd, ConsistentHash ch, Set segments) { + ReadWriteManyCommand copy = new ReadWriteManyCommand(cmd).withKeys( + new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); + copy.setForwarded(true); + return copy; + } + + @Override + public Collection getItems(ReadWriteManyCommand cmd) { + return cmd.getAffectedKeys(); + } + + @Override + public Object item2key(Object key) { + return key; + } + + @Override + public Collection newContainer() { + return new ArrayList<>(); + } + + @Override + public void accumulate(Collection list, Object key) { + list.add(key); + } + + @Override + public int containerSize(Collection list) { + return list.size(); + } + + @Override + public boolean shouldRegisterRemoteCallback(ReadWriteManyCommand cmd) { + return !cmd.isForwarded(); + } + + @Override + public Object transformResult(Object[] results) { + return results == null ? null : Arrays.asList(results); + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java new file mode 100644 index 000000000000..32cf836a3aef --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java @@ -0,0 +1,37 @@ +package org.infinispan.interceptors.distribution; + +import java.util.Collection; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.commands.write.WriteCommand; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.interceptors.InvocationSuccessFunction; + +abstract class WriteManyCommandHelper { + protected final InvocationSuccessFunction remoteCallback; + + protected WriteManyCommandHelper(Function, InvocationSuccessFunction> createRemoteCallback) { + this.remoteCallback = createRemoteCallback.apply(this); + } + + public abstract C copyForLocal(C cmd, Container container); + + public abstract C copyForPrimary(C cmd, ConsistentHash ch, Set segments); + + public abstract C copyForBackup(C cmd, ConsistentHash ch, Set segments); + + public abstract Collection getItems(C cmd); + + public abstract Object item2key(Item item); + + public abstract Container newContainer(); + + public abstract void accumulate(Container container, Item item); + + public abstract int containerSize(Container container); + + public abstract boolean shouldRegisterRemoteCallback(C cmd); + + public abstract Object transformResult(Object[] results); +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java new file mode 100644 index 000000000000..01a528c726ec --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java @@ -0,0 +1,73 @@ +package org.infinispan.interceptors.distribution; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.distribution.util.ReadOnlySegmentAwareMap; +import org.infinispan.interceptors.InvocationSuccessFunction; + +class WriteOnlyManyEntriesHelper extends WriteManyCommandHelper, Map.Entry> { + WriteOnlyManyEntriesHelper(Function, InvocationSuccessFunction> createRemoteCallback) { + super(createRemoteCallback); + } + + @Override + public WriteOnlyManyEntriesCommand copyForLocal(WriteOnlyManyEntriesCommand cmd, Map entries) { + return new WriteOnlyManyEntriesCommand(cmd).withEntries(entries); + } + + @Override + public WriteOnlyManyEntriesCommand copyForPrimary(WriteOnlyManyEntriesCommand cmd, ConsistentHash ch, Set segments) { + return new WriteOnlyManyEntriesCommand(cmd) + .withEntries(new ReadOnlySegmentAwareMap<>(cmd.getEntries(), ch, segments)); + } + + @Override + public WriteOnlyManyEntriesCommand copyForBackup(WriteOnlyManyEntriesCommand cmd, ConsistentHash ch, Set segments) { + WriteOnlyManyEntriesCommand copy = new WriteOnlyManyEntriesCommand(cmd) + .withEntries(new ReadOnlySegmentAwareMap(cmd.getEntries(), ch, segments)); + copy.setForwarded(true); + return copy; + } + + @Override + public Collection> getItems(WriteOnlyManyEntriesCommand cmd) { + return cmd.getEntries().entrySet(); + } + + @Override + public Object item2key(Map.Entry entry) { + return entry.getKey(); + } + + @Override + public Map newContainer() { + return new HashMap<>(); + } + + @Override + public void accumulate(Map map, Map.Entry entry) { + map.put(entry.getKey(), entry.getValue()); + } + + @Override + public int containerSize(Map map) { + return map.size(); + } + + @Override + public boolean shouldRegisterRemoteCallback(WriteOnlyManyEntriesCommand cmd) { + return !cmd.isForwarded(); + } + + @Override + public Object transformResult(Object[] results) { + return results == null ? null : Arrays.asList(results); + } +} diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java new file mode 100644 index 000000000000..21b5b176915d --- /dev/null +++ b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java @@ -0,0 +1,72 @@ +package org.infinispan.interceptors.distribution; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.commands.functional.WriteOnlyManyCommand; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.distribution.util.ReadOnlySegmentAwareCollection; +import org.infinispan.interceptors.InvocationSuccessFunction; + +class WriteOnlyManyHelper extends WriteManyCommandHelper, Object> { + WriteOnlyManyHelper(Function, InvocationSuccessFunction> createRemoteCallback) { + super(createRemoteCallback); + } + + @Override + public WriteOnlyManyCommand copyForLocal(WriteOnlyManyCommand cmd, Collection keys) { + return new WriteOnlyManyCommand(cmd).withKeys(keys); + } + + @Override + public WriteOnlyManyCommand copyForPrimary(WriteOnlyManyCommand cmd, ConsistentHash ch, Set segments) { + return new WriteOnlyManyCommand(cmd) + .withKeys(new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); + } + + @Override + public WriteOnlyManyCommand copyForBackup(WriteOnlyManyCommand cmd, ConsistentHash ch, Set segments) { + WriteOnlyManyCommand copy = new WriteOnlyManyCommand(cmd) + .withKeys(new ReadOnlySegmentAwareCollection(cmd.getAffectedKeys(), ch, segments)); + copy.setForwarded(true); + return copy; + } + + @Override + public Collection getItems(WriteOnlyManyCommand cmd) { + return cmd.getAffectedKeys(); + } + + @Override + public Object item2key(Object key) { + return key; + } + + @Override + public Collection newContainer() { + return new ArrayList<>(); + } + + @Override + public void accumulate(Collection list, Object key) { + list.add(key); + } + + @Override + public int containerSize(Collection list) { + return list.size(); + } + + @Override + public boolean shouldRegisterRemoteCallback(WriteOnlyManyCommand cmd) { + return !cmd.isForwarded(); + } + + @Override + public Object transformResult(Object[] results) { + return results == null ? null : Arrays.asList(results); + } +} From 8fc071a411bfa85a84bf62096530f195e7723f5d Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Fri, 21 Jul 2017 10:58:28 +0200 Subject: [PATCH 2/2] ISPN-8078 Support functional commands in scattered mode * Scattered cache now uses RepeatableReadEntries to support concurrent changes in multi-key operations --- .../ReadWriteManyEntriesCommand.java | 7 +- .../functional/WriteOnlyManyCommand.java | 3 +- .../WriteOnlyManyEntriesCommand.java | 10 +- .../commands/write/PutMapCommand.java | 7 +- .../commands/write/RemoveCommand.java | 3 + .../container/EntryFactoryImpl.java | 5 +- .../container/entries/MVCCEntry.java | 10 + .../container/entries/ReadCommittedEntry.java | 13 +- .../entries/RepeatableReadEntry.java | 12 + .../BaseDistributionInterceptor.java | 3 +- .../MergingCompletableFuture.java | 4 + .../distribution/PutMapHelper.java | 8 +- .../ReadWriteManyEntriesHelper.java | 10 +- .../distribution/ReadWriteManyHelper.java | 5 + .../ScatteredDistributionInterceptor.java | 953 +++++++++++------- .../distribution/WriteManyCommandHelper.java | 2 + .../WriteOnlyManyEntriesHelper.java | 10 +- .../distribution/WriteOnlyManyHelper.java | 5 + .../impl/EntryWrappingInterceptor.java | 19 +- .../RetryingEntryWrappingInterceptor.java | 19 +- .../distribution/BlockingInterceptor.java | 14 +- .../StateTransferOverwritingValueTest.java | 6 +- .../functional/AbstractFunctionalOpTest.java | 27 +- .../functional/AbstractFunctionalTest.java | 16 +- .../functional/FunctionalCachestoreTest.java | 4 +- .../functional/FunctionalInMemoryTest.java | 22 +- .../FunctionalScatteredInMemoryTest.java | 112 ++ .../functional/FunctionalTxInMemoryTest.java | 15 +- .../FunctionalWriteSkewInMemoryTest.java | 2 +- .../stress/ReadOnlyManyCommandStressTest.java | 2 +- .../scattered/ScatteredSyncFuncTest.java | 70 +- 31 files changed, 963 insertions(+), 435 deletions(-) create mode 100644 core/src/test/java/org/infinispan/functional/FunctionalScatteredInMemoryTest.java diff --git a/core/src/main/java/org/infinispan/commands/functional/ReadWriteManyEntriesCommand.java b/core/src/main/java/org/infinispan/commands/functional/ReadWriteManyEntriesCommand.java index 9b1be922cc13..2069f1b8d2e3 100644 --- a/core/src/main/java/org/infinispan/commands/functional/ReadWriteManyEntriesCommand.java +++ b/core/src/main/java/org/infinispan/commands/functional/ReadWriteManyEntriesCommand.java @@ -7,12 +7,14 @@ import java.io.ObjectOutput; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; import org.infinispan.commands.CommandInvocationId; import org.infinispan.commands.Visitor; +import org.infinispan.commons.marshall.MarshallUtil; import org.infinispan.functional.EntryView.ReadWriteEntryView; import org.infinispan.container.entries.CacheEntry; import org.infinispan.context.InvocationContext; @@ -67,7 +69,7 @@ public byte getCommandId() { @Override public void writeTo(ObjectOutput output) throws IOException { CommandInvocationId.writeTo(output, commandInvocationId); - output.writeObject(entries); + MarshallUtil.marshallMap(entries, output); output.writeObject(f); output.writeBoolean(isForwarded); Params.writeObject(output, params); @@ -78,7 +80,8 @@ public void writeTo(ObjectOutput output) throws IOException { @Override public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { commandInvocationId = CommandInvocationId.readFrom(input); - entries = (Map) input.readObject(); + // We use LinkedHashMap in order to guarantee the same order of iteration + entries = MarshallUtil.unmarshallMap(input, LinkedHashMap::new); f = (BiFunction, R>) input.readObject(); isForwarded = input.readBoolean(); params = Params.readObject(input); diff --git a/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyCommand.java b/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyCommand.java index d8f873543b0b..a5dbca90e4ab 100644 --- a/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyCommand.java +++ b/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyCommand.java @@ -93,7 +93,8 @@ public Object perform(InvocationContext ctx) throws Throwable { @Override public boolean isReturnValueExpected() { - return false; + // Scattered cache always needs some response. + return true; } @Override diff --git a/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyEntriesCommand.java b/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyEntriesCommand.java index f97513027325..7539c86b19f5 100644 --- a/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyEntriesCommand.java +++ b/core/src/main/java/org/infinispan/commands/functional/WriteOnlyManyEntriesCommand.java @@ -4,11 +4,13 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.Map; import java.util.function.BiConsumer; import org.infinispan.commands.CommandInvocationId; import org.infinispan.commands.Visitor; +import org.infinispan.commons.marshall.MarshallUtil; import org.infinispan.functional.EntryView.WriteEntryView; import org.infinispan.container.entries.CacheEntry; import org.infinispan.context.InvocationContext; @@ -58,7 +60,7 @@ public byte getCommandId() { @Override public void writeTo(ObjectOutput output) throws IOException { CommandInvocationId.writeTo(output, commandInvocationId); - output.writeObject(entries); + MarshallUtil.marshallMap(entries, output); output.writeObject(f); output.writeBoolean(isForwarded); Params.writeObject(output, params); @@ -69,7 +71,8 @@ public void writeTo(ObjectOutput output) throws IOException { @Override public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { commandInvocationId = CommandInvocationId.readFrom(input); - entries = (Map) input.readObject(); + // We use LinkedHashMap in order to guarantee the same order of iteration + entries = MarshallUtil.unmarshallMap(input, LinkedHashMap::new); f = (BiConsumer>) input.readObject(); isForwarded = input.readBoolean(); params = Params.readObject(input); @@ -93,7 +96,8 @@ public Object perform(InvocationContext ctx) throws Throwable { @Override public boolean isReturnValueExpected() { - return false; + // Scattered cache always needs some response. + return true; } @Override diff --git a/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java b/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java index 2eefb8bc4423..fdf493070fa4 100644 --- a/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java +++ b/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; @@ -16,6 +17,7 @@ import org.infinispan.commands.CommandInvocationId; import org.infinispan.commands.MetadataAwareCommand; import org.infinispan.commands.Visitor; +import org.infinispan.commons.marshall.MarshallUtil; import org.infinispan.container.entries.MVCCEntry; import org.infinispan.context.InvocationContext; import org.infinispan.context.impl.FlagBitSets; @@ -157,7 +159,7 @@ public byte getCommandId() { @Override public void writeTo(ObjectOutput output) throws IOException { - output.writeObject(map); + MarshallUtil.marshallMap(map, output); output.writeObject(metadata); output.writeBoolean(isForwarded); output.writeLong(FlagBitSets.copyWithoutRemotableFlags(getFlagsBitSet())); @@ -166,8 +168,7 @@ public void writeTo(ObjectOutput output) throws IOException { @Override public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { - //noinspection unchecked - map = (Map) input.readObject(); + map = MarshallUtil.unmarshallMap(input, LinkedHashMap::new); metadata = (Metadata) input.readObject(); isForwarded = input.readBoolean(); setFlagsBitSet(input.readLong()); diff --git a/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java b/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java index 402e81b75fd5..f4c2a3c4beeb 100644 --- a/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java +++ b/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java @@ -235,6 +235,9 @@ protected Object performRemove(MVCCEntry e, Object prevValue, InvocationContext e.setValid(false); e.setChanged(true); e.setValue(null); + if (metadata != null) { + e.setMetadata(metadata); + } if (valueMatcher != ValueMatcher.MATCH_EXPECTED_OR_NEW) { return isConditional() ? true : prevValue; diff --git a/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java b/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java index f3db61c95e6c..efcb2e15f85f 100644 --- a/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java +++ b/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java @@ -52,8 +52,11 @@ public void injectDependencies(DataContainer dataContainer, Configuration config @Start (priority = 8) public void init() { + // Scattered mode needs repeatable-read entries to properly retry half-committed multi-key operations + // (see RetryingEntryWrappingInterceptor for details). useRepeatableRead = configuration.transaction().transactionMode().isTransactional() - && configuration.locking().isolationLevel() == IsolationLevel.REPEATABLE_READ; + && configuration.locking().isolationLevel() == IsolationLevel.REPEATABLE_READ + || configuration.clustering().cacheMode().isScattered(); isL1Enabled = configuration.clustering().l1().enabled(); // Write-skew check implies isolation level = REPEATABLE_READ && locking mode = OPTIMISTIC useVersioning = Configurations.isTxVersioned(configuration); diff --git a/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java b/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java index 34d76ff84c67..d8274df8b15e 100644 --- a/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java +++ b/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java @@ -67,4 +67,14 @@ default void setRead() {} default boolean isRead() { return false; } + + /** + * Mark this context-entry as already committed to the {@link DataContainer}. + */ + default void setCommitted() {} + + /** + * @return True if this context entry has been committed to the {@link DataContainer} + */ + default boolean isCommitted() { return false; } } diff --git a/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java b/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java index 56080839981c..92e08dd27ac2 100644 --- a/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java +++ b/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java @@ -2,6 +2,7 @@ import static org.infinispan.commons.util.Util.toStr; import static org.infinispan.container.entries.ReadCommittedEntry.Flags.CHANGED; +import static org.infinispan.container.entries.ReadCommittedEntry.Flags.COMMITTED; import static org.infinispan.container.entries.ReadCommittedEntry.Flags.CREATED; import static org.infinispan.container.entries.ReadCommittedEntry.Flags.EVICTED; import static org.infinispan.container.entries.ReadCommittedEntry.Flags.EXPIRED; @@ -44,7 +45,7 @@ protected enum Flags { CHANGED(1), CREATED(1 << 1), REMOVED(1 << 2), - // 1 << 3 no longer used (was: VALID) + COMMITTED(1 << 3), EVICTED(1 << 4), EXPIRED(1 << 5), SKIP_LOOKUP(1 << 6), @@ -208,6 +209,16 @@ public boolean isExpired() { return isFlagSet(EXPIRED); } + @Override + public void setCommitted() { + setFlag(COMMITTED); + } + + @Override + public boolean isCommitted() { + return isFlagSet(COMMITTED); + } + @Override public void resetCurrentValue() { // noop, the entry is removed from context diff --git a/core/src/main/java/org/infinispan/container/entries/RepeatableReadEntry.java b/core/src/main/java/org/infinispan/container/entries/RepeatableReadEntry.java index 36e77ead47a5..72344286c9f3 100644 --- a/core/src/main/java/org/infinispan/container/entries/RepeatableReadEntry.java +++ b/core/src/main/java/org/infinispan/container/entries/RepeatableReadEntry.java @@ -18,10 +18,12 @@ public class RepeatableReadEntry extends ReadCommittedEntry { /* Value before the last modification. Serves as the previous value when the operation is retried */ protected Object oldValue; + protected Metadata oldMetadata; public RepeatableReadEntry(Object key, Object value, Metadata metadata) { super(key, value, metadata); this.oldValue = value; + this.oldMetadata = metadata; } @Override @@ -49,11 +51,21 @@ public final Object setValue(Object value) { @Override public void resetCurrentValue() { value = oldValue; + metadata = oldMetadata; } @Override public void updatePreviousValue() { oldValue = value; + oldMetadata = metadata; + } + + public Object getOldValue() { + return oldValue; + } + + public Metadata getOldMetadata() { + return oldMetadata; } public void setRead() { diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java index 60a2e4438145..ccd17dcb441d 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java @@ -11,8 +11,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Supplier; -import java.util.stream.*; +import java.util.stream.Stream; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.ReplicableCommand; diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java b/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java index 808faaa49456..00caaff17244 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/MergingCompletableFuture.java @@ -1,5 +1,6 @@ package org.infinispan.interceptors.distribution; +import java.lang.reflect.Array; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -33,6 +34,9 @@ static void moveListItemsToFuture(Object rv, MergingCompletableFuture f, items = ((Map) rv).entrySet(); } else if (rv instanceof Collection) { items = (Collection) rv; + } else if (rv != null && rv.getClass().isArray() && !rv.getClass().getComponentType().isPrimitive()) { + System.arraycopy(rv, 0, f.results, myOffset, Array.getLength(rv)); + return; } else { f.completeExceptionally(new IllegalArgumentException("Unexpected result value " + rv)); return; diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java index 27e45f599134..0d3030874909 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/PutMapHelper.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -45,7 +46,7 @@ public Object item2key(Map.Entry entry) { @Override public Map newContainer() { - return new HashMap<>(); + return new LinkedHashMap<>(); } @Override @@ -58,6 +59,11 @@ public int containerSize(Map map) { return map.size(); } + @Override + public Iterable toKeys(Map map) { + return map.keySet(); + } + @Override public boolean shouldRegisterRemoteCallback(PutMapCommand cmd) { return !cmd.isForwarded(); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java index 3bff44e7bcb3..83697a45a8a7 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyEntriesHelper.java @@ -2,7 +2,7 @@ import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -48,7 +48,8 @@ public Object item2key(Map.Entry entry) { @Override public Map newContainer() { - return new HashMap<>(); + // Make sure the iteration in containers is ordered + return new LinkedHashMap<>(); } @Override @@ -61,6 +62,11 @@ public int containerSize(Map map) { return map.size(); } + @Override + public Iterable toKeys(Map map) { + return map.keySet(); + } + @Override public boolean shouldRegisterRemoteCallback(ReadWriteManyEntriesCommand cmd) { return !cmd.isForwarded(); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java index 5aa758c81434..e52b66776d53 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/ReadWriteManyHelper.java @@ -59,6 +59,11 @@ public int containerSize(Collection list) { return list.size(); } + @Override + public Iterable toKeys(Collection list) { + return list; + } + @Override public boolean shouldRegisterRemoteCallback(ReadWriteManyCommand cmd) { return !cmd.isForwarded(); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java index 1acb6b73b965..e06186dccaf4 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java @@ -1,5 +1,7 @@ package org.infinispan.interceptors.distribution; +import static org.infinispan.commands.VisitableCommand.LoadType.DONT_LOAD; + import org.infinispan.commands.AbstractVisitor; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.MetadataAwareCommand; @@ -22,7 +24,6 @@ import org.infinispan.commands.remote.ClusteredGetAllCommand; import org.infinispan.commands.remote.ClusteredGetCommand; import org.infinispan.commands.remote.GetKeysInGroupCommand; -import org.infinispan.commands.write.ApplyDeltaCommand; import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.ComputeCommand; import org.infinispan.commands.write.ComputeIfAbsentCommand; @@ -32,14 +33,16 @@ import org.infinispan.commands.write.RemoveCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.ValueMatcher; +import org.infinispan.commands.write.WriteCommand; import org.infinispan.commons.CacheException; -import org.infinispan.commons.util.ByRef; +import org.infinispan.commons.util.ArrayCollector; import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.container.entries.InternalCacheValue; -import org.infinispan.container.entries.MVCCEntry; import org.infinispan.container.entries.NullCacheEntry; import org.infinispan.container.entries.RemoteMetadata; +import org.infinispan.container.entries.RepeatableReadEntry; +import org.infinispan.container.entries.metadata.MetadataImmortalCacheEntry; import org.infinispan.container.entries.metadata.MetadataImmortalCacheValue; import org.infinispan.container.versioning.EntryVersion; import org.infinispan.container.versioning.InequalVersionComparisonResult; @@ -53,6 +56,7 @@ import org.infinispan.distribution.group.impl.GroupManager; import org.infinispan.factories.annotations.Inject; import org.infinispan.functional.impl.FunctionalNotifier; +import org.infinispan.interceptors.InvocationFinallyAction; import org.infinispan.interceptors.InvocationSuccessAction; import org.infinispan.interceptors.InvocationSuccessFunction; import org.infinispan.interceptors.impl.ClusteringInterceptor; @@ -77,11 +81,15 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Stream; /** * This interceptor mixes several functions: @@ -110,19 +118,10 @@ public class ScatteredDistributionInterceptor extends ClusteringInterceptor { private volatile Address cachedNextMember; private volatile int cachedNextMemberTopology = -1; - private final InvocationSuccessAction dataWriteCommandNoReadHandler = (rCtx, rCommand, rv) -> { - DataWriteCommand dataWriteCommand = (DataWriteCommand) rCommand; - CacheEntry entry = rCtx.lookupEntry(dataWriteCommand.getKey()); - boolean committed = commitSingleEntryIfNewer(entry, rCtx, dataWriteCommand); - if (committed && rCtx.isOriginLocal() && !dataWriteCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { - svm.scheduleKeyInvalidation(dataWriteCommand.getKey(), entry.getMetadata().version(), entry.isRemoved()); - } - }; - private final InvocationSuccessAction putMapCommandHandler = (rCtx, rCommand, rv) -> { PutMapCommand putMapCommand = (PutMapCommand) rCommand; for (Object key : putMapCommand.getAffectedKeys()) { - commitSingleEntryIfNewer(rCtx.lookupEntry(key), rCtx, rCommand); + commitSingleEntryIfNewer((RepeatableReadEntry) rCtx.lookupEntry(key), rCtx, rCommand); // this handler is called only for ST or when isOriginLocal() == false so we don't have to invalidate } }; @@ -135,6 +134,14 @@ public class ScatteredDistributionInterceptor extends ClusteringInterceptor { } }; private InvocationSuccessFunction handleWritePrimaryResponse = this::handleWritePrimaryResponse; + private InvocationSuccessFunction handleWriteManyOnPrimary = this::handleWriteManyOnPrimary; + + private PutMapHelper putMapHelper = new PutMapHelper(helper -> null); + private ReadWriteManyHelper readWriteManyHelper = new ReadWriteManyHelper(helper -> null); + private ReadWriteManyEntriesHelper readWriteManyEntriesHelper = new ReadWriteManyEntriesHelper(helper -> null); + private WriteOnlyManyHelper writeOnlyManyHelper = new WriteOnlyManyHelper(helper -> null); + private WriteOnlyManyEntriesHelper writeOnlyManyEntriesHelper = new WriteOnlyManyEntriesHelper(helper -> null); + @Inject public void injectDependencies(GroupManager groupManager, ScatteredVersionManager svm, TimeService timeService, @@ -149,8 +156,8 @@ public void injectDependencies(GroupManager groupManager, ScatteredVersionManage this.distributionManager = distributionManager; } - private Object handleWriteCommand(InvocationContext ctx, T command) throws Throwable { - CacheEntry cacheEntry = ctx.lookupEntry(command.getKey()); + private Object handleWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { + RepeatableReadEntry cacheEntry = (RepeatableReadEntry) ctx.lookupEntry(command.getKey()); EntryVersion seenVersion = getVersionOrNull(cacheEntry); LocalizedCacheTopology cacheTopology = checkTopology(command); @@ -160,32 +167,32 @@ private Object handleWriteCo } if (isLocalModeForced(command)) { - CacheEntry contextEntry = cacheEntry; + RepeatableReadEntry contextEntry = cacheEntry; if (cacheEntry == null) { entryFactory.wrapExternalEntry(ctx, command.getKey(), null, false, true); - contextEntry = ctx.lookupEntry(command.getKey()); + contextEntry = (RepeatableReadEntry) ctx.lookupEntry(command.getKey()); } + EntryVersion nextVersion = null; if (command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { - contextEntry.setMetadata(command.getMetadata()); + // we don't increment versions with state transfer } else if (info.isPrimary()) { - if (cacheTopology.getTopologyId() == 0) { - // this is the singleton topology used for preload - contextEntry.setMetadata(command.getMetadata()); - svm.updatePreloadedEntryVersion(command.getMetadata().version()); + if (cacheTopology.getTopologyId() == 0 && command instanceof MetadataAwareCommand) { + // Preload does not use functional commands which are not metadata-aware + Metadata metadata = ((MetadataAwareCommand) command).getMetadata(); + svm.updatePreloadedEntryVersion(metadata.version()); } else { // let's allow local-mode writes on primary owner, preserving versions - EntryVersion nextVersion = svm.incrementVersion(info.segmentId()); - contextEntry.setMetadata(addVersion(command.getMetadata(), nextVersion)); + nextVersion = svm.incrementVersion(info.segmentId()); } } - return commitSingleEntryOnReturn(ctx, command, contextEntry, contextEntry.getValue(), seenVersion); + return commitSingleEntryOnReturn(ctx, command, contextEntry, nextVersion); } if (ctx.isOriginLocal()) { if (info.isPrimary()) { Object seenValue = cacheEntry.getValue(); return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> - handleWriteOnOriginPrimary(rCtx, (T) rCommand, rv, cacheEntry, seenValue, seenVersion, cacheTopology, info)); + handleWriteOnOriginPrimary(rCtx, (DataWriteCommand) rCommand, rv, cacheEntry, seenValue, seenVersion, cacheTopology, info)); } else { // not primary owner CompletableFuture> rpcFuture = rpcManager.invokeRemotelyAsync(info.writeOwners(), command, defaultSyncOptions); @@ -193,45 +200,36 @@ private Object handleWriteCo } } else { // remote origin if (info.isPrimary()) { - Object seenValue = cacheEntry.getValue(); // TODO [ISPN-3918]: the previous value is unreliable as this could be second invocation return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> { - T cmd = (T) rCommand; + DataWriteCommand cmd = (DataWriteCommand) rCommand; if (!cmd.isSuccessful()) { if (trace) log.tracef("Skipping the replication of the command as it did not succeed on primary owner (%s).", cmd); return rv; } EntryVersion nextVersion = svm.incrementVersion(info.segmentId()); - Metadata metadata = addVersion(cmd.getMetadata(), nextVersion); + Metadata metadata = addVersion(cacheEntry.getMetadata(), nextVersion); cacheEntry.setMetadata(metadata); - cmd.setMetadata(metadata); - if (cmd.loadType() != VisitableCommand.LoadType.DONT_LOAD) { - commitSingleEntryIfNoChange(seenValue, seenVersion, cacheEntry, rCtx, cmd); + if (cmd.loadType() != DONT_LOAD) { + commitSingleEntryIfNoChange(cacheEntry, rCtx, cmd); } else { commitSingleEntryIfNewer(cacheEntry, rCtx, cmd); } - if (cmd.isReturnValueExpected()) { - return new MetadataImmortalCacheValue(rv, metadata); - } else { - // force return value to be sent in the response (the version) - command.setFlagsBitSet(command.getFlagsBitSet() & ~(FlagBitSets.IGNORE_RETURN_VALUES | FlagBitSets.SKIP_REMOTE_LOOKUP)); - return metadata.version(); - } + return cmd.acceptVisitor(ctx, new PrimaryResponseGenerator(cacheEntry, rv)); }); } else { // The origin is primary and we're merely backup saving the data assert cacheEntry == null || command.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK); - CacheEntry contextEntry; + RepeatableReadEntry contextEntry; if (cacheEntry == null) { entryFactory.wrapExternalEntry(ctx, command.getKey(), null, false, true); - contextEntry = ctx.lookupEntry(command.getKey()); + contextEntry = (RepeatableReadEntry) ctx.lookupEntry(command.getKey()); } else { contextEntry = cacheEntry; } - contextEntry.setMetadata(command.getMetadata()); return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> { commitSingleEntryIfNewer(contextEntry, rCtx, rCommand); return null; @@ -240,9 +238,9 @@ private Object handleWriteCo } } - private Object handleWriteOnOriginPrimary(InvocationContext ctx, T command, Object rv, - CacheEntry cacheEntry, Object seenValue, EntryVersion seenVersion, - CacheTopology cacheTopology, DistributionInfo info) { + private Object handleWriteOnOriginPrimary(InvocationContext ctx, DataWriteCommand command, Object rv, + RepeatableReadEntry cacheEntry, Object seenValue, EntryVersion seenVersion, + CacheTopology cacheTopology, DistributionInfo info) { if (!command.isSuccessful()) { if (trace) log.tracef("Skipping the replication of the command as it did not succeed on primary owner (%s).", command); @@ -251,30 +249,33 @@ private Object handleWriteOn // increment the version EntryVersion nextVersion = svm.incrementVersion(info.segmentId()); - Metadata metadata = addVersion(command.getMetadata(), nextVersion); + Metadata metadata = addVersion(cacheEntry.getMetadata(), nextVersion); cacheEntry.setMetadata(metadata); - command.setMetadata(metadata); - boolean committed; - if (command.loadType() != VisitableCommand.LoadType.DONT_LOAD) { - committed = commitSingleEntryIfNoChange(seenValue, seenVersion, cacheEntry, ctx, command); + if (command.loadType() != DONT_LOAD) { + commitSingleEntryIfNoChange(cacheEntry, ctx, command); } else { - committed = commitSingleEntryIfNewer(cacheEntry, ctx, command); + commitSingleEntryIfNewer(cacheEntry, ctx, command); } - command.setValueMatcher(ValueMatcher.MATCH_ALWAYS); // When replicating to backup, we'll add skip ownership check since we're now on primary owner // and we have already committed the entry, reading the return value. If we got OTE from remote // site and the command would be retried, we could fail to do the retry/return wrong value. - // TODO: maybe we should rather create a copy of the command with modifications... - command.addFlags(FlagBitSets.SKIP_OWNERSHIP_CHECK); + WriteCommand backupCommand; + long flags = command.getFlagsBitSet() | FlagBitSets.SKIP_OWNERSHIP_CHECK; + if (cacheEntry.isRemoved()) { + backupCommand = cf.buildRemoveCommand(command.getKey(), null, flags); + ((RemoveCommand) backupCommand).setMetadata(cacheEntry.getMetadata()); + } else { + backupCommand = cf.buildPutKeyValueCommand(command.getKey(), cacheEntry.getValue(), cacheEntry.getMetadata(), flags); + } Address backup = getNextMember(cacheTopology); if (backup != null) { // error responses throw exceptions from JGroupsTransport CompletableFuture> rpcFuture = - rpcManager.invokeRemotelyAsync(Collections.singletonList(backup), command, defaultSyncOptions); + rpcManager.invokeRemotelyAsync(Collections.singletonList(backup), backupCommand, defaultSyncOptions); rpcFuture.thenRun(() -> { - if (committed && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { + if (cacheEntry.isCommitted() && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { svm.scheduleKeyInvalidation(command.getKey(), cacheEntry.getMetadata().version(), cacheEntry.isRemoved()); } }); @@ -315,39 +316,33 @@ private LocalizedCache return cacheTopology; } - private Object commitSingleEntryOnReturn(InvocationContext ctx, DataWriteCommand command, CacheEntry cacheEntry, Object prevValue, EntryVersion prevVersion) { - if (command.loadType() != VisitableCommand.LoadType.DONT_LOAD) { - return invokeNextThenAccept(ctx, command, (rCtx, rCommand, rv) -> { - DataWriteCommand dataWriteCommand = (DataWriteCommand) rCommand; - boolean committed = commitSingleEntryIfNoChange(prevValue, prevVersion, cacheEntry, rCtx, rCommand); - if (committed && rCtx.isOriginLocal() && !dataWriteCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { - svm.scheduleKeyInvalidation(dataWriteCommand.getKey(), cacheEntry.getMetadata().version(), cacheEntry.isRemoved()); - } - }); - } else { - return invokeNextThenAccept(ctx, command, dataWriteCommandNoReadHandler); - } + private Object commitSingleEntryOnReturn(InvocationContext ctx, DataWriteCommand command, RepeatableReadEntry cacheEntry, + EntryVersion nextVersion) { + return invokeNextThenAccept(ctx, command, (rCtx, rCommand, rv) -> { + DataWriteCommand dataWriteCommand = (DataWriteCommand) rCommand; + if (nextVersion != null) { + cacheEntry.setMetadata(addVersion(cacheEntry.getMetadata(), nextVersion)); + } + if (command.loadType() != DONT_LOAD) { + commitSingleEntryIfNoChange(cacheEntry, rCtx, rCommand); + } else { + commitSingleEntryIfNewer(cacheEntry, rCtx, dataWriteCommand); + } + if (cacheEntry.isCommitted() && rCtx.isOriginLocal() && nextVersion != null) { + svm.scheduleKeyInvalidation(dataWriteCommand.getKey(), nextVersion, cacheEntry.isRemoved()); + } + }); } - private boolean commitSingleEntryIfNewer(CacheEntry entry, InvocationContext ctx, VisitableCommand command) { + private void commitSingleEntryIfNewer(RepeatableReadEntry entry, InvocationContext ctx, VisitableCommand command) { if (!entry.isChanged()) { if (trace) { log.tracef("Entry has not changed, not committing"); - return false; } } - // ignore metadata argument and use the one from entry, as e.g. PutMapCommand passes its metadata - // here and we need own metadata for each entry. - // RemoveCommand does not null the entry value - if (entry.isRemoved()) { - entry.setValue(null); - } // We cannot delegate the dataContainer.compute() to entry.commit() as we need to reliably // retrieve previous value and metadata, but the entry API does not provide these. - ByRef previousValue = new ByRef<>(null); - ByRef previousMetadata = new ByRef<>(null); - ByRef.Boolean successful = new ByRef.Boolean(false); dataContainer.compute(entry.getKey(), (key, oldEntry, factory) -> { // newMetadata is null in case of local-mode write Metadata newMetadata = entry.getMetadata(); @@ -361,7 +356,7 @@ private boolean commitSingleEntryIfNewer(CacheEntry entry, InvocationContext ctx if (trace) { log.trace("Committing new entry " + entry); } - successful.set(true); + entry.setCommitted(); return factory.create(entry); } } @@ -370,12 +365,10 @@ private boolean commitSingleEntryIfNewer(CacheEntry entry, InvocationContext ctx if (oldMetadata == null || oldMetadata.version() == null || newMetadata == null || newMetadata.version() == null || (comparisonResult = oldMetadata.version().compareTo(newMetadata.version())) == InequalVersionComparisonResult.BEFORE || (oldMetadata instanceof RemoteMetadata && comparisonResult == InequalVersionComparisonResult.EQUAL)) { - previousValue.set(oldEntry.getValue()); - previousValue.set(oldMetadata); if (trace) { log.tracef("Committing entry %s, replaced %s", entry, oldEntry); } - successful.set(true); + entry.setCommitted(); if (entry.getValue() != null || newMetadata != null) { return factory.create(entry); } else { @@ -389,32 +382,18 @@ private boolean commitSingleEntryIfNewer(CacheEntry entry, InvocationContext ctx } }); - boolean created = entry.isCreated(); - boolean removed = entry.isRemoved(); - boolean expired = false; - if (removed && entry instanceof MVCCEntry) { - expired = ((MVCCEntry) entry).isExpired(); - } - - if (successful.get()) { - NotifyHelper.entryCommitted(cacheNotifier, functionalNotifier, created, removed, expired, - entry, ctx, (FlagAffectedCommand) command, previousValue.get(), previousMetadata.get()); - return true; - } else { - // we skip the notification, and the already executed notification skipped this (intermediate) update - return false; - } + if (entry.isCommitted()) { + NotifyHelper.entryCommitted(cacheNotifier, functionalNotifier, entry.isCreated(), entry.isRemoved(), entry.isExpired(), + entry, ctx, (FlagAffectedCommand) command, entry.getOldValue(), entry.getOldMetadata()); + } // else we skip the notification, and the already executed notification skipped this (intermediate) update } - private boolean commitSingleEntryIfNoChange(Object seenValue, EntryVersion seenVersion, CacheEntry entry, InvocationContext ctx, VisitableCommand command) { + private void commitSingleEntryIfNoChange(RepeatableReadEntry entry, InvocationContext ctx, VisitableCommand command) { if (!entry.isChanged()) { if (trace) { log.tracef("Entry has not changed, not committing"); - return false; } } - // ignore metadata argument and use the one from entry, as e.g. PutMapCommand passes its metadata - // here and we need own metadata for each entry. // RemoveCommand does not null the entry value if (entry.isRemoved()) { entry.setValue(null); @@ -422,14 +401,11 @@ private boolean commitSingleEntryIfNoChange(Object seenValue, EntryVersion seenV // We cannot delegate the dataContainer.compute() to entry.commit() as we need to reliably // retrieve previous value and metadata, but the entry API does not provide these. - ByRef previousValue = new ByRef<>(null); - ByRef previousMetadata = new ByRef<>(null); - ByRef.Boolean successful = new ByRef.Boolean(false); dataContainer.compute(entry.getKey(), (key, oldEntry, factory) -> { // newMetadata is null in case of local-mode write on non-primary owners Metadata newMetadata = entry.getMetadata(); if (oldEntry == null) { - if (seenValue != null) { + if (entry.getOldValue() != null) { if (trace) { log.trace("Non-null value in context, not committing"); } @@ -444,12 +420,14 @@ private boolean commitSingleEntryIfNoChange(Object seenValue, EntryVersion seenV if (trace) { log.trace("Committing new entry " + entry); } - successful.set(true); + entry.setCommitted(); return factory.create(entry); } } Metadata oldMetadata = oldEntry.getMetadata(); EntryVersion oldVersion = oldMetadata == null ? null : oldMetadata.version(); + Metadata seenMetadata = entry.getOldMetadata(); + EntryVersion seenVersion = seenMetadata == null ? null : seenMetadata.version(); if (oldVersion == null) { if (seenVersion != null) { if (trace) { @@ -478,12 +456,10 @@ private boolean commitSingleEntryIfNoChange(Object seenValue, EntryVersion seenV if (oldVersion == null || newMetadata == null || newMetadata.version() == null || (comparisonResult = oldMetadata.version().compareTo(newMetadata.version())) == InequalVersionComparisonResult.BEFORE || (oldMetadata instanceof RemoteMetadata && comparisonResult == InequalVersionComparisonResult.EQUAL)) { - previousValue.set(oldEntry.getValue()); - previousValue.set(oldMetadata); if (trace) { log.tracef("Committing entry %s, replaced %s", entry, oldEntry); } - successful.set(true); + entry.setCommitted(); if (entry.getValue() != null || newMetadata != null) { return factory.create(entry); } else { @@ -497,21 +473,10 @@ private boolean commitSingleEntryIfNoChange(Object seenValue, EntryVersion seenV } }); - boolean created = entry.isCreated(); - boolean removed = entry.isRemoved(); - boolean expired = false; - if (removed && entry instanceof MVCCEntry) { - expired = ((MVCCEntry) entry).isExpired(); - } - - if (successful.get()) { - NotifyHelper.entryCommitted(cacheNotifier, functionalNotifier, created, removed, expired, - entry, ctx, (FlagAffectedCommand) command, previousValue.get(), previousMetadata.get()); - return true; - } else { - // we skip the notification, and the already executed notification skipped this (intermediate) update - return false; - } + if (entry.isCommitted()) { + NotifyHelper.entryCommitted(cacheNotifier, functionalNotifier, entry.isCreated(), entry.isRemoved(), entry.isExpired(), + entry, ctx, (FlagAffectedCommand) command, entry.getOldValue(), entry.getOldMetadata()); + } // else we skip the notification, and the already executed notification skipped this (intermediate) update } private EntryVersion getVersionOrNull(CacheEntry cacheEntry) { @@ -642,197 +607,31 @@ public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) @Override public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { - LocalizedCacheTopology cacheTopology = checkTopology(command); - - Map originalMap = command.getMap(); - if (command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { - extractAndSetMetadata(ctx, command, originalMap); - return invokeNextThenAccept(ctx, command, putMapCommandHandler); - } + if (command.isForwarded() || command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { + assert command.getMetadata() == null || command.getMetadata().version() == null; - if (ctx.isOriginLocal()) { - return invokeNextThenApply(ctx, command, (returnCtx, returnCommand, rv) -> - handlePutMapOnOrigin(returnCtx, (PutMapCommand) returnCommand, rv, originalMap, cacheTopology)); - } - - // Remote - if (command.isForwarded()) { - // carries entries with version to back them up - extractAndSetMetadata(ctx, command, originalMap); - return invokeNextThenAccept(ctx, command, putMapCommandHandler); - } else { - // this node should be the primary - Map versionMap = new HashMap<>(originalMap.size()); - for (Map.Entry entry : originalMap.entrySet()) { + Map valueMap = new HashMap<>(command.getMap().size()); + for (Map.Entry entry : command.getMap().entrySet()) { Object key = entry.getKey(); CacheEntry cacheEntry = ctx.lookupEntry(key); if (cacheEntry == null) { - throw new IllegalStateException("Not wrapped " + key); + // since this is executed on backup node (or by ST), the entry was not wrapped + entryFactory.wrapExternalEntry(ctx, key, null, false, true); + cacheEntry = ctx.lookupEntry(key); } - EntryVersion version = svm.incrementVersion(keyPartitioner.getSegment(key)); - cacheEntry.setMetadata(addVersion(command.getMetadata(), version)); - versionMap.put(key, new VersionedResult(null, version)); + // TODO: we should set version only after the command has executed but as it won't modify version + // on its own, we can do it right here + InternalCacheValue value = (InternalCacheValue) entry.getValue(); + Metadata entryMetadata = command.getMetadata() == null ? value.getMetadata() + : command.getMetadata().builder().version(value.getMetadata().version()).build(); + cacheEntry.setMetadata(entryMetadata); + valueMap.put(key, value.getValue()); } - // disable ignore return values as this controls isReturnValueExpected with versionMap - command.setFlagsBitSet(command.getFlagsBitSet() & ~FlagBitSets.IGNORE_RETURN_VALUES); - return invokeNextThenApply(ctx, command, (ctx1, command1, rv) -> { - for (Object key : ((PutMapCommand) command1).getAffectedKeys()) { - commitSingleEntryIfNewer(ctx1.lookupEntry(key), ctx1, command1); - } - if (rv instanceof Map) { - Map resultMap = (Map) rv; - for (Map.Entry entry : resultMap.entrySet()) { - versionMap.compute(entry.getKey(), (k, vr) -> new VersionedResult(entry.getValue(), vr.version)); - } - } - return versionMap; - }); - } - } - - private Object handlePutMapOnOrigin(InvocationContext ctx, PutMapCommand command, Object rv, Map originalMap, LocalizedCacheTopology cacheTopology) { - if (!command.isSuccessful()) { - return null; - } - - Map lookedUpEntries = ctx.getLookedUpEntries(); - Map> remoteEntries = new HashMap<>(); - Map localEntries = new HashMap<>(); - for (Map.Entry entry : originalMap.entrySet()) { - Object key = entry.getKey(); - DistributionInfo info = cacheTopology.getDistribution(key); - if (info.isPrimary()) { - CacheEntry ctxEntry = lookedUpEntries.get(key); - if (ctxEntry == null) { - throw new CacheException("Entry not looked up for " + key); - } - EntryVersion version = svm.incrementVersion(info.segmentId()); - Metadata metadata = new EmbeddedMetadata.Builder().version(version).build(); - ctxEntry.setMetadata(metadata); - localEntries.put(key, new MetadataImmortalCacheValue(entry.getValue(), metadata)); - commitSingleEntryIfNewer(ctxEntry, ctx, command); - } else if (info.primary() == null) { - throw new OutdatedTopologyException(cacheTopology.getTopologyId() + 1); - } else { - Map currentEntries = remoteEntries.computeIfAbsent(info.primary(), k -> new HashMap<>()); - currentEntries.put(key, entry.getValue()); - } - } - - PutMapFuture allFuture = new PutMapFuture(command, remoteEntries.size(), (Map) rv); - if (!localEntries.isEmpty()) { - Address backup = getNextMember(cacheTopology); - if (backup != null) { - allFuture.counter++; - // note: we abuse PutMapCommand a bit as we need it to transport versions as well, and it can - // carry only single Metadata instance. - PutMapCommand backupCommand = cf.buildPutMapCommand(localEntries, command.getMetadata(), command.getFlagsBitSet()); - backupCommand.setForwarded(true); - rpcManager.invokeRemotelyAsync(Collections.singleton(backup), backupCommand, defaultSyncOptions).whenComplete((r, t) -> { - if (t != null) { - allFuture.completeExceptionally(t); - } else { - synchronized (allFuture) { - if (--allFuture.counter == 0) { - allFuture.complete(allFuture.map); - } - } - for (Map.Entry entry : localEntries.entrySet()) { - svm.scheduleKeyInvalidation(entry.getKey(), entry.getValue().getMetadata().version(), false); - } - } - }); - } - } - for (Map.Entry> ownerEntry : remoteEntries.entrySet()) { - Address owner = ownerEntry.getKey(); - PutMapCommand toPrimary = cf.buildPutMapCommand(ownerEntry.getValue(), command.getMetadata(), command.getFlagsBitSet()); - CompletableFuture> rpcFuture = rpcManager.invokeRemotelyAsync( - Collections.singletonList(owner), toPrimary, defaultSyncOptions); - rpcFuture.whenComplete((responseMap, t) -> { - if (t != null) { - allFuture.completeExceptionally(t); - return; - } - SuccessfulResponse response = getSuccessfulResponseOrFail(responseMap, allFuture, null); - if (response == null) { - return; - } - Object responseValue = response.getResponseValue(); - if (!(responseValue instanceof Map)) { - allFuture.completeExceptionally(new CacheException("Response from " + owner + ": expected Map but it is " + responseValue).fillInStackTrace()); - return; - } - Map versions = (Map) responseValue; - synchronized (allFuture) { - if (allFuture.isDone()) { - return; - } - for (Map.Entry entry : versions.entrySet()) { - // we will serve as the backup - entryFactory.wrapExternalEntry(ctx, entry.getKey(), null, false, true); - CacheEntry cacheEntry = ctx.lookupEntry(entry.getKey()); - VersionedResult result = entry.getValue(); - if (result.result != null) { - if (allFuture.map == null) { - allFuture.map = new HashMap<>(); - } - allFuture.map.put(entry.getKey(), result.result); - } - Metadata metadata = addVersion(command.getMetadata(), result.version); - cacheEntry.setValue(originalMap.get(entry.getKey())); - cacheEntry.setMetadata(metadata); - // we don't care about setCreated() since backup owner should not fire listeners - cacheEntry.setChanged(true); - boolean committed = commitSingleEntryIfNewer(cacheEntry, ctx, command); - if (committed && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { - svm.scheduleKeyInvalidation(entry.getKey(), result.version, false); - } - } - if (--allFuture.counter == 0) { - allFuture.complete(allFuture.map); - } - } - }); - } - return asyncValue(allFuture); - } - - private static class PutMapFuture extends CompletableFuture> { - private PutMapCommand command; - private int counter; - private Map map; - - public PutMapFuture(PutMapCommand command, int counter, Map map) { - this.command = command; - this.counter = counter; - this.map = map; - } - - @Override - public synchronized boolean completeExceptionally(Throwable ex) { - command.fail(); - return super.completeExceptionally(ex); - } - } - - protected void extractAndSetMetadata(InvocationContext ctx, PutMapCommand command, Map originalMap) { - Map valueMap = new HashMap<>(originalMap.size()); - for (Map.Entry entry : originalMap.entrySet()) { - Object key = entry.getKey(); - CacheEntry cacheEntry = ctx.lookupEntry(key); - if (cacheEntry == null) { - // since this is executed on backup node (or by ST), the entry was not wrapped - entryFactory.wrapExternalEntry(ctx, key, null, false, true); - cacheEntry = ctx.lookupEntry(key); - } - InternalCacheValue value = (InternalCacheValue) entry.getValue(); - Metadata entryMetadata = command.getMetadata() == null ? value.getMetadata() - : command.getMetadata().builder().version(value.getMetadata().version()).build(); - cacheEntry.setMetadata(entryMetadata); - valueMap.put(key, value.getValue()); + command.setMap(valueMap); + return invokeNextThenAccept(ctx, command, putMapCommandHandler); + } else { + return handleWriteManyCommand(ctx, command, putMapHelper); } - command.setMap(valueMap); } @Override @@ -941,59 +740,320 @@ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) thr } } - @Override - public Object visitApplyDeltaCommand(InvocationContext ctx, ApplyDeltaCommand command) throws Throwable { - throw new UnsupportedOperationException(); - } - @Override public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + Object key = command.getKey(); + CacheEntry entry = ctx.lookupEntry(key); + if (entry != null) { + // the entry is owned locally (it is NullCacheEntry if it was not found), no need to go remote + return invokeNext(ctx, command); + } + if (!ctx.isOriginLocal()) { + return UnsureResponse.INSTANCE; + } + if (isLocalModeForced(command) || command.hasAnyFlag(FlagBitSets.SKIP_REMOTE_LOOKUP)) { + if (ctx.lookupEntry(command.getKey()) == null) { + entryFactory.wrapExternalEntry(ctx, command.getKey(), NullCacheEntry.getInstance(), false, false); + } + return invokeNext(ctx, command); + } + DistributionInfo info = checkTopology(command).getDistribution(command.getKey()); + if (info.primary() == null) { + throw AllOwnersLostException.INSTANCE; + } + CompletableFuture> rpc = rpcManager.invokeRemotelyAsync(Collections.singleton(info.primary()), command, syncIgnoreLeavers); + return asyncValue(rpc.thenApply(responses -> { + Response response = getSingleResponse(responses); + if (response.isSuccessful()) { + return ((SuccessfulResponse) response).getResponseValue(); + } else if (response instanceof UnsureResponse) { + throw OutdatedTopologyException.INSTANCE; + } else if (response instanceof CacheNotFoundResponse) { + throw AllOwnersLostException.INSTANCE; + } else { + throw new IllegalArgumentException("Unexpected response " + response); + } + })); } @Override public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP)) { + return handleLocalOnlyReadManyCommand(ctx, command, command.getKeys()); + } + + LocalizedCacheTopology cacheTopology = checkTopology(command); + if (!ctx.isOriginLocal()) { + return handleRemoteReadManyCommand(ctx, command, command.getKeys()); + } + if (command.getKeys().isEmpty()) { + return Stream.empty(); + } + + ConsistentHash ch = cacheTopology.getReadConsistentHash(); + int estimateForOneNode = 2 * command.getKeys().size() / ch.getMembers().size(); + Function> createList = k -> new ArrayList<>(estimateForOneNode); + Map> requestedKeys = new HashMap<>(); + for (Object key : command.getKeys()) { + DistributionInfo info = cacheTopology.getDistribution(key); + if (info.primary() == null) { + throw AllOwnersLostException.INSTANCE; + } + requestedKeys.computeIfAbsent(info.primary(), createList).add(key); + } + + MergingCompletableFuture allFuture = new MergingCompletableFuture<>( + requestedKeys.size(), new Object[command.getKeys().size()], Arrays::stream); + + int offset = 0; + List localKeys = requestedKeys.get(rpcManager.getAddress()); + if (localKeys != null) { + offset += localKeys.size(); + ReadOnlyManyCommand localCommand = new ReadOnlyManyCommand(command).withKeys(localKeys); + invokeNextAndFinally(ctx, localCommand, (rCtx, rCommand, rv, throwable) -> { + if (throwable != null) { + allFuture.completeExceptionally(throwable); + } else { + try { + ((Stream) rv).collect(new ArrayCollector(allFuture.results)); + allFuture.countDown(); + } catch (Throwable t) { + allFuture.completeExceptionally(t); + } + } + }); + } + + for (Map.Entry> addressKeys : requestedKeys.entrySet()) { + List keysForAddress = addressKeys.getValue(); + ReadOnlyManyCommand remoteCommand = new ReadOnlyManyCommand(command).withKeys(keysForAddress); + Set
target = Collections.singleton(addressKeys.getKey()); + int myOffset = offset; + rpcManager.invokeRemotelyAsync(target, remoteCommand, syncIgnoreLeavers) + .whenComplete((responseMap, throwable) -> { + if (throwable != null) { + allFuture.completeExceptionally(throwable); + return; + } + SuccessfulResponse response = getSuccessfulResponseOrFail(responseMap, allFuture, + rsp -> allFuture.completeExceptionally(rsp instanceof UnsureResponse? + OutdatedTopologyException.INSTANCE : AllOwnersLostException.INSTANCE)); + if (response == null) { + return; + } + try { + Object[] values = (Object[]) response.getResponseValue(); + if (values != null) { + System.arraycopy(values, 0, allFuture.results, myOffset, values.length); + allFuture.countDown(); + } else { + allFuture.completeExceptionally(new IllegalStateException("Unexpected response value " + response.getResponseValue())); + } + } catch (Throwable t) { + allFuture.completeExceptionally(t); + } + }); + offset += keysForAddress.size(); + } + return asyncValue(allFuture); + } + + private Object handleLocalOnlyReadManyCommand(InvocationContext ctx, VisitableCommand command, Collection keys) { + for (Object key : keys) { + if (ctx.lookupEntry(key) == null) { + entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), true, false); + } + } + return invokeNext(ctx, command); + } + + private Object handleRemoteReadManyCommand( + InvocationContext ctx, C command, Collection keys) { + for (Object key : keys) { + if (ctx.lookupEntry(key) == null) { + return UnsureResponse.INSTANCE; + } + } + return invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> ((Stream) rv).toArray()); } @Override public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteCommand(ctx, command); } @Override public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteCommand(ctx, command); } @Override public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteCommand(ctx, command); + } + + private Object handleWriteManyCommand( + InvocationContext ctx, C command, WriteManyCommandHelper helper) { + if (ctx.isOriginLocal()) { + return handleWriteManyOnOrigin(ctx, command, helper); + } else { + checkTopology(command); + // Functional commands cannot be forwarded, because we use PutMapCommand for backup + // this node should be the primary + assert helper.shouldRegisterRemoteCallback(command); + return invokeNextThenApply(ctx, command, handleWriteManyOnPrimary); + } + } + + private Object handleWriteManyOnOrigin( + InvocationContext ctx, C command, WriteManyCommandHelper helper) { + LocalizedCacheTopology cacheTopology = checkTopology(command); + + Map remoteEntries = new HashMap<>(); + for (Item item : helper.getItems(command)) { + Object key = helper.item2key(item); + DistributionInfo info = cacheTopology.getDistribution(key); + Address primary = info.primary(); + if (primary == null) { + throw AllOwnersLostException.INSTANCE; + } else { + Container currentEntries = remoteEntries.computeIfAbsent(primary, k -> helper.newContainer()); + helper.accumulate(currentEntries, item); + } + } + + Object[] results = command.loadType() == DONT_LOAD ? null : new Object[command.getAffectedKeys().size()]; + MergingCompletableFuture allFuture = new MergingCompletableFuture<>(remoteEntries.size(), results, helper::transformResult); + + int offset = 0; + Container localEntries = remoteEntries.remove(rpcManager.getAddress()); + if (localEntries != null) { + helper.containerSize(localEntries); + C localCommand = helper.copyForLocal(command, localEntries); + LocalWriteManyHandler handler = new LocalWriteManyHandler(allFuture, localCommand.getAffectedKeys(), cacheTopology); + invokeNextAndFinally(ctx, localCommand, handler); + } + + for (Map.Entry ownerEntry : remoteEntries.entrySet()) { + Address owner = ownerEntry.getKey(); + // TODO: copyForLocal just creates the command with given entries, not using the segment-aware map + Container container = ownerEntry.getValue(); + C toPrimary = helper.copyForLocal(command, container); + CompletableFuture> rpcFuture = rpcManager.invokeRemotelyAsync( + Collections.singletonList(owner), toPrimary, defaultSyncOptions); + int myOffset = offset; + offset += helper.containerSize(container); + rpcFuture.whenComplete((responseMap, t) -> { + if (t != null) { + allFuture.completeExceptionally(t); + return; + } + SuccessfulResponse response = getSuccessfulResponseOrFail(responseMap, allFuture, null); + if (response == null) { + return; + } + Object responseValue = response.getResponseValue(); + // Note: we could use PrimaryResponseHandler, but we would have to add the reference to allFuture, offset... + InternalCacheValue[] values; + try { + if (command.loadType() == DONT_LOAD) { + if (!(responseValue instanceof InternalCacheValue[])) { + allFuture.completeExceptionally(new CacheException("Response from " + owner + ": expected InternalCacheValue[] but it is " + responseValue)); + return; + } + values = (InternalCacheValue[]) responseValue; + } else { + if (!(responseValue instanceof Object[]) || (((Object[]) responseValue).length != 2)) { + allFuture.completeExceptionally(new CacheException("Response from " + owner + ": expected Object[2] but it is " + responseValue)); + return; + } + // We use Object[] { InternalCacheValue[], Object[] } structure to get benefit of same-type array marshalling + // TODO optimize returning entry itself + // Note: some interceptors relying on the return value *could* have a problem interpreting this + values = (InternalCacheValue[]) ((Object[]) responseValue)[0]; + MergingCompletableFuture.moveListItemsToFuture(((Object[]) responseValue)[1], allFuture, myOffset); + } + synchronized (allFuture) { + if (allFuture.isDone()) { + return; + } + int i = 0; + for (Object key : helper.toKeys(container)) { + // we will serve as the backup + InternalCacheEntry ice = values[i++].toInternalCacheEntry(key); + entryFactory.wrapExternalEntry(ctx, key, ice, true, true); + RepeatableReadEntry entry = (RepeatableReadEntry) ctx.lookupEntry(key); + // we don't care about setCreated() since backup owner should not fire listeners + entry.setChanged(true); + commitSingleEntryIfNewer(entry, ctx, command); + if (entry.isCommitted() && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { + svm.scheduleKeyInvalidation(entry.getKey(), entry.getMetadata().version(), entry.isRemoved()); + } + } + assert i == values.length; + } + allFuture.countDown(); + } catch (Throwable t2) { + allFuture.completeExceptionally(t2); + } + }); + } + return asyncValue(allFuture); + } + + private Object handleWriteManyOnPrimary(InvocationContext ctx, VisitableCommand command, Object rv) { + WriteCommand cmd = (WriteCommand) command; + int numKeys = cmd.getAffectedKeys().size(); + InternalCacheValue[] values = new InternalCacheValue[numKeys]; + // keys are always iterated in order + int i = 0; + for (Object key : cmd.getAffectedKeys()) { + RepeatableReadEntry entry = (RepeatableReadEntry) ctx.lookupEntry(key); + EntryVersion nextVersion = svm.incrementVersion(keyPartitioner.getSegment(key)); + entry.setMetadata(addVersion(entry.getMetadata(), nextVersion)); + if (cmd.loadType() == DONT_LOAD) { + commitSingleEntryIfNewer(entry, ctx, command); + } else { + commitSingleEntryIfNoChange(entry, ctx, command); + } + values[i++] = new MetadataImmortalCacheValue(entry.getValue(), entry.getMetadata()); + } + // TODO ISPN-7806: This code will cause ClassCastException when putAll is used on cache with querying + // as QueryInterceptor checks return value which is not a Map. As query tests are not executed with + // scattered cache this does not break the testsuite. We are not trying to fix the problem here because + // QueryInterceptor misbehaves with all functional commands anyway, even in distributed cache. + if (cmd.loadType() == DONT_LOAD) { + // Disable ignoring return value in response + cmd.setFlagsBitSet(cmd.getFlagsBitSet() & ~FlagBitSets.IGNORE_RETURN_VALUES); + return values; + } else { + return new Object[] { values, ((List) rv).toArray() }; + } } @Override public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteManyCommand(ctx, command, writeOnlyManyEntriesHelper); } @Override public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteCommand(ctx, command); } @Override public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteManyCommand(ctx, command, writeOnlyManyHelper); } @Override public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteManyCommand(ctx, command, readWriteManyHelper); } @Override public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleWriteManyCommand(ctx, command, readWriteManyEntriesHelper); } @Override @@ -1027,24 +1087,118 @@ protected Log getLog() { return log; } + protected static class PrimaryResponseGenerator extends AbstractVisitor { + private final CacheEntry cacheEntry; + private final Object returnValue; + + public PrimaryResponseGenerator(CacheEntry cacheEntry, Object rv) { + this.cacheEntry = cacheEntry; + this.returnValue = rv; + } + + private Object handleDataWriteCommand(InvocationContext ctx, DataWriteCommand cmd) { + if (cmd.isReturnValueExpected()) { + return new Object[] { returnValue, cacheEntry.getMetadata().version() }; + } else { + // force return value to be sent in the response (the version) + cmd.setFlagsBitSet(cmd.getFlagsBitSet() & ~(FlagBitSets.IGNORE_RETURN_VALUES | FlagBitSets.SKIP_REMOTE_LOOKUP)); + return cacheEntry.getMetadata().version(); + } + } + + private Object handleValueResponseCommand(InvocationContext ctx, DataWriteCommand cmd) { + return new MetadataImmortalCacheValue(cacheEntry.getValue(), cacheEntry.getMetadata()); + } + + private Object handleFunctionalCommand(InvocationContext ctx, DataWriteCommand cmd) { + return new Object[] { cacheEntry.getValue(), cacheEntry.getMetadata(), returnValue }; + } + + @Override + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { + return handleDataWriteCommand(ctx, command); + } + + @Override + public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { + return handleDataWriteCommand(ctx, command); + } + + @Override + public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { + return handleDataWriteCommand(ctx, command); + } + + @Override + public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable { + // This is actually one place where existence of ComputeCommand (as opposed to basing compute() method + // on top of ReadWriteKeyCommand) gives some advantage: we know that stored entry is equal to command's return + // value so we don't have to send it twice. + // TODO: optimize case where new value == return value in RKWC + return handleValueResponseCommand(ctx, command); + } + + @Override + public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable { + return handleValueResponseCommand(ctx, command); + } + + @Override + public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable { + return handleValueResponseCommand(ctx, command); + } + + @Override + public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable { + return handleFunctionalCommand(ctx, command); + } + + @Override + public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable { + return handleFunctionalCommand(ctx, command); + } + + @Override + public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable { + return handleValueResponseCommand(ctx, command); + } + + @Override + public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable { + throw new UnsupportedOperationException(); + } + } + protected class PrimaryResponseHandler extends AbstractVisitor implements InvocationSuccessFunction { private final Object responseValue; - private CacheEntry cacheEntry; private Object returnValue; + private EntryVersion version; public PrimaryResponseHandler(Object responseValue) { this.responseValue = responseValue; } - private Object handleDataWriteCommand(InvocationContext ctx, C command) { - EntryVersion version; + private Object handleDataWriteCommand(InvocationContext ctx, DataWriteCommand command) { if (command.isReturnValueExpected()) { - if (!(responseValue instanceof MetadataImmortalCacheValue)) { - throw new CacheException("Expected MetadataImmortalCacheValue as response but it is " + responseValue); + if (!(responseValue instanceof Object[])) { + throw new CacheException("Expected Object[] { return-value, version } as response but it is " + responseValue); } - MetadataImmortalCacheValue micv = (MetadataImmortalCacheValue) responseValue; - version = micv.getMetadata().version(); - returnValue = micv.getValue(); + Object[] array = (Object[]) this.responseValue; + if (array.length != 2) { + throw new CacheException("Expected Object[] { return-value, version } but it is " + Arrays.toString(array)); + } + version = (EntryVersion) array[1]; + returnValue = array[0]; } else { if (!(responseValue instanceof EntryVersion)) { throw new CacheException("Expected EntryVersion as response but it is " + responseValue); @@ -1052,38 +1206,53 @@ private Object handleDataWri version = (EntryVersion) responseValue; returnValue = null; } - Metadata metadata = addVersion(command.getMetadata(), version); - - // TODO: skip lookup by returning from entry factory directly entryFactory.wrapExternalEntry(ctx, command.getKey(), null, false, true); - cacheEntry = ctx.lookupEntry(command.getKey()); - cacheEntry.setMetadata(metadata); // Primary succeeded, so apply the value locally command.setValueMatcher(ValueMatcher.MATCH_ALWAYS); return invokeNextThenApply(ctx, command, this); } - private Object handleComputeCommand(InvocationContext ctx, C command) throws Throwable { + private Object handleValueResponseCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { if (!(responseValue instanceof MetadataImmortalCacheValue)) { throw new CacheException("Expected MetadataImmortalCacheValue as response but it is " + responseValue); } MetadataImmortalCacheValue micv = (MetadataImmortalCacheValue) responseValue; InternalCacheEntry ice = micv.toInternalCacheEntry(command.getKey()); - returnValue = ice.getValue(); + returnValue = command.isWriteOnly() ? null : ice.getValue(); + version = ice.getMetadata().version(); - // TODO: skip lookup by returning from entry factory directly entryFactory.wrapExternalEntry(ctx, command.getKey(), ice, true, true); - cacheEntry = ctx.lookupEntry(command.getKey()); - cacheEntry.setChanged(true); + return apply(ctx, command, null); + } + + private Object handleFunctionalCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { + if (!(responseValue instanceof Object[])) { + throw new CacheException("Expected Object[] { value, metadata, return-value } but it is " + responseValue); + } + Object[] array = (Object[]) responseValue; + if (array.length != 3) { + throw new CacheException("Expected Object[] { value, metadata, return-value } but it is " + Arrays.toString(array)); + } + Metadata metadata = (Metadata) array[1]; + returnValue = array[2]; + version = metadata.version(); + + entryFactory.wrapExternalEntry(ctx, command.getKey(), new MetadataImmortalCacheEntry(command.getKey(), array[0], metadata), true, true); return apply(ctx, command, null); } @Override public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv) throws Throwable { DataWriteCommand cmd = (DataWriteCommand) rCommand; + + RepeatableReadEntry cacheEntry = (RepeatableReadEntry) rCtx.lookupEntry(cmd.getKey()); + Metadata metadata = addVersion(cacheEntry.getMetadata(), version); + cacheEntry.setMetadata(metadata); + cacheEntry.setChanged(true); + // We don't care about the local value, as we use MATCH_ALWAYS on backup - boolean committed = commitSingleEntryIfNewer(cacheEntry, rCtx, cmd); - if (committed && !cmd.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { + commitSingleEntryIfNewer(cacheEntry, rCtx, cmd); + if (cacheEntry.isCommitted() && !cmd.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { svm.scheduleKeyInvalidation(cmd.getKey(), cacheEntry.getMetadata().version(), cacheEntry.isRemoved()); } return returnValue; @@ -1106,12 +1275,12 @@ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) @Override public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable { - return handleComputeCommand(ctx, command); + return handleValueResponseCommand(ctx, command); } @Override public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable { - return handleComputeCommand(ctx, command); + return handleValueResponseCommand(ctx, command); } @Override @@ -1121,12 +1290,12 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t @Override public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleFunctionalCommand(ctx, command); } @Override public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable { - throw new UnsupportedOperationException(); + return handleFunctionalCommand(ctx, command); } @Override @@ -1138,5 +1307,93 @@ public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyComm public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable { throw new UnsupportedOperationException(); } + + @Override + public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable { + return handleValueResponseCommand(ctx, command); + } + + @Override + public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable { + return handleValueResponseCommand(ctx, command); + } + } + + /** + * This class uses synchronized {@link #completeExceptionally(Throwable)} for the same reasons as + * {@link org.infinispan.interceptors.impl.ClusteringInterceptor.ClusteredGetAllFuture} + */ + private static class SyncMergingCompletableFuture extends MergingCompletableFuture { + SyncMergingCompletableFuture(int participants, T[] results, Function transform) { + super(participants, results, transform); + } + + @Override + public synchronized boolean completeExceptionally(Throwable ex) { + return super.completeExceptionally(ex); + } + } + + private class LocalWriteManyHandler implements InvocationFinallyAction { + private final MergingCompletableFuture allFuture; + private final Collection keys; + private final LocalizedCacheTopology cacheTopology; + + private LocalWriteManyHandler(MergingCompletableFuture allFuture, Collection keys, LocalizedCacheTopology cacheTopology) { + this.allFuture = allFuture; + this.keys = keys; + this.cacheTopology = cacheTopology; + } + + @Override + public void accept(InvocationContext ctx, VisitableCommand command, Object rv, Throwable throwable) throws Throwable { + if (throwable != null) { + allFuture.completeExceptionally(throwable); + } else try { + if (allFuture.results != null) { + MergingCompletableFuture.moveListItemsToFuture(rv, allFuture, 0); + } + WriteCommand writeCommand = (WriteCommand) command; + Map backupMap = new HashMap<>(); + synchronized (allFuture) { + if (allFuture.isDone()) { + return; + } + for (Object key : keys) { + DistributionInfo info = cacheTopology.getDistribution(key); + EntryVersion version = svm.incrementVersion(info.segmentId()); + RepeatableReadEntry entry = (RepeatableReadEntry) ctx.lookupEntry(key); + if (entry == null) { + throw new CacheException("Entry not looked up for " + key); + } + Metadata metadata = addVersion(entry.getMetadata(), version); + entry.setMetadata(metadata); + backupMap.put(key, new MetadataImmortalCacheValue(entry.getValue(), metadata)); + if (writeCommand.loadType() == DONT_LOAD) { + commitSingleEntryIfNewer(entry, ctx, command); + } else { + commitSingleEntryIfNoChange(entry, ctx, command); + } + } + } + PutMapCommand backupCommand = cf.buildPutMapCommand(backupMap, null, writeCommand.getFlagsBitSet()); + backupCommand.setForwarded(true); + rpcManager.invokeRemotelyAsync(Collections.singleton(getNextMember(cacheTopology)), backupCommand, defaultSyncOptions) + .whenComplete((responseMap, throwable1) -> { + if (throwable1 != null) { + synchronized (allFuture) { + allFuture.completeExceptionally(throwable1); + } + } else { + allFuture.countDown(); + for (Map.Entry entry : backupCommand.getMap().entrySet()) { + svm.scheduleKeyInvalidation(entry.getKey(), ((InternalCacheValue) entry.getValue()).getMetadata().version(), false); + } + } + }); + } catch (Throwable t) { + allFuture.completeExceptionally(t); + } + } } } diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java index 32cf836a3aef..536b74ae1d89 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/WriteManyCommandHelper.java @@ -31,6 +31,8 @@ protected WriteManyCommandHelper(Function, Invoc public abstract int containerSize(Container container); + public abstract Iterable toKeys(Container container); + public abstract boolean shouldRegisterRemoteCallback(C cmd); public abstract Object transformResult(Object[] results); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java index 01a528c726ec..fd46846b8a77 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyEntriesHelper.java @@ -2,7 +2,7 @@ import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -48,7 +48,8 @@ public Object item2key(Map.Entry entry) { @Override public Map newContainer() { - return new HashMap<>(); + // Make sure the iteration in containers is ordered + return new LinkedHashMap<>(); } @Override @@ -61,6 +62,11 @@ public int containerSize(Map map) { return map.size(); } + @Override + public Iterable toKeys(Map map) { + return map.keySet(); + } + @Override public boolean shouldRegisterRemoteCallback(WriteOnlyManyEntriesCommand cmd) { return !cmd.isForwarded(); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java index 21b5b176915d..975da68cc5d9 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/WriteOnlyManyHelper.java @@ -60,6 +60,11 @@ public int containerSize(Collection list) { return list.size(); } + @Override + public Iterable toKeys(Collection list) { + return list; + } + @Override public boolean shouldRegisterRemoteCallback(WriteOnlyManyCommand cmd) { return !cmd.isForwarded(); diff --git a/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java index d706f6f30592..7d69fb36ced8 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java @@ -168,7 +168,8 @@ public void start() { isSync = cacheConfiguration.clustering().cacheMode().isSynchronous(); // isolation level makes no sense without transactions useRepeatableRead = cacheConfiguration.transaction().transactionMode().isTransactional() - && cacheConfiguration.locking().isolationLevel() == IsolationLevel.REPEATABLE_READ; + && cacheConfiguration.locking().isolationLevel() == IsolationLevel.REPEATABLE_READ + || cacheConfiguration.clustering().cacheMode().isScattered(); isVersioned = Configurations.isTxVersioned(cacheConfiguration); totalOrder = cacheConfiguration.transaction().transactionProtocol().isTotalOrder(); } @@ -228,8 +229,12 @@ public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) t if (useRepeatableRead) { for (Object key : getAllCommand.getKeys()) { CacheEntry cacheEntry = rCtx.lookupEntry(key); - if (trace && cacheEntry == null) log.tracef(t, "Missing entry for " + key); - cacheEntry.setSkipLookup(true); + if (cacheEntry == null) { + // Data was lost + if (trace) log.tracef(t, "Missing entry for " + key); + } else { + cacheEntry.setSkipLookup(true); + } } } @@ -329,12 +334,14 @@ private void removeFromContextOnRetry(InvocationContext ctx, Object key) { if (useRepeatableRead) { MVCCEntry entry = (MVCCEntry) ctx.lookupEntry(key); if (trace) { - log.tracef("This is a retry - resetting previous value in entry ", entry); + log.tracef("This is a retry - resetting previous value in entry %s", entry); + } + if (entry != null) { + entry.resetCurrentValue(); } - entry.resetCurrentValue(); } else { if (trace) { - log.tracef("This is a retry - removing looked up entry " + ctx.lookupEntry(key)); + log.tracef("This is a retry - removing looked up entry %s", ctx.lookupEntry(key)); } ctx.removeLookedUpEntry(key); } diff --git a/core/src/main/java/org/infinispan/interceptors/impl/RetryingEntryWrappingInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/RetryingEntryWrappingInterceptor.java index b7f81a4e3470..67dc03c0904c 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/RetryingEntryWrappingInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/RetryingEntryWrappingInterceptor.java @@ -3,6 +3,7 @@ import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.write.DataWriteCommand; import org.infinispan.commands.write.WriteCommand; +import org.infinispan.container.entries.MVCCEntry; import org.infinispan.context.InvocationContext; import org.infinispan.interceptors.InvocationExceptionFunction; import org.infinispan.interceptors.distribution.ConcurrentChangeException; @@ -15,6 +16,14 @@ * The commit is executed in {@link ScatteredDistributionInterceptor} * before replicating the change from primary owner. * + * When the {@link ScatteredDistributionInterceptor} throws a {@link ConcurrentChangeException} during single-key + * command processing, we know that the entry has not been committed and can safely remove the whole entry from context + * and retry. + * When the command processes multiple keys, some of the entries might be already committed. Therefore we have to keep + * the original value in a {@link org.infinispan.container.entries.RepeatableReadEntry} and for committed entries we + * only reset the value before retry (we assume that the outcome of an operation is deterministic). The non-committed + * entries are removed and re-wrapped as in the single-key case. + * * @author Radim Vansa <rvansa@redhat.com> */ public class RetryingEntryWrappingInterceptor extends EntryWrappingInterceptor { @@ -52,7 +61,15 @@ private Object handleManyWriteReturn(InvocationContext ctx, VisitableCommand com if (trace) { log.tracef(throwable, "Retrying %s after concurrent change", command); } - ctx.removeLookedUpEntries(((WriteCommand) command).getAffectedKeys()); + // Note: this is similar to what EWI does when RETRY flag is set, but we have to check entry.isCommitted() + for (Object key : ((WriteCommand) command).getAffectedKeys()) { + MVCCEntry entry = (MVCCEntry) ctx.lookupEntry(key); + if (entry.isCommitted()) { + entry.resetCurrentValue(); + } else { + ctx.removeLookedUpEntry(key); + } + } return visitCommand(ctx, command); } else { throw throwable; diff --git a/core/src/test/java/org/infinispan/distribution/BlockingInterceptor.java b/core/src/test/java/org/infinispan/distribution/BlockingInterceptor.java index 41e0990c49ab..eb483619978e 100644 --- a/core/src/test/java/org/infinispan/distribution/BlockingInterceptor.java +++ b/core/src/test/java/org/infinispan/distribution/BlockingInterceptor.java @@ -22,21 +22,25 @@ public class BlockingInterceptor extends DDAsyncInte private static final Log log = LogFactory.getLog(BlockingInterceptor.class); private final CyclicBarrier barrier; - private final Class commandClass; private final boolean blockAfter; private final boolean originLocalOnly; private final AtomicBoolean suspended = new AtomicBoolean(); - private final Predicate acceptCommand; + private final Predicate acceptCommand; public BlockingInterceptor(CyclicBarrier barrier, Class commandClass, boolean blockAfter, boolean originLocalOnly) { - this(barrier, commandClass, blockAfter, originLocalOnly, t -> true); + this(barrier, commandClass, blockAfter, originLocalOnly, t -> t != null && commandClass.equals(t.getClass())); } public BlockingInterceptor(CyclicBarrier barrier, Class commandClass, boolean blockAfter, boolean originLocalOnly, Predicate acceptCommand) { + this(barrier, blockAfter, originLocalOnly, + t -> t != null && commandClass.equals(t.getClass()) && acceptCommand.test(commandClass.cast(t))); + } + + public BlockingInterceptor(CyclicBarrier barrier, boolean blockAfter, boolean originLocalOnly, + Predicate acceptCommand) { this.barrier = barrier; - this.commandClass = commandClass; this.blockAfter = blockAfter; this.originLocalOnly = originLocalOnly; this.acceptCommand = acceptCommand; @@ -51,7 +55,7 @@ private void blockIfNeeded(InvocationContext ctx, VisitableCommand command) thro log.tracef("Suspended, not blocking command %s", command); return; } - if (commandClass.equals(command.getClass()) && (!originLocalOnly || ctx.isOriginLocal()) && acceptCommand.test(commandClass.cast(command))) { + if ((!originLocalOnly || ctx.isOriginLocal()) && acceptCommand.test(command)) { log.tracef("Command blocking %s completion of %s", blockAfter ? "after" : "before", command); // The first arrive and await is to sync with main thread barrier.await(); diff --git a/core/src/test/java/org/infinispan/distribution/rehash/StateTransferOverwritingValueTest.java b/core/src/test/java/org/infinispan/distribution/rehash/StateTransferOverwritingValueTest.java index 263f1cc6b7e4..f63ecbe3b98c 100644 --- a/core/src/test/java/org/infinispan/distribution/rehash/StateTransferOverwritingValueTest.java +++ b/core/src/test/java/org/infinispan/distribution/rehash/StateTransferOverwritingValueTest.java @@ -16,6 +16,8 @@ import org.infinispan.AdvancedCache; import org.infinispan.Cache; +import org.infinispan.commands.write.PutKeyValueCommand; +import org.infinispan.commands.write.RemoveCommand; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.distribution.BlockingInterceptor; @@ -146,8 +148,10 @@ private void doTest(final TestWriteOperation op) throws Exception { // Every PutKeyValueCommand will be blocked before committing the entry on cache1 CyclicBarrier beforeCommitCache1Barrier = new CyclicBarrier(2); + // Scattered cache mode uses only PKVC or RemoveCommands for backup BlockingInterceptor blockingInterceptor1 = new BlockingInterceptor<>(beforeCommitCache1Barrier, - op.getCommandClass(), true, false); + true, false, cacheMode.isScattered() ? t -> t instanceof PutKeyValueCommand || t instanceof RemoveCommand + : t -> t.getClass().equals(op.getCommandClass())); Class ewi = cacheMode.isScattered() ? RetryingEntryWrappingInterceptor.class : EntryWrappingInterceptor.class; assertTrue(cache1.getAsyncInterceptorChain().addInterceptorAfter(blockingInterceptor1, ewi)); diff --git a/core/src/test/java/org/infinispan/functional/AbstractFunctionalOpTest.java b/core/src/test/java/org/infinispan/functional/AbstractFunctionalOpTest.java index 5c863d1ca426..139035a09e00 100644 --- a/core/src/test/java/org/infinispan/functional/AbstractFunctionalOpTest.java +++ b/core/src/test/java/org/infinispan/functional/AbstractFunctionalOpTest.java @@ -2,7 +2,9 @@ import java.util.Collections; import java.util.List; +import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -18,6 +20,7 @@ import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.functional.TxReadOnlyKeyCommand; import org.infinispan.commands.functional.TxReadOnlyManyCommand; +import org.infinispan.commons.CacheException; import org.infinispan.functional.EntryView.ReadEntryView; import org.infinispan.functional.EntryView.WriteEntryView; import org.infinispan.functional.FunctionalMap.ReadWriteMap; @@ -36,6 +39,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.infinispan.test.Exceptions.expectExceptionNonStrict; import static org.testng.Assert.assertEquals; /** @@ -46,10 +50,14 @@ public abstract class AbstractFunctionalOpTest extends AbstractFunctionalTest { static ConcurrentMap, AtomicInteger> invocationCounts = new ConcurrentHashMap<>(); FunctionalMap.ReadOnlyMap ro; + FunctionalMap.ReadOnlyMap sro; FunctionalMap.ReadOnlyMap lro; WriteOnlyMap wo; ReadWriteMap rw; AdvancedCache cache; + WriteOnlyMap swo; + ReadWriteMap srw; + AdvancedCache scatteredCache; WriteOnlyMap lwo; ReadWriteMap lrw; List countingRequestRepositories; @@ -108,10 +116,14 @@ public void resetInvocationCount() { public void createBeforeMethod() throws Throwable { super.createBeforeMethod(); this.ro = ReadOnlyMapImpl.create(fmapD1); + this.sro = ReadOnlyMapImpl.create(fmapS1); this.lro = ReadOnlyMapImpl.create(fmapL1); this.wo = WriteOnlyMapImpl.create(fmapD1); this.rw = ReadWriteMapImpl.create(fmapD1); this.cache = cacheManagers.get(0).getCache(DIST).getAdvancedCache(); + this.swo = WriteOnlyMapImpl.create(fmapS1); + this.srw = ReadWriteMapImpl.create(fmapS1); + this.scatteredCache = cacheManagers.get(0).getCache(SCATTERED).getAdvancedCache(); this.lwo = WriteOnlyMapImpl.create(fmapL1); this.lrw = ReadWriteMapImpl.create(fmapL1); } @@ -120,7 +132,7 @@ public void createBeforeMethod() throws Throwable { protected void createCacheManagers() throws Throwable { super.createCacheManagers(); countingRequestRepositories = cacheManagers.stream().map(cm -> CountingRequestRepository.replaceDispatcher(cm)).collect(Collectors.toList()); - Stream.of(null, DIST, REPL).forEach(name -> caches(name).forEach(c -> { + Stream.of(null, DIST, REPL, SCATTERED).forEach(name -> caches(name).forEach(c -> { c.getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore(new CommandCachingInterceptor(), CallInterceptor.class); })); } @@ -134,16 +146,16 @@ protected void advanceGenerationsAndAwait(long timeout, TimeUnit timeUnit) throw } } - protected Object getKey(boolean isOwner) { + protected Object getKey(boolean isOwner, String cacheName) { Object key; if (isOwner) { // this is simple: find a key that is local to the originating node - key = getKeyForCache(0, DIST); + key = getKeyForCache(0, cacheName); } else { // this is more complicated: we need a key that is *not* local to the originating node key = IntStream.iterate(0, i -> i + 1) .mapToObj(i -> "key" + i) - .filter(k -> !cache(0, DIST).getAdvancedCache().getDistributionManager().getLocality(k).isLocal()) + .filter(k -> !cache(0, cacheName).getAdvancedCache().getDistributionManager().getLocality(k).isLocal()) .findAny() .get(); } @@ -165,6 +177,13 @@ private static void incrementInvocationCount(Class new AtomicInteger()).incrementAndGet(); } + protected void testReadOnMissingValue(K key, FunctionalMap.ReadOnlyMap ro, ReadMethod method) { + assertEquals(ro.eval(key, view -> view.find().isPresent()).join(), Boolean.FALSE); + expectExceptionNonStrict(CompletionException.class, CacheException.class, NoSuchElementException.class, () -> + method.eval(key, ro, view -> view.get()) + ); + } + enum WriteMethod { WO_EVAL(false, (key, wo, rw, read, write, clazz) -> wo.eval(key, view -> { diff --git a/core/src/test/java/org/infinispan/functional/AbstractFunctionalTest.java b/core/src/test/java/org/infinispan/functional/AbstractFunctionalTest.java index 17c3e5474d90..41163d207e66 100644 --- a/core/src/test/java/org/infinispan/functional/AbstractFunctionalTest.java +++ b/core/src/test/java/org/infinispan/functional/AbstractFunctionalTest.java @@ -11,6 +11,7 @@ abstract class AbstractFunctionalTest extends MultipleCacheManagersTest { static final String DIST = "dist"; static final String REPL = "repl"; + static final String SCATTERED = "scattered"; // Create local caches as default in a cluster of 2 int numNodes = 2; @@ -25,9 +26,13 @@ abstract class AbstractFunctionalTest extends MultipleCacheManagersTest { FunctionalMapImpl fmapD1; FunctionalMapImpl fmapD2; + // TODO: we should not create all those maps in tests where we don't use them FunctionalMapImpl fmapR1; FunctionalMapImpl fmapR2; + FunctionalMapImpl fmapS1; + FunctionalMapImpl fmapS2; + @Override protected void createCacheManagers() throws Throwable { ConfigurationBuilder localBuilder = new ConfigurationBuilder(); @@ -43,8 +48,15 @@ protected void createCacheManagers() throws Throwable { replBuilder.clustering().cacheMode(isSync ? CacheMode.REPL_SYNC : CacheMode.REPL_ASYNC); configureCache(replBuilder); cacheManagers.stream().forEach(cm -> cm.defineConfiguration(REPL, replBuilder.build())); + // Create scattered caches + if (!Boolean.TRUE.equals(transactional)) { + ConfigurationBuilder scatteredBuilder = new ConfigurationBuilder(); + scatteredBuilder.clustering().cacheMode(CacheMode.SCATTERED_SYNC); + configureCache(scatteredBuilder); + cacheManagers.stream().forEach(cm -> cm.defineConfiguration(SCATTERED, scatteredBuilder.build())); + } // Wait for cluster to form - waitForClusterToForm(DIST, REPL); + waitForClusterToForm(DIST, REPL, SCATTERED); } protected void configureCache(ConfigurationBuilder builder) { @@ -94,6 +106,8 @@ private void initMaps() { fmapD2 = FunctionalMapImpl.create(cacheManagers.get(1).getCache(DIST).getAdvancedCache()); fmapR1 = FunctionalMapImpl.create(cacheManagers.get(0).getCache(REPL).getAdvancedCache()); fmapR2 = FunctionalMapImpl.create(cacheManagers.get(1).getCache(REPL).getAdvancedCache()); + fmapS1 = FunctionalMapImpl.create(cacheManagers.get(0).getCache(SCATTERED).getAdvancedCache()); + fmapS2 = FunctionalMapImpl.create(cacheManagers.get(1).getCache(SCATTERED).getAdvancedCache()); } } diff --git a/core/src/test/java/org/infinispan/functional/FunctionalCachestoreTest.java b/core/src/test/java/org/infinispan/functional/FunctionalCachestoreTest.java index d26e12ce9d85..daaab2847f4d 100644 --- a/core/src/test/java/org/infinispan/functional/FunctionalCachestoreTest.java +++ b/core/src/test/java/org/infinispan/functional/FunctionalCachestoreTest.java @@ -34,7 +34,7 @@ protected String parameters() { @Test(dataProvider = "owningModeAndWriteMethod") public void testWriteLoad(boolean isSourceOwner, WriteMethod method) throws Exception { - Object key = getKey(isSourceOwner); + Object key = getKey(isSourceOwner, DIST); List> owners = caches(DIST).stream() .filter(cache -> cache.getAdvancedCache().getDistributionManager().getLocality(key).isLocal()) @@ -103,7 +103,7 @@ public void testWriteLoadLocal(WriteMethod method) { @Test(dataProvider = "owningModeAndReadMethod") public void testReadLoad(boolean isSourceOwner, ReadMethod method) { - Object key = getKey(isSourceOwner); + Object key = getKey(isSourceOwner, DIST); List> owners = caches(DIST).stream() .filter(cache -> cache.getAdvancedCache().getDistributionManager().getLocality(key).isLocal()) .collect(Collectors.toList()); diff --git a/core/src/test/java/org/infinispan/functional/FunctionalInMemoryTest.java b/core/src/test/java/org/infinispan/functional/FunctionalInMemoryTest.java index 16a7cb4f4ab7..a5ac9091d4bf 100644 --- a/core/src/test/java/org/infinispan/functional/FunctionalInMemoryTest.java +++ b/core/src/test/java/org/infinispan/functional/FunctionalInMemoryTest.java @@ -2,7 +2,6 @@ import static org.infinispan.test.Exceptions.assertException; import static org.infinispan.test.Exceptions.assertExceptionNonStrict; -import static org.infinispan.test.Exceptions.expectExceptionNonStrict; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -18,6 +17,7 @@ import org.infinispan.commons.CacheException; import org.infinispan.functional.impl.ReadOnlyMapImpl; import org.infinispan.remoting.RemoteException; +import org.infinispan.test.TestException; import org.testng.annotations.Test; /** @@ -32,7 +32,7 @@ public FunctionalInMemoryTest() { @Test(dataProvider = "owningModeAndWriteMethod") public void testWriteLoad(boolean isOwner, WriteMethod method) { - Object key = getKey(isOwner); + Object key = getKey(isOwner, DIST); method.eval(key, wo, rw, view -> { assertFalse(view.find().isPresent()); return null; }, @@ -84,7 +84,7 @@ public void testWriteLoadLocal(WriteMethod method) { @Test(dataProvider = "owningModeAndWriteMethod") public void testExceptionPropagation(boolean isOwner, WriteMethod method) { - Object key = getKey(isOwner); + Object key = getKey(isOwner, DIST); try { method.eval(key, wo, rw, view -> null, @@ -114,7 +114,7 @@ public void testExceptionPropagation(boolean isOwner, WriteMethod method) { @Test(dataProvider = "owningModeAndReadWrites") public void testWriteOnMissingValue(boolean isOwner, WriteMethod method) { - Object key = getKey(isOwner); + Object key = getKey(isOwner, DIST); try { method.eval(key, null, rw, view -> view.get(), @@ -134,7 +134,7 @@ public void testWriteOnMissingValue(boolean isOwner, WriteMethod method) { @Test(dataProvider = "owningModeAndReadMethod") public void testReadLoad(boolean isOwner, ReadMethod method) { - Object key = getKey(isOwner); + Object key = getKey(isOwner, DIST); assertTrue(method.eval(key, ro, view -> { assertFalse(view.find().isPresent()); return true; })); @@ -181,21 +181,11 @@ public void testReadLoadLocal(ReadMethod method) { @Test(dataProvider = "owningModeAndReadMethod") public void testReadOnMissingValue(boolean isOwner, ReadMethod method) { - testReadOnMissingValue(getKey(isOwner), ro, method); + testReadOnMissingValue(getKey(isOwner, DIST), ro, method); } @Test(dataProvider = "methods") public void testOnMissingValueLocal(ReadMethod method) { testReadOnMissingValue(0, ReadOnlyMapImpl.create(fmapL1), method); } - - private void testReadOnMissingValue(K key, FunctionalMap.ReadOnlyMap ro, ReadMethod method) { - assertEquals(ro.eval(key, view -> view.find().isPresent()).join(), Boolean.FALSE); - expectExceptionNonStrict(CompletionException.class, CacheException.class, NoSuchElementException.class, () -> - method.eval(key, ro, view -> view.get()) - ); - } - - private static class TestException extends RuntimeException { - } } diff --git a/core/src/test/java/org/infinispan/functional/FunctionalScatteredInMemoryTest.java b/core/src/test/java/org/infinispan/functional/FunctionalScatteredInMemoryTest.java new file mode 100644 index 000000000000..2d9a95d813dd --- /dev/null +++ b/core/src/test/java/org/infinispan/functional/FunctionalScatteredInMemoryTest.java @@ -0,0 +1,112 @@ +package org.infinispan.functional; + +import static org.infinispan.test.Exceptions.assertException; +import static org.infinispan.test.Exceptions.assertExceptionNonStrict; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletionException; + +import org.infinispan.commons.CacheException; +import org.infinispan.remoting.RemoteException; +import org.infinispan.scattered.Utils; +import org.infinispan.test.TestException; +import org.testng.annotations.Test; + +public class FunctionalScatteredInMemoryTest extends AbstractFunctionalOpTest { + + @Test(dataProvider = "owningModeAndWriteMethod") + public void testWrite(boolean isOwner, WriteMethod method) { + Object key = getKey(isOwner, SCATTERED); + + method.eval(key, swo, srw, + view -> { assertFalse(view.find().isPresent()); return null; }, + (view, nil) -> view.set("value"), getClass()); + + assertInvocations(1); + + caches(SCATTERED).forEach(cache -> assertEquals(cache.get(key), "value", getAddress(cache).toString())); + Utils.assertOwnershipAndNonOwnership(caches(SCATTERED), key); + + method.eval(key, swo, srw, + view -> { + assertTrue(view.find().isPresent()); + assertEquals(view.get(), "value"); + return null; + }, + (view, nil) -> {}, getClass()); + + assertInvocations(2); + } + + @Test(dataProvider = "owningModeAndWriteMethod") + public void testExceptionPropagation(boolean isOwner, WriteMethod method) { + Object key = getKey(isOwner, SCATTERED); + try { + method.eval(key, swo, srw, + view -> null, + (view, nil) -> { + throw new TestException(); + }, getClass()); + fail("Should throw CompletionException:CacheException:[RemoteException:]*TestException"); + } catch (CacheException | CompletionException e) { // catches RemoteExceptions, too + Throwable t = e; + assertException(CompletionException.class, t); + t = t.getCause(); + assertExceptionNonStrict(CacheException.class, t); + while (t.getCause() instanceof RemoteException && t != t.getCause()) { + t = t.getCause(); + } + assertException(TestException.class, t.getCause()); + } + } + + @Test(dataProvider = "owningModeAndReadWrites") + public void testWriteOnMissingValue(boolean isOwner, WriteMethod method) { + Object key = getKey(isOwner, SCATTERED); + try { + method.eval(key, null, srw, + view -> view.get(), + (view, nil) -> {}, getClass()); + fail("Should throw CompletionException:CacheException:[RemoteException:]*NoSuchElementException"); + } catch (CompletionException e) { // catches RemoteExceptions, too + Throwable t = e; + assertException(CompletionException.class, t); + t = t.getCause(); + assertExceptionNonStrict(CacheException.class, t); + while (t.getCause() instanceof RemoteException && t != t.getCause()) { + t = t.getCause(); + } + assertException(NoSuchElementException.class, t.getCause()); + } + } + + @Test(dataProvider = "owningModeAndReadMethod") + public void testReadLoad(boolean isOwner, ReadMethod method) { + Object key = getKey(isOwner, SCATTERED); + + assertTrue(method.eval(key, sro, view -> { assertFalse(view.find().isPresent()); return true; })); + + // we can't add from read-only cache, so we put manually: + cache(0, SCATTERED).put(key, "value"); + + caches(SCATTERED).forEach(cache -> assertEquals(cache.get(key), "value", getAddress(cache).toString())); + Utils.assertOwnershipAndNonOwnership(caches(SCATTERED), key); + + assertEquals(method.eval(key, sro, + view -> { + assertTrue(view.find().isPresent()); + assertEquals(view.get(), "value"); + return "OK"; + }), "OK"); + } + + @Test(dataProvider = "owningModeAndReadMethod") + public void testReadOnMissingValue(boolean isOwner, ReadMethod method) { + testReadOnMissingValue(getKey(isOwner, SCATTERED), sro, method); + } + +} diff --git a/core/src/test/java/org/infinispan/functional/FunctionalTxInMemoryTest.java b/core/src/test/java/org/infinispan/functional/FunctionalTxInMemoryTest.java index a44b7b934754..f6d1054aa777 100644 --- a/core/src/test/java/org/infinispan/functional/FunctionalTxInMemoryTest.java +++ b/core/src/test/java/org/infinispan/functional/FunctionalTxInMemoryTest.java @@ -16,7 +16,6 @@ import org.infinispan.commons.CacheException; import org.infinispan.marshall.core.MarshallableFunctions; -import org.infinispan.functional.impl.ReadOnlyMapImpl; import org.infinispan.test.TestingUtil; import org.infinispan.transaction.LockingMode; import org.infinispan.util.concurrent.IsolationLevel; @@ -81,7 +80,7 @@ public void testReadLoadsLocal(ReadMethod method) throws Exception { @Test(dataProvider = "owningModeAndReadMethod") public void testReadsAfterMods(boolean isOwner, ReadMethod method) throws Exception { - Object KEY = getKey(isOwner); + Object KEY = getKey(isOwner, DIST); cache(0, DIST).put(KEY, "a"); tm.begin(); @@ -94,7 +93,7 @@ public void testReadsAfterMods(boolean isOwner, ReadMethod method) throws Except @Test(dataProvider = "owningModeAndReadWrites") public void testReadWriteAfterMods(boolean isOwner, WriteMethod method) throws Exception { - Object KEY = getKey(isOwner); + Object KEY = getKey(isOwner, DIST); cache(0, DIST).put(KEY, "a"); tm.begin(); @@ -108,7 +107,7 @@ public void testReadWriteAfterMods(boolean isOwner, WriteMethod method) throws E } public void testNonFunctionalReadsAfterMods() throws Exception { - Object KEY = getKey(false); + Object KEY = getKey(false, DIST); cache(0, DIST).put(KEY, "a"); tm.begin(); @@ -151,7 +150,7 @@ public void testNonFunctionalReadsAfterMods() throws Exception { @Test(dataProvider = "owningModeAndReadWrites") public void testWriteModsInTxContext(boolean isOwner, WriteMethod method) throws Exception { - Object KEY = getKey(isOwner); + Object KEY = getKey(isOwner, DIST); cache(0, DIST).put(KEY, "a"); tm.begin(); @@ -180,15 +179,15 @@ private static SerializableFunction @Test(dataProvider = "owningModeAndReadMethod") public void testReadOnMissingValues(boolean isOwner, ReadMethod method) throws Exception { - testReadOnMissingValue(getKeys(isOwner, NUM_KEYS), ro, method); + testReadOnMissingValues(getKeys(isOwner, NUM_KEYS), ro, method); } @Test(dataProvider = "readMethods") public void testReadOnMissingValuesLocal(ReadMethod method) throws Exception { - testReadOnMissingValue(INT_KEYS, ReadOnlyMapImpl.create(fmapL1), method); + testReadOnMissingValues(INT_KEYS, lro, method); } - private void testReadOnMissingValue(K[] keys, FunctionalMap.ReadOnlyMap ro, ReadMethod method) throws Exception { + private void testReadOnMissingValues(K[] keys, FunctionalMap.ReadOnlyMap ro, ReadMethod method) throws Exception { tm.begin(); for (K key : keys) { Assert.assertEquals(ro.eval(key, view -> view.find().isPresent()).join(), Boolean.FALSE); diff --git a/core/src/test/java/org/infinispan/functional/FunctionalWriteSkewInMemoryTest.java b/core/src/test/java/org/infinispan/functional/FunctionalWriteSkewInMemoryTest.java index 8a974fee7c91..a4b9e41e8604 100644 --- a/core/src/test/java/org/infinispan/functional/FunctionalWriteSkewInMemoryTest.java +++ b/core/src/test/java/org/infinispan/functional/FunctionalWriteSkewInMemoryTest.java @@ -46,7 +46,7 @@ public static Object[][] readCombos() { @Test(dataProvider = "readCombos") public void testWriteSkew(boolean isOwner, ReadOp op1, ReadOp op2) throws Throwable { - Object key = getKey(isOwner); + Object key = getKey(isOwner, DIST); cache(0, DIST).put(key, "value0"); tm.begin(); diff --git a/core/src/test/java/org/infinispan/functional/stress/ReadOnlyManyCommandStressTest.java b/core/src/test/java/org/infinispan/functional/stress/ReadOnlyManyCommandStressTest.java index e71a425b2257..f2abe564e955 100644 --- a/core/src/test/java/org/infinispan/functional/stress/ReadOnlyManyCommandStressTest.java +++ b/core/src/test/java/org/infinispan/functional/stress/ReadOnlyManyCommandStressTest.java @@ -16,7 +16,7 @@ import org.testng.annotations.Test; @Test(groups = "stress", testName = "ReadOnlyManyCommandStressTest", timeOut = 15*60*1000) -@InCacheMode(CacheMode.DIST_SYNC) +@InCacheMode(CacheMode.SCATTERED_SYNC) public class ReadOnlyManyCommandStressTest extends GetAllCommandStressTest { @Override diff --git a/core/src/test/java/org/infinispan/scattered/ScatteredSyncFuncTest.java b/core/src/test/java/org/infinispan/scattered/ScatteredSyncFuncTest.java index a73f3385d8ab..afae07d0495a 100644 --- a/core/src/test/java/org/infinispan/scattered/ScatteredSyncFuncTest.java +++ b/core/src/test/java/org/infinispan/scattered/ScatteredSyncFuncTest.java @@ -3,9 +3,6 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; -import static org.testng.AssertJUnit.fail; - -import org.infinispan.commons.CacheException; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.distribution.DistSyncFuncTest; @@ -63,8 +60,7 @@ public void testCompute() { // removing from owner assertEquals(null, cache(0, cacheName).compute(otherKey, (k, v) -> "x".equals(v) ? null : "unexpected")); assertLocalValue(0, otherKey, null); - assertLocalValue(1, otherKey, null); - assertNoLocalValue(2, otherKey); + // we don't know which node became backup of the tombstone assertOwnershipAndNonOwnership(otherKey, false); // on tombstone, from owner @@ -119,7 +115,7 @@ public void testComputeIfAbsent() { cache(1, cacheName).put(key, "a"); // from non-owner, non-last writer - cache(2, cacheName).computeIfAbsent(key, k -> "b"); + assertEquals("a", cache(2, cacheName).computeIfAbsent(key, k -> "b")); assertLocalValue(0, key, "a"); assertLocalValue(1, key, "a"); assertNoLocalValue(2, key); @@ -127,7 +123,7 @@ public void testComputeIfAbsent() { assertOwnershipAndNonOwnership(key, false); // from non-owner, last writer - cache(1, cacheName).computeIfAbsent(key, k -> "c"); + assertEquals("a", cache(1, cacheName).computeIfAbsent(key, k -> "c")); assertLocalValue(0, key, "a"); assertLocalValue(1, key, "a"); assertNoLocalValue(2, key); @@ -135,7 +131,7 @@ public void testComputeIfAbsent() { assertOwnershipAndNonOwnership(key, false); // from owner - cache(2, cacheName).computeIfAbsent(key, k -> "d"); + assertEquals("a", cache(2, cacheName).computeIfAbsent(key, k -> "d")); assertLocalValue(0, key, "a"); assertLocalValue(1, key, "a"); assertNoLocalValue(2, key); @@ -178,7 +174,50 @@ public void testComputeIfAbsent() { assertLocalValue(1, otherKey, "x"); assertNoLocalValue(2, otherKey); assertOwnershipAndNonOwnership(key, false); + } + + public void testMerge() { + MagicKey key = new MagicKey(cache(0, cacheName)); + cache(1, cacheName).put(key, "a"); + + // from non-owner and non-last writer + assertEquals("ab", this.cache(2, cacheName).merge(key, "b", (o, n) -> o + n)); + assertLocalValue(0, key, "ab"); + assertLocalValue(1, key, "a"); + assertLocalValue(2, key, "ab"); + assertOwnershipAndNonOwnership(key, false); + + // from owner + assertEquals("abc", this.cache(0, cacheName).merge(key, "c", (o, n) -> o + n)); + assertLocalValue(0, key, "abc"); + // we don't know which node become backup + assertOwnershipAndNonOwnership(key, false); + // removing from non-owner + assertEquals(null, cache(1, cacheName).merge(key, "x", (o, n) -> "abc".equals(o) ? null : "unexpected")); + assertLocalValue(0, key, null); + assertLocalValue(1, key, null); + assertLocalValue(2, key, "ab"); + assertOwnershipAndNonOwnership(key, false); + + MagicKey otherKey = new MagicKey(cache(0, cacheName)); + + // on non-existent value, non-owner - should work as putIfAbsent + assertEquals("x", cache(1, cacheName).merge(otherKey, "x", (o, n) -> "unexpected")); + assertLocalValue(0, otherKey, "x"); + assertLocalValue(1, otherKey, "x"); + assertNoLocalValue(2, otherKey); + assertOwnershipAndNonOwnership(otherKey, false); + + // removing from owner + assertEquals(null, cache(0, cacheName).merge(otherKey, "y", (o, n) -> "x".equals(o) ? null : "unexpected")); + assertLocalValue(0, otherKey, null); + assertOwnershipAndNonOwnership(otherKey, false); + + // on tombstone, from owner + assertEquals("z", cache(0, cacheName).merge(otherKey, "z", (o, n) -> "unexpected")); + assertLocalValue(0, otherKey, "z"); + assertOwnershipAndNonOwnership(otherKey, false); } protected void assertNoLocalValue(int node, MagicKey key) { @@ -191,19 +230,4 @@ protected void assertLocalValue(int node, MagicKey key, String expectedValue) { assertNotNull(ice); assertEquals(expectedValue, ice.getValue()); } - - @Override - public void testMergeFromNonOwner() { - // TODO : Add support for ScatteredCaches in functional commands : https://issues.jboss.org/browse/ISPN-8078 - RuntimeException mergeException = new RuntimeException("hi there"); - - try { - getFirstNonOwner("k1").merge("k1", "ex", (k, v) -> { - throw mergeException; - }); - fail("Exception was not thrown"); - } catch (CacheException ex) { - assertEquals(UnsupportedOperationException.class, ex.getCause().getClass()); - } - } }