Skip to content

Commit

Permalink
Merge dee84ea into 08dc14c
Browse files Browse the repository at this point in the history
  • Loading branch information
jas34 committed Jul 11, 2021
2 parents 08dc14c + dee84ea commit 20ff924
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 37 deletions.
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ embedded in it. It runs as an extension module of conductor.

[![Build Status](https://travis-ci.com/jas34/scheduledwf.svg?token=k2Upd1dy5qz2VgAZDCYD&branch=master)]()
[![GitHub Release](https://img.shields.io/github/release/jas34/scheduledwf.svg?style=flat)](https://github.com/jas34/scheduledwf/releases)
[![Maven Central](https://img.shields.io/maven-central/v/io.github.jas34/scheduledwf-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22io.github.jas34%22%20AND%20a:%22scheduledwf-parent%22)
[![Maven Central](https://img.shields.io/maven-central/v/io.github.jas34/scheduledwf-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22io.github.jas34%22%20AND%20a:%22scheduledwf-parent%22)
[![Coverage Status](https://coveralls.io/repos/github/jas34/scheduledwf/badge.svg?branch=master)](https://coveralls.io/github/jas34/scheduledwf?branch=master)

## Builds
| Module | Build |
|:------:|:-------------------------------------------------------------------------------------------------------------:|
| scheduledwf-server | [![Maven Central](https://img.shields.io/maven-central/v/io.github.jas34/scheduledwf-server.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22io.github.jas34%22%20AND%20a:%22scheduledwf-server%22) |
| scheduledwf-module | [![Maven Central](https://img.shields.io/maven-central/v/io.github.jas34/scheduledwf-module.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22io.github.jas34%22%20AND%20a:%22scheduledwf-module%22)

### Build
_scheduledwf-server_: [![Maven Central](https://img.shields.io/maven-central/v/io.github.jas34/scheduledwf-server.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22io.github.jas34%22%20AND%20a:%22scheduledwf-server%22)

### Motivation
- In digital space there are many use cases that are solved by running schedulers. Some of the common cases are:
Expand Down
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.github.jas34</groupId>
Expand Down Expand Up @@ -224,6 +225,17 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.7.9</version>
<configuration>
<excludes>
<exclude>**/MySQLPersistenceModule.class</exclude>
<exclude>**/dao/mysql/**</exclude>
<exclude>**/config/**</exclude>
<exclude>**/metadata/**</exclude>
<exclude>**/run/**</exclude>
<exclude>**/utils/**</exclude>
<exclude>**/resources/**</exclude>
</excludes>
</configuration>
<executions>
<execution>
<id>prepare-agent</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.github.jas34.scheduledwf.run.Status;

/**
* TODO: to implement auto-recovery in case registry and scheduler goes out of sync from each other.
*
* @author Jasbir Singh
*/
Expand Down Expand Up @@ -157,6 +156,7 @@ List<SchedulingResult> scheduleApplicableWorkflows() {
scheduledWorkFlow.setManagerRefId(managerInfo.getId());
scheduledWorkFlow.setCreateTime(System.currentTimeMillis());
scheduledWorkFlow.setCreatedBy(operatedBy());
scheduledWorkFlow.setReSchedulingEnabled(true);

if (!processRegistry.addProcess(scheduledWorkFlow) && !isJunitRun) {
return;
Expand Down Expand Up @@ -202,7 +202,7 @@ List<ShutdownResult> manageShutDownProcesses() {
logger.debug("No running process found for shutdown with managerRef={}, with names={}",
managerInfo.getId(), names);

cleanUpMetaDataIfApplicable(tobeShutDownScheduleWfDefsOptional.get());
// cleanUpMetaDataIfApplicable(tobeShutDownScheduleWfDefsOptional.get());
return null;
}

Expand All @@ -225,12 +225,12 @@ List<ShutdownResult> manageShutDownProcesses() {
doIndexing(shutdownProcess);
shutdownResults.add(result);
});
cleanUpMetaDataIfApplicable(tobeShutDownScheduleWfDefsOptional.get());
// cleanUpMetaDataIfApplicable(tobeShutDownScheduleWfDefsOptional.get());
return shutdownResults;
}

private void cleanUpMetaDataIfApplicable(List<ScheduleWfDef> deletaleDefinitions) {
List<String> deletableScheduleWfs = deletaleDefinitions.stream()
private void cleanUpMetaDataIfApplicable(List<ScheduleWfDef> deletableDefinitions) {
List<String> deletableScheduleWfs = deletableDefinitions.stream()
.filter(scheduleWfDef -> ScheduleWfDef.Status.DELETE == scheduleWfDef.getStatus())
.map(ScheduleWfDef::getWfName).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(deletableScheduleWfs)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setCronExpression(String cronExpression) {
}

public enum Status {
RUN, SHUTDOWN, DELETE;
RUN, SHUTDOWN, DELETE
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class ScheduledWorkFlow extends Auditable implements Serializable {

private Throwable schedulingException;

private boolean reSchedulingEnabled;

public String getId() {
return id;
}
Expand Down Expand Up @@ -126,6 +128,14 @@ public void setSchedulingException(Throwable schedulingException) {
this.schedulingException = schedulingException;
}

public boolean isReSchedulingEnabled() {
return reSchedulingEnabled;
}

public void setReSchedulingEnabled(boolean reSchedulingEnabled) {
this.reSchedulingEnabled = reSchedulingEnabled;
}

public enum State {
INITIALIZED, SCHEDULING_FAILED, RUNNING, SHUTDOWN, SHUTDOWN_FAILED;
}
Expand All @@ -151,10 +161,19 @@ public ScheduledWorkFlow cloneWithoutProcessRef() {

@Override
public String toString() {
return "ScheduledWorkFlow{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", nodeAddress='"
+ nodeAddress + '\'' + ", wfName='" + wfName + '\'' + ", wfVersion=" + wfVersion
+ ", wfInput=" + wfInput + ", cronExpression='" + cronExpression + '\'' + ", state=" + state
+ ", scheduledProcess=" + scheduledProcess + ", managerRefId='" + managerRefId + '\''
+ ", schedulingException=" + schedulingException + "} " + super.toString();
return "ScheduledWorkFlow{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", nodeAddress='" + nodeAddress + '\'' +
", wfName='" + wfName + '\'' +
", wfVersion=" + wfVersion +
", wfInput=" + wfInput +
", cronExpression='" + cronExpression + '\'' +
", state=" + state +
", scheduledProcess=" + scheduledProcess +
", managerRefId='" + managerRefId + '\'' +
", schedulingException=" + schedulingException +
", reSchedulingEnabled=" + reSchedulingEnabled +
"} " + super.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.github.jas34.scheduledwf.scheduler;

import com.coreoz.wisp.Job;

/**
* @author Jasbir Singh
*/
public class CronBasedScheduledProcess<Job> extends ScheduledProcess {
public class CronBasedScheduledProcess extends ScheduledProcess<Job> {

private Job job;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.jas34.scheduledwf.scheduler;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;

import javax.annotation.PreDestroy;
Expand All @@ -22,7 +23,7 @@
*/
@Singleton
public class CronBasedWorkflowScheduler
implements WorkflowScheduler<CronBasedScheduledProcess<Job>>, SchedulerStats {
implements WorkflowScheduler<CronBasedScheduledProcess>, SchedulerStats {
private final Logger logger = LoggerFactory.getLogger(CronBasedWorkflowScheduler.class);

private Scheduler scheduler;
Expand All @@ -35,19 +36,24 @@ public CronBasedWorkflowScheduler(Scheduler scheduler, ScheduledTaskProvider tas
}

@Override
public CronBasedScheduledProcess<Job> schedule(ScheduledWorkFlow scheduledWorkFlow) {
public CronBasedScheduledProcess schedule(ScheduledWorkFlow scheduledWorkFlow) {
Schedule schedule = CronSchedule.parseQuartzCron(scheduledWorkFlow.getCronExpression());
if (scheduledWorkFlow.isReSchedulingEnabled()
&& Objects.nonNull(scheduledWorkFlow.getScheduledProcess())
&& Objects.nonNull(resolveJobFromName(scheduledWorkFlow.getWfName()))) {
shutdown((CronBasedScheduledProcess) scheduledWorkFlow.getScheduledProcess());
}
Job scheduledJob = scheduler.schedule(scheduledWorkFlow.getName(),
taskProvider.getTask(scheduledWorkFlow, this), schedule);
logger.info("scheduledWorkFlow scheduled with jobName={}", scheduledJob.name());
return new CronBasedScheduledProcess<>(scheduledJob);
return new CronBasedScheduledProcess(scheduledJob);
}

@Override
public Void shutdown(CronBasedScheduledProcess<Job> scheduledProcess) {
public Void shutdown(CronBasedScheduledProcess scheduledProcess) {
String jobName = scheduledProcess.getJobReference().name();
scheduler.cancel(jobName);
logger.info("scheduledWorkFlow with jobName={} cancelled", jobName);
scheduler.cancel(jobName)
.thenRun(() -> logger.info("scheduledWorkFlow with jobName={} cancelled.", jobName));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
/**
* @author Jasbir Singh
*/
public abstract class ScheduledProcess implements JobReferer {
public abstract class ScheduledProcess<T> implements JobReferer {
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void registerScheduleWorkflowDef(ScheduleWfDef def) {

Optional<ScheduleWfDef> scheduleWfDef =
scheduleWorkflowMetadataDao.getScheduledWorkflowDef(def.getWfName());
if (scheduleWfDef.isPresent()) {
if (scheduleWfDef.isPresent() && ScheduleWfDef.Status.RUN == scheduleWfDef.get().getStatus()) {
throw new ApplicationException(ApplicationException.Code.INVALID_INPUT,
"ScheduleWfDef already present. Cannot accept register.");
"ScheduleWfDef already running. Cannot accept register. First SHUTDOWN or DELETE scheduler.");
}
assertCronExpressionIsValid(def.getCronExpression());
scheduleWorkflowMetadataDao.saveScheduleWorkflow(def);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void init() {
@Test
public void test_scheduler() {
ScheduledWorkFlow scheduledWorkFlow =
createScheduledWorkFlow(managerInfo, TEST_WF_NAME, ScheduledWorkFlow.State.INITIALIZED);
CronBasedScheduledProcess<Job> scheduledProcess =
createScheduledWorkFlow(managerInfo, TEST_WF_NAME + "-1", ScheduledWorkFlow.State.INITIALIZED);
CronBasedScheduledProcess scheduledProcess =
cronBasedWorkflowScheduler.schedule(scheduledWorkFlow);
Job job = scheduledProcess.getJobReference();

Expand All @@ -59,4 +59,46 @@ public void test_scheduler() {
cronBasedWorkflowScheduler.shutdown(scheduledProcess);
assertEquals(resolveNextExecutionTime(scheduledProcess.getJobReference()), -1L);
}

@Test
public void test_reScheduling() {
ScheduledWorkFlow scheduledWorkFlow =
createScheduledWorkFlow(managerInfo, TEST_WF_NAME + "-2", ScheduledWorkFlow.State.INITIALIZED);
scheduledWorkFlow.setReSchedulingEnabled(true);
scheduledWorkFlow.setScheduledProcess(null);
scheduledWorkFlow.setWfName(TEST_WF_NAME + "-2");
CronBasedScheduledProcess scheduledProcess =
cronBasedWorkflowScheduler.schedule(scheduledWorkFlow);
Job job = scheduledProcess.getJobReference();

assertNotNull(scheduler.findJob(scheduledWorkFlow.getName()));
assertEquals(scheduledWorkFlow.getName(), job.name());

// lets wait for job to run and then check executionsCount. It should be greater than 0
sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
assertTrue(0 < job.executionsCount());

//override definition for rescheduling.
scheduledWorkFlow.setScheduledProcess(scheduledProcess);
scheduledWorkFlow.setCronExpression("0/2 1/1 * 1/1 * ? *");
System.out.println("Going to schedule updated scheduler definition");

scheduledProcess = cronBasedWorkflowScheduler.schedule(scheduledWorkFlow);
sleepUninterruptibly(5000, TimeUnit.MILLISECONDS);
assertTrue(0 < scheduledProcess.getJobReference().executionsCount());
}

@Test
public void test_Job_Cacel_Without_Wait_To_Complete() {
ScheduledWorkFlow scheduledWorkFlow =
createScheduledWorkFlow(managerInfo, TEST_WF_NAME + "-3-sleep", ScheduledWorkFlow.State.INITIALIZED);
scheduledWorkFlow.setReSchedulingEnabled(true);
scheduledWorkFlow.setScheduledProcess(null);
scheduledWorkFlow.setWfName(TEST_WF_NAME + "-3-sleep");
CronBasedScheduledProcess scheduledProcess =
cronBasedWorkflowScheduler.schedule(scheduledWorkFlow);
sleepUninterruptibly(5000, TimeUnit.MILLISECONDS);
cronBasedWorkflowScheduler.shutdown(scheduledProcess);
assertEquals(resolveNextExecutionTime(scheduledProcess.getJobReference()), -1L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public TestScheduledTaskProvider(IndexExecutionDataCallback callback) {

@Override
public Runnable getTask(ScheduledWorkFlow scheduledWorkFlow, SchedulerStats schedulerStats) {
if (scheduledWorkFlow.getWfName().endsWith("sleep")) {
return new TestTaskWithSleep(prepareScheduledTaskDef(scheduledWorkFlow), callback);
}
return new TestTask(prepareScheduledTaskDef(scheduledWorkFlow), callback);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.github.jas34.scheduledwf.scheduler;

import java.util.concurrent.TimeUnit;

import io.github.jas34.scheduledwf.metadata.ScheduledTaskDef;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;

/**
* @author Jasbir Singh
*/
public class TestTaskWithSleep extends TestTask {
public TestTaskWithSleep(ScheduledTaskDef taskDef, IndexExecutionDataCallback indexExecutionDataCallback) {
super(taskDef, indexExecutionDataCallback);
}

@Override
public void run() {
System.out.println("Going to sleep for 3 sec");
sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
System.out.println("Resuming to work from sleep.");
super.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

/**
* Populates metadata definitions {@link ScheduleWfDef} of objects.
*
* @author Jasbir Singh
*/
@Api(value = "/scheduling/metadata", produces = MediaType.APPLICATION_JSON,
consumes = MediaType.APPLICATION_JSON, tags = "Scheduled Wofkflow Metadata Management")
consumes = MediaType.APPLICATION_JSON, tags = "Scheduled Worfkflow Metadata Management")
@Path("/scheduling/metadata")
@Produces({MediaType.APPLICATION_JSON})
@Consumes({MediaType.APPLICATION_JSON})
Expand All @@ -48,8 +49,10 @@ public void create(ScheduleWfDef scheduleWfDef) {

@PUT
@Path("/scheduleWf/{name}")
@ApiOperation("Update status of a schedule workflow definition")
public void update(@PathParam("name") String name, @QueryParam("status") ScheduleWfDef.Status status) {
@ApiOperation("Update status of a schedule workflow definition.")
public void update(@ApiParam(value = "Scheduled workflow name.") @PathParam("name") String name,
@ApiParam(
value = "status=SHUTDOWN/DELETE are equal. Either of them can be used.") @QueryParam("status") ScheduleWfDef.Status status) {
metadataService.updateScheduledWorkflowDef(name, status);
}

Expand All @@ -66,11 +69,4 @@ public ScheduleWfDef get(@PathParam("name") String name) {
public List<ScheduleWfDef> getAll() {
return metadataService.getScheduleWorkflowDefs();
}

// @DELETE
// @Path("/scheduleWf/{name}")
// @ApiOperation("Removes schedule workflow definition")
// public void unregisterDef(@PathParam("name") String name) {
// metadataService.unregisterScheduleWorkflowDef(name);
// }
}

0 comments on commit 20ff924

Please sign in to comment.