Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14.0.x] ISPN-15758 Publisher should fetch data from remote nodes #12062

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public static boolean containsAny(long bitSet, long testBitSet) {
return (bitSet & testBitSet) != 0;
}

public static boolean noneOf(long bitSet, long testBitSet) {
return (bitSet & testBitSet) == 0;
}

public static int bitSetSize(long bitSet) {
return Long.bitCount(bitSet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private <I, R> void startKeyPublisher(boolean parallelPublisher, IntSet segments
}

if (localKeys != null) {
DeliveryGuarantee guarantee = deliveryToUse(null, deliveryGuarantee);
DeliveryGuarantee guarantee = deliveryToUse(null, deliveryGuarantee, explicitFlags);
CompletionStage<PublisherResult<R>> localStage = composedType.localInvocation(parallelPublisher, null,
localKeys, null, explicitFlags, guarantee, transformer, finalizer);

Expand Down Expand Up @@ -331,7 +331,7 @@ private <I, R> void startSegmentPublisher(boolean parallelPublisher, IntSet segm
FlowableProcessor<R> flowableProcessor) {
LocalizedCacheTopology topology = distributionManager.getCacheTopology();
Address localAddress = topology.getLocalAddress();
Map<Address, IntSet> targets = determineSegmentTargets(topology, segments, localAddress);
Map<Address, IntSet> targets = determineSegmentTargets(topology, segments, localAddress, explicitFlags);

int targetSize = targets.size();

Expand Down Expand Up @@ -376,7 +376,7 @@ private <I, R> void startSegmentPublisher(boolean parallelPublisher, IntSet segm
}

if (localSegments != null) {
DeliveryGuarantee guarantee = deliveryToUse(null, deliveryGuarantee);
DeliveryGuarantee guarantee = deliveryToUse(null, deliveryGuarantee, explicitFlags);
CompletionStage<PublisherResult<R>> localStage = composedType.localInvocation(parallelPublisher, localSegments,
null, keysToExcludeByAddress.get(localAddress), explicitFlags, guarantee, transformer, finalizer);

Expand Down Expand Up @@ -668,8 +668,8 @@ protected PublisherResult<R> addException(Address sender, Exception exception) {
}
}

private Map<Address, IntSet> determineSegmentTargets(LocalizedCacheTopology topology, IntSet segments, Address localAddress) {
if ((sharedStore || replicatedCache) && !writeBehindShared) {
private Map<Address, IntSet> determineSegmentTargets(LocalizedCacheTopology topology, IntSet segments, Address localAddress, long explicitFlags) {
if (skipRemoteInvocation(explicitFlags)) {
// A shared store without write behind will have all values available on all nodes, so just do local lookup
var map = new HashMap<Address, IntSet>();
map.put(localAddress, segments == null ? IntSets.immutableRangeSet(maxSegment) : segments);
Expand Down Expand Up @@ -1013,7 +1013,7 @@ private <E> Flowable<E> getValuesFlowable(BiFunction<InnerPublisherSubscription.
int currentTopology = topology.getTopologyId();
this.currentTopology = currentTopology;
Address localAddress = rpcManager.getAddress();
Map<Address, IntSet> targets = determineSegmentTargets(topology, segmentsToComplete, localAddress);
Map<Address, IntSet> targets = determineSegmentTargets(topology, segmentsToComplete, localAddress, publisher.explicitFlags);
if (previousTopology != -1 && previousTopology == currentTopology || targets.isEmpty()) {
int nextTopology = currentTopology + 1;
if (log.isTraceEnabled()) {
Expand Down Expand Up @@ -1113,7 +1113,7 @@ CompletionStage<PublisherResponse> sendInitialCommand(Address target, IntSet seg
target, segments);
}
boolean local = target == rpcManager.getAddress();
InitialPublisherCommand cmd = publisher.buildInitialCommand(target, requestId, segments, excludedKeys, batchSize,
InitialPublisherCommand cmd = publisher.buildInitialCommand(local ? null : target, requestId, segments, excludedKeys, batchSize,
local && useContext.getAndSet(false));
if (cmd == null) {
return CompletableFuture.completedFuture(PublisherResponse.emptyResponse(segments, null));
Expand Down Expand Up @@ -1333,7 +1333,7 @@ InitialPublisherCommand buildInitialCommand(Address target, String requestId, In
functionToUse = transformer;
}

DeliveryGuarantee guarantee = deliveryToUse(target, deliveryGuarantee);
DeliveryGuarantee guarantee = deliveryToUse(target, deliveryGuarantee, explicitFlags);

return commandsFactory.buildInitialPublisherCommand(requestId, guarantee,
batchSize, segments, keysToUse, excludedKeys, explicitFlags, composedType.isEntry(), shouldTrackKeys,
Expand Down Expand Up @@ -1368,16 +1368,32 @@ InitialPublisherCommand buildInitialCommand(Address target, String requestId, In
functionToUse = transformer;
}

DeliveryGuarantee guarantee = deliveryToUse(target, deliveryGuarantee);
DeliveryGuarantee guarantee = deliveryToUse(target, deliveryGuarantee, explicitFlags);

return commandsFactory.buildInitialPublisherCommand(requestId, guarantee,
batchSize, segments, null, excludedKeys, explicitFlags, composedType.isEntry(), shouldTrackKeys, functionToUse);
}
}

private DeliveryGuarantee deliveryToUse(Address target, DeliveryGuarantee desiredGuarantee) {
private DeliveryGuarantee deliveryToUse(Address target, DeliveryGuarantee desiredGuarantee, long explicitFlags) {
// When the target is the local node and we have a shared store that doesn't have write behind we don't
// need any special delivery guarantee as our store will hold all entries
return target == null && ((sharedStore || replicatedCache) && !writeBehindShared) ? DeliveryGuarantee.AT_MOST_ONCE : desiredGuarantee;
return target == null && skipRemoteInvocation(explicitFlags) ? DeliveryGuarantee.AT_MOST_ONCE : desiredGuarantee;
}


/**
* Returns true if the publisher can do its thing without contacting the other nodes.
* <p>
* Replicated caches and Shared stores have all data locally. We do not need RPCs.
* <p>
* Consider {@link org.infinispan.context.Flag#SKIP_CACHE_LOAD}.
*
* @param explicitFlags The flags used by the command.
* @return {@code true} if it can perform everything locally.
*/
private boolean skipRemoteInvocation(long explicitFlags) {
// write behind may lose some data in remote nodes queues.
return ((sharedStore && EnumUtil.noneOf(explicitFlags, FlagBitSets.SKIP_CACHE_LOAD)) || replicatedCache) && !writeBehindShared;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.stream.IntStream;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
Expand Down Expand Up @@ -119,7 +123,43 @@ public void testSize() {
assertStoreStatInvocationEquals(cache0, "size", 1);
}

private void assertStoreStatInvocationEquals(Cache<?, ?> cache, String invocationName, int invocationCount) {
public void testIterationWithSkipCacheLoad(Method method) {
// store 10 entries
IntStream.range(0, 10).forEach(i -> cache(0).put(TestingUtil.k(method, i), TestingUtil.v(method, i)));

// evict the last 5 entries from all caches
IntStream.range(5, 10).forEach(i -> caches().forEach(c -> c.evict(TestingUtil.k(method, i))));

for (var store : TestingUtil.cachestores(caches())) {
assertEquals(10, store.size());
}

assertIterationData(method, this.<String, String>cache(0).getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD), 5);
assertIterationData(method, cache(0), 10);
}

private void assertIterationData(Method method, Cache<String, String> cache, int expectedSize) {
assertEquals(expectedSize, cache.size());

var entries = new HashMap<>();
for (var e : cache.entrySet()) {
entries.put(e.getKey(), e.getValue());
}
assertEquals("Wrong size: " + entries, expectedSize, entries.size());
IntStream.range(0, expectedSize).forEach(i -> assertEquals("Wrong key " + i, TestingUtil.v(method, i), entries.get(TestingUtil.k(method, i))));

var keys = new HashSet<>(cache.keySet());

assertEquals("Wrong size: " + keys, expectedSize, keys.size());
IntStream.range(0, expectedSize).forEach(i -> assertTrue("Wrong key " + i, keys.contains(TestingUtil.k(method, i))));

var values = new HashSet<>(cache.values());

assertEquals("Wrong size: " + values, expectedSize, values.size());
IntStream.range(0, expectedSize).forEach(i -> assertTrue("Wrong value " + i, values.contains(TestingUtil.v(method, i))));
}

private void assertStoreStatInvocationEquals(Cache<String, String> cache, String invocationName, int invocationCount) {
DummyInMemoryStore dims = TestingUtil.getFirstStore(cache);
assertEquals(invocationCount, dims.stats().get(invocationName).intValue());
}
Expand Down