Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ISPN-4621 MassIndexer does not work on replicated cache (with local i…
…ndex)
  • Loading branch information
Gustavo Fernandes authored and anistor committed Jan 15, 2015
1 parent bbf1419 commit 63539d6
Show file tree
Hide file tree
Showing 23 changed files with 991 additions and 209 deletions.
Expand Up @@ -55,7 +55,8 @@
import org.infinispan.query.impl.externalizers.LuceneTermQueryExternalizer; import org.infinispan.query.impl.externalizers.LuceneTermQueryExternalizer;
import org.infinispan.query.impl.externalizers.LuceneTopDocsExternalizer; import org.infinispan.query.impl.externalizers.LuceneTopDocsExternalizer;
import org.infinispan.query.impl.externalizers.LuceneTopFieldDocsExternalizer; import org.infinispan.query.impl.externalizers.LuceneTopFieldDocsExternalizer;
import org.infinispan.query.impl.massindex.MapReduceMassIndexer; import org.infinispan.query.impl.massindex.DistributedExecutorMassIndexer;
import org.infinispan.query.impl.massindex.IndexWorker;
import org.infinispan.query.logging.Log; import org.infinispan.query.logging.Log;
import org.infinispan.query.spi.ProgrammaticSearchMappingProvider; import org.infinispan.query.spi.ProgrammaticSearchMappingProvider;
import org.infinispan.transaction.LockingMode; import org.infinispan.transaction.LockingMode;
Expand Down Expand Up @@ -224,7 +225,7 @@ private void registerQueryMBeans(AdvancedCache cache,
.toManageableComponentMetadata(); .toManageableComponentMetadata();
try { try {
// TODO: MassIndexer should be some kind of query cache component? // TODO: MassIndexer should be some kind of query cache component?
MapReduceMassIndexer maxIndexer = new MapReduceMassIndexer(cache, sf); DistributedExecutorMassIndexer maxIndexer = new DistributedExecutorMassIndexer(cache, sf);
ResourceDMBean mbean = new ResourceDMBean(maxIndexer, massIndexerCompMetadata); ResourceDMBean mbean = new ResourceDMBean(maxIndexer, massIndexerCompMetadata);
ObjectName massIndexerObjName = new ObjectName(jmxDomain + ":" ObjectName massIndexerObjName = new ObjectName(jmxDomain + ":"
+ queryGroupName + ",component=" + massIndexerCompMetadata.getJmxObjectName()); + queryGroupName + ",component=" + massIndexerCompMetadata.getJmxObjectName());
Expand Down Expand Up @@ -343,6 +344,7 @@ public void cacheManagerStarting(GlobalComponentRegistry gcr, GlobalConfiguratio
externalizerMap.put(ExternalizerIds.LUCENE_SCORE_DOC, new LuceneScoreDocExternalizer()); externalizerMap.put(ExternalizerIds.LUCENE_SCORE_DOC, new LuceneScoreDocExternalizer());
externalizerMap.put(ExternalizerIds.LUCENE_TOPFIELDDOCS, new LuceneTopFieldDocsExternalizer()); externalizerMap.put(ExternalizerIds.LUCENE_TOPFIELDDOCS, new LuceneTopFieldDocsExternalizer());
externalizerMap.put(ExternalizerIds.LUCENE_QUERY_MATCH_ALL, new LuceneMatchAllQueryExternalizer()); externalizerMap.put(ExternalizerIds.LUCENE_QUERY_MATCH_ALL, new LuceneMatchAllQueryExternalizer());
externalizerMap.put(ExternalizerIds.INDEX_WORKER, new IndexWorker.Externalizer());
} }


} }
Expand Up @@ -20,7 +20,7 @@
import org.infinispan.query.dsl.embedded.LuceneQuery; import org.infinispan.query.dsl.embedded.LuceneQuery;
import org.infinispan.query.dsl.embedded.impl.EmbeddedLuceneQueryFactory; import org.infinispan.query.dsl.embedded.impl.EmbeddedLuceneQueryFactory;
import org.infinispan.query.dsl.embedded.impl.QueryCache; import org.infinispan.query.dsl.embedded.impl.QueryCache;
import org.infinispan.query.impl.massindex.MapReduceMassIndexer; import org.infinispan.query.impl.massindex.DistributedExecutorMassIndexer;
import org.infinispan.query.spi.SearchManagerImplementor; import org.infinispan.query.spi.SearchManagerImplementor;


