Skip to content

Commit

Permalink
[JBPM-7414] AsyncMode doesn't wait on Inclusive converging Gateway (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tkobayas authored and Tihomir Surdilovic committed Jul 6, 2018
1 parent 851ec60 commit 85ec39d
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.jbpm.process.core.context.variable.VariableScope;
import org.jbpm.process.instance.context.variable.VariableScopeInstance;
import org.jbpm.workflow.core.node.AsyncEventNode;
import org.jbpm.workflow.core.node.Join;
import org.jbpm.workflow.core.node.Split;
import org.jbpm.workflow.instance.impl.NodeInstanceImpl;
Expand Down Expand Up @@ -200,6 +201,9 @@ private boolean checkNodes(Set<Long> vistedNodes, Node startAt, Node currentNode
// for dynamic/ad hoc task there is no node
return false;
}
if (currentNode instanceof AsyncEventNode) {
currentNode = ((AsyncEventNode) currentNode).getActualNode();
}
List<Connection> connections = currentNode.getOutgoingConnections(org.jbpm.workflow.core.Node.CONNECTION_DEFAULT_TYPE);
// special handling for XOR split as it usually is used for arbitrary loops
if (currentNode instanceof Split && ((Split) currentNode).getType() == Split.TYPE_XOR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.persistence.EntityManagerFactory;
Expand All @@ -49,7 +50,9 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.event.process.DefaultProcessEventListener;
import org.kie.api.event.process.ProcessEventListener;
import org.kie.api.event.process.ProcessNodeTriggeredEvent;
import org.kie.api.executor.ExecutorService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.io.ResourceType;
Expand Down Expand Up @@ -1199,8 +1202,77 @@ public List<ProcessEventListener> getProcessEventListeners(RuntimeEngine runtime
assertNull(processInstance);

}



@Test(timeout=10000)
public void testAsyncModeWithInclusiveGateway() throws Exception {
// JBPM-7414
final NodeLeftCountDownProcessEventListener countDownListener = new NodeLeftCountDownProcessEventListener("EndProcess", 1);
final NodeTriggerCountListener triggerListener = new NodeTriggerCountListener("ScriptTask-4");

RuntimeEnvironment environment = RuntimeEnvironmentBuilder.Factory.get().newDefaultBuilder()
.userGroupCallback(userGroupCallback)
.addAsset(ResourceFactory.newClassPathResource("BPMN2-AsyncInclusiveGateway.bpmn2"), ResourceType.BPMN2)
.addEnvironmentEntry("ExecutorService", executorService)
.addEnvironmentEntry("AsyncMode", "true")
.registerableItemsFactory(new DefaultRegisterableItemsFactory() {
@Override
public Map<String, WorkItemHandler> getWorkItemHandlers(RuntimeEngine runtime) {
Map<String, WorkItemHandler> handlers = super.getWorkItemHandlers(runtime);
handlers.put("Rest", new SystemOutWorkItemHandler());
return handlers;
}

@Override
public List<ProcessEventListener> getProcessEventListeners(RuntimeEngine runtime) {
List<ProcessEventListener> listeners = super.getProcessEventListeners(runtime);
listeners.add(countDownListener);
listeners.add(triggerListener);
return listeners;
}
})
.get();

manager = RuntimeManagerFactory.Factory.get().newSingletonRuntimeManager(environment);
assertNotNull(manager);

RuntimeEngine runtime = manager.getRuntimeEngine(EmptyContext.get());
KieSession ksession = runtime.getKieSession();
assertNotNull(ksession);

Map<String, Object> params = new HashMap<String, Object>();
params.put("Var1", "AAA");
ProcessInstance processInstance = ksession.startProcess("TestProject.DemoProcess", params);
assertEquals(ProcessInstance.STATE_ACTIVE, processInstance.getState());
long processInstanceId = processInstance.getId();

countDownListener.waitTillCompleted();

processInstance = runtime.getKieSession().getProcessInstance(processInstanceId);
assertNull(processInstance);

assertEquals(1, triggerListener.getCount().intValue());
}

private static class NodeTriggerCountListener extends DefaultProcessEventListener {
private AtomicInteger count = new AtomicInteger(0);
private String nodeName;

private NodeTriggerCountListener(String nodeName) {
this.nodeName = nodeName;
}

@Override
public void afterNodeTriggered(ProcessNodeTriggeredEvent event) {
if (event.getNodeInstance().getNodeName().equals(nodeName)) {
count.getAndIncrement();
}
}

public AtomicInteger getCount() {
return count;
}
};

private boolean waitForAllJobsToComplete() throws Exception {
int attempts = 10;
do {
Expand Down
Loading

0 comments on commit 85ec39d

Please sign in to comment.