Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ jobs:
services:
postgres:
image: postgres
options: --name pg-container
env:
POSTGRES_USER: sa
POSTGRES_PASSWORD: veryStrong123
Expand Down
2 changes: 1 addition & 1 deletion RUN_AND_BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mvn versions:set -DnewVersion=1.6.0-SNAPSHOT -DgenerateBackupPoms=false

## postgres

docker run --name pg-container -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=veryStrong123 -p 5432:5432 -d postgres
docker run -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=veryStrong123 -p 5432:5432 -d postgres

## azure-sql-edge

Expand Down
38 changes: 29 additions & 9 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.sterl.spring</groupId>
<artifactId>spring-persistent-tasks-root</artifactId>
<version>1.7.1-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -25,6 +25,21 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>com.querydsl</groupId>
<artifactId>querydsl-apt</artifactId>
<version>${querydsl.version}</version>
<classifier>jakarta</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.querydsl</groupId>
<artifactId>querydsl-jpa</artifactId>
<version>${querydsl.version}</version>
<classifier>jakarta</classifier>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
Expand All @@ -38,6 +53,12 @@
<artifactId>liquibase-core</artifactId>
</dependency>

<dependency>
<groupId>com.github.f4b6a3</groupId>
<artifactId>uuid-creator</artifactId>
<version>6.1.1</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
Expand All @@ -52,11 +73,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down Expand Up @@ -186,12 +202,16 @@
</customTypeMappings>
<classes>
<class>org.springframework.data.web.PagedModel</class>
<class>
org.sterl.spring.persistent_tasks.scheduler.entity.SchedulerEntity</class>
<class>org.sterl.spring.persistent_tasks.scheduler.entity.SchedulerEntity</class>
</classes>
<classPatterns>
<pattern>org.sterl.spring.persistent_tasks.api.**</pattern>
<pattern>org.sterl.spring.persistent_tasks.api.*</pattern>
</classPatterns>
<excludeClasses>
<class>java.lang.Class</class>
<class>org.sterl.spring.persistent_tasks.api.QTriggerKey</class>
<class>java.io.Serializable</class>
</excludeClasses>
<outputFile>../ui/src/server-api.d.ts</outputFile>
<outputKind>module</outputKind>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@
import java.util.List;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.context.event.EventListener;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
import org.sterl.spring.persistent_tasks.api.TriggerSearch;
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
import org.sterl.spring.persistent_tasks.history.HistoryService;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.history.model.CompletedTriggerEntity;
import org.sterl.spring.persistent_tasks.scheduler.SchedulerService;
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
import org.sterl.spring.persistent_tasks.shared.model.TriggerEntity;
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
import org.sterl.spring.persistent_tasks.trigger.model.RunningTriggerEntity;

import lombok.RequiredArgsConstructor;

