Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kie-roadmap-119] Scheduled jobs do not keep configured execution time #2292

Merged
merged 3 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 65 additions & 0 deletions jbpm-services/jbpm-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,28 @@
<artifactId>hornetq-jms-server</artifactId>
<scope>test</scope>
</dependency>

<!-- byteman deps for testing -->
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-submit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-install</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-bmunit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -219,6 +241,49 @@
</testResources>

<plugins>
<plugin>
<!-- We need to fork always as we have tests which are configured by system properties when a JVM starts -->
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<reuseForks>false</reuseForks>
<!-- Properties for Byteman which allows it to run on IBM Java as well -->
<argLine>
-javaagent:${project.build.directory}/agents/byteman.jar=port:9091,boot:${project.build.directory}/agents/byteman.jar
-Dfile.encoding=${project.build.sourceEncoding}
${jacoco.agent.line}
</argLine>
<systemPropertyVariables>
<org.jboss.byteman.contrib.bmunit.agent.inhibit>true</org.jboss.byteman.contrib.bmunit.agent.inhibit>
<org.jboss.byteman.allow.config.update>true</org.jboss.byteman.allow.config.update>
</systemPropertyVariables>
</configuration>
</plugin>

<!-- Byteman jar is copied to target/agents directory so Java is able to bootstrap it -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-agent</id>
<phase>process-test-classes</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<outputDirectory>${project.build.directory}/agents</outputDirectory>
<destFileName>byteman.jar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
* <li>DateFormat - date format for further date related params - if not given yyyy-MM-dd is used (pattern of SimpleDateFormat class)</li>
* <li>EmfName - name of entity manager factory to be used for queries (valid persistence unit name)</li>
* <li>SingleRun - indicates if execution should be single run only (true|false)</li>
* <li>RepeatMode - defines how the next execution time is calculated
* <ul>
* <li>FIXED - Scheduled time + NextRun parameter value</li>
* <li>INTERVAL - End of execution + NextRun parameter value (default)</li>
martinweiler marked this conversation as resolved.
Show resolved Hide resolved
* </ul>
* </li>
* <li>NextRun - provides next execution time (valid time expression e.g. 1d, 5h, etc)</li>
* <li>OlderThan - indicates what logs should be deleted - older than given date</li>
* <li>OlderThanPeriod - indicated what logs should be deleted older than given time expression (valid time expression e.g. 1d, 5h, etc)</li>
Expand Down Expand Up @@ -125,15 +131,6 @@ public ExecutionResults execute(CommandContext ctx) throws Exception {
emfName = "org.jbpm.domain";
}
nextScheduleTimeAdd = TimeUnit.DAYS.toMillis(1);
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
nextScheduleTimeAdd = -1;
}
String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
nextScheduleTimeAdd = DateTimeUtils.parseDateAsDuration(nextRun);
}

// get hold of persistence and create instance of audit service
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(emfName);
Expand Down Expand Up @@ -268,11 +265,40 @@ public ExecutionResults execute(CommandContext ctx) throws Exception {
executionResults.setData("ProcessInstanceLogRemoved", piLogsRemoved);
}
}
calculateNextRun(ctx);
executionResults.setData("BAMLogRemoved", 0L);
return executionResults;
}

private static boolean mightBeMore (int deleted, int recordPerTransaction) {
return recordPerTransaction > 0 && deleted >= recordPerTransaction;
}

private void calculateNextRun(CommandContext ctx) {
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
nextScheduleTimeAdd = -1;
return;
}

String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
nextScheduleTimeAdd = DateTimeUtils.parseDateAsDuration(nextRun) - calculateExecutionTimeInMillis(ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose that the subtraction is negative (because the cleanup operation lasts too long or the interval is very short): in that case, the nextScheduleTimeAdd is negative, and therefore getScheduleTime will return null, without scheduling any other time.
I think this edge scenario could be handled as well, wdyt? (also a test with more delay to verify that the cleanup execution is not stopped).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmunozfe thanks for the input, that makes sense. I added the check that in case there is an overlap (previous execution took longer than nextRun value), the next execution is scheduled immediately. Also added a new test case for this scenario.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martinweiler thanks, this looks great!

}
}

