Skip to content

Commit

Permalink
hibernate#34 use IndexingContext for entity row count
Browse files Browse the repository at this point in the history
  • Loading branch information
mincong-h committed Jun 13, 2016
1 parent 430ea9e commit 9820a46
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
Expand Up @@ -62,7 +62,7 @@ public String process() throws Exception {
.setCacheable(false)
.uniqueResult();
System.out.printf("entityType = %s (%d rows).%n", entityTypeStr, rowCount);
jobContext.setTransientUserData(rowCount);
indexingContext.addEntityCount(rowCount);

// load ids and store in scrollable results
ScrollableResults scrollableIds = session
Expand Down
Expand Up @@ -9,13 +9,25 @@

import org.hibernate.search.store.IndexShardingStrategy;

/**
* Specific indexing context for mass indexer. Several attributes are used :
* <p>
* <ul>
* <li>entityCount: the total number of entities to be indexed in the job. The
* number is summarized by partitioned step "loadId". Each
* IdProducerBatchlet (partiton) produces the number of entities linked to
* its own target entity, then call the method #addEntityCount(long) to
* summarize it with other partition(s).</li>
* </ul>
* @author Mincong HUANG
*/
@Named
@Singleton
public class IndexingContext {

private ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Serializable[]>> idQueues;

private IndexShardingStrategy indexShardingStrategy;
private long entityCount = 0;

public void add(Serializable[] clazzIDs, Class<?> clazz) {
idQueues.get(clazz).add(clazzIDs);
Expand Down Expand Up @@ -53,4 +65,12 @@ public IndexShardingStrategy getIndexShardingStrategy() {
public void setIndexShardingStrategy(IndexShardingStrategy indexShardingStrategy) {
this.indexShardingStrategy = indexShardingStrategy;
}

public void addEntityCount(long entityCount) {
this.entityCount += entityCount;
}

public long getEntityCount() {
return entityCount;
}
}
Expand Up @@ -5,20 +5,21 @@
import javax.batch.api.BatchProperty;
import javax.batch.api.partition.PartitionAnalyzer;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.context.JobContext;
import javax.inject.Inject;
import javax.inject.Named;

@Named
public class LucenePartitionAnalyzer implements PartitionAnalyzer {

@Inject
private JobContext jobContext;
private int workCount = 0;
private float percentage = 0;
private IndexingContext indexingContext;

@Inject @BatchProperty
private int maxResults;

private int workCount = 0;
private float percentage = 0;

/**
* Analyze data obtained from different partition plans via partition data
* collectors. The current analyze is to summarize to their progresses :
Expand All @@ -37,8 +38,8 @@ public class LucenePartitionAnalyzer implements PartitionAnalyzer {
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {

long rowCount = (long) jobContext.getTransientUserData();
int entitiesLoaded = Math.min((int) rowCount, maxResults);
long entityCount = indexingContext.getEntityCount();
int entitiesLoaded = Math.min((int) entityCount, maxResults);

workCount += (int) fromCollector;
if (entitiesLoaded != 0) {
Expand All @@ -53,5 +54,4 @@ public void analyzeStatus(BatchStatus batchStatus, String exitStatus)
throws Exception {
System.out.println("#analyzeStatus(...) called.");
}

}

0 comments on commit 9820a46

Please sign in to comment.