Skip to content

Commit

Permalink
[JBPM-10048] Row was updated or deleted by another transaction (#2130)
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Jun 21, 2022
1 parent 1388d24 commit 4e4b229
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public interface ProcessPersistenceContext

PersistentProcessInstance findProcessInstanceInfo(Long processId);

default void evict(PersistentProcessInstance processInstanceInfo) {

}

void remove(PersistentProcessInstance processInstanceInfo);

List<Long> getProcessInstancesWaitingForEvent(String type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public void remove(PersistentProcessInstance processInstanceInfo) {
}
}

@Override
public void evict(PersistentProcessInstance processInstanceInfo) {
getEntityManager().detach(processInstanceInfo);
}

/**
* This method is used by the {@link JPASignalManager} in order to load {@link ProcessInstance} instances
* into the {@link ProcessInstanceManager} cache so that they can then be signalled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public ProcessInstance getProcessInstance(long id, boolean readOnly) {
org.jbpm.process.instance.ProcessInstance processInstance = null;
processInstance = (org.jbpm.process.instance.ProcessInstance) this.processInstances.get(id);
if (processInstance != null) {

if (((WorkflowProcessInstanceImpl) processInstance).isPersisted() && !readOnly) {
ProcessPersistenceContextManager ppcm
= (ProcessPersistenceContextManager) this.kruntime.getEnvironment().get( EnvironmentName.PERSISTENCE_CONTEXT_MANAGER );
Expand All @@ -181,7 +182,6 @@ public ProcessInstance getProcessInstance(long id, boolean readOnly) {


EventManagerProvider.getInstance().get().update(getInstanceViewFor(processInstance));

}
return processInstance;
}
Expand Down Expand Up @@ -222,6 +222,7 @@ public ProcessInstance getProcessInstance(long id, boolean readOnly) {

if (readOnly) {
internalRemoveProcessInstance(processInstance);
context.evict(processInstanceInfo);
}
return processInstance;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ public ProcessInstance getProcessInstance(InternalKnowledgeRuntime kruntime,
}
context.close();
} catch ( IOException e ) {
e.printStackTrace();
throw new IllegalArgumentException( "IOException while loading process instance: " + e.getMessage(),
e );
throw new IllegalArgumentException( "IOException while loading process instance: " + e.getMessage(), e);
}
}
((WorkflowProcessInstanceImpl) processInstance).internalSetStartDate(this.startDate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

package org.jbpm.persistence.session;

import static org.jbpm.test.persistence.util.PersistenceUtil.*;
import static org.junit.Assert.*;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.naming.InitialContext;
import javax.transaction.UserTransaction;
Expand All @@ -44,30 +51,39 @@
import org.kie.internal.persistence.jpa.JPAKnowledgeService;
import org.kie.internal.runtime.StatefulKnowledgeSession;

import static org.jbpm.test.persistence.util.PersistenceUtil.JBPM_PERSISTENCE_UNIT_NAME;
import static org.jbpm.test.persistence.util.PersistenceUtil.cleanUp;
import static org.jbpm.test.persistence.util.PersistenceUtil.createEnvironment;
import static org.jbpm.test.persistence.util.PersistenceUtil.setupWithPoolingDataSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

/**
* This test looks at the behavior of the {@link JPAProcessInstanceManager}
* This test looks at the behavior of the {@link JPAProcessInstanceManager}
* with regards to created (but not started) process instances
* and whether the process instances are available or not after creation.
*/
@RunWith(Parameterized.class)
public class GetProcessInstancesTest extends AbstractBaseTest {

private HashMap<String, Object> context;

private Environment env;
private KieBase kbase;
private long sessionId;
public GetProcessInstancesTest(boolean locking) {
this.useLocking = locking;

public GetProcessInstancesTest(boolean locking) {
this.useLocking = locking;
}

@Parameters
public static Collection<Object[]> persistence() {
Object[][] data = new Object[][] { { false }, { true } };
return Arrays.asList(data);
};

@Before
public void setUp() throws Exception {
context = setupWithPoolingDataSource(JBPM_PERSISTENCE_UNIT_NAME);
Expand Down Expand Up @@ -107,45 +123,45 @@ public void create2ProcessInstances() throws Exception {
public void create2ProcessInstancesInsideTransaction() throws Exception {
long[] processId = new long[2];

UserTransaction ut = (UserTransaction) new InitialContext().lookup( "java:comp/UserTransaction" );
UserTransaction ut = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
ut.begin();

StatefulKnowledgeSession ksession = reloadKnowledgeSession();
processId[0] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
processId[1] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
assertEquals(2, ksession.getProcessInstances().size());

// process instance manager cache flushed on tx
ut.commit();
assertEquals(0, ksession.getProcessInstances().size());

ksession = reloadKnowledgeSession(ksession);
assertEquals(0, ksession.getProcessInstances().size());
ksession.dispose();

assertProcessInstancesExist(processId);
}

@Test
public void noProcessInstancesLeftAfterRollback() throws Exception {
long[] notProcess = new long[2];

UserTransaction ut = (UserTransaction) new InitialContext().lookup( "java:comp/UserTransaction" );
UserTransaction ut = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
ut.begin();

StatefulKnowledgeSession ksession = reloadKnowledgeSession();
notProcess[0] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
notProcess[1] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
assertEquals(2, ksession.getProcessInstances().size());

ut.rollback();
// Validate that proc inst mgr cache is also flushed on rollback
assertEquals(0, ksession.getProcessInstances().size());

ksession = reloadKnowledgeSession(ksession);
assertEquals(0, ksession.getProcessInstances().size());
ksession.dispose();

assertProcessInstancesNotExist(notProcess);
}

Expand All @@ -154,43 +170,139 @@ public void noProcessInstancesLeftWithPreTxKSessionAndRollback() throws Exceptio
long[] notProcess = new long[4];

StatefulKnowledgeSession ksession = reloadKnowledgeSession();
UserTransaction ut = (UserTransaction) new InitialContext().lookup( "java:comp/UserTransaction" );

UserTransaction ut = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
ut.begin();

notProcess[0] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
notProcess[1] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();

ut.rollback();
// Validate that proc inst mgr cache is also flushed on rollback
assertEquals(0, ksession.getProcessInstances().size());

ksession = reloadKnowledgeSession(ksession);
assertEquals(0, ksession.getProcessInstances().size());
ksession.dispose();

assertProcessInstancesNotExist(notProcess);
}

@Test
public void createProcessInstanceAndGetStartDate() throws Exception {
StatefulKnowledgeSession ksession = reloadKnowledgeSession();
long processId= ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
long processId = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
assertEquals(0, ksession.getProcessInstances().size());
RuleFlowProcessInstance processInstance = (RuleFlowProcessInstance) ksession.getProcessInstance(processId);

RuleFlowProcessInstance processInstance = (RuleFlowProcessInstance) ksession.getProcessInstance(processId);
assertNotNull("Process " + processId + " exist!", processInstance);
assertNotNull("Process start at " + processInstance.getStartDate(), processInstance.getStartDate());

ksession.dispose();


}

@Test
public void processInstanceWriteAfterReadonly1() throws Exception {
internalProcessInstanceWriteAfterReadonly(false); // checks OptimisticLockException "Row was updated or deleted by another transaction"
}

@Test
public void processInstanceWriteAfterReadonly2() throws Exception {
internalProcessInstanceWriteAfterReadonly(true); // checks variable
}

private void internalProcessInstanceWriteAfterReadonly(boolean checkVariable) throws Exception {
long[] processId = new long[1];

StatefulKnowledgeSession ksession = reloadKnowledgeSession();
processId[0] = ksession.createProcessInstance("org.jbpm.processinstance.helloworld", null).getId();
ksession.dispose();

assertProcessInstancesExist(processId);

String testVarName = "testVar" + processId;
String testVarValue = UUID.randomUUID().toString();

Lock lock = new ReentrantLock();
AtomicBoolean thread1Waiting = new AtomicBoolean(true);
AtomicBoolean thread2Waiting = new AtomicBoolean(true);
Condition thread1Ready = lock.newCondition();
Condition thread2Ready = lock.newCondition();
Callable<Void> thread1Task = () -> {
lock.lock();
try {
// synchronization with thread2Task
while (thread1Waiting.get()) {
thread2Ready.await();
}

UserTransaction ut = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
ut.begin();

// getting process instance in write-mode and setting up variable
StatefulKnowledgeSession ksession1 = JPAKnowledgeService.newStatefulKnowledgeSession(kbase, null, createEnvironment(context));
RuleFlowProcessInstance processInstance = (RuleFlowProcessInstance) ksession1.getProcessInstance(processId[0], false); // write-mode access
assertNotNull(processInstance);
processInstance.setVariable(testVarName, testVarValue);

ut.commit();
ksession1.dispose();

thread2Waiting.set(false);
thread1Ready.signal();
} finally {
lock.unlock();
}
return null;
};
Callable<Void> thread2Task = () -> {
lock.lock();
try {
UserTransaction ut = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
ut.begin();

// firstly we getting process instance in readonly mode
StatefulKnowledgeSession ksession2 = JPAKnowledgeService.newStatefulKnowledgeSession(kbase, null, createEnvironment(context));
RuleFlowProcessInstance processInstance = (RuleFlowProcessInstance) ksession2.getProcessInstance(processId[0], true); // readonly access
assertNotNull(processInstance);
assertNull(processInstance.getVariable(testVarName));

// waiting thread1Task work
thread1Waiting.set(false);
thread2Ready.signal();
while (thread2Waiting.get()) {
thread1Ready.await();
}

// now we read process instance in write-mode
processInstance = (RuleFlowProcessInstance) ksession2.getProcessInstance(processId[0], false); // write-mode access
assertNotNull(processInstance);
if (checkVariable)
assertEquals(testVarValue, processInstance.getVariable(testVarName)); // it doesn't returns expected value while we are using the old ProcessInstanceInfo version

ut.commit(); // raises OptimisticLockException "Row was updated or deleted by another transaction" prior we have fix it
ksession2.dispose();
} finally {
lock.unlock();
}
return null;
};
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
Future<Void> future1 = executor.submit(thread1Task);
Future<Void> future2 = executor.submit(thread2Task);
future1.get(1000, TimeUnit.MILLISECONDS);
future2.get(1000, TimeUnit.MILLISECONDS);
} finally {
executor.shutdown();
}
}


/**
* Helper functions
*/

private void assertProcessInstancesExist(long[] processId) {
StatefulKnowledgeSession ksession = reloadKnowledgeSession();

Expand All @@ -214,7 +326,7 @@ private KieBase createBase() {

return kbuilder.newKieBase();
}

private StatefulKnowledgeSession reloadKnowledgeSession() {
return JPAKnowledgeService.loadStatefulKnowledgeSession(sessionId, kbase, null, env);
}
Expand Down

0 comments on commit 4e4b229

Please sign in to comment.