private long calculateExecutionTimeInMillis(CommandContext ctx) {
long executionTimeInMillis = 0;
String repeatMode = (String) ctx.getData("RepeatMode");
if( "fixed".equalsIgnoreCase(repeatMode) ) {
// return diff between scheduled time and actual time
Date scheduledExecutionTime = (Date) ctx.getData("scheduledExecutionTime");
executionTimeInMillis = Instant.now().minus(scheduledExecutionTime.toInstant().toEpochMilli(), ChronoUnit.MILLIS).toEpochMilli();
logger.debug("Calculated execution time {}ms, based on scheduled execution time {}", executionTimeInMillis, scheduledExecutionTime);
} else if( "interval".equalsIgnoreCase(repeatMode) ) {
// no calculation required
}
martinweiler marked this conversation as resolved.
Show resolved Hide resolved
return executionTimeInMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public void executeGivenJob(RequestInfo request) {
}
// add class loader so internally classes can be created with valid (kjar) deployment
ctx.setData("ClassLoader", cl);


// add scheduled execution time
ctx.setData("scheduledExecutionTime", request.getTime());

cmd = classCacheManager.findCommand(request.getCommandName(), cl);
// increment execution counter directly to cover both success and failure paths
request.setExecutions(request.getExecutions() + 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.executor;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import org.jboss.byteman.contrib.bmunit.BMScript;
import org.jboss.byteman.contrib.bmunit.BMUnitConfig;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.jbpm.executor.impl.ExecutorServiceImpl;
import org.jbpm.executor.test.CountDownAsyncJobListener;
import org.jbpm.test.util.ExecutorTestUtil;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutorService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.executor.STATUS;
import org.kie.api.runtime.query.QueryContext;
import org.kie.test.util.db.PoolingDataSourceWrapper;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(BMUnitRunner.class)
@BMUnitConfig(loadDirectory = "target/test-classes")
public class LogCleanupCommandTest {

private PoolingDataSourceWrapper pds;

protected ExecutorService executorService;
protected EntityManagerFactory emf = null;

@Before
public void setUp() {
pds = ExecutorTestUtil.setupPoolingDataSource();
emf = Persistence.createEntityManagerFactory("org.jbpm.executor");

executorService = ExecutorServiceFactory.newExecutorService(emf);

executorService.init();
executorService.setThreadPoolSize(1);
executorService.setInterval(3);
}

@After
public void tearDown() {
executorService.clearAllErrors();
executorService.clearAllRequests();

executorService.destroy();
if (emf != null) {
emf.close();
}
pds.close();
}

protected CountDownAsyncJobListener configureListener(int threads) {
CountDownAsyncJobListener countDownListener = new CountDownAsyncJobListener(threads);
((ExecutorServiceImpl) executorService).addAsyncJobListener(countDownListener);

return countDownListener;
}


@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupSingleRunTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("SingleRun", "true");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(0, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(0, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunIntervalTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "10s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be at least 11 seconds (10s next interval + artificial slowdown from byteman)
long diff = nextExecution - firstExecution;
assertTrue(diff > 11000);
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunFixedTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "10s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
ctxCMD.setData("RepeatMode", "fixed");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be less than 11 seconds (10s next interval, regardless of artifial slowdown by byteman)
long diff = nextExecution - firstExecution;
assertTrue(diff < 11000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
########################################################################
#
# Rule to slow down the execution of JPAAuditLogService.doDelete
#

RULE JPAAuditLogService.doDelete sleep
CLASS org.jbpm.process.audit.JPAAuditLogService
METHOD doDelete
AT ENTRY
IF TRUE
DO debug("Pausing JPAAuditLogService.doDelete for 500ms");
Thread.sleep(500);
return 1
ENDRULE