Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kie-roadmap-119] Scheduled jobs do not keep configured execution time #2292

Merged
merged 3 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 65 additions & 0 deletions jbpm-services/jbpm-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,28 @@
<artifactId>hornetq-jms-server</artifactId>
<scope>test</scope>
</dependency>

<!-- byteman deps for testing -->
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-submit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-install</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-bmunit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -219,6 +241,49 @@
</testResources>

<plugins>
<plugin>
<!-- We need to fork always as we have tests which are configured by system properties when a JVM starts -->
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<reuseForks>false</reuseForks>
<!-- Properties for Byteman which allows it to run on IBM Java as well -->
<argLine>
-javaagent:${project.build.directory}/agents/byteman.jar=port:9091,boot:${project.build.directory}/agents/byteman.jar
-Dfile.encoding=${project.build.sourceEncoding}
${jacoco.agent.line}
</argLine>
<systemPropertyVariables>
<org.jboss.byteman.contrib.bmunit.agent.inhibit>true</org.jboss.byteman.contrib.bmunit.agent.inhibit>
<org.jboss.byteman.allow.config.update>true</org.jboss.byteman.allow.config.update>
</systemPropertyVariables>
</configuration>
</plugin>

<!-- Byteman jar is copied to target/agents directory so Java is able to bootstrap it -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-agent</id>
<phase>process-test-classes</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<outputDirectory>${project.build.directory}/agents</outputDirectory>
<destFileName>byteman.jar</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
* <li>DateFormat - date format for further date related params - if not given yyyy-MM-dd is used (pattern of SimpleDateFormat class)</li>
* <li>EmfName - name of entity manager factory to be used for queries (valid persistence unit name)</li>
* <li>SingleRun - indicates if execution should be single run only (true|false)</li>
* <li>RepeatMode - defines how the next execution time is calculated
* <ul>
* <li>FIXED - Scheduled time + NextRun parameter value</li>
* <li>INTERVAL (or not present) - End of execution + NextRun parameter value (default)</li>
* </ul>
* </li>
* <li>NextRun - provides next execution time (valid time expression e.g. 1d, 5h, etc)</li>
* <li>OlderThan - indicates what logs should be deleted - older than given date</li>
* <li>OlderThanPeriod - indicated what logs should be deleted older than given time expression (valid time expression e.g. 1d, 5h, etc)</li>
Expand Down Expand Up @@ -125,15 +131,6 @@ public ExecutionResults execute(CommandContext ctx) throws Exception {
emfName = "org.jbpm.domain";
}
nextScheduleTimeAdd = TimeUnit.DAYS.toMillis(1);
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
nextScheduleTimeAdd = -1;
}
String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
nextScheduleTimeAdd = DateTimeUtils.parseDateAsDuration(nextRun);
}

// get hold of persistence and create instance of audit service
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(emfName);
Expand Down Expand Up @@ -268,11 +265,40 @@ public ExecutionResults execute(CommandContext ctx) throws Exception {
executionResults.setData("ProcessInstanceLogRemoved", piLogsRemoved);
}
}
calculateNextRun(ctx);
executionResults.setData("BAMLogRemoved", 0L);
return executionResults;
}

private static boolean mightBeMore (int deleted, int recordPerTransaction) {
return recordPerTransaction > 0 && deleted >= recordPerTransaction;
}

private void calculateNextRun(CommandContext ctx) {
String singleRun = (String) ctx.getData("SingleRun");
if ("true".equalsIgnoreCase(singleRun)) {
// disable rescheduling
nextScheduleTimeAdd = -1;
return;
}

String nextRun = (String) ctx.getData("NextRun");
if (nextRun != null) {
// if calculated next execution time is negative, schedule next run for immediate execution
nextScheduleTimeAdd = Math.max(DateTimeUtils.parseDateAsDuration(nextRun) - calculateExecutionTimeInMillis(ctx), 100);
}
}

