Skip to content

Commit

Permalink
ISPN-9332 REPL local iteration optimization cannot be used when store
Browse files Browse the repository at this point in the history
has write behind

* Also fixed async store segment filtering
* Also fixed clear losing later updates
  • Loading branch information
wburns authored and tristantarrant committed Sep 13, 2018
1 parent f0790f6 commit d372489
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 10 deletions.
Expand Up @@ -37,20 +37,24 @@ public Publisher<K> publishKeys(Predicate<? super K> filter) {
return advancedLoader().publishKeys(filter); return advancedLoader().publishKeys(filter);
} }


Publisher<K> modPublisher = Flowable.fromIterable(modificationMap.entrySet()) Flowable<K> modFlowable = Flowable.fromIterable(modificationMap.entrySet())
// REMOVE we ignore, LIST and CLEAR aren't possible // REMOVE we ignore, LIST and CLEAR aren't possible
.filter(me -> Modification.Type.STORE == me.getValue().getType()) .filter(me -> Modification.Type.STORE == me.getValue().getType())
.map(e -> (K) e.getKey()); .map(e -> (K) e.getKey());


if (filter != null) {
modFlowable = modFlowable.filter(filter::test);
}

if (hadClear.get() == Boolean.TRUE) { if (hadClear.get() == Boolean.TRUE) {
return modPublisher; return modFlowable;
} }
if (filter == null) { if (filter == null) {
filter = k -> !modificationMap.containsKey(k); filter = k -> !modificationMap.containsKey(k);
} else { } else {
filter = filter.and(k -> !modificationMap.containsKey(k)); filter = filter.and(k -> !modificationMap.containsKey(k));
} }
return Flowable.merge(modPublisher, advancedLoader().publishKeys(filter)); return Flowable.merge(modFlowable, advancedLoader().publishKeys(filter));
} }


@Override @Override
Expand All @@ -61,24 +65,29 @@ public Publisher<MarshalledEntry<K, V>> publishEntries(Predicate<? super K> filt
if (modificationMap.isEmpty()) { if (modificationMap.isEmpty()) {
return advancedLoader().publishEntries(filter, fetchValue, fetchMetadata); return advancedLoader().publishEntries(filter, fetchValue, fetchMetadata);
} }
Publisher<MarshalledEntry<K, V>> modPublisher = Flowable.fromIterable(modificationMap.entrySet()) Flowable<MarshalledEntry<K, V>> modFlowable = Flowable.fromIterable(modificationMap.entrySet())
.map(Map.Entry::getValue) .map(Map.Entry::getValue)
// REMOVE we ignore, LIST and CLEAR aren't possible // REMOVE we ignore, LIST and CLEAR aren't possible
.filter(e -> Modification.Type.STORE == e.getType()) .filter(e -> Modification.Type.STORE == e.getType())
.cast(Store.class) .cast(Store.class)
.map(Store::getStoredValue); .map(Store::getStoredValue);


if (filter != null) {
Predicate<? super K> modificationFilter = filter;
modFlowable = modFlowable.filter(e -> modificationFilter.test(e.getKey()));
}

// If we encountered a clear just ignore the actual store // If we encountered a clear just ignore the actual store
if (hadClear.get() == Boolean.TRUE) { if (hadClear.get() == Boolean.TRUE) {
return modPublisher; return modFlowable;
} }
if (filter == null) { if (filter == null) {
filter = k -> !modificationMap.containsKey(k); filter = k -> !modificationMap.containsKey(k);
} else { } else {
// Only use entry if it wasn't in modification map and passes filter // Only use entry if it wasn't in modification map and passes filter
filter = filter.and(k -> !modificationMap.containsKey(k)); filter = filter.and(k -> !modificationMap.containsKey(k));
} }
return Flowable.merge(modPublisher, advancedLoader().publishEntries(filter, fetchValue, fetchMetadata)); return Flowable.merge(modFlowable, advancedLoader().publishEntries(filter, fetchValue, fetchMetadata));
} }


@Override @Override
Expand Down
Expand Up @@ -70,11 +70,12 @@ else if (state.clear)
Map<Object, Modification> flattenModifications(ByRef<Boolean> containsClear) { Map<Object, Modification> flattenModifications(ByRef<Boolean> containsClear) {
Map<Object, Modification> map = new HashMap<>(); Map<Object, Modification> map = new HashMap<>();
for (State state = this; state != null; state = state.next) { for (State state = this; state != null; state = state.next) {
// Make sure to add these before checking clear - as these are write operations done after the clear
state.modifications.forEach(map::putIfAbsent);
if (state.clear) { if (state.clear) {
containsClear.set(Boolean.TRUE); containsClear.set(Boolean.TRUE);
break; break;
} }
state.modifications.forEach(map::putIfAbsent);
} }
return map; return map;
} }
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.infinispan.commons.util.IntSet; import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets; import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.IteratorMapper; import org.infinispan.commons.util.IteratorMapper;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.distribution.ch.ConsistentHash;
Expand Down Expand Up @@ -93,6 +94,8 @@ public class DistributedCacheStream<Original, R> extends AbstractCacheStream<Ori


private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass()); private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());


private final boolean writeBehind;

