Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding "priority ordering" feature to allow users specifying the order of precedence in a queue #183

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,17 @@ During execution, the scheduler regularly updates a heartbeat-time for the task-
When a dead execution is found, the `Task`is consulted to see what should be done. A dead `RecurringTask` is typically rescheduled to `now()`.


### Priority

When creating a task, a `priority` can be specified. An executor will always run the tasks with the highest priority first even if the `execution_time` is greater than other tasks.
Priority can be specified through task

```java
scheduler.schedule(onetimeTask.instanceBuilder("1").setPriority(100), Instant.now());
scheduler.schedule(onetimeTask.instanceBuilder("2").setPriority(200), Instant.now());
```


### Things to note / gotchas

* There are no guarantees that all instants in a schedule for a `RecurringTask` will be executed. The `Schedule` is consulted after the previous task-execution finishes, and the closest time in the future will be selected for next execution-time. A new type of task may be added in the future to provide such functionality.
Expand All @@ -304,6 +315,10 @@ When a dead execution is found, the `Task`is consulted to see what should be don

See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release-notes.


**Upgrading to 10.x**
* Add column `priority` to the database schema. See table definitions for [postgresql](db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](https://github.com/kagkarlsson/db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](https://github.com/kagkarlsson/db-scheduler/src/test/resources/mysql_tables.sql). `null` is handled as 0, so no need to update existing records.

**Upgrading to 8.x**
* Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not.

Expand Down
1 change: 1 addition & 0 deletions db-scheduler-boot-starter/src/test/resources/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table if not exists scheduled_tasks (
consecutive_failures INT,
last_heartbeat TIMESTAMP WITH TIME ZONE,
version BIGINT,
priority INT,
PRIMARY KEY (task_name, task_instance)
);
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ public boolean createIfNotExists(Execution execution) {
}

jdbcRunner.execute(
"insert into " + tableName + "(task_name, task_instance, task_data, execution_time, picked, version) values(?, ?, ?, ?, ?, ?)",
"insert into " + tableName + "(task_name, task_instance, task_data, execution_time, picked, version, priority) values(?, ?, ?, ?, ?, ?, ?)",
(PreparedStatement p) -> {
p.setString(1, execution.taskInstance.getTaskName());
p.setString(2, execution.taskInstance.getId());
p.setObject(3, serializer.serialize(execution.taskInstance.getData()));
jdbcCustomization.setInstant(p, 4, execution.executionTime);
p.setBoolean(5, false);
p.setLong(6, 1L);
p.setInt(7, execution.taskInstance.getPriority());
});
return true;

Expand Down Expand Up @@ -137,7 +138,7 @@ public List<Execution> getDue(Instant now, int limit) {
final UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());
final String explicitLimit = jdbcCustomization.supportsExplicitQueryLimitPart() ? jdbcCustomization.getQueryLimitPart(limit) : "";
return jdbcRunner.query(
"select * from " + tableName + " where picked = ? and execution_time <= ? " + unresolvedFilter.andCondition() + " order by execution_time asc" + explicitLimit,
"select * from " + tableName + " where picked = ? and execution_time <= ? " + unresolvedFilter.andCondition() + " order by priority desc, execution_time asc" + explicitLimit,
(PreparedStatement p) -> {
int index = 1;
p.setBoolean(index++, false);
Expand Down Expand Up @@ -191,7 +192,8 @@ private boolean rescheduleInternal(Execution execution, Instant nextExecutionTim
"version = version + 1 " +
"where task_name = ? " +
"and task_instance = ? " +
"and version = ?",
"and version = ? " +
"and priority = ? ",
ps -> {
int index = 1;
ps.setBoolean(index++, false);
Expand All @@ -208,6 +210,7 @@ private boolean rescheduleInternal(Execution execution, Instant nextExecutionTim
ps.setString(index++, execution.taskInstance.getTaskName());
ps.setString(index++, execution.taskInstance.getId());
ps.setLong(index++, execution.version);
ps.setLong(index++, execution.taskInstance.getPriority());
});

if (updated != 1) {
Expand All @@ -224,7 +227,8 @@ public Optional<Execution> pick(Execution e, Instant timePicked) {
"where picked = ? " +
"and task_name = ? " +
"and task_instance = ? " +
"and version = ?",
"and version = ? "+
"and priority = ? ",
ps -> {
ps.setBoolean(1, true);
ps.setString(2, truncate(schedulerSchedulerName.getName(), 50));
Expand All @@ -233,6 +237,7 @@ public Optional<Execution> pick(Execution e, Instant timePicked) {
ps.setString(5, e.taskInstance.getTaskName());
ps.setString(6, e.taskInstance.getId());
ps.setLong(7, e.version);
ps.setInt(8, e.taskInstance.getPriority());
});

if (updated == 0) {
Expand Down Expand Up @@ -400,9 +405,11 @@ public Void map(ResultSet rs) throws SQLException {
int consecutiveFailures = rs.getInt("consecutive_failures"); // null-value is returned as 0 which is the preferred default
Instant lastHeartbeat = jdbcCustomization.getInstant(rs,"last_heartbeat");
long version = rs.getLong("version");
int priority = rs.getInt("priority");

Supplier dataSupplier = memoize(() -> serializer.deserialize(task.get().getDataClass(), data));
this.consumer.accept(new Execution(executionTime, new TaskInstance(taskName, instanceId, dataSupplier), picked, pickedBy, lastSuccess, lastFailure, consecutiveFailures, lastHeartbeat, version));
this.consumer.accept(new Execution(executionTime, new TaskInstance.Builder<>(taskName, instanceId).setDataSupplier(dataSupplier).setPriority(priority).build(),
picked, pickedBy, lastSuccess, lastFailure, consecutiveFailures, lastHeartbeat, version, priority));
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ public final class Execution {
public final long version;
public final Instant lastFailure;
public final Instant lastSuccess;
private final int priority;

public Execution(Instant executionTime, TaskInstance taskInstance) {
this(executionTime, taskInstance, false, null, null, null, 0, null, 1L);
this(executionTime, taskInstance, false, null, null, null, 0, null, 1L,taskInstance.getPriority());
}

public Execution(Instant executionTime, TaskInstance taskInstance, boolean picked, String pickedBy,
Instant lastSuccess, Instant lastFailure, int consecutiveFailures, Instant lastHeartbeat, long version) {
Instant lastSuccess, Instant lastFailure, int consecutiveFailures, Instant lastHeartbeat, long version, int priority) {
this.executionTime = executionTime;
this.taskInstance = taskInstance;
this.picked = picked;
Expand All @@ -45,6 +46,7 @@ public Execution(Instant executionTime, TaskInstance taskInstance, boolean picke
this.consecutiveFailures = consecutiveFailures;
this.lastHeartbeat = lastHeartbeat;
this.version = version;
this.priority = priority;
}

public Instant getExecutionTime() {
Expand Down Expand Up @@ -80,6 +82,7 @@ public String toString() {
", picked=" + picked +
", pickedBy=" + pickedBy +
", lastHeartbeat=" + lastHeartbeat +
", version=" + version;
", version=" + version +
", priority=" + priority;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static ExecutionComplete failure(Execution execution, Instant timeStarted
*/
public static ExecutionComplete simulatedSuccess(Instant timeDone) {
TaskInstance nonExistingTaskInstance = new TaskInstance("non-existing-task", "non-existing-id");
Execution nonExistingExecution = new Execution(timeDone, nonExistingTaskInstance, false, "simulated-picked-by", timeDone, null, 0, null, 1);
Execution nonExistingExecution = new Execution(timeDone, nonExistingTaskInstance, false, "simulated-picked-by", timeDone, null, 0, null, 1, 0);
return new ExecutionComplete(nonExistingExecution, timeDone.minus(Duration.ofSeconds(1)), timeDone, Result.OK, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ public Class<T> getDataClass() {
}

public TaskInstance<T> instance(String id) {
return new TaskInstance<>(this.name, id);
return instanceBuilder(id).build();
}

public TaskInstance<T> instance(String id, T data) {
return new TaskInstance<>(this.name, id, data);
return instanceBuilder(id).setData(data).build();
}

public TaskInstance.Builder<T> instanceBuilder(String id) {
return new TaskInstance.Builder<>(this.name, id);
}

public abstract CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

public final class TaskInstance<T> implements TaskInstanceId {

private static final int DEFAULT_PRIORITY = 0;

private final String taskName;
private final String id;
private final Supplier<T> dataSupplier;
private final int priority;

public TaskInstance(String taskName, String id) {
this(taskName, id, (T) null);
Expand All @@ -32,9 +35,14 @@ public TaskInstance(String taskName, String id, T data) {
}

public TaskInstance(String taskName, String id, Supplier<T> dataSupplier) {
this(taskName, id, dataSupplier, DEFAULT_PRIORITY);
}

public TaskInstance(String taskName, String id, Supplier<T> dataSupplier, int priority) {
this.taskName = taskName;
this.id = id;
this.dataSupplier = dataSupplier;
this.priority = priority;
}

public String getTaskAndInstance() {
Expand All @@ -54,14 +62,24 @@ public T getData() {
return dataSupplier.get();
}

public int getPriority() {
return priority;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

TaskInstance<?> that = (TaskInstance<?>) o;

if (!taskName.equals(that.taskName)) return false;
if (!taskName.equals(that.taskName)) {
return false;
}
return id.equals(that.id);
}

Expand All @@ -76,7 +94,40 @@ public int hashCode() {
public String toString() {
return "TaskInstance: " +
"task=" + taskName +
", id=" + id;
", id=" + id +
", priority=" + priority;
}

public static class Builder<T> {

private final String taskName;
private final String id;
private Supplier<T> dataSupplier = () -> (T) null;
private int priority = DEFAULT_PRIORITY;

public Builder(String taskName, String id) {
this.id = id;
this.taskName = taskName;
}

public Builder<T> setDataSupplier(Supplier<T> dataSupplier) {
this.dataSupplier = dataSupplier;
return this;
}

public Builder<T> setData(T data) {
this.dataSupplier = () -> (T) data;
;
return this;
}

public Builder<T> setPriority(int priority) {
this.priority = priority;
return this;
}

public TaskInstance<T> build() {
return new TaskInstance<>(taskName, id, dataSupplier, priority);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.github.kagkarlsson.scheduler.functional;

import com.github.kagkarlsson.scheduler.EmbeddedPostgresqlExtension;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.StopSchedulerExtension;
import com.github.kagkarlsson.scheduler.TestTasks;
import com.github.kagkarlsson.scheduler.helper.TestableRegistry;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.testhelper.SettableClock;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.extension.RegisterExtension;


public class PriorityExecutionTest {

private SettableClock clock;

@RegisterExtension
public EmbeddedPostgresqlExtension postgres = new EmbeddedPostgresqlExtension();
@RegisterExtension
public StopSchedulerExtension stopScheduler = new StopSchedulerExtension();

@BeforeEach
public void setUp() {
clock = new SettableClock();
}

@RepeatedTest(10)
public void test_immediate_execution() {
Assertions.assertTimeoutPreemptively(Duration.ofSeconds(100), () -> {

String[] sequence = new String[]{"priority-3", "priority-2", "priority-1", "priority-0"};

AtomicInteger index = new AtomicInteger();
OneTimeTask<Void> task = TestTasks.oneTime("onetime-a", Void.class, (taskInstance, executionContext) -> {
// check that the ordering is always correct
Assertions.assertEquals(sequence[index.getAndIncrement()], taskInstance.getId());
});
TestableRegistry.Condition completedCondition = TestableRegistry.Conditions.completed(1);
TestableRegistry.Condition executeDueCondition = TestableRegistry.Conditions.ranExecuteDue(1);

TestableRegistry registry =
TestableRegistry.create().waitConditions(executeDueCondition, completedCondition).build();

Scheduler scheduler = Scheduler.create(postgres.getDataSource(), task)
.pollingInterval(Duration.ofMinutes(1))
.enableImmediateExecution()
.schedulerName(new SchedulerName.Fixed("test"))
.statsRegistry(registry)
// 1 thread to force being sequential
.threads(1)
.build();
stopScheduler.register(scheduler);

// no matter when they are scheduled, the highest priority should always be executed first
scheduler.schedule(task.instanceBuilder(sequence[3]).setPriority(-1).build(),
clock.now().minus(3, ChronoUnit.MINUTES));
scheduler.schedule(task.instanceBuilder(sequence[1]).setPriority(1).build(),
clock.now().minus(2, ChronoUnit.MINUTES));
scheduler.schedule(task.instanceBuilder(sequence[0]).setPriority(2).build(),
clock.now().minus(1, ChronoUnit.MINUTES));
scheduler.schedule(task.instanceBuilder(sequence[2]).setPriority(0).build(), clock.now());

scheduler.start();
executeDueCondition.waitFor();
completedCondition.waitFor();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ create table custom_tablename (
last_failure timestamp with time zone,
last_heartbeat timestamp with time zone,
version BIGINT not null,
priority INT not null,
PRIMARY KEY (task_name, task_instance)
)
)
1 change: 1 addition & 0 deletions db-scheduler/src/test/resources/hsql_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table scheduled_tasks (
consecutive_failures INT,
last_heartbeat TIMESTAMP WITH TIME ZONE,
version BIGINT,
priority INT,
PRIMARY KEY (task_name, task_instance)
)
3 changes: 2 additions & 1 deletion db-scheduler/src/test/resources/mssql_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table scheduled_tasks (
consecutive_failures INT,
last_heartbeat datetimeoffset ,
[version] BIGINT not null,
priority INT,
PRIMARY KEY (task_name, task_instance)
)
)
1 change: 1 addition & 0 deletions db-scheduler/src/test/resources/mysql_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table test.scheduled_tasks (
consecutive_failures INT,
last_heartbeat timestamp(6) null,
version BIGINT not null,
priority INT,
PRIMARY KEY (task_name, task_instance)
)
1 change: 1 addition & 0 deletions db-scheduler/src/test/resources/oracle_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ create table scheduled_tasks
consecutive_failures NUMBER(19, 0),
last_heartbeat TIMESTAMP(6),
version NUMBER(19, 0),
priority NUMBER(19, 0),
PRIMARY KEY (task_name, task_instance)
)
3 changes: 2 additions & 1 deletion db-scheduler/src/test/resources/postgresql_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table scheduled_tasks (
consecutive_failures INT,
last_heartbeat timestamp with time zone,
version BIGINT not null,
priority INT,
PRIMARY KEY (task_name, task_instance)
)
)
Loading