Skip to content

Commit

Permalink
ISPN-8865 Move AdvancedCacheLoader over to using Publisher instead of
Browse files Browse the repository at this point in the history
process

* Convert LuceneCacheLoader
  • Loading branch information
wburns authored and danberindei committed May 11, 2018
1 parent 30c335b commit a8d5c95
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 44 deletions.
Expand Up @@ -63,7 +63,7 @@ protected DirectoryLoaderAdaptor(final Directory directory, String indexName, in
* @param entriesCollector loaded entries are collected in this set
* @param maxEntries to limit amount of entries loaded
*/
protected void loadAllEntries(final HashSet<MarshalledEntry> entriesCollector, final int maxEntries, StreamingMarshaller marshaller) {
protected <K, V> void loadAllEntries(final Set<MarshalledEntry<K, V>> entriesCollector, final int maxEntries, StreamingMarshaller marshaller) {
int existingElements = entriesCollector.size();
int toLoadElements = maxEntries - existingElements;
if (toLoadElements <= 0) {
Expand Down
Expand Up @@ -4,24 +4,25 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

import org.apache.lucene.store.FSDirectory;
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.executors.ExecutorAllCompletionService;
import org.infinispan.filter.KeyFilter;
import org.infinispan.lucene.IndexScopedKey;
import org.infinispan.lucene.cacheloader.configuration.LuceneLoaderConfiguration;
import org.infinispan.lucene.logging.Log;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.TaskContextImpl;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

/**
* A CacheLoader meant to load Lucene index(es) from filesystem based Lucene index(es).
Expand All @@ -36,7 +37,7 @@
* @since 5.2
*/
@ConfiguredBy(LuceneLoaderConfiguration.class)
public class LuceneCacheLoader implements AdvancedCacheLoader {
public class LuceneCacheLoader<K, V> implements AdvancedCacheLoader<K, V> {

private static final Log log = LogFactory.getLog(LuceneCacheLoader.class, Log.class);

Expand Down Expand Up @@ -91,38 +92,22 @@ public boolean contains(final Object key) {
}

@Override
public void process(final KeyFilter filter, final CacheLoaderTask task, Executor executor, boolean fetchValue, boolean fetchMetadata) {
scanForUnknownDirectories();
ExecutorAllCompletionService eacs = new ExecutorAllCompletionService(executor);

final TaskContextImpl taskContext = new TaskContextImpl();
for (final DirectoryLoaderAdaptor dir : openDirectories.values()) {
eacs.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
final HashSet<MarshalledEntry> allInternalEntries = new HashSet<>();
dir.loadAllEntries(allInternalEntries, Integer.MAX_VALUE, ctx.getMarshaller());
for (MarshalledEntry me : allInternalEntries) {
if (taskContext.isStopped())
break;
if (filter == null || filter.accept(me.getKey())) {
task.processEntry(me, taskContext);
}
}
return null;
}
catch (Exception e) {
log.errorExecutingParallelStoreTask(e);
throw e;
}
}
});
}
eacs.waitUntilAllCompleted();
if (eacs.isExceptionThrown()) {
throw new PersistenceException("Execution exception!", eacs.getFirstException());
}
public Publisher<MarshalledEntry<K, V>> publishEntries(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
return Flowable.defer(() -> {
// Make sure that we update directories before we start iterating upon directories
scanForUnknownDirectories();
return Flowable.fromIterable(openDirectories.values());
})
// We parallelize this since the loading below is blocking
.parallel()
.runOn(Schedulers.from(ctx.getExecutor()))
.flatMap(dir -> {
final Set<MarshalledEntry<K, V>> allInternalEntries = new HashSet<>();
dir.loadAllEntries(allInternalEntries, Integer.MAX_VALUE, ctx.getMarshaller());
return Flowable.fromIterable(allInternalEntries);
})
.filter(me -> filter == null || filter.test(me.getKey()))
.sequential();
}

@Override
Expand All @@ -132,7 +117,7 @@ public int size() {

/**
* There might be Directories we didn't store yet in the openDirectories Map.
* Make sure they are all initialized before serving methods such as {@link #process(KeyFilter, org.infinispan.persistence.spi.AdvancedCacheLoader.CacheLoaderTask, java.util.concurrent.Executor, boolean, boolean)}
* Make sure they are all initialized before serving methods such as {@link #publishEntries(Predicate, boolean, boolean)}
*/
private void scanForUnknownDirectories() {
File[] filesInRoot = rootDirectory.listFiles();
Expand Down
Expand Up @@ -12,7 +12,6 @@
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.lucene.ChunkCacheKey;
import org.infinispan.lucene.FileCacheKey;
import org.infinispan.lucene.FileListCacheKey;
Expand Down Expand Up @@ -97,7 +96,7 @@ public void testLoadAllKeysWithExclusion() throws Exception {
exclusionSet.add(key);
}

keyList = PersistenceUtil.toKeySet(cacheLoader, new CollectionKeyFilter(exclusionSet));
keyList = PersistenceUtil.toKeySet(cacheLoader, k -> !exclusionSet.contains(k));

AssertJUnit.assertEquals((initialCount - fileNamesFromIndexDir.length), keyList.size());

Expand Down Expand Up @@ -252,7 +251,7 @@ public void testLoadAllKeysWithExclusionOfRootKey() throws Exception {
HashSet exclusionSet = new HashSet();
exclusionSet.add(new FileListCacheKey(indexName, segmentId));

keySet = PersistenceUtil.toKeySet(cacheLoader, new CollectionKeyFilter(exclusionSet));
keySet = PersistenceUtil.toKeySet(cacheLoader, k -> !exclusionSet.contains(k));
String[] fileNamesArr = TestHelper.getFileNamesFromDir(rootDir, indexName);
AssertJUnit.assertEquals((initialCount - 1), keySet.size());

Expand All @@ -275,7 +274,7 @@ public void testLoadAllKeysWithChunkExclusion() throws Exception {
Set keyList = PersistenceUtil.toKeySet(cacheLoader, null);
checkIfExists(keyList, exclusionSet, true, false);

keyList = PersistenceUtil.toKeySet(cacheLoader, new CollectionKeyFilter(exclusionSet));
keyList = PersistenceUtil.toKeySet(cacheLoader, k -> !exclusionSet.contains(k));
checkIfExists(keyList, exclusionSet, false, true);
}

Expand Down

0 comments on commit a8d5c95

Please sign in to comment.