Expand All @@ -40,14 +41,14 @@ public class PersistentTaskService {
private final HistoryService historyService;

/**
* Returns the last known {@link TriggerData} to a given key. First running triggers are checked.
* Returns the last known {@link TriggerEntity} to a given key. First running triggers are checked.
* Maybe out of the history event from a retry execution of the very same id.
*
* @param key the {@link TriggerKey} to look for
* @return the {@link TriggerData} to the {@link TriggerKey}
* @return the {@link TriggerEntity} to the {@link TriggerKey}
*/
public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
final Optional<TriggerEntity> trigger = triggerService.get(key);
public Optional<TriggerEntity> getLastTriggerData(TriggerKey key) {
final Optional<RunningTriggerEntity> trigger = triggerService.get(key);
if (trigger.isEmpty()) {
var history = historyService.findLastKnownStatus(key);
if (history.isPresent()) {
Expand All @@ -59,7 +60,7 @@ public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
}
}

public Optional<TriggerData> getLastDetailData(TriggerKey key) {
public Optional<TriggerEntity> getLastDetailData(TriggerKey key) {
var data = historyService.findAllDetailsForKey(key, Pageable.ofSize(1));
if (data.isEmpty()) {
return Optional.empty();
Expand All @@ -85,14 +86,14 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {
*/
@Transactional(timeout = 10)
@NonNull
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
public <T extends Serializable> List<TriggerKey> queue(Collection<TriggerRequest<T>> triggers) {
if (triggers == null || triggers.isEmpty()) {
return Collections.emptyList();
}

return triggers.stream() //
.map(t -> triggerService.queue(t)) //
.map(TriggerEntity::getKey) //
.map(RunningTriggerEntity::getKey) //
.toList();
}
/**
Expand All @@ -104,7 +105,7 @@ public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequ
*/
@Transactional(timeout = 5)
@NonNull
public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
public <T extends Serializable> TriggerKey queue(TriggerRequest<T> trigger) {
return triggerService.queue(trigger).getKey();
}

Expand All @@ -114,7 +115,7 @@ public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
* @return the reference to the {@link TriggerKey}
*/
public <T extends Serializable> TriggerKey runOrQueue(
AddTriggerRequest<T> triggerRequest) {
TriggerRequest<T> triggerRequest) {
if (schedulerService.isPresent()) {
schedulerService.get().runOrQueue(triggerRequest);
} else {
Expand All @@ -128,42 +129,41 @@ public <T extends Serializable> TriggerKey runOrQueue(
* Data is limited to overall 300 elements.
*
* @param correlationId the id to search for
* @return the found {@link TriggerData} sorted by create time ASC
* @return the found {@link TriggerEntity} sorted by create time ASC
*/
@Transactional(readOnly = true, timeout = 5)
public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {
public List<TriggerEntity> findAllTriggerByCorrelationId(String correlationId) {
if (StringUtils.isAllBlank(correlationId)) return Collections.emptyList();

final var search = TriggerSearch.byCorrelationId(correlationId);

var running = triggerService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(100))
.stream().map(TriggerEntity::getData)
final var running = triggerService.searchTriggers(search, PageRequest.of(0, 100, TriggerSearch.DEFAULT_SORT))
.stream().map(RunningTriggerEntity::getData)
.toList();

var done = historyService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(200))
.stream().map(TriggerHistoryLastStateEntity::getData)
final var done = historyService.searchTriggers(search, PageRequest.of(0, 200, TriggerSearch.DEFAULT_SORT))
.stream().map(CompletedTriggerEntity::getData)
.toList();

var result = new ArrayList<TriggerData>(running.size() + done.size());
final var result = new ArrayList<TriggerEntity>(running.size() + done.size());
result.addAll(done);
result.addAll(running);
return result;
}

/**
* Returns the first info to a trigger based on the correlationId.
*
* @param correlationId the id to search for
* @return the found {@link TriggerData}
*/

@Transactional(readOnly = true, timeout = 5)
public Optional<TriggerData> findLastTriggerByCorrelationId(String correlationId) {
final var page = PageRequest.of(0, 1, Sort.by(Direction.DESC, "data.createdTime"));
var result = triggerService.findTriggerByCorrelationId(correlationId, page)
.stream().map(TriggerEntity::getData)
.toList();
public Optional<TriggerEntity> findLastTriggerByCorrelationId(String correlationId) {
final var page = PageRequest.of(0, 1, TriggerSearch.sortByCreatedTime(Direction.DESC));
final var search = TriggerSearch.byCorrelationId(correlationId);

var result = triggerService.searchTriggers(search, page)
.stream().map(RunningTriggerEntity::getData)
.toList();

if (result.isEmpty()) {
result = historyService.findTriggerByCorrelationId(correlationId, page)
.stream().map(TriggerHistoryLastStateEntity::getData)
.toList();
result = historyService.searchTriggers(search, page)
.stream().map(CompletedTriggerEntity::getData)
.toList();
}
return result.isEmpty() ? Optional.empty() : Optional.of(result.getFirst());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public TriggerBuilder<T> newTrigger(T state) {
return new TriggerBuilder<>(this).state(state);
}

public AddTriggerRequest<T> newUniqueTrigger(T state) {
public TriggerRequest<T> newUniqueTrigger(T state) {
return new TriggerBuilder<>(this).state(state).build();
}

Expand All @@ -34,19 +34,24 @@ public static class TriggerBuilder<T extends Serializable> {
private final TaskId<T> taskId;
private String id;
private String correlationId;
private String tag;
private TriggerStatus status = TriggerStatus.WAITING;
private T state;
private OffsetDateTime when = OffsetDateTime.now();
private int priority = AddTriggerRequest.DEFAULT_PRIORITY;
private int priority = TriggerRequest.DEFAULT_PRIORITY;

public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name) {
return new TriggerBuilder<>(new TaskId<T>(name));
}
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name, T state) {
return new TriggerBuilder<>(new TaskId<T>(name)).state(state);
}
public AddTriggerRequest<T> build() {
public static <T extends Serializable> TriggerBuilder<T> newTrigger(TriggerKey key, T state) {
return new TriggerBuilder<>(new TaskId<T>(key.getTaskName())).id(key.getId()).state(state);
}
public TriggerRequest<T> build() {
var key = TriggerKey.of(id, taskId);
return new AddTriggerRequest<>(key, state, when, priority, correlationId);
return new TriggerRequest<>(key, status, state, when, priority, correlationId, tag);
}
/**
* The ID of this task, same queued ids are replaced.
Expand All @@ -63,6 +68,10 @@ public TriggerBuilder<T> correlationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
public TriggerBuilder<T> tag(String tag) {
this.tag = tag;
return this;
}
public TriggerBuilder<T> state(T state) {
this.state = state;
return this;
Expand All @@ -86,11 +95,24 @@ public TriggerBuilder<T> when(OffsetDateTime when) {
}
public TriggerBuilder<T> runAt(OffsetDateTime when) {
this.when = when;
this.status = TriggerStatus.WAITING;
return this;
}
/**
* synonym for {@link #runAt(OffsetDateTime)}
*/
public TriggerBuilder<T> runAfter(Duration duration) {
runAt(OffsetDateTime.now().plus(duration));
return this;
}
/**
* Creates a trigger which waits for an external signal and
* will run into {@link TriggerStatus#EXPIRED_SIGNAL} if no signal happens.
*/
public TriggerBuilder<T> waitForSignal(OffsetDateTime timeout) {
this.when = timeout;
this.status = TriggerStatus.AWAITING_SIGNAL;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class Trigger {
/** the business key which is unique it is combination for triggers but not the history! */
private TriggerKey key;

private String tag;

private String correlationId;

private String runningOn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.Serializable;
import java.time.OffsetDateTime;
import java.util.UUID;

import org.springframework.lang.Nullable;

Expand All @@ -29,7 +28,7 @@ public class TriggerKey implements Serializable {
private String taskName;

public static TriggerKey of(@Nullable String id, TaskId<? extends Serializable> taskId) {
return new TriggerKey(id == null ? UUID.randomUUID().toString() : id, taskId.name());
return new TriggerKey(id, taskId.name());
}

public TaskId<Serializable> toTaskId() {
Expand All @@ -40,30 +39,29 @@ public TaskId<Serializable> toTaskId() {
* Builds a trigger for the given persistentTask name
*/
public TriggerKey(String taskName) {
id = UUID.randomUUID().toString();
this.taskName = taskName;
}

/**
* Just triggers the given persistentTask to be executed using <code>null</code> as state.
*/
public <T extends Serializable> AddTriggerRequest<T> newTrigger(TaskId<T> taskId) {
public <T extends Serializable> TriggerRequest<T> newTrigger(TaskId<T> taskId) {
return newTrigger(taskId, null);
}

public <T extends Serializable> AddTriggerRequest<T> newTrigger(TaskId<T> taskId, T state) {
return newTrigger(UUID.randomUUID().toString(), taskId, state);
public <T extends Serializable> TriggerRequest<T> newTrigger(TaskId<T> taskId, T state) {
return newTrigger(null, taskId, state);
}

public <T extends Serializable> AddTriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state) {
public <T extends Serializable> TriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state) {
return newTrigger(id, taskId, state, OffsetDateTime.now());
}

public <T extends Serializable> AddTriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state, OffsetDateTime when) {
public <T extends Serializable> TriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state, OffsetDateTime when) {
return taskId.newTrigger() //
.id(id) //
.state(state) //
.when(when) //
.build();
.build(); //
}
}
Loading