Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kie-issues-509] Handle SLA timers during process instance migration #2323

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,9 @@ public void removeEventListeners() {
}

@Override
protected void triggerCompleted(String type, boolean remove) {
if (this.slaCompliance == org.kie.api.runtime.process.ProcessInstance.SLA_PENDING) {
if (System.currentTimeMillis() > slaDueDate.getTime()) {
protected void triggerCompleted(String type, boolean remove) {
if (this.slaCompliance == org.kie.api.runtime.process.ProcessInstance.SLA_PENDING) {
if (System.currentTimeMillis() > slaDueDate.getTime()) {
// completion of the node instance is after expected SLA due date, mark it accordingly
this.slaCompliance = org.kie.api.runtime.process.ProcessInstance.SLA_VIOLATED;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package org.jbpm.runtime.manager.impl.migration;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
Expand Down Expand Up @@ -142,12 +148,18 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {
boolean migrateExecutorJobs = ((SimpleRuntimeEnvironment)currentManager.getEnvironment()).getEnvironmentTemplate().get("ExecutorService") != null;
validate(migrateExecutorJobs);
Map<Long, List<TimerInstance>> timerMigrated = null;
Map<Long, List<TimerInstance>> stateBasedTimer = null;
Map<Long, List<TimerInstance>> slaTimerMigrated = null;
Map<Long, List<TimerInstance>> humanTaskSuspended = null;
try {

// collect and cancel any active timers before migration
timerMigrated = cancelActiveTimersBeforeMigration(currentManager);
timerMigrated = cancelActiveTimersBeforeMigration(currentManager, TimerNodeInstance.class, active -> asList(active.getTimerId()));
stateBasedTimer = cancelActiveTimersBeforeMigration(currentManager, StateBasedNodeInstance.class, active -> active.getTimerInstances());
slaTimerMigrated = cancelActiveTimersBeforeMigration(currentManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, active -> asList(active.getSlaTimerId()));
humanTaskSuspended = cancelActiveTimersBeforeMigration(currentManager, HumanTaskNodeInstance.class, active -> asList((active.getSuspendUntilTimerId())));

// start transaction to secure consistency of the migration
// start transaction to secure consistency of the migration
txm = TransactionManagerFactory.get().newTransactionManager(currentManager.getEnvironment().getEnvironment());
transactionOwner = txm.begin();

Expand Down Expand Up @@ -254,9 +266,14 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {
tobe = toBeManager.getEnvironment().getKieBase().newKieSession();
upgradeProcessInstance(current, tobe, migrationSpec.getProcessInstanceId(), migrationSpec.getToProcessId(), nodeMapping, em, toBeManager.getIdentifier());

if (!timerMigrated.isEmpty()) {
rescheduleTimersAfterMigration(toBeManager, timerMigrated);
}

// reschedule timers
rescheduleTimersAfterMigration(toBeManager, TimerNodeInstance.class, timerMigrated, (active, timers) -> active.internalSetTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(toBeManager, StateBasedNodeInstance.class, stateBasedTimer, (active, timers) -> active.internalSetTimerInstances(timers.stream().map(TimerInstance::getId).collect(Collectors.toList())));
rescheduleTimersAfterMigration(toBeManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, slaTimerMigrated, (active, timers) -> active.internalSetSlaTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(toBeManager, HumanTaskNodeInstance.class, humanTaskSuspended, (active, timers) -> active.setSuspendUntilTimerId(toSingletonTimerId(timers)));


em.flush();
} finally {
em.clear();
Expand All @@ -270,10 +287,12 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {
} catch (Throwable e) {
txm.rollback(transactionOwner);
logger.error("Unexpected error during migration", e);
// put back timers (if there are any) in case of rollback
if (timerMigrated != null && !timerMigrated.isEmpty()) {
rescheduleTimersAfterMigration(currentManager, timerMigrated);
}

rescheduleTimersAfterMigration(currentManager, TimerNodeInstance.class, timerMigrated, (active, timers) -> active.internalSetTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(currentManager, StateBasedNodeInstance.class, stateBasedTimer, (active, timers) -> active.internalSetTimerInstances(timers.stream().map(TimerInstance::getId).collect(Collectors.toList())));
rescheduleTimersAfterMigration(currentManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, slaTimerMigrated, (active, timers) -> active.internalSetSlaTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(currentManager, HumanTaskNodeInstance.class, humanTaskSuspended, (active, timers) -> active.setSuspendUntilTimerId(toSingletonTimerId(timers)));

report.addEntry(Type.ERROR, "Migration of process instance (" + migrationSpec.getProcessInstanceId() + ") failed due to " + e.getMessage());

} finally {
Expand All @@ -297,6 +316,10 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {

return report;
}

private Long toSingletonTimerId(List<TimerInstance> timerInstances) {
return (timerInstances.isEmpty()) ? -1 : timerInstances.get(0).getId();
}

private void validate(boolean migrateExecutorJobs) {
if (migrationSpec == null) {
Expand Down Expand Up @@ -356,7 +379,7 @@ private void validate(boolean migrateExecutorJobs) {
if (migrateExecutorJobs) {
List<Long> executorJobs = (List<Long>) em.createQuery("select id FROM RequestInfo ri WHERE ri.processInstanceId = :processInstanceId and ri.status in (:statuses)")
.setParameter("processInstanceId", migrationSpec.getProcessInstanceId())
.setParameter("statuses", Arrays.asList(STATUS.QUEUED, STATUS.RETRYING, STATUS.RUNNING))
.setParameter("statuses", asList(STATUS.QUEUED, STATUS.RETRYING, STATUS.RUNNING))
.getResultList();

if (!executorJobs.isEmpty()) {
Expand Down Expand Up @@ -569,7 +592,7 @@ protected TimerManager getTimerManager(KieSession ksession) {
return ((InternalProcessRuntime) ((StatefulKnowledgeSessionImpl) internal).getProcessRuntime()).getTimerManager();
}

protected Map<Long, List<TimerInstance>> cancelActiveTimersBeforeMigration(RuntimeManager manager) {
protected <T extends NodeInstance> Map<Long, List<TimerInstance>> cancelActiveTimersBeforeMigration(RuntimeManager manager, Class<T> type, Function<T, List<Long>> getTimerInstances ) {
RuntimeEngine engineBefore = manager.getRuntimeEngine(ProcessInstanceIdContext.get(migrationSpec.getProcessInstanceId()));
try {
Map<Long, List<TimerInstance>> timerMigrated = engineBefore.getKieSession().execute(new ExecutableCommand<Map<Long, List<TimerInstance>>>() {
Expand All @@ -589,27 +612,31 @@ public Map<Long, List<TimerInstance>> execute(Context context) {
Collection<org.jbpm.workflow.instance.NodeInstance> activeInstances = processInstance.getNodeInstances(true);

for (org.jbpm.workflow.instance.NodeInstance active : activeInstances) {
if (active instanceof TimerNodeInstance) {
TimerInstance timerInstance = timerManager.getTimerMap().get(((TimerNodeInstance) active).getTimerId());

timerManager.cancelTimer(processInstance.getId(), timerInstance.getId());
result.put(active.getId(), Arrays.asList(timerInstance));
} else if (active instanceof StateBasedNodeInstance) {
List<Long> timers = ((StateBasedNodeInstance) active).getTimerInstances();

if (timers != null && !timers.isEmpty()) {
List<TimerInstance> collected = new ArrayList<>();
for (Long timerId : timers) {
TimerInstance timerInstance = timerManager.getTimerMap().get(timerId);
if (timerInstance==null) {
report.addEntry(Type.WARN, "Could not find timer instance with id "+timerId+" to cancel.");
continue;
}
timerManager.cancelTimer(processInstance.getId(), timerInstance.getId());
collected.add(timerInstance);
}
result.put(active.getId(), collected);
if (!type.isAssignableFrom(active.getClass())) {
continue;
}

List<Long> timers = getTimerInstances.apply(type.cast(active));
if (timers == null) {
continue;
}

List<TimerInstance> collected = new ArrayList<>();
for (Long timerId : timers) {
if (timerId == -1) {
continue;
}

TimerInstance timerInstance = timerManager.getTimerMap().get(timerId);
if (timerInstance == null) {
report.addEntry(Type.WARN, "Could not find timer instance with id " + timerId + " to cancel.");
continue;
}
timerManager.cancelTimer(processInstance.getId(), timerInstance.getId());
collected.add(timerInstance);
}
if (!collected.isEmpty()) {
result.put(active.getId(), collected);
}
}

Expand All @@ -623,12 +650,16 @@ public Map<Long, List<TimerInstance>> execute(Context context) {
}
}

protected void rescheduleTimersAfterMigration(RuntimeManager manager, Map<Long, List<TimerInstance>> timerMigrated) {
protected <T extends NodeInstance> void rescheduleTimersAfterMigration(RuntimeManager manager, Class<T> nodeType, Map<Long, List<TimerInstance>> timersToMigrate, BiConsumer<T, List<TimerInstance>> timerMigrated) {
if(timersToMigrate.isEmpty()) {
return;
}

RuntimeEngine engine = manager.getRuntimeEngine(ProcessInstanceIdContext.get(migrationSpec.getProcessInstanceId()));
try {
engine.getKieSession().execute(new ExecutableCommand<Void>() {

private static final long serialVersionUID = 7144657913971146080L;
private static final long serialVersionUID = 7144357923971146089L;

@Override
public Void execute(Context context) {
Expand All @@ -637,35 +668,25 @@ public Void execute(Context context) {

WorkflowProcessInstanceImpl processInstance = (WorkflowProcessInstanceImpl) kieSession.getProcessInstance(migrationSpec.getProcessInstanceId());

for (Entry<Long, List<TimerInstance>> entry : timerMigrated.entrySet()) {
for (Entry<Long, List<TimerInstance>> entry : timersToMigrate.entrySet()) {

org.jbpm.workflow.instance.NodeInstance active = processInstance.getNodeInstance(entry.getKey(), true);
if (active instanceof TimerNodeInstance) {
TimerInstance timerInstance = entry.getValue().get(0);

if(!nodeType.isAssignableFrom(active.getClass())) {
continue;
}

List<TimerInstance> timerInstances = entry.getValue();
for (TimerInstance timerInstance : timerInstances) {
long delay = timerInstance.getDelay() - (System.currentTimeMillis() - timerInstance.getActivated().getTime());
timerInstance.setDelay(delay);

updateBasedOnTrigger(timerInstance);

timerManager.registerTimer(timerInstance, processInstance);
((TimerNodeInstance) active).internalSetTimerId(timerInstance.getId());
} else if (active instanceof StateBasedNodeInstance) {

List<TimerInstance> timerInstances = entry.getValue();
List<Long> timers = new ArrayList<>();
for (TimerInstance timerInstance : timerInstances) {
long delay = timerInstance.getDelay() - (System.currentTimeMillis() - timerInstance.getActivated().getTime());
timerInstance.setDelay(delay);

updateBasedOnTrigger(timerInstance);

timerManager.registerTimer(timerInstance, processInstance);
timers.add(timerInstance.getId());
}
((StateBasedNodeInstance) active).internalSetTimerInstances(timers);

timerManager.registerTimer(timerInstance, processInstance);
}

timerMigrated.accept(nodeType.cast(active), timerInstances);

}

return null;
Expand Down Expand Up @@ -697,4 +718,4 @@ protected void updateBasedOnTrigger(TimerInstance timerInstance) {
}
}

}
}