/** /**
Expand Down Expand Up @@ -121,7 +121,7 @@ public SearchIntegrator getSearchFactory() {
@Override @Override
public MassIndexer getMassIndexer() { public MassIndexer getMassIndexer() {
// TODO: Should a new instance be created every time? // TODO: Should a new instance be created every time?
return new MapReduceMassIndexer(cache, searchFactory); return new DistributedExecutorMassIndexer(cache, searchFactory);
} }


@Override @Override
Expand Down
Expand Up @@ -36,4 +36,7 @@ public interface ExternalizerIds {
Integer FILTER_RESULT = 1611; Integer FILTER_RESULT = 1611;


Integer LUCENE_QUERY_MATCH_ALL = 1612; Integer LUCENE_QUERY_MATCH_ALL = 1612;

Integer INDEX_WORKER = 1613;

} }
@@ -0,0 +1,95 @@
package org.infinispan.query.impl.massindex;

import org.hibernate.search.engine.spi.EntityIndexBinding;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.SearchIntegrator;
import org.infinispan.AdvancedCache;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.distexec.DistributedTask;
import org.infinispan.distexec.DistributedTaskBuilder;
import org.infinispan.query.MassIndexer;
import org.infinispan.query.indexmanager.InfinispanIndexManager;
import org.infinispan.query.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @author gustavonalle
* @since 7.1
*/
public class DistributedExecutorMassIndexer implements MassIndexer {

private static final Log LOG = LogFactory.getLog(DistributedExecutorMassIndexer.class, Log.class);

private final AdvancedCache cache;
private final SearchIntegrator searchIntegrator;
private final IndexUpdater indexUpdater;

public DistributedExecutorMassIndexer(AdvancedCache cache, SearchIntegrator searchIntegrator) {
this.cache = cache;
this.searchIntegrator = searchIntegrator;
this.indexUpdater = new IndexUpdater(cache);
}

@Override
@SuppressWarnings("unchecked")
public void start() {
DistributedExecutorService executor = new DefaultExecutorService(cache);
ArrayList<Future<Void>> futures = new ArrayList<>();
Deque<Class<?>> toFlush = new LinkedList<>();
boolean replicated = cache.getAdvancedCache().getCacheConfiguration().clustering().cacheMode().isReplicated();

for (Class<?> indexedType : searchIntegrator.getIndexedTypes()) {
EntityIndexBinding indexBinding = searchIntegrator.getIndexBinding(indexedType);
IndexManager[] indexManagers = indexBinding.getIndexManagers();
for (IndexManager indexManager : indexManagers) {
String indexName = indexManager.getIndexName();
IndexWorker indexWork;
boolean shared = isShared(indexManager);
if (shared) {
indexUpdater.purge(indexedType);
indexWork = new IndexWorker(indexedType, indexName, false);
toFlush.add(indexedType);
} else {
indexWork = new IndexWorker(indexedType, indexName, true);
}
DistributedTaskBuilder builder = executor.createDistributedTaskBuilder(indexWork).timeout(0, TimeUnit.NANOSECONDS);
DistributedTask task = builder.build();
if (replicated && shared) {
futures.add(executor.submit(task));
} else {
futures.addAll(executor.submitEverywhere(task));
}
}
}
waitForAll(futures);

for (Class<?> type : toFlush) {
indexUpdater.flush(type);
}
}

private void waitForAll(ArrayList<Future<Void>> futures) {
for (Future f : futures) {
try {
f.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.errorExecutingMassIndexer(e);
}
}
}

private boolean isShared(IndexManager indexManager) {
return indexManager instanceof InfinispanIndexManager;
}

}
@@ -0,0 +1,86 @@
package org.infinispan.query.impl.massindex;

import org.hibernate.search.backend.UpdateLuceneWork;
import org.hibernate.search.backend.impl.batch.DefaultBatchBackend;
import org.hibernate.search.bridge.spi.ConversionContext;
import org.hibernate.search.bridge.util.impl.ContextualExceptionBridgeHelper;
import org.hibernate.search.engine.spi.DocumentBuilderIndexedEntity;
import org.hibernate.search.engine.spi.EntityIndexBinding;
import org.hibernate.search.spi.DefaultInstanceInitializer;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.SearchIntegrator;
import org.infinispan.Cache;
import org.infinispan.commons.util.Util;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.impl.ComponentRegistryUtils;
import org.infinispan.query.logging.Log;
import org.infinispan.util.logging.LogFactory;

