@@ -0,0 +1,151 @@
/*
* To change this template, choose Tools | Templates and open the template in
* the editor.
*/
package com.salaboy.jbpm5.dev.guide;

import static org.junit.Assert.assertEquals;

import java.util.HashMap;

import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.WorkingMemory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderError;
import org.drools.builder.KnowledgeBuilderErrors;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.event.RuleFlowGroupActivatedEvent;
import org.drools.event.RuleFlowGroupDeactivatedEvent;
import org.drools.impl.StatefulKnowledgeSessionImpl;
import org.drools.io.impl.ClassPathResource;
import org.drools.logger.KnowledgeRuntimeLoggerFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.process.ProcessInstance;
import org.drools.runtime.process.WorkflowProcessInstance;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.salaboy.jbpm5.dev.guide.commands.CheckInCommand;
import com.salaboy.jbpm5.dev.guide.executor.Executor;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorFactoryBean;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorImpl;
import com.salaboy.jbpm5.dev.guide.workitems.AbstractAsyncWorkItemHandler;

/**
*
* @author salaboy
*/
public class DontWaitCompletionAsyncTaskSimpleTest {

protected Executor executor;
protected StatefulKnowledgeSession session;

public DontWaitCompletionAsyncTaskSimpleTest() {
}

@Before
public void setUp() throws Exception {
initializeExecutionEnvironment();
initializeSession();
}

@After
public void tearDown(){
stopExecutionEnvironment();
}

protected void initializeExecutionEnvironment() throws Exception {
ExecutorFactoryBean factoryBean = new ExecutorFactoryBean();
executor = factoryBean.getObject();
CheckInCommand.reset();
}

protected void stopExecutionEnvironment() {
executor.destroy();
}

private void initializeSession() {
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();

kbuilder.add(new ClassPathResource("DeferExecutionScenarioV1-data.bpmn"), ResourceType.BPMN2);
if (kbuilder.hasErrors()) {
KnowledgeBuilderErrors errors = kbuilder.getErrors();

for (KnowledgeBuilderError error : errors) {
System.out.println(">>> Error:" + error.getMessage());

}
throw new IllegalStateException(">>> Knowledge couldn't be parsed! ");
}

KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();

kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());

session = kbase.newStatefulKnowledgeSession();
KnowledgeRuntimeLoggerFactory.newConsoleLogger(session);

((StatefulKnowledgeSessionImpl)session).session.addEventListener(new org.drools.event.AgendaEventListener() {
public void activationCreated(org.drools.event.ActivationCreatedEvent event, WorkingMemory workingMemory) { }
public void activationCancelled(org.drools.event.ActivationCancelledEvent event, WorkingMemory workingMemory) { }
public void beforeActivationFired(org.drools.event.BeforeActivationFiredEvent event, WorkingMemory workingMemory) { }
public void afterActivationFired(org.drools.event.AfterActivationFiredEvent event, WorkingMemory workingMemory) { }
public void agendaGroupPopped(org.drools.event.AgendaGroupPoppedEvent event, WorkingMemory workingMemory) { }
public void agendaGroupPushed(org.drools.event.AgendaGroupPushedEvent event, WorkingMemory workingMemory) { }
public void beforeRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event, WorkingMemory workingMemory) { }
public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event, WorkingMemory workingMemory) {
workingMemory.fireAllRules();
}
public void beforeRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event, WorkingMemory workingMemory) { }
public void afterRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event, WorkingMemory workingMemory) { }
});


}

@Test
public void executorCheckInTestFinishes() throws InterruptedException {
HashMap<String, Object> input = new HashMap<String, Object>();

String patientName = "John Doe";
input.put("bedrequest_patientname", patientName);

AbstractAsyncWorkItemHandler asyncHandler = new AbstractAsyncWorkItemHandler(executor, null);
session.getWorkItemManager().registerWorkItemHandler("Async Work", asyncHandler);

WorkflowProcessInstance pI = (WorkflowProcessInstance) session.startProcess("PatientDeferredCheckIn", input);

assertEquals(ProcessInstance.STATE_COMPLETED, pI.getState());

assertEquals(0, CheckInCommand.getCheckInCount());

Thread.sleep(((ExecutorImpl) executor).getWaitTime() + 1000);

assertEquals(1, CheckInCommand.getCheckInCount());
}

