Skip to content

Commit

Permalink
#33 change job to use dynamic configuration of partition plan
Browse files Browse the repository at this point in the history
  • Loading branch information
mincong-h committed Jun 12, 2016
1 parent e733a7f commit 430ea9e
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 63 deletions.
Expand Up @@ -2,6 +2,7 @@

import java.io.Serializable;

import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.ItemReader;
import javax.inject.Inject;
import javax.inject.Named;
Expand All @@ -24,6 +25,9 @@
@Named
public class BatchItemReader implements ItemReader {

@Inject @BatchProperty
private String entityType;

@Inject
private IndexingContext indexingContext;

Expand Down Expand Up @@ -61,7 +65,7 @@ public void close() throws Exception {
*/
@Override
public void open(Serializable checkpoint) throws Exception {
System.out.println("BatchItemReader#open(...)");
System.out.printf("BatchItemReader#open(%s)...%n", entityType);
}

/**
Expand All @@ -72,6 +76,8 @@ public void open(Serializable checkpoint) throws Exception {
*/
@Override
public Object readItem() throws Exception {
return indexingContext.poll();
// TODO: change it to a generic type
// return indexingContext.poll(entityClazz);
return indexingContext.poll(Class.forName(entityType));
}
}
@@ -1,8 +1,6 @@
package io.github.mincongh.batch;

import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import javax.batch.api.BatchProperty;
import javax.batch.api.partition.PartitionMapper;
Expand All @@ -14,7 +12,8 @@
@Named
public class EntityPartitionMapper implements PartitionMapper {

@Inject @BatchProperty private String rootEntitiesStr;
@Inject @BatchProperty(name = "rootEntities")
private String rootEntitiesStr;

@Override
public PartitionPlan mapPartitions() throws Exception {
Expand Down Expand Up @@ -50,16 +49,18 @@ public Properties[] getPartitionProperties() {
/**
* Parse a set of entities in string into a set of entity-types.
*
* @param raw a set of entities concatenated in string, with "[]" on both
* ends, separated by ",".
* @param raw a set of entities concatenated in string, separated by ","
* and surrounded by "[]", e.g. "[com.xx.foo, com.xx.bar]".
* @return a set of entity-types
* @throws NullPointerException thrown if the entity-token is not found.
*/
private String[] parse(String raw) throws NullPointerException {

if (raw == null) {
throw new NullPointerException("Not any target entity to index");
}
return raw.substring(1, raw.length() - 1).split(", ");
String[] rootEntities = raw
.substring(1, raw.length() - 1)
.split(", ");
return rootEntities;
}
}
Expand Up @@ -16,8 +16,6 @@
import org.hibernate.Session;
import org.hibernate.criterion.Projections;

import io.github.mincongh.entity.Address;

/**
* Read identifiers of entities via entity manager. The result is going to be
* stored in {@code IndexingContext}, then be used for Lucene document
Expand All @@ -31,7 +29,7 @@ public class IdProducerBatchlet implements Batchlet {
@Inject @BatchProperty private int arrayCapacity;
@Inject @BatchProperty private int fetchSize;
@Inject @BatchProperty private int maxResults;
@Inject @BatchProperty private String entityType;
@Inject @BatchProperty(name = "entityType") private String entityTypeStr;

@Inject
private JobContext jobContext;
Expand All @@ -51,44 +49,47 @@ public class IdProducerBatchlet implements Batchlet {
@Override
public String process() throws Exception {

// get entity class type
Class<?> entityClazz = Class.forName(entityTypeStr);

// unwrap session from entity manager
session = em.unwrap(Session.class);

// get total number of id
long rowCount = (long) session
.createCriteria(Class.forName(entityType))
.createCriteria(entityClazz)
.setProjection(Projections.rowCount())
.setCacheable(false)
.uniqueResult();
System.out.printf("entityType = %s (%d rows).%n", entityType, rowCount);
System.out.printf("entityType = %s (%d rows).%n", entityTypeStr, rowCount);
jobContext.setTransientUserData(rowCount);

// load ids and store in scrollable results
ScrollableResults scrollableIds = session
.createCriteria(Address.class)
.createCriteria(entityClazz)
.setCacheable(false)
.setFetchSize(fetchSize)
.setProjection(Projections.id())
.setMaxResults(maxResults)
.scroll(ScrollMode.FORWARD_ONLY);

Serializable[] ids = new Serializable[arrayCapacity];
Serializable[] entityIDs = new Serializable[arrayCapacity];
long row = 0;
int i = 0;
try {
// create (K, V) pair in the hash-map embedded
// indexing context
indexingContext.createQueue(entityClazz);

while (scrollableIds.next() && row < rowCount) {
Serializable id = (Serializable) scrollableIds.get(0);
ids[i++] = id;
entityIDs[i++] = id;
if (i == arrayCapacity) {
/*
for (Serializable _id : ids) {
System.out.printf("%5d ", _id);
}
System.out.printf("%n");
*/
indexingContext.add(ids);
// add array entityIDs into indexing context
// (mapped to key K=entityClazz
indexingContext.add(entityIDs, entityClazz);
// reset id array and index
ids = new Serializable[arrayCapacity];
entityIDs = new Serializable[arrayCapacity];
i = 0;
}
row++;
Expand Down
@@ -1,6 +1,7 @@
package io.github.mincongh.batch;

import java.io.Serializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.inject.Named;
Expand All @@ -12,24 +13,37 @@
@Singleton
public class IndexingContext {

private ConcurrentLinkedQueue<Serializable[]> idChunkQueue;
private ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Serializable[]>> idQueues;

private IndexShardingStrategy indexShardingStrategy;

public IndexingContext() {
this.idChunkQueue = new ConcurrentLinkedQueue<>();
public void add(Serializable[] clazzIDs, Class<?> clazz) {
idQueues.get(clazz).add(clazzIDs);
}

public Serializable[] poll(Class<?> clazz) {
return idQueues.get(clazz).poll();
}

public int sizeOf(Class<?> clazz) {
return idQueues.get(clazz).size();
}

public void createQueue(Class<?> clazz) {
idQueues.put(clazz, new ConcurrentLinkedQueue<>());
}

public void add(Serializable[] idArray) {
idChunkQueue.add(idArray);
public IndexingContext() {
this.idQueues = new ConcurrentHashMap<>();
}

public Serializable[] poll() {
return idChunkQueue.poll();
public ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Serializable[]>> getIdQueues() {
return idQueues;
}

public int size() {
return idChunkQueue.size();
// I don't think we need this method.
public void setIdQueues(ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Serializable[]>> idQueues) {
this.idQueues = idQueues;
}

public IndexShardingStrategy getIndexShardingStrategy() {
Expand Down
@@ -1,6 +1,8 @@
package io.github.mincongh.batch;

import java.util.LinkedList;
import java.util.Properties;
import java.util.Queue;

import javax.batch.api.BatchProperty;
import javax.batch.api.partition.PartitionMapper;
Expand Down Expand Up @@ -36,37 +38,74 @@ public class LucenePartitionMapper implements PartitionMapper {

@Inject @BatchProperty private int partitionCapacity;
@Inject @BatchProperty private int threads;
@Inject @BatchProperty(name="rootEntities") private String rootEntitiesStr;

@Override
public PartitionPlan mapPartitions() throws Exception {

String[] rootEntities = parse(rootEntitiesStr);
Queue<String> classQueue = new LinkedList<>();

int queueSize = indexingContext.size();
int partitions = Math.max(queueSize / partitionCapacity, 1); // minimum 1 partition
int partSum = 0;
for (String rootEntity: rootEntities) {
int queueSize = indexingContext.sizeOf(Class.forName(rootEntity));
// TODO: handle queueSize is 0
int partCount = queueSize / partitionCapacity;
if (queueSize % partitionCapacity != 0) {
partCount++;
}
partSum += partCount;
// enqueue entity type into classQueue, as much as the number of
// the partitions
for (int i = 0; i < partCount; i++) {
classQueue.add(rootEntity);
}
}
final int partCountFinal = partSum;

return new PartitionPlanImpl() {

@Override
public int getPartitions() {
System.out.printf("#mapPartitions(): %d partitions.%n", partitions);
return partitions;
System.out.printf("#mapPartitions(): %d partitions.%n", partCountFinal);
return partCountFinal;
}

@Override
public int getThreads() {
System.out.printf("#getThreads(): %d threads.%n", Math.min(partitions, threads));
return Math.min(partitions, threads);
System.out.printf("#getThreads(): %d threads.%n", Math.min(partCountFinal, threads));
return Math.min(partCountFinal, threads);
}

@Override
public Properties[] getPartitionProperties() {

Properties[] props = new Properties[getPartitions()];
// for (int i = 0; i < getPartitions(); i++) {
// props[i] = new Properties();
// props[i].setProperty("start", String.valueOf(i * 10 + 1));
// props[i].setProperty("end", String.valueOf((i + 1) * 10));
// }
for (int i = 0; i < props.length; i++) {
String entityType = classQueue.poll();
props[i] = new Properties();
props[i].setProperty("entityType", entityType);
}
return props;
}
};
}

/**
* Parse a set of entities in string into a set of entity-types.
*
* @param raw a set of entities concatenated in string, separated by ","
* and surrounded by "[]", e.g. "[com.xx.foo, com.xx.bar]".
* @return a set of entity-types
* @throws NullPointerException thrown if the entity-token is not found.
*/
private String[] parse(String raw) throws NullPointerException {
if (raw == null) {
throw new NullPointerException("Not any target entity to index");
}
String[] rootEntities = raw
.substring(1, raw.length() - 1)
.split(", ");
return rootEntities;
}
}
Expand Up @@ -27,7 +27,8 @@ public class PurgeDecider implements Decider {
* @param executions step executions.
*/
@Override
public String decide(StepExecution[] executions) throws Exception {
public String decide(StepExecution[] executions) throws Exception {
System.out.printf("PurgeDecider#decide: purgeAtStart=%s.%n", purgeAtStart);
return String.valueOf(purgeAtStart);
}
}
Expand Up @@ -82,7 +82,7 @@ public void massIndex() {
jobParams.setProperty("purgeAtStart", String.valueOf(true));
jobParams.setProperty("optimizeAfterPurge", String.valueOf(true));
jobParams.setProperty("optimizeAtEnd", String.valueOf(true));
jobParams.setProperty("rootEntitiesStr", getRootEntities().toString());
jobParams.setProperty("rootEntities", getRootEntities().toString());
Long executionId = jobOperator.start("mass-index", jobParams);

// calculate the performance
Expand Down
28 changes: 12 additions & 16 deletions us-address/src/main/resources/META-INF/batch-jobs/mass-index.xml
Expand Up @@ -3,10 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<!--
<step id="loadId" next="purgeDecision">
-->
<step id="loadId" next="optimizeAfterIndex">
<batchlet ref="idProducerBatchlet">
<properties>
<!-- fetchSize - The number of rows returned in one chunk in criteria.
Expand All @@ -33,19 +30,9 @@
-->
<mapper ref="entityPartitionMapper">
<properties>
<property name="rootEntitiesStr" value="#{jobParameters['rootEntitiesStr']}"/>
<property name="rootEntities" value="#{jobParameters['rootEntities']}"/>
</properties>
</mapper>
<!--
<plan partitions="2">
<properties partition="0">
<property name="entityType" value="io.github.mincongh.entity.Address"/>
</properties>
<properties partition="1">
<property name="entityType" value="io.github.mincongh.entity.Stock"/>
</properties>
</plan>
-->
</partition>
</step>
<decision id="purgeDecision" ref="purgeDecider">
Expand All @@ -70,15 +57,24 @@
</step>
<step id="produceLuceneDoc" next="afterIndexDecision">
<chunk item-count="3">
<reader ref="batchItemReader"/>
<processor ref="batchItemProcessor"/>
<reader ref="batchItemReader">
<properties>
<property name="entityType" value="#{partitionPlan['entityType']}"/>
</properties>
</reader>
<processor ref="batchItemProcessor">
<properties>
<property name="entityType" value="#{partitionPlan['entityType']}"/>
</properties>
</processor>
<writer ref="batchItemWriter"/>
</chunk>
<partition>
<mapper ref="lucenePartitionMapper">
<properties>
<property name="partitionCapacity" value="#{jobParameters['partitionCapacity']}?:1;"/>
<property name="threads" value="#{jobParameters['threads']}?:2;"/>
<property name="rootEntities" value="#{jobParameters['rootEntities']}"/>
</properties>
</mapper>
<collector ref="lucenePartitionCollector"/>
Expand Down

0 comments on commit 430ea9e

Please sign in to comment.