Skip to content

Commit

Permalink
#13 add AddLuceneWork execution in job "mass-index"
Browse files Browse the repository at this point in the history
The BatchItemWriter can now execute AddLuceneWork and a simple monitor has been added to monitoring the index progress.
The BatchItemProcessor had a problem about the IndexShardingStrategy (issue #17). Now it is fixed using a customized context class for context-value-transfer.
  • Loading branch information
mincong-h committed Jun 4, 2016
1 parent 81891b9 commit 9b8faa8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
Expand Up @@ -5,6 +5,7 @@
import java.util.List;

import javax.batch.api.chunk.ItemProcessor;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
Expand All @@ -22,8 +23,10 @@
import org.hibernate.search.engine.impl.HibernateSessionLoadingInitializer;
import org.hibernate.search.engine.integration.impl.ExtendedSearchIntegrator;
import org.hibernate.search.engine.spi.DocumentBuilderIndexedEntity;
import org.hibernate.search.engine.spi.EntityIndexBinding;
import org.hibernate.search.hcore.util.impl.ContextHelper;
import org.hibernate.search.spi.InstanceInitializer;
import org.hibernate.search.store.IndexShardingStrategy;

import io.github.mincongh.entity.Address;

Expand All @@ -49,6 +52,10 @@
* <li>{@code sessionInitializer} TODO: don't know what it is.
*
* <li>{@code conversionContext} TODO: don't know what it is.
*
* <li>{@code shardingStrategy} TODO: add description
*
* <li>{@code indexingContext} TODO: add description
* </ul>
*
* @author Mincong HUANG
Expand All @@ -61,8 +68,12 @@ public class BatchItemProcessor implements ItemProcessor {
private Session session;
private ExtendedSearchIntegrator searchIntegrator;
private DocumentBuilderIndexedEntity docBuilder;
private EntityIndexBinding entityIndexBinding;
private InstanceInitializer sessionInitializer;
private ConversionContext conversionContext;
private IndexShardingStrategy shardingStrategy;
@Inject
private IndexingContext indexingContext;

/**
* Process an input item into an output item. Here, the input item is an
Expand Down Expand Up @@ -98,10 +109,12 @@ public Object processItem(Object item) throws Exception {
private <T> List<AddLuceneWork> buildAddLuceneWorks(List<T> entities) {
session = em.unwrap(Session.class);
searchIntegrator = ContextHelper.getSearchintegrator(session);
docBuilder = searchIntegrator
entityIndexBinding = searchIntegrator
.getIndexBindings()
.get(Address.class)
.getDocumentBuilder();
.get(Address.class);
shardingStrategy = entityIndexBinding.getSelectionStrategy();
indexingContext.setIndexShardingStrategy(shardingStrategy);
docBuilder = entityIndexBinding.getDocumentBuilder();
conversionContext = new ContextualExceptionBridgeHelper();
sessionInitializer = new HibernateSessionLoadingInitializer(
(SessionImplementor) session
Expand Down
Expand Up @@ -31,6 +31,7 @@ public class BatchItemReader implements ItemReader {
* The checkpointInfo method returns the current checkpoint data for this
* reader. It is called before a chunk checkpoint is committed.
*
* @return the checkpoint info
* @throws Exception thrown for any errors.
*/
@Override
Expand Down
@@ -1,21 +1,45 @@
package io.github.mincongh.batch;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;

import javax.batch.api.chunk.ItemWriter;
import javax.inject.Inject;
import javax.inject.Named;

import org.hibernate.search.backend.AddLuceneWork;
import org.hibernate.search.backend.impl.StreamingOperationExecutor;
import org.hibernate.search.backend.impl.StreamingOperationExecutorSelector;
import org.hibernate.search.batchindexing.MassIndexerProgressMonitor;
import org.hibernate.search.batchindexing.impl.SimpleIndexingProgressMonitor;
import org.hibernate.search.store.IndexShardingStrategy;

/**
* Batch item writer writes a list of items into Lucene documents. Here, items
* mean the entities processed by the item processor. These items will be used
* to create {@code LuceneWork}.
* <p>
* <ul>
* <li>{@code stepContext} TODO: add description here
*
* <li>{@code stepContext} the JSR 352 specific step context, used for storing
* transient data during the step execution.
*
* <li>{@code monitor} mass indexer progress monitor helps to follow the mass
* indexing progress and show it in the console.
* </ul>
*
* @author Mincong HUANG
*/
@Named
public class BatchItemWriter implements ItemWriter {

@Inject
private IndexingContext indexingContext;
private MassIndexerProgressMonitor monitor;
private final Boolean forceAsync = false;

/**
* The checkpointInfo method returns the current checkpoint data for this
* writer. It is called before a chunk checkpoint is committed.
Expand Down Expand Up @@ -46,7 +70,7 @@ public void close() throws Exception {
*/
@Override
public void open(Serializable checkpoint) throws Exception {

monitor = new SimpleIndexingProgressMonitor();
}

/**
Expand All @@ -57,11 +81,27 @@ public void open(Serializable checkpoint) throws Exception {
* @throw Exception is thrown for any errors.
*/
@Override
@SuppressWarnings("unchecked")
public void writeItems(List<Object> items) throws Exception {

if (items != null) {
System.out.printf("#writeItems(...): %d arrays written.%n", items.size());
System.out.printf("#writeItems(...): %d lucene work arrays written.%n", items.size());
} else {
System.out.printf("#writeItems(...): null.%n");
}
IndexShardingStrategy shardingStrategy =
indexingContext.getIndexShardingStrategy();
for (Object item : items) {
for(AddLuceneWork addWork : (LinkedList<AddLuceneWork>) item) {
StreamingOperationExecutor executor = addWork.acceptIndexWorkVisitor(
StreamingOperationExecutorSelector.INSTANCE, null);
executor.performStreamOperation(
addWork,
shardingStrategy,
monitor,
forceAsync
);
}
}
}
}

0 comments on commit 9b8faa8

Please sign in to comment.