@Test
public void executorCheckInTestStoppedBefore() throws InterruptedException {
HashMap<String, Object> input = new HashMap<String, Object>();

String patientName = "John Doe";
input.put("bedrequest_patientname", patientName);

AbstractAsyncWorkItemHandler asyncHandler = new AbstractAsyncWorkItemHandler(executor, null);
session.getWorkItemManager().registerWorkItemHandler("Async Work", asyncHandler);

WorkflowProcessInstance pI = (WorkflowProcessInstance) session.startProcess("PatientDeferredCheckIn", input);

assertEquals(ProcessInstance.STATE_COMPLETED, pI.getState());

assertEquals(0, CheckInCommand.getCheckInCount());

Thread.sleep(((ExecutorImpl) executor).getWaitTime() - 1000);

assertEquals(0, CheckInCommand.getCheckInCount());
}

}
@@ -0,0 +1,208 @@
/*
* To change this template, choose Tools | Templates and open the template in
* the editor.
*/
package com.salaboy.jbpm5.dev.guide;

import static org.junit.Assert.assertEquals;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseFactory;
import org.drools.WorkingMemory;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderError;
import org.drools.builder.KnowledgeBuilderErrors;
import org.drools.builder.KnowledgeBuilderFactory;
import org.drools.builder.ResourceType;
import org.drools.event.RuleFlowGroupActivatedEvent;
import org.drools.event.RuleFlowGroupDeactivatedEvent;
import org.drools.impl.StatefulKnowledgeSessionImpl;
import org.drools.io.impl.ClassPathResource;
import org.drools.logger.KnowledgeRuntimeLoggerFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.process.ProcessInstance;
import org.drools.runtime.process.WorkflowProcessInstance;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.salaboy.jbpm5.dev.guide.commands.CheckInCommand;
import com.salaboy.jbpm5.dev.guide.executor.CommandContext;
import com.salaboy.jbpm5.dev.guide.executor.CommandDoneHandler;
import com.salaboy.jbpm5.dev.guide.executor.Executor;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorFactoryBean;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorImpl;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorListenerImpl;
import com.salaboy.jbpm5.dev.guide.workitems.AbstractAsyncWorkItemHandler;

/**
*
* @author salaboy
*/
public class WaitCompletionAsyncTaskSimpleTest {

protected Executor executor;
protected ExecutorListenerImpl listener;
protected StatefulKnowledgeSession session;

public WaitCompletionAsyncTaskSimpleTest() {
}

@Before
public void setUp() throws Exception {
initializeExecutionEnvironment();
initializeSession();
}

@After
public void tearDown(){
stopExecutionEnvironment();
}

protected void initializeExecutionEnvironment() throws Exception {
CheckInCommand.reset();
ExecutorFactoryBean factoryBean = new ExecutorFactoryBean();
factoryBean.setWaitTime(5000);
executor = factoryBean.getObject();
listener = new ExecutorListenerImpl();
listener.setEntityManager(factoryBean.createEntityManager());
}

protected void stopExecutionEnvironment() {
executor.destroy();
}

private void initializeSession() {
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();

kbuilder.add(new ClassPathResource("DeferExecutionScenarioV2-data.bpmn"), ResourceType.BPMN2);
if (kbuilder.hasErrors()) {
KnowledgeBuilderErrors errors = kbuilder.getErrors();

for (KnowledgeBuilderError error : errors) {
System.out.println(">>> Error:" + error.getMessage());

}
throw new IllegalStateException(">>> Knowledge couldn't be parsed! ");
}

KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();

kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());

session = kbase.newStatefulKnowledgeSession();
KnowledgeRuntimeLoggerFactory.newConsoleLogger(session);

((StatefulKnowledgeSessionImpl)session).session.addEventListener(new org.drools.event.AgendaEventListener() {
public void activationCreated(org.drools.event.ActivationCreatedEvent event, WorkingMemory workingMemory) { }
public void activationCancelled(org.drools.event.ActivationCancelledEvent event, WorkingMemory workingMemory) { }
public void beforeActivationFired(org.drools.event.BeforeActivationFiredEvent event, WorkingMemory workingMemory) { }
public void afterActivationFired(org.drools.event.AfterActivationFiredEvent event, WorkingMemory workingMemory) { }
public void agendaGroupPopped(org.drools.event.AgendaGroupPoppedEvent event, WorkingMemory workingMemory) { }
public void agendaGroupPushed(org.drools.event.AgendaGroupPushedEvent event, WorkingMemory workingMemory) { }
public void beforeRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event, WorkingMemory workingMemory) { }
public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event, WorkingMemory workingMemory) {
workingMemory.fireAllRules();
}
public void beforeRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event, WorkingMemory workingMemory) { }
public void afterRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event, WorkingMemory workingMemory) { }
});


}

