Skip to content

Commit

Permalink
#24 add decider and purger batchlet before the doc production chunk
Browse files Browse the repository at this point in the history
Decider and purger batchlet provide a before-production enhancement. User can now have possibility to purge all index before started. However, this commit provides only the working model, method inside the batchlet is not implemented yet.
  • Loading branch information
mincong-h committed Jun 6, 2016
1 parent 55f0137 commit 36440be
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 1 deletion.
@@ -0,0 +1,44 @@
package io.github.mincongh.batch;

import javax.batch.api.BatchProperty;
import javax.batch.api.Decider;
import javax.batch.runtime.StepExecution;
import javax.inject.Inject;
import javax.inject.Named;

/**
* Decider decides the next step-execution before the start of index chunk. If
* user requires a index purge, then the next step should be a purge, else,
* the next step will be directly the index chunk. Index purge use
* IndexPurgerBatchlet.
*
* @author Mincong HUANG
*/
@Named
public class BeforeIndexDecider implements Decider {

@Inject @BatchProperty
private Boolean purgeAtStart;
private final String PURGE_IDX = "purgeIndex";
private final String PRODUCE_DOC = "produceLuceneDoc";

/**
* Decide the next step
*
* @param executions not used for the moment.
*/
@Override
public String decide(StepExecution[] executions) throws Exception {

String nextStep = purgeAtStart ? PURGE_IDX : PRODUCE_DOC;

for (StepExecution se : executions) {
System.out.println(se.getStepName() + " "
+ se.getBatchStatus().toString() + " "
+ se.getExitStatus()
);
}

return nextStep;
}
}
@@ -0,0 +1,22 @@
package io.github.mincongh.batch;

import javax.batch.api.Batchlet;
import javax.batch.runtime.BatchStatus;
import javax.inject.Named;

@Named
public class IndexPurgerBatchlet implements Batchlet {

@Override
public String process() throws Exception {

System.out.println("purging entities ...");

return BatchStatus.COMPLETED.toString();
}

@Override
public void stop() throws Exception {
// TODO Auto-generated method stub
}
}
Expand Up @@ -68,14 +68,19 @@ public BatchSession() {
*/
@Asynchronous
public void massIndex() {

Long start = System.currentTimeMillis();

Properties jobParams = new Properties();
jobParams.setProperty("fetchSize", "200000");
jobParams.setProperty("arrayCapacity", "1000");
jobParams.setProperty("maxResults", "1000000");
jobParams.setProperty("partitionCapacity", "250");
jobParams.setProperty("threads", "4");
jobParams.setProperty("purgeAtStart", String.valueOf(true));
Long executionId = jobOperator.start("mass-index", jobParams);

// calculate the performance
JobExecution execution = jobOperator.getJobExecution(executionId);
int i = 0;
while (!execution.getBatchStatus().equals(BatchStatus.COMPLETED) && i < 200) {
Expand Down
12 changes: 11 additions & 1 deletion us-address/src/main/resources/META-INF/batch-jobs/mass-index.xml
Expand Up @@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="loadId" next="produceLuceneDoc">
<step id="loadId" next="decisionBeforeIndex">
<batchlet ref="idProducerBatchlet">
<properties>
<!-- fetchSize - The number of rows returned in one chunk in criteria.
Expand All @@ -21,6 +21,16 @@
</properties>
</batchlet>
</step>
<decision id="decisionBeforeIndex" ref="beforeIndexDecider">
<properties>
<property name="purgeAtStart" value="#{jobParameters['purgeAtStart']}?:false"/>
</properties>
<next on="purgeIndex" to="purgeIndex"/>
<next on="produceLuceneDoc" to="produceLuceneDoc"/>
</decision>
<step id="purgeIndex" next="produceLuceneDoc">
<batchlet ref="indexPurgerBatchlet"/>
</step>
<step id="produceLuceneDoc">
<chunk item-count="3">
<reader ref="batchItemReader"/>
Expand Down

0 comments on commit 36440be

Please sign in to comment.