diff --git a/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessInstanceManager.java b/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessInstanceManager.java index 1d67279128..29ba6d7f28 100644 --- a/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessInstanceManager.java +++ b/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessInstanceManager.java @@ -35,5 +35,7 @@ public interface ProcessInstanceManager { void internalRemoveProcessInstance(ProcessInstance processInstance); void clearProcessInstances(); + + void clearProcessInstancesState(); } diff --git a/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java b/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java index 39ea3cbe65..1b5db61d8e 100644 --- a/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java +++ b/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java @@ -401,4 +401,9 @@ public void clearProcessInstances() { this.processInstanceManager.clearProcessInstances(); } + public void clearProcessInstancesState() { + this.processInstanceManager.clearProcessInstancesState(); + + } + } diff --git a/jbpm-flow/src/main/java/org/jbpm/process/instance/impl/DefaultProcessInstanceManager.java b/jbpm-flow/src/main/java/org/jbpm/process/instance/impl/DefaultProcessInstanceManager.java index 6d955bfe8e..5a24990788 100644 --- a/jbpm-flow/src/main/java/org/jbpm/process/instance/impl/DefaultProcessInstanceManager.java +++ b/jbpm-flow/src/main/java/org/jbpm/process/instance/impl/DefaultProcessInstanceManager.java @@ -58,4 +58,8 @@ public void internalRemoveProcessInstance(ProcessInstance processInstance) { public void clearProcessInstances() { processInstances.clear(); } + + public void clearProcessInstancesState() { + + } } diff --git a/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/JPAProcessInstanceManager.java b/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/JPAProcessInstanceManager.java index e6882cc94b..3152e6a5a6 100644 --- a/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/JPAProcessInstanceManager.java +++ b/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/processinstance/JPAProcessInstanceManager.java @@ -4,17 +4,25 @@ import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.drools.common.InternalKnowledgeRuntime; import org.drools.definition.process.Process; +import org.drools.process.instance.WorkItem; import org.drools.runtime.EnvironmentName; import org.drools.runtime.process.ProcessInstance; +import org.drools.runtime.process.WorkflowProcessInstance; import org.jbpm.persistence.ProcessPersistenceContext; import org.jbpm.persistence.ProcessPersistenceContextManager; +import org.jbpm.process.instance.InternalProcessRuntime; import org.jbpm.process.instance.ProcessInstanceManager; import org.jbpm.process.instance.impl.ProcessInstanceImpl; +import org.jbpm.process.instance.timer.TimerManager; +import org.jbpm.workflow.instance.NodeInstance; +import org.jbpm.workflow.instance.node.StateBasedNodeInstance; +import org.jbpm.workflow.instance.node.WorkItemNodeInstance; /** * This is an implementation of the {@link ProcessInstanceManager} that uses JPA. @@ -122,4 +130,26 @@ public void clearProcessInstances() { } } + public void clearProcessInstancesState() { + // at this point only timers are considered as state that needs to be cleared + TimerManager timerManager = ((InternalProcessRuntime)kruntime.getProcessRuntime()).getTimerManager(); + + for (ProcessInstance processInstance: new ArrayList(processInstances.values())) { + WorkflowProcessInstance pi = ((WorkflowProcessInstance) processInstance); + + + for (org.drools.runtime.process.NodeInstance nodeInstance : pi.getNodeInstances()) { + if (nodeInstance instanceof StateBasedNodeInstance) { + List timerIds = ((StateBasedNodeInstance) nodeInstance).getTimerInstances(); + if (timerIds != null) { + for (Long id: timerIds) { + timerManager.cancelTimer(id); + } + } + } + } + + } + } + } diff --git a/jbpm-test/src/test/java/org/jbpm/persistence/SerializedTimerRollbackTest.java b/jbpm-test/src/test/java/org/jbpm/persistence/SerializedTimerRollbackTest.java new file mode 100644 index 0000000000..db5608381f --- /dev/null +++ b/jbpm-test/src/test/java/org/jbpm/persistence/SerializedTimerRollbackTest.java @@ -0,0 +1,247 @@ +package org.jbpm.persistence; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import javax.transaction.TransactionManager; + +import org.drools.KnowledgeBase; +import org.drools.KnowledgeBaseFactory; +import org.drools.SystemEventListenerFactory; +import org.drools.builder.KnowledgeBuilder; +import org.drools.builder.KnowledgeBuilderError; +import org.drools.builder.KnowledgeBuilderFactory; +import org.drools.builder.ResourceType; +import org.drools.common.InternalKnowledgeRuntime; +import org.drools.io.ResourceFactory; +import org.drools.marshalling.impl.MarshallingConfigurationImpl; +import org.drools.marshalling.impl.ProtobufMarshaller; +import org.drools.persistence.jpa.JPAKnowledgeService; +import org.drools.runtime.Environment; +import org.drools.runtime.EnvironmentName; +import org.drools.runtime.StatefulKnowledgeSession; +import org.drools.runtime.process.ProcessInstance; +import org.jbpm.process.instance.InternalProcessRuntime; +import org.jbpm.process.instance.timer.TimerInstance; +import org.jbpm.process.instance.timer.TimerManager; +import org.jbpm.process.workitem.wsht.LocalHTWorkItemHandler; +import org.jbpm.task.Group; +import org.jbpm.task.User; +import org.jbpm.task.service.TaskService; +import org.jbpm.task.service.local.LocalTaskService; +import org.jbpm.task.utils.OnErrorAction; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import bitronix.tm.TransactionManagerServices; +import bitronix.tm.resource.jdbc.PoolingDataSource; + + +public class SerializedTimerRollbackTest { + + private PoolingDataSource ds; + + @Before + public void setup() { + ds = new PoolingDataSource(); + ds.setUniqueName( "jdbc/jbpm-ds" ); + ds.setClassName( "org.h2.jdbcx.JdbcDataSource" ); + ds.setMaxPoolSize( 3 ); + ds.setAllowLocalTransactions( true ); + ds.getDriverProperties().put( "user", "sa" ); + ds.getDriverProperties().put( "password", "sasa" ); + ds.getDriverProperties().put( "URL", "jdbc:h2:mem:mydb" ); + ds.init(); + } + + @After + public void tearDown() { + ds.close(); + } + + @Test + public void testSerizliableTestsWithExternalRollback() { + try { + + Environment env = KnowledgeBaseFactory.newEnvironment(); + EntityManagerFactory emf = Persistence.createEntityManagerFactory("org.jbpm.persistence.jpa"); + TransactionManager tm = TransactionManagerServices.getTransactionManager(); + System.out.println("Created JPA EntityManager"); + + env.set( EnvironmentName.ENTITY_MANAGER_FACTORY, emf ); + env.set( EnvironmentName.TRANSACTION_MANAGER, TransactionManagerServices.getTransactionManager() ); + TaskService taskService = new org.jbpm.task.service.TaskService(emf, SystemEventListenerFactory.getSystemEventListener()); + Map users = new HashMap(); + users.put("Administrator", new User("Administrator")); + users.put("john", new User("john")); + Map groups = new HashMap(); + taskService.addUsersAndGroups(users, groups); + org.jbpm.task.TaskService humanTaskClient = new LocalTaskService(taskService);; + + System.out.println("Task service created"); + + KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); + kbuilder.add(ResourceFactory.newClassPathResource("HumanTaskWithBoundaryTimer.bpmn"),ResourceType.BPMN2); + if (kbuilder.getErrors()!=null){ + for(KnowledgeBuilderError error: kbuilder.getErrors()){ + System.err.println(error.toString()); + } + } + System.out.println("BPMN process knowledge acquired"); + + KnowledgeBase kbase = kbuilder.newKnowledgeBase(); + StatefulKnowledgeSession sesion = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, env ); + System.out.println("Created knowledge session"); + + LocalHTWorkItemHandler localHTWorkItemHandler = new LocalHTWorkItemHandler(humanTaskClient, sesion, OnErrorAction.RETHROW); + localHTWorkItemHandler.connect(); + sesion.getWorkItemManager().registerWorkItemHandler("Human Task", localHTWorkItemHandler); + System.out.println("Attached human task work item handler"); + List committedProcessInstanceIds = new ArrayList(); + for(int i=0;i<10;i++){ + tm.begin(); + Map params = new HashMap(); + params.put("test", "john"); + System.out.println("Creating process instance: "+ i); + ProcessInstance pi = sesion.startProcess("PROCESS_1", params); + if (i%2 == 0) { + committedProcessInstanceIds.add(pi.getId()); + tm.commit(); + } else { + tm.rollback(); + } + } + + Connection c = ds.getConnection(); + Statement st = c.createStatement(); + ResultSet rs = st.executeQuery("select rulesbytearray from sessioninfo"); + rs.next(); + Blob b = rs.getBlob("rulesbytearray"); + assertNotNull(b); + + KnowledgeBuilder builder = KnowledgeBuilderFactory.newKnowledgeBuilder(); + ProtobufMarshaller marshaller = new ProtobufMarshaller(builder.newKnowledgeBase(),new MarshallingConfigurationImpl()); + StatefulKnowledgeSession session = marshaller.unmarshall(b.getBinaryStream()); + assertNotNull(session); + + TimerManager timerManager = ((InternalProcessRuntime)((InternalKnowledgeRuntime)session).getProcessRuntime()).getTimerManager(); + assertNotNull(timerManager); + + Collection timers = timerManager.getTimers(); + assertNotNull(timers); + assertEquals(5, timers.size()); + + for (TimerInstance timerInstance : timers) { + assertTrue(committedProcessInstanceIds.contains(timerInstance.getProcessInstanceId())); + } + + } catch (Exception e){ + e.printStackTrace(); + fail("Exception thrown"); + } + } + + @Test + public void testSerizliableTestsWithEngineRollback() { + try { + + Environment env = KnowledgeBaseFactory.newEnvironment(); + EntityManagerFactory emf = Persistence.createEntityManagerFactory("org.jbpm.persistence.jpa"); + System.out.println("Created JPA EntityManager"); + + env.set( EnvironmentName.ENTITY_MANAGER_FACTORY, emf ); + env.set( EnvironmentName.TRANSACTION_MANAGER, TransactionManagerServices.getTransactionManager() ); + TaskService taskService = new org.jbpm.task.service.TaskService(emf, SystemEventListenerFactory.getSystemEventListener()); + Map users = new HashMap(); + users.put("Administrator", new User("Administrator")); + users.put("john", new User("john")); + Map groups = new HashMap(); + taskService.addUsersAndGroups(users, groups); + org.jbpm.task.TaskService humanTaskClient = new LocalTaskService(taskService);; + + System.out.println("Task service created"); + + KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); + kbuilder.add(ResourceFactory.newClassPathResource("HumanTaskWithBoundaryTimer.bpmn"),ResourceType.BPMN2); + if (kbuilder.getErrors()!=null){ + for(KnowledgeBuilderError error: kbuilder.getErrors()){ + System.err.println(error.toString()); + } + } + System.out.println("BPMN process knowledge acquired"); + + KnowledgeBase kbase = kbuilder.newKnowledgeBase(); + StatefulKnowledgeSession sesion = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, env ); + System.out.println("Created knowledge session"); + + LocalHTWorkItemHandler localHTWorkItemHandler = new LocalHTWorkItemHandler(humanTaskClient, sesion, OnErrorAction.RETHROW); + localHTWorkItemHandler.connect(); + sesion.getWorkItemManager().registerWorkItemHandler("Human Task", localHTWorkItemHandler); + System.out.println("Attached human task work item handler"); + List committedProcessInstanceIds = new ArrayList(); + for(int i=0;i<10;i++){ + if (i%2 == 0) { + Map params = new HashMap(); + params.put("test", "john"); + System.out.println("Creating process instance: "+ i); + ProcessInstance pi = sesion.startProcess("PROCESS_1", params); + + committedProcessInstanceIds.add(pi.getId()); + + } else { + try { + Map params = new HashMap(); + params.put("test", "test"); + System.out.println("Creating process instance: "+ i); + ProcessInstance pi = sesion.startProcess("PROCESS_1", params); + } catch (Exception e) { + System.out.println("Process rolled back"); + } + } + } + + Connection c = ds.getConnection(); + Statement st = c.createStatement(); + ResultSet rs = st.executeQuery("select rulesbytearray from sessioninfo"); + rs.next(); + Blob b = rs.getBlob("rulesbytearray"); + assertNotNull(b); + + KnowledgeBuilder builder = KnowledgeBuilderFactory.newKnowledgeBuilder(); + ProtobufMarshaller marshaller = new ProtobufMarshaller(builder.newKnowledgeBase(),new MarshallingConfigurationImpl()); + StatefulKnowledgeSession session = marshaller.unmarshall(b.getBinaryStream()); + assertNotNull(session); + + TimerManager timerManager = ((InternalProcessRuntime)((InternalKnowledgeRuntime)session).getProcessRuntime()).getTimerManager(); + assertNotNull(timerManager); + + Collection timers = timerManager.getTimers(); + assertNotNull(timers); + assertEquals(5, timers.size()); + + for (TimerInstance timerInstance : timers) { + assertTrue(committedProcessInstanceIds.contains(timerInstance.getProcessInstanceId())); + } + + } catch (Exception e){ + e.printStackTrace(); + fail("Exception thrown"); + } + } + +} \ No newline at end of file diff --git a/jbpm-test/src/test/resources/HumanTaskWithBoundaryTimer.bpmn b/jbpm-test/src/test/resources/HumanTaskWithBoundaryTimer.bpmn new file mode 100644 index 0000000000..aac4859d15 --- /dev/null +++ b/jbpm-test/src/test/resources/HumanTaskWithBoundaryTimer.bpmn @@ -0,0 +1,179 @@ + + + + + + + + + + + + + + + + + + + + + + + + + _4_CommentInput + _4_SkippableInput + _4_LocaleInput + _4_TaskNameInput + _4_GroupIdInput + _4_PriorityInput + + + + + + _4_CommentInput + + + _4_CommentInput + + + + _4_SkippableInput + + false + _4_SkippableInput + + + + _4_LocaleInput + + en-UK + _4_LocaleInput + + + + _4_TaskNameInput + + + _4_TaskNameInput + + + + _4_GroupIdInput + + + _4_GroupIdInput + + + + _4_PriorityInput + + + _4_PriorityInput + + + + + #{test} + + + + + + + + + 30d + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file