@Test
public void executorCheckInTestFinishesWithoutHandler() throws InterruptedException {
HashMap<String, Object> input = new HashMap<String, Object>();

String patientName = "John Doe";
input.put("bedrequest_patientname", patientName);

listener.setHandler(new CommandDoneHandler() {
public void onCommandDone(CommandContext ctx) {
//do nothing
System.out.println("I'm not completing the workItem");
}
});

AbstractAsyncWorkItemHandler asyncHandler = new AbstractAsyncWorkItemHandler(executor, listener);
session.getWorkItemManager().registerWorkItemHandler("Async Work", asyncHandler);

WorkflowProcessInstance pI = (WorkflowProcessInstance) session.startProcess("PatientDeferredCheckIn", input);

assertEquals(ProcessInstance.STATE_ACTIVE, pI.getState());

assertEquals(0, CheckInCommand.getCheckInCount());

Thread.sleep(((ExecutorImpl) executor).getWaitTime() + 1000);

assertEquals(1, CheckInCommand.getCheckInCount());
}

@Test
public void executorCheckInTestFinishesWithHandler() throws InterruptedException {
HashMap<String, Object> input = new HashMap<String, Object>();

String patientName = "John Doe";
input.put("bedrequest_patientname", patientName);

listener.setHandler(new CommandDoneHandler() {
public void onCommandDone(CommandContext ctx) {
Map<String, Object> results = new HashMap<String, Object>();
for (Map.Entry<String, Serializable> entry : ctx.getData().entrySet()) {
results.put(entry.getKey(), entry.getValue());
}
String sWorkItemId = (String) ctx.getData("_workItemId");
session.getWorkItemManager().completeWorkItem(Long.valueOf(sWorkItemId), results);
}
});

AbstractAsyncWorkItemHandler asyncHandler = new AbstractAsyncWorkItemHandler(executor, listener);
session.getWorkItemManager().registerWorkItemHandler("Async Work", asyncHandler);

WorkflowProcessInstance pI = (WorkflowProcessInstance) session.startProcess("PatientDeferredCheckIn", input);

assertEquals(ProcessInstance.STATE_ACTIVE, pI.getState());

assertEquals(0, CheckInCommand.getCheckInCount());

Thread.sleep(((ExecutorImpl) executor).getWaitTime() * 2);

assertEquals(1, CheckInCommand.getCheckInCount());

assertEquals(ProcessInstance.STATE_COMPLETED, pI.getState());
}

@Test
public void executorCheckInTestStoppedBefore() throws InterruptedException {
HashMap<String, Object> input = new HashMap<String, Object>();

String patientName = "John Doe";
input.put("bedrequest_patientname", patientName);

listener.setHandler(new CommandDoneHandler() {
public void onCommandDone(CommandContext ctx) {
//do nothing
System.out.println("I'm not completing the workItem");
}
});

AbstractAsyncWorkItemHandler asyncHandler = new AbstractAsyncWorkItemHandler(executor, listener);
session.getWorkItemManager().registerWorkItemHandler("Async Work", asyncHandler);

WorkflowProcessInstance pI = (WorkflowProcessInstance) session.startProcess("PatientDeferredCheckIn", input);

assertEquals(ProcessInstance.STATE_ACTIVE, pI.getState());

assertEquals(0, CheckInCommand.getCheckInCount());

Thread.sleep(((ExecutorImpl) executor).getWaitTime() - 1000);

assertEquals(0, CheckInCommand.getCheckInCount());
}

}
@@ -65,5 +65,11 @@
<version>1.3.163</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
</dependencies>
</project>
@@ -9,5 +9,6 @@
* @author salaboy
*/
public interface Command {
public void setContext(CommandContext ctx);
public void execute();
}
@@ -0,0 +1,36 @@
package com.salaboy.jbpm5.dev.guide.executor;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class CommandContext implements Serializable {

private static final long serialVersionUID = -1440017934399413860L;

private Map<String, Serializable> data = new HashMap<String, Serializable>();

public CommandContext() {
}

public void setData(Map<String, Serializable> data) {
this.data = data;
}

public Map<String, Serializable> getData() {
return data;
}

public Serializable getData(String key) {
return data.get(key);
}

public void setData(String key, Serializable value) {
data.put(key, value);
}

public Set<String> keySet() {
return data.keySet();
}
}
@@ -0,0 +1,6 @@
package com.salaboy.jbpm5.dev.guide.executor;