private long calculateExecutionTimeInMillis(CommandContext ctx) {
long executionTimeInMillis = 0;
String repeatMode = (String) ctx.getData("RepeatMode");
if( "fixed".equalsIgnoreCase(repeatMode) ) {
// return diff between scheduled time and actual time
Date scheduledExecutionTime = (Date) ctx.getData("scheduledExecutionTime");
executionTimeInMillis = Instant.now().minus(scheduledExecutionTime.toInstant().toEpochMilli(), ChronoUnit.MILLIS).toEpochMilli();
logger.debug("Calculated execution time {}ms, based on scheduled execution time {}", executionTimeInMillis, scheduledExecutionTime);
}
// no calculation required for interval (or empty) mode
return executionTimeInMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public void executeGivenJob(RequestInfo request) {
}
// add class loader so internally classes can be created with valid (kjar) deployment
ctx.setData("ClassLoader", cl);


// add scheduled execution time
ctx.setData("scheduledExecutionTime", request.getTime());

cmd = classCacheManager.findCommand(request.getCommandName(), cl);
// increment execution counter directly to cover both success and failure paths
request.setExecutions(request.getExecutions() + 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.executor;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import org.jboss.byteman.contrib.bmunit.BMScript;
import org.jboss.byteman.contrib.bmunit.BMUnitConfig;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.jbpm.executor.impl.ExecutorServiceImpl;
import org.jbpm.executor.test.CountDownAsyncJobListener;
import org.jbpm.test.util.ExecutorTestUtil;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutorService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.executor.STATUS;
import org.kie.api.runtime.query.QueryContext;
import org.kie.test.util.db.PoolingDataSourceWrapper;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(BMUnitRunner.class)
@BMUnitConfig(loadDirectory = "target/test-classes")
public class LogCleanupCommandTest {

private PoolingDataSourceWrapper pds;

protected ExecutorService executorService;
protected EntityManagerFactory emf = null;

@Before
public void setUp() {
pds = ExecutorTestUtil.setupPoolingDataSource();
emf = Persistence.createEntityManagerFactory("org.jbpm.executor");

executorService = ExecutorServiceFactory.newExecutorService(emf);

executorService.init();
executorService.setThreadPoolSize(1);
executorService.setInterval(3);
}

@After
public void tearDown() {
executorService.clearAllErrors();
executorService.clearAllRequests();

executorService.destroy();
if (emf != null) {
emf.close();
}
pds.close();
}

protected CountDownAsyncJobListener configureListener(int threads) {
CountDownAsyncJobListener countDownListener = new CountDownAsyncJobListener(threads);
((ExecutorServiceImpl) executorService).addAsyncJobListener(countDownListener);

return countDownListener;
}


@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupSingleRunTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("SingleRun", "true");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(0, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(0, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunIntervalTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "10s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be at least 11 seconds (10s next interval + artificial slowdown from byteman)
long diff = nextExecution - firstExecution;
assertTrue(diff > 11000);
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunFixedTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "10s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
ctxCMD.setData("RepeatMode", "fixed");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be less than 11 seconds (10s next interval, regardless of artifial slowdown by byteman)
long diff = nextExecution - firstExecution;
assertTrue(diff < 11000);
}

@Test(timeout=10000)
@BMScript(value = "byteman-scripts/simulateSlowLogCleanupCommand.btm")
public void logCleanupNextRunFixedIntervalTooSmallTest() throws InterruptedException {
CountDownAsyncJobListener countDownListener = configureListener(1);
// this delay will be invoked 3 times during the test, resulting in an overall delay of 2s
System.setProperty("byteman.jpaaudit.sleep", "666");

CommandContext ctxCMD = new CommandContext();
ctxCMD.setData("businessKey", UUID.randomUUID().toString());
ctxCMD.setData("NextRun", "1s");
ctxCMD.setData("EmfName", "org.jbpm.executor");
ctxCMD.setData("SkipProcessLog", "true");
ctxCMD.setData("SkipTaskLog", "true");
ctxCMD.setData("RepeatMode", "fixed");
executorService.scheduleRequest("org.jbpm.executor.commands.LogCleanupCommand", ctxCMD);

countDownListener.waitTillCompleted();

List<RequestInfo> rescheduled = executorService.getRequestsByBusinessKey((String)ctxCMD.getData("businessKey"), Arrays.asList(STATUS.QUEUED), new QueryContext());
// check if the job has been rescheduled
assertEquals(1, rescheduled.size());

List<RequestInfo> inErrorRequests = executorService.getInErrorRequests(new QueryContext());
assertEquals(0, inErrorRequests.size());
List<RequestInfo> queuedRequests = executorService.getQueuedRequests(new QueryContext());
assertEquals(1, queuedRequests.size());
List<RequestInfo> executedRequests = executorService.getCompletedRequests(new QueryContext());
assertEquals(1, executedRequests.size());

executorService.cancelRequest(queuedRequests.get(0).getId());

long firstExecution = executedRequests.get(0).getTime().getTime();
long nextExecution = queuedRequests.get(0).getTime().getTime();

// time difference between first and second should be around 2 seconds (2nd job rescheduled immediately)
long diff = nextExecution - firstExecution;
assertTrue(diff >= 2000 && diff < 3000);
System.clearProperty("byteman.jpaaudit.sleep");
}
}