/**
* Handle batch updates to an index.
*
* @author gustavonalle
* @since 7.1
*/
public class IndexUpdater {

private static final Log LOG = LogFactory.getLog(IndexUpdater.class, Log.class);

private final SearchIntegrator searchIntegrator;
private final KeyTransformationHandler keyTransformationHandler;
private final DefaultBatchBackend defaultBatchBackend;
private final QueryInterceptor queryInterceptor;

public IndexUpdater(Cache<?, ?> cache) {
this.queryInterceptor = ComponentRegistryUtils.getQueryInterceptor(cache);
this.searchIntegrator = queryInterceptor.getSearchFactory();
this.keyTransformationHandler = queryInterceptor.getKeyTransformationHandler();
ComponentRegistry componentRegistry = cache.getAdvancedCache().getComponentRegistry();
DefaultMassIndexerProgressMonitor monitor = new DefaultMassIndexerProgressMonitor(componentRegistry.getTimeService());
this.defaultBatchBackend = new DefaultBatchBackend(searchIntegrator, monitor);
}

public void flush(Class<?> entityType) {
LOG.flushingIndex(entityType.getName());
defaultBatchBackend.flush(Util.<Class<?>>asSet(entityType));
}

public void purge(Class<?> entityType) {
LOG.purgingIndex(entityType.getName());
queryInterceptor.purgeIndex(entityType);
}

public void updateIndex(Object key, Object value, String indexName) {
if (value != null) {
Class clazz = value.getClass();
EntityIndexBinding entityIndexBinding = searchIntegrator.getIndexBinding(clazz);
if (entityIndexBinding == null) {
// it might be possible to receive not-indexes types
return;
}
ConversionContext conversionContext = new ContextualExceptionBridgeHelper();
DocumentBuilderIndexedEntity docBuilder = entityIndexBinding.getDocumentBuilder();
final String idInString = keyTransformationHandler.keyToString(key);
UpdateLuceneWork updateTask = docBuilder.createUpdateWork(
clazz,
value,
idInString,
idInString,
DefaultInstanceInitializer.DEFAULT_INITIALIZER,
conversionContext
);
try {
IndexManager indexManagerForAddition = entityIndexBinding
.getSelectionStrategy().getIndexManagerForAddition(clazz, idInString, idInString, updateTask.getDocument());
if (indexManagerForAddition.getIndexName().equals(indexName)) {
defaultBatchBackend.enqueueAsyncWork(updateTask);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

}
@@ -0,0 +1,127 @@
package org.infinispan.query.impl.massindex;

import org.infinispan.Cache;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Util;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.filter.AcceptAllKeyValueFilter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.iteration.impl.EntryRetriever;
import org.infinispan.marshall.core.MarshalledValue;
import org.infinispan.metadata.Metadata;
import org.infinispan.query.impl.externalizers.ExternalizerIds;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Set;

/**
* Base class for mass indexer tasks.
*
* @author gustavonalle
* @since 7.1
*/
public class IndexWorker implements DistributedCallable<Void, Void, Void> {

protected Cache<?, ?> cache;
protected final Class<?> entity;
protected final String indexName;
private final boolean flush;
protected IndexUpdater indexUpdater;

private ClusteringDependentLogic clusteringDependentLogic;
private EntryRetriever entryRetriever;

public IndexWorker(Class<?> entity, String indexName, boolean flush) {
this.entity = entity;
this.indexName = indexName;
this.flush = flush;
}

@Override
public void setEnvironment(Cache<Void, Void> cache, Set<Void> inputKeys) {
this.cache = cache;
this.indexUpdater = new IndexUpdater(cache);
ComponentRegistry componentRegistry = cache.getAdvancedCache().getComponentRegistry();
this.entryRetriever = componentRegistry.getComponent(EntryRetriever.class);
this.clusteringDependentLogic = componentRegistry.getComponent(ClusteringDependentLogic.class);
}

protected void preIndex() {
if (flush) indexUpdater.purge(entity);
}

protected void postIndex() {
if (flush) indexUpdater.flush(entity);
}

private KeyValueFilter getFilter() {
boolean replicated = cache.getCacheConfiguration().clustering().cacheMode().isReplicated();
return replicated ? AcceptAllKeyValueFilter.getInstance() : new PrimaryOwnersKeyValueFilter();
}

private Object extractValue(Object wrappedValue) {
if (wrappedValue instanceof MarshalledValue)
return ((MarshalledValue) wrappedValue).get();
return wrappedValue;
}

@Override
@SuppressWarnings("unchecked")
public Void call() throws Exception {
preIndex();
KeyValueFilter filter = getFilter();
try (CloseableIterator<CacheEntry<Object, String>> iterator = entryRetriever.retrieveEntries(filter, null, Util.asSet(Flag.CACHE_MODE_LOCAL), null)) {
while (iterator.hasNext()) {
CacheEntry<Object, String> next = iterator.next();
Object value = extractValue(next.getValue());
if (value != null && value.getClass().equals(entity))
indexUpdater.updateIndex(next.getKey(), value, indexName);
}
}
postIndex();
return null;
}


private class PrimaryOwnersKeyValueFilter implements KeyValueFilter {

@Override
public boolean accept(Object key, Object value, Metadata metadata) {
return clusteringDependentLogic.localNodeIsPrimaryOwner(key);
}
}

public static class Externalizer extends AbstractExternalizer<IndexWorker> {

@Override
@SuppressWarnings("ALL")
public Set<Class<? extends IndexWorker>> getTypeClasses() {
return Util.<Class<? extends IndexWorker>>asSet(IndexWorker.class);
}

@Override
public void writeObject(ObjectOutput output, IndexWorker worker) throws IOException {
output.writeObject(worker.entity);
output.writeUTF(worker.indexName);
output.writeBoolean(worker.flush);
}

@Override
public IndexWorker readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return new IndexWorker((Class<?>) input.readObject(), input.readUTF(), input.readBoolean());
}

@Override
public Integer getId() {
return ExternalizerIds.INDEX_WORKER;
}
}

}

0 comments on commit 63539d6

Please sign in to comment.