public interface CommandDoneHandler {

void onCommandDone(CommandContext ctx);
}
@@ -1,12 +1,12 @@

package com.salaboy.jbpm5.dev.guide.executor;

import org.drools.command.impl.GenericCommand;

/**
*
* @author salaboy
*/
public interface Executor extends Service{
public void schedule(String requestName);
public void schedule(String requestName, String key, CommandContext ctx);
public void unschedule(String key);
}
@@ -0,0 +1,116 @@
package com.salaboy.jbpm5.dev.guide.executor;

import java.util.HashMap;
import java.util.Map;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import org.springframework.beans.factory.FactoryBean;

public class ExecutorFactoryBean implements FactoryBean<Executor> {

private int waitTime = 5000;

private final ExecutorImpl executor = new ExecutorImpl();

private String username;
private String driverClass;
private String dialectClass;
private String password;
private String url;

private EntityManager em = null;

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getDriverClass() {
return driverClass;
}

public void setDriverClass(String driverClass) {
this.driverClass = driverClass;
}

public String getDialectClass() {
return dialectClass;
}

public void setDialectClass(String dialectClass) {
this.dialectClass = dialectClass;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public EntityManager createEntityManager() {
if (em == null) {
Map<String, String> connectionProperties = new HashMap<String, String>();
if (driverClass != null) {
connectionProperties.put("hibernate.connection.driver_class", driverClass);
}
if (dialectClass != null) {
connectionProperties.put("hibernate.dialect", dialectClass);
}
if (username != null) {
connectionProperties.put("hibernate.connection.username", username);
}
if (password != null) {
connectionProperties.put("hibernate.connection.password", password);
}
if (url != null) {
connectionProperties.put("hibernate.connection.url", url);
}
EntityManagerFactory emf = Persistence.createEntityManagerFactory(
"org.jbpm.executor", connectionProperties);
em = emf.createEntityManager();
}
return em;
}

public ExecutorFactoryBean() {
}

public int getWaitTime() {
return waitTime;
}

public void setWaitTime(int waitTime) {
this.waitTime = waitTime;
}

public Executor getObject() throws Exception {
executor.setWaitTime(waitTime);
executor.setEntityManager(createEntityManager());
executor.init();
return executor;
}

public Class<?> getObjectType() {
return Executor.class;
}

public boolean isSingleton() {
return true;
}
}
@@ -4,49 +4,77 @@
*/
package com.salaboy.jbpm5.dev.guide.executor;

import com.salaboy.jbpm5.dev.guide.executor.entities.RequestInfo;
import com.salaboy.jbpm5.dev.guide.executor.entities.STATUS;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import com.salaboy.jbpm5.dev.guide.executor.entities.RequestInfo;
import com.salaboy.jbpm5.dev.guide.executor.entities.STATUS;

/**
*
* @author salaboy
*/
public class ExecutorImpl implements Executor {

private static EntityManagerFactory emf;
private Thread running = null;

private int waitTime = 5000;
private EntityManager em;

public ExecutorImpl() {
emf = Persistence.createEntityManagerFactory("org.jbpm.executor");
em = emf.createEntityManager();
}

public int getWaitTime() {
return waitTime;
}

public void init() {

new Thread() {

public void setWaitTime(int waitTime) {
this.waitTime = waitTime;
}

public void setEntityManager(EntityManager em) {
this.em = em;
}

public void init() {
if (this.running != null) {
throw new IllegalArgumentException("Executor already running");
}
this.running = new Thread() {
@Override
public void run() {
while (true) {
try {
System.out.println("Sleeping ...");
Thread.sleep(5000);
Thread.sleep(waitTime);
System.out.println("Waking Up! ...");
try {
List resultList = em.createQuery("Select r from RequestInfo as r where r.status ='QUEUED'").getResultList();
List<?> resultList = em.createQuery("Select r from RequestInfo as r where r.status ='QUEUED'").getResultList();

System.out.println("Number of request pending for execution = "+resultList.size());
if (resultList.size() > 0) {

RequestInfo r = (RequestInfo) resultList.get(0);
System.out.println("Request Status =" +r.getStatus());
Command cmd = (Command) Class.forName(r.getCommandName()).newInstance();
byte[] rData = r.getRequestData();
if (rData != null) {
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(rData));
CommandContext ctx = (CommandContext) in.readObject();
cmd.setContext(ctx);
} catch (IOException e) {
cmd.setContext(null);
}
}
cmd.execute();
em.getTransaction().begin();
r.setStatus(STATUS.DONE);
@@ -64,25 +92,53 @@ public void run() {
}
}
}
}.start();
};
running.start();
}

public void schedule(String requestName) {
public void schedule(String requestName, String key, CommandContext ctx) {
RequestInfo requestInfo = new RequestInfo();
requestInfo.setCommandName(requestName);
requestInfo.setKey("HI");
requestInfo.setKey(key);
requestInfo.setStatus(STATUS.QUEUED);
requestInfo.setMessage("Ready to execute");

if (ctx != null) {
try {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(bout);
oout.writeObject(ctx);
requestInfo.setRequestData(bout.toByteArray());
} catch (IOException e) {
requestInfo.setRequestData(null);
}
}

em.getTransaction().begin();
em.persist(requestInfo);
em.getTransaction().commit();



}

public void unschedule(String key) {
String eql = "Select r from RequestInfo as r where r.status ='QUEUED' and key = :key";
List<?> result = em.createQuery(eql).setParameter("key", key).getResultList();
if (result.isEmpty()) {
return;
}
RequestInfo r = (RequestInfo) result.iterator().next();
em.getTransaction().begin();
em.remove(r);
em.getTransaction().commit();
}

public void destroy() {
em.close();
emf.close();
if (running != null && running.isAlive()) {
running.interrupt();
}
}

public void join() throws InterruptedException {
running.join();
}
}
@@ -0,0 +1,8 @@
package com.salaboy.jbpm5.dev.guide.executor;

public interface ExecutorListener {

void setExecutionKey(String executionKey);
void init();
void destroy();
}
@@ -0,0 +1,150 @@
package com.salaboy.jbpm5.dev.guide.executor;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.persistence.EntityManager;

import com.salaboy.jbpm5.dev.guide.executor.entities.RequestInfo;
import com.salaboy.jbpm5.dev.guide.executor.entities.STATUS;

public class ExecutorListenerImpl implements ExecutorListener {

private EntityManager em;
private CommandDoneHandler handler;
private Thread running = null;

private int waitTime = 2000; //default is smaller than ExecutorImpl
private String executionKey;

public ExecutorListenerImpl() {
}

public int getWaitTime() {
return waitTime;
}

public void setWaitTime(int waitTime) {
this.waitTime = waitTime;
}

public String getExecutionKey() {
return executionKey;
}

public void setExecutionKey(String executionKey) {
this.executionKey = executionKey;
}

public void setEntityManager(EntityManager em) {
this.em = em;
}

public void init() {
if (this.running != null) {
throw new IllegalArgumentException("Executor already running");
}
this.running = new Thread() {
@Override
public void run() {
while (true) {
try {
System.out.println("Sleeping ...");
Thread.sleep(waitTime);
System.out.println("Waking Up! ...");
try {
List<?> resultList = em.createQuery("Select r from RequestInfo as r where r.status ='DONE'").getResultList();

System.out.println("Number of request pending for execution = "+resultList.size());
if (resultList.size() > 0) {

RequestInfo r = (RequestInfo) resultList.get(0);
System.out.println("Request Status =" +r.getStatus());
byte[] rData = r.getRequestData();
CommandContext ctx = null;
if (rData != null) {
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(rData));
ctx = (CommandContext) in.readObject();
} catch (IOException e) {
ctx = null;
}
}
handler.onCommandDone(ctx);
em.getTransaction().begin();
r.setStatus(STATUS.NOTIFIED);
em.getTransaction().commit();
}
} catch (ClassNotFoundException ex) {
Logger.getLogger(ExecutorImpl.class.getName()).log(Level.SEVERE, null, ex);
}
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorImpl.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
};
running.start();
}

public void schedule(String requestName, String key, CommandContext ctx) {
RequestInfo requestInfo = new RequestInfo();
requestInfo.setCommandName(requestName);
requestInfo.setKey(key);
requestInfo.setStatus(STATUS.QUEUED);
requestInfo.setMessage("Ready to execute");

if (ctx != null) {
try {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(bout);
oout.writeObject(ctx);
requestInfo.setRequestData(bout.toByteArray());
} catch (IOException e) {
requestInfo.setRequestData(null);
}
}

em.getTransaction().begin();
em.persist(requestInfo);
em.getTransaction().commit();
}

public void setHandler(CommandDoneHandler handler) {
this.handler = handler;
}

public CommandDoneHandler getHandler() {
return handler;
}

public void unschedule(String key) {
String eql = "Select r from RequestInfo as r where r.status ='QUEUED' and key = :key";
List<?> result = em.createQuery(eql).setParameter("key", key).getResultList();
if (result.isEmpty()) {
return;
}
RequestInfo r = (RequestInfo) result.iterator().next();
em.getTransaction().begin();
em.remove(r);
em.getTransaction().commit();
}

public void destroy() {
em.close();
if (running != null && running.isAlive()) {
running.interrupt();
}
}

public void join() throws InterruptedException {
running.join();
}

}
@@ -0,0 +1,28 @@
package com.salaboy.jbpm5.dev.guide.executor;

public class ExecutorMain {

public static void main(String[] args) {
String waitTimeString = args.length > 0 ? args[0] : "5000";
int waitTime = 5000;
try {
waitTime = Integer.parseInt(waitTimeString);
} catch (NumberFormatException e) {
waitTime = 5000;
}
final ExecutorImpl executor = new ExecutorImpl();
executor.setWaitTime(waitTime);
executor.init();

try {
executor.join();
} catch (InterruptedException e) {
//do nothing
}
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
executor.destroy();
}
});
}
}
@@ -5,13 +5,17 @@
package com.salaboy.jbpm5.dev.guide.executor.commands;

import com.salaboy.jbpm5.dev.guide.executor.Command;
import com.salaboy.jbpm5.dev.guide.executor.CommandContext;

/**
*
* @author salaboy
*/
public class PrintOutCommand implements Command{

public void setContext(CommandContext ctx) {
}

public void execute() {
System.out.println(">>> Hi This is the first command!");
}
@@ -6,7 +6,6 @@

import java.util.Date;
import javax.persistence.*;
import javax.print.DocFlavor.STRING;

/**
*
@@ -25,6 +24,8 @@ public class RequestInfo {
private String message;
//Business Key for callback
private String key;
@Lob
private byte[] requestData;

public RequestInfo() {
}
@@ -76,6 +77,12 @@ public Date getTime() {
public void setTime(Date time) {
this.time = time;
}



public byte[] getRequestData() {
return requestData;
}

public void setRequestData(byte[] requestData) {
this.requestData = requestData;
}
}
@@ -9,7 +9,7 @@
* @author salaboy
*/
public enum STATUS {
QUEUED, DONE, ERROR;
QUEUED, DONE, NOTIFIED, ERROR;


}
@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Build-Jdk: 1.6.0_26
Main-Class: com.salaboy.jbpm5.dev.guide.executor.ExecutorMain
@@ -5,15 +5,16 @@
package com.salaboy.jbpm5.dev.guide;

import com.salaboy.jbpm5.dev.guide.executor.Executor;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorImpl;
import com.salaboy.jbpm5.dev.guide.executor.ExecutorFactoryBean;

import org.junit.*;

/**
*
* @author salaboy
*/
public class ExecutorSimpleTest {
private Executor executor = new ExecutorImpl();
private Executor executor;
public ExecutorSimpleTest() {
}

@@ -26,20 +27,21 @@ public static void tearDownClass() throws Exception {
}

@Before
public void setUp() {
executor.init();
public void setUp() throws Exception {
ExecutorFactoryBean factoryBean = new ExecutorFactoryBean();
executor = factoryBean.getObject();
}

@After
public void tearDown() {
executor.destroy();
executor = null;
}

@Test
public void executorSimpleTest() throws InterruptedException {


executor.schedule("com.salaboy.jbpm5.dev.guide.executor.commands.PrintOutCommand");
executor.schedule("com.salaboy.jbpm5.dev.guide.executor.commands.PrintOutCommand", "myKey", null);

Thread.sleep(15000);
}
@@ -54,7 +54,7 @@ public void persistenceSimple() {
em.getTransaction().begin();
em.persist(requestInfo);
em.getTransaction().commit();
List resultList = em.createQuery("Select r from RequestInfo as r").getResultList();
List<?> resultList = em.createQuery("Select r from RequestInfo as r").getResultList();
assertEquals(1, resultList.size());
}

@@ -15,6 +15,7 @@
<module>jBPM5-ServiceTasksSimple</module>
<module>jBPM5-SimpleExecutorService</module>
<module>jBPM5-WebServiceTasksSimple</module>
<module>jBPM5-ExecutorTaskSimple</module>
</modules>