// This is a hack to allow for cast to work properly, since Java doesn't work as well with nested generics // This is a hack to allow for cast to work properly, since Java doesn't work as well with nested generics
protected static <R> Supplier<CacheStream<R>> supplierStreamCast(Supplier supplier) { protected static <R> Supplier<CacheStream<R>> supplierStreamCast(Supplier supplier) {
return supplier; return supplier;
Expand All @@ -118,6 +121,9 @@ public DistributedCacheStream(Address localAddress, boolean parallel, Distributi
int distributedBatchSize, Executor executor, ComponentRegistry registry, Function<? super Original, ?> toKeyFunction) { int distributedBatchSize, Executor executor, ComponentRegistry registry, Function<? super Original, ?> toKeyFunction) {
super(localAddress, parallel, dm, supplierStreamCast(supplier), csm, includeLoader, distributedBatchSize, super(localAddress, parallel, dm, supplierStreamCast(supplier), csm, includeLoader, distributedBatchSize,
executor, registry, toKeyFunction); executor, registry, toKeyFunction);

Configuration configuration = registry.getComponent(Configuration.class);
writeBehind = configuration.persistence().usingAsyncStore();
} }


/** /**
Expand All @@ -127,6 +133,9 @@ public DistributedCacheStream(Address localAddress, boolean parallel, Distributi
*/ */
protected DistributedCacheStream(AbstractCacheStream other) { protected DistributedCacheStream(AbstractCacheStream other) {
super(other); super(other);

Configuration configuration = registry.getComponent(Configuration.class);
writeBehind = configuration.persistence().usingAsyncStore();
} }


@Override @Override
Expand Down Expand Up @@ -629,10 +638,15 @@ <S> CloseableIterator<S> nonRehashRemoteIterator(ConsistentHash ch, IntSet segme


if (ch.getMembers().contains(localAddress)) { if (ch.getMembers().contains(localAddress)) {
IntSet ownedSegments = IntSets.from(ch.getSegmentsForOwner(localAddress)); IntSet ownedSegments = IntSets.from(ch.getSegmentsForOwner(localAddress));
if (segmentsToFilter == null) { if (writeBehind) {
stayLocal = ownedSegments.size() == ch.getNumSegments(); // When write behind is enabled - we can't do stay local optimization
stayLocal = false;
} else { } else {
stayLocal = ownedSegments.containsAll(segmentsToFilter); if (segmentsToFilter == null) {
stayLocal = ownedSegments.size() == ch.getNumSegments();
} else {
stayLocal = ownedSegments.containsAll(segmentsToFilter);
}
} }


Publisher<S> innerPublisher = localPublisher(segmentsToFilter, ch, Publisher<S> innerPublisher = localPublisher(segmentsToFilter, ch,
Expand Down
@@ -0,0 +1,127 @@
package org.infinispan.persistence.support;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.test.MultipleCacheManagersTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* @author wburns
* @since 9.4
*/
@Test(groups = "functional", testName = "persistence.support.ReplAsyncStoreTest")
public class ReplAsyncStoreTest extends MultipleCacheManagersTest {
protected final String CACHE_NAME = "testCache";

private boolean shared;

ReplAsyncStoreTest shared(boolean shared) {
this.shared = shared;
return this;
}

enum Op {
SIZE {
@Override
void perform(MagicKey key, Cache<MagicKey, String> cache) {
assertEquals(1, cache.size());
}
},
KEY_ITERATOR {
@Override
void perform(MagicKey key, Cache<MagicKey, String> cache) {
Iterator iterator = cache.keySet().iterator();
assertTrue(iterator.hasNext());
assertEquals(key, iterator.next());
assertFalse(iterator.hasNext());
}
},
ENTRY_COLLECT {
@Override
void perform(MagicKey key, Cache<MagicKey, String> cache) {
List<Map.Entry<MagicKey, String>> list = cache.entrySet().stream().collect(() -> Collectors.toList());
assertEquals(1, list.size());
assertEquals(key, list.get(0).getKey());
}
}
;

abstract void perform(MagicKey key, Cache<MagicKey, String> cache);
}

@Override
public Object[] factory() {
return new Object[] {
new ReplAsyncStoreTest().shared(false),
new ReplAsyncStoreTest().shared(true),
};
}

@Override
protected String[] parameterNames() {
return new String[] { "shared" };
}

@Override
protected Object[] parameterValues() {
return new Object[] { shared };
}

@Override
protected void createCacheManagers() throws Throwable {
ConfigurationBuilder cfg = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);

cfg.persistence()
.addStore(DummyInMemoryStoreConfigurationBuilder.class)
.slow(true)
.storeName(shared ? ReplAsyncStoreTest.class.getName() : null)
.shared(shared)
.async().enable();

createClusteredCaches(3, CACHE_NAME, cfg);
waitForClusterToForm(CACHE_NAME);
}

@DataProvider(name = "async-ops")
public Object[][] asyncOperationProvider() {
// Now smash all those ops with a true and false for using primary
return Stream.of(Op.values()).flatMap(op ->
Stream.of(true, false).map(bool -> new Object[] { bool, op }))
.toArray(Object[][]::new);
}

@Test(dataProvider = "async-ops")
public void testOperationAfterInsertAndEvict(boolean primary, Op consumer) {
Cache<MagicKey, String> primaryOwner = cache(0, CACHE_NAME);
MagicKey key = getKeyForCache(primaryOwner);
// The actual store write is delayed 100 ms
primaryOwner.put(key, "some-value");

Cache<MagicKey, String> cacheToUse;

if (primary) {
cacheToUse = primaryOwner;
} else {
cacheToUse = cache(1, CACHE_NAME);
}

// This messes up some things - so evict so it is only in store (which shouldn't be written yet)
cacheToUse.evict(key);

consumer.perform(key, cacheToUse);
}
}

0 comments on commit d372489

Please sign in to comment.