From 9f9172d84aacc0a53ff916178e5bdba0384e3324 Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Wed, 1 Jan 2025 21:24:13 +0100 Subject: [PATCH 1/7] moved ping of scheduler --- .../spring/persistent_tasks/scheduler/SchedulerService.java | 6 ++++-- .../persistent_tasks/scheduler/entity/SchedulerEntity.java | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java index d3582742a..a67456484 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java @@ -129,16 +129,18 @@ public Future runOrQueue( if (taskExecutor.getFreeThreads() > 0) { trigger = triggerService.markTriggersAsRunning(trigger, name); + pingRegistry().addRunning(1); } else { log.debug("Currently not enough free thread available {} of {} in use. Task {} queued.", taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey()); } return trigger; }); - Future result = CompletableFuture.completedFuture(runningTrigger.getKey()); + Future result; if (runningTrigger.isRunning()) { result = taskExecutor.submit(runningTrigger); - pingRegistry(); + } else { + result = CompletableFuture.completedFuture(runningTrigger.getKey()); } return result; } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java index dd4043780..00f517d7b 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java @@ -43,4 +43,9 @@ public SchedulerEntity(String name) { super(); this.id = name; } + + public SchedulerEntity addRunning(int count) { + this.runnungTasks += count; + return this; + } } From be7150fcab2e09e7ca6ee94bf040f53f5b9959a7 Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Wed, 1 Jan 2025 21:45:52 +0100 Subject: [PATCH 2/7] less transaction needed --- .../scheduler/SchedulerService.java | 19 +++++++++++-------- .../component/TaskExecutorComponent.java | 2 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java index a67456484..195153cd1 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java @@ -103,16 +103,19 @@ public List> triggerNextTasks() { */ @NonNull public List> triggerNextTasks(OffsetDateTime timeDue) { - List triggers; if (taskExecutor.getFreeThreads() > 0) { - triggers = triggerService.lockNextTrigger( - name, taskExecutor.getFreeThreads(), timeDue); + final var result = trx.execute(t -> { + var triggers = triggerService.lockNextTrigger(name, + taskExecutor.getFreeThreads(), timeDue); + pingRegistry().addRunning(triggers.size()); + return triggers; + }); + + return taskExecutor.submit(result); } else { - triggers = Collections.emptyList(); + pingRegistry(); + return Collections.emptyList(); } - var result = taskExecutor.submit(triggers); - pingRegistry(); - return result; } /** @@ -122,7 +125,7 @@ public List> triggerNextTasks(OffsetDateTime timeDue) { */ public Future runOrQueue( AddTriggerRequest triggerRequest) { - var runningTrigger = trx.execute(t -> { + final var runningTrigger = trx.execute(t -> { var trigger = triggerService.queue(triggerRequest); // exit now if this trigger is for the future ... if (trigger.shouldRunInFuture()) return trigger; diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java index bffa7f8e6..3665058ba 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java @@ -47,6 +47,8 @@ public TaskExecutorComponent(TriggerService triggerService, int maxThreads) { @NonNull public List> submit(List trigger) { + if (trigger == null || trigger.isEmpty()) return Collections.emptyList(); + final List> result = new ArrayList<>(trigger.size()); for (TriggerEntity triggerEntity : trigger) { result.add(submit(triggerEntity)); From b6f254fabd044b98c517211453dc013c2ece161a Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Thu, 2 Jan 2025 17:07:02 +0100 Subject: [PATCH 3/7] added transaction support for tasks --- .../persistent_tasks/api/RetryStrategy.java | 105 +++++++++++++++--- .../persistent_tasks/api/SpringBeanTask.java | 15 +++ .../component/EditTriggerComponent.java | 6 +- .../component/RunTriggerComponent.java | 56 ++++++++-- .../trigger/component/StateSerializer.java | 2 +- ...eteEvent.java => TriggerSuccessEvent.java} | 2 +- .../trigger/model/TriggerEntity.java | 10 ++ .../trigger/TriggerServiceTest.java | 30 +++++ ui/src/server-api.d.ts | 7 ++ 9 files changed, 201 insertions(+), 32 deletions(-) rename core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/{TriggerCompleteEvent.java => TriggerSuccessEvent.java} (58%) diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java index 71f753ff5..24b93680c 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java @@ -1,45 +1,114 @@ package org.sterl.spring.persistent_tasks.api; import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import org.springframework.lang.Nullable; +import lombok.RequiredArgsConstructor; + @FunctionalInterface public interface RetryStrategy { RetryStrategy NO_RETRY = (c, e) -> false; /** * One initial execution and after that we will try it 3 more times. Overall 4 executions. */ - RetryStrategy THREE_RETRIES = (c, e) -> c < 4; + RetryStrategy THREE_RETRIES = new LinearRetryStrategy(4, ChronoUnit.MINUTES, 1); /** * One initial execution and after that we will try it 3 more times. Overall 4 executions. */ - RetryStrategy THREE_RETRIES_IMMEDIATELY = new RetryStrategy() { + RetryStrategy THREE_RETRIES_IMMEDIATELY = new MultiplicativeRetryStrategy(4, ChronoUnit.MILLIS, 0); + + + /** + * Determines whether a retry should be attempted based on the current + * execution count and the provided exception. + * + * @param executionCount The number of attempts already made. + * @param error The exception that triggered the retry. + * @return {@code true} if the current execution count is less than + * the maximum execution count; {@code false} otherwise. + */ + boolean shouldRetry(int executionCount, @Nullable Exception error); + + /** + * Calculates the time of the next retry attempt based on the current + * execution count and the provided exception. + * + * @param executionCount The number of attempts already made. + * @param exception The exception that triggered the retry. + * @return The {@link OffsetDateTime} representing the time of the next retry attempt. + */ + default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception) { + return OffsetDateTime.now().plusMinutes(executionCount); + } + + + // Default implementations + /** + * A retry strategy that determines the next retry time by adding a fixed + * offset and the execution count to the current time in the specified temporal unit. + * + *

This strategy can be used to create retry intervals that increase linearly + * with the number of attempts, providing a predictable delay pattern.

+ * + *

Example: + * If {@code offset = 5}, {@code unit = ChronoUnit.SECONDS}, and + * {@code executionCount = 3}, the next retry will be scheduled after + * {@code 5 + 3 = 8 seconds} from the current time.

+ * + *

Note: The retry attempts will stop once the maximum execution count + * ({@code maxExecutionCount}) is reached.

+ * + * @author Your Name + */ + @RequiredArgsConstructor + public static class LinearRetryStrategy implements RetryStrategy { + private final int maxExecutionCount; + private final TemporalUnit unit; + private final int offset; + @Override public boolean shouldRetry(int executionCount, Exception error) { - return executionCount < 4; + return maxExecutionCount > executionCount; } @Override public OffsetDateTime retryAt(int executionCount, Exception error) { - return OffsetDateTime.now(); + return OffsetDateTime.now().plus(offset + executionCount, unit); } - }; - + } + /** - * Check if a retry should be done. + * A retry strategy that determines the next retry time by multiplying + * the execution count by a scaling factor and adding the result to the + * current time in the specified temporal unit. + * + *

This strategy can be used to create retry intervals that increase + * multiplicatively with the number of attempts, providing a way to progressively + * delay retries.

* - * @param executionCount 0 based counter how often the execution was tried - * @param error the exception, null on a timeout - */ - boolean shouldRetry(int executionCount, @Nullable Exception error); - - /** - * By default a linear retry strategy, adding one minute for each failed try. + *

Example: + * If {@code scalingFactor = 2}, {@code unit = ChronoUnit.SECONDS}, and + * {@code executionCount = 3}, the next retry will be scheduled after + * {@code 2 * 3 = 6 seconds} from the current time.

* - * @param executionCount 0 based counter how often the execution was tried - * @param error the exception, null on a timeout + *

Note: The retry attempts will stop once the maximum execution count + * ({@code maxExecutionCount}) is reached.

*/ - default OffsetDateTime retryAt(int executionCount, @Nullable Exception error) { - return OffsetDateTime.now().plusMinutes(1 + executionCount); + @RequiredArgsConstructor + public static class MultiplicativeRetryStrategy implements RetryStrategy { + private final int maxExecutionCount; + private final TemporalUnit unit; + private final int scalingFactor; + + @Override + public boolean shouldRetry(int executionCount, Exception error) { + return maxExecutionCount > executionCount; + } + @Override + public OffsetDateTime retryAt(int executionCount, Exception error) { + return OffsetDateTime.now().plus(scalingFactor * executionCount, unit); + } } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java index 295b97157..83f302e4a 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java @@ -11,4 +11,19 @@ public interface SpringBeanTask extends Consumer { default RetryStrategy retryStrategy() { return RetryStrategy.THREE_RETRIES; } + + /** + * Whether the task is transaction or not. If true the execution + * is wrapped into the default transaction template together with the state update + * and the following events: + *
    + *
  1. org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent
  2. + *
  3. org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent
  4. + *
  5. org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent
  6. + *
+ * @return {@code true} if the task is transactional; {@code false} otherwise. + */ + default boolean isTransactional() { + return false; + } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java index cf924e359..3c5da4ff7 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java @@ -19,7 +19,7 @@ import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus; import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent; import org.sterl.spring.persistent_tasks.trigger.event.TriggerCanceledEvent; -import org.sterl.spring.persistent_tasks.trigger.event.TriggerCompleteEvent; +import org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent; import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent; import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; import org.sterl.spring.persistent_tasks.trigger.repository.TriggerRepository; @@ -55,8 +55,8 @@ public Optional completeTaskWithStatus(TriggerKey key, Exception result.ifPresent(t -> { t.complete(e); - if (t.getData().getStatus() != TriggerStatus.FAILED) { - publisher.publishEvent(new TriggerCompleteEvent(t)); + if (t.getData().getStatus() == TriggerStatus.SUCCESS) { + publisher.publishEvent(new TriggerSuccessEvent(t)); log.debug("Setting {} to status={} {}", key, t.getData().getStatus(), e == null ? "" : "error=" + e.getClass().getSimpleName()); } else { diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java index 6786186ad..de15e5308 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java @@ -3,10 +3,12 @@ import java.io.Serializable; import java.time.OffsetDateTime; import java.util.Optional; +import java.util.concurrent.Callable; import org.springframework.context.ApplicationEventPublisher; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; import org.sterl.spring.persistent_tasks.api.Task; import org.sterl.spring.persistent_tasks.task.TaskService; import org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent; @@ -23,36 +25,72 @@ public class RunTriggerComponent { private final TaskService taskService; private final EditTriggerComponent editTrigger; private final ApplicationEventPublisher eventPublisher; + private final TransactionTemplate trx; private final StateSerializer serializer = new StateSerializer(); /** * Will execute the given {@link TriggerEntity} and handle any errors etc. */ - public Optional execute(@Nullable TriggerEntity trigger) { + public Optional execute(TriggerEntity trigger) { if (trigger == null) { return Optional.empty(); } - Task task = null; + final var taskAndState = getTastAndState(trigger); + // something went really wrong this trigger is crap + if (taskAndState == null) return Optional.of(trigger); + try { - task = taskService.assertIsKnown(trigger.newTaskId()); + Optional result; + if (taskAndState.isTransactional()) { + result = trx.execute(t -> taskAndState.call()); + } else { + result = taskAndState.call(); + } + return result; + } catch (Exception e) { + return handleTaskException(taskAndState, e); + } + } + @Nullable + private TaskAndState getTastAndState(TriggerEntity trigger) { + try { + var task = taskService.assertIsKnown(trigger.newTaskId()); + var state = serializer.deserialize(trigger.getData().getState()); + return new TaskAndState(task, state, trigger); + } catch (Exception e) { + // this trigger is somehow crap, no retry and done. + handleTaskException(new TaskAndState(null, null, trigger), e); + return null; + } + } + @RequiredArgsConstructor + private class TaskAndState implements Callable> { + final Task task; + final Serializable state; + final TriggerEntity trigger; + + boolean isTransactional() { + return task.isTransactional(); + } + public Optional call() { eventPublisher.publishEvent(new TriggerRunningEvent(trigger)); - task.accept(serializer.deserialize(trigger.getData().getState())); + task.accept(state); var result = editTrigger.completeTaskWithSuccess(trigger.getKey()); editTrigger.deleteTrigger(trigger); return result; - } catch (Exception e) { - return handleTaskException(trigger, task, e); + } } - private Optional handleTaskException(TriggerEntity trigger, - @Nullable Task task, + private Optional handleTaskException(TaskAndState taskAndState, @Nullable Exception e) { + var trigger = taskAndState.trigger; + var task = taskAndState.task; var result = editTrigger.completeTaskWithStatus(trigger.getKey(), e); if (task != null && @@ -73,7 +111,7 @@ private Optional handleTaskException(TriggerEntity trigger, } } else { log.error("{} failed, no more retries! {}", trigger.getKey(), - e == null ? "No exception given." : e.getMessage()); + e == null ? "No exception given." : e.getMessage(), e); editTrigger.deleteTrigger(trigger); } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/StateSerializer.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/StateSerializer.java index 995e5c81d..5fd1ba668 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/StateSerializer.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/StateSerializer.java @@ -33,7 +33,7 @@ public Serializable deserialize(byte[] bytes) { try (ObjectInput in = new ObjectInputStream(bis)) { return (Serializable)in.readObject(); } catch (Exception ex) { - throw new RuntimeException(ex); + throw new RuntimeException("Failed to deserialize state of length " + bytes.length, ex); } } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerCompleteEvent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerSuccessEvent.java similarity index 58% rename from core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerCompleteEvent.java rename to core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerSuccessEvent.java index a687c8237..3fdb52fa5 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerCompleteEvent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerSuccessEvent.java @@ -2,6 +2,6 @@ import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; -public record TriggerCompleteEvent(TriggerEntity trigger) implements TriggerLifeCycleEvent { +public record TriggerSuccessEvent(TriggerEntity trigger) implements TriggerLifeCycleEvent { } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java index 87899e0b9..a9879cce6 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java @@ -85,10 +85,15 @@ public TriggerEntity runOn(String runningOn) { return this; } + /** + * @param e Sets either {@link TriggerStatus#SUCCESS} or {@link TriggerStatus#FAILED} + * based if the {@link Exception} is null or not. + */ public TriggerEntity complete(Exception e) { data.setStatus(TriggerStatus.SUCCESS); data.setEnd(OffsetDateTime.now()); data.updateRunningDuration(); + if (e != null) { data.setStatus(TriggerStatus.FAILED); data.setExceptionName(e.getClass().getName()); @@ -103,4 +108,9 @@ public TriggerEntity runAt(OffsetDateTime runAt) { data.setRunAt(runAt); return this; } + + public TriggerEntity withState(byte[] state) { + this.data.setState(state); + return this; + } } diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java index 8bca11900..b8f756996 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java @@ -353,4 +353,34 @@ void testRescheduleAbandonedTasks() { assertThat(rescheduledTasks).hasSize(1); assertThat(rescheduledTasks.get(0).getKey()).isEqualTo(t1.getKey()); } + + @Test + void testUnknownTriggersNoRetry() { + // GIVEN + var t = triggerRepository.save(new TriggerEntity(new TriggerKey("fooTask-unknown"))); + + // WHEN + runNextTrigger(); + + // WHEN + var triggerData = persistentTaskService.getLastTriggerData(t.getKey()).get(); + assertThat(triggerData.getStatus()).isEqualTo(TriggerStatus.FAILED); + assertThat(triggerData.getExceptionName()).isEqualTo(IllegalStateException.class.getName()); + } + + @Test + void testBadStateNoRetry() { + var t = triggerRepository.save(new TriggerEntity( + new TriggerKey("slowTask") + ).withState(new byte[] {12, 54}) + ); + + // WHEN + runNextTrigger(); + + // WHEN + var triggerData = persistentTaskService.getLastTriggerData(t.getKey()).get(); + assertThat(triggerData.getStatus()).isEqualTo(TriggerStatus.FAILED); + assertThat(triggerData.getExceptionName()).isEqualTo(RuntimeException.class.getName()); + } } diff --git a/ui/src/server-api.d.ts b/ui/src/server-api.d.ts index 0122ba761..499c781e1 100644 --- a/ui/src/server-api.d.ts +++ b/ui/src/server-api.d.ts @@ -37,7 +37,14 @@ export interface HistoryOverview { export interface RetryStrategy { } +export interface LinearRetryStrategy extends RetryStrategy { +} + +export interface MultiplicativeRetryStrategy extends RetryStrategy { +} + export interface SpringBeanTask extends Consumer { + transactional: boolean; } export interface Task extends SpringBeanTask { From 7ac1eb7cbf816b6412a682e344b01f42e0fb00ed Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Thu, 2 Jan 2025 17:09:50 +0100 Subject: [PATCH 4/7] fixed PMD issue --- .../sterl/spring/persistent_tasks/api/RetryStrategy.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java index 24b93680c..b052d0c37 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java @@ -64,7 +64,7 @@ default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception * @author Your Name */ @RequiredArgsConstructor - public static class LinearRetryStrategy implements RetryStrategy { + public class LinearRetryStrategy implements RetryStrategy { private final int maxExecutionCount; private final TemporalUnit unit; private final int offset; @@ -78,7 +78,7 @@ public OffsetDateTime retryAt(int executionCount, Exception error) { return OffsetDateTime.now().plus(offset + executionCount, unit); } } - + /** * A retry strategy that determines the next retry time by multiplying * the execution count by a scaling factor and adding the result to the @@ -97,7 +97,7 @@ public OffsetDateTime retryAt(int executionCount, Exception error) { * ({@code maxExecutionCount}) is reached.

*/ @RequiredArgsConstructor - public static class MultiplicativeRetryStrategy implements RetryStrategy { + public class MultiplicativeRetryStrategy implements RetryStrategy { private final int maxExecutionCount; private final TemporalUnit unit; private final int scalingFactor; From abb46d995c02f6dae0f2fdde7f5cee6e95a655e6 Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Thu, 2 Jan 2025 17:36:17 +0100 Subject: [PATCH 5/7] added test to count the transactions --- .../task/model/RegisteredTask.java | 4 ++ .../persistent_tasks/AbstractSpringTest.java | 10 +++- .../persistent_tasks/HibernateAsserts.java | 48 +++++++++++++++++ .../SchedulerServiceTransactionTest.java | 53 ++++++++++++++++--- .../EditSchedulerStatusComponentTest.java | 2 +- 5 files changed, 108 insertions(+), 9 deletions(-) create mode 100644 core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java rename core/src/test/java/org/sterl/spring/{sample_app => persistent_tasks/scheduler}/component/EditSchedulerStatusComponentTest.java (92%) diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/task/model/RegisteredTask.java b/core/src/main/java/org/sterl/spring/persistent_tasks/task/model/RegisteredTask.java index bd689a8b3..e7245a629 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/task/model/RegisteredTask.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/task/model/RegisteredTask.java @@ -38,4 +38,8 @@ public void accept(T state) { public RetryStrategy retryStrategy() { return this.fun.retryStrategy(); } + @Override + public boolean isTransactional() { + return this.fun.isTransactional(); + } } diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java index db53e9dba..be4943d0b 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java @@ -15,7 +15,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; -import org.springframework.test.context.ActiveProfiles; import org.springframework.transaction.support.TransactionTemplate; import org.sterl.spring.persistent_tasks.api.SpringBeanTask; import org.sterl.spring.persistent_tasks.api.TaskId; @@ -30,6 +29,7 @@ import org.sterl.spring.sample_app.SampleApp; import org.sterl.test.AsyncAsserts; +import jakarta.persistence.EntityManager; import lombok.RequiredArgsConstructor; import uk.co.jemos.podam.api.PodamFactory; import uk.co.jemos.podam.api.PodamFactoryImpl; @@ -63,6 +63,8 @@ public class AbstractSpringTest { protected TransactionTemplate trx; @Autowired protected AsyncAsserts asserts; + @Autowired + protected HibernateAsserts hibernateAsserts; protected final PodamFactory pm = new PodamFactoryImpl(); @@ -72,6 +74,11 @@ public static class TaskConfig { AsyncAsserts asserts() { return new AsyncAsserts(); } + + @Bean + HibernateAsserts hibernateAsserts(EntityManager entityManager) { + return new HibernateAsserts(entityManager); + } @Primary @Bean("schedulerA") @@ -147,6 +154,7 @@ protected Optional runNextTrigger() { @BeforeEach public void beforeEach() throws Exception { + hibernateAsserts.reset(); triggerService.deleteAll(); historyService.deleteAll(); asserts.clear(); diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java b/core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java new file mode 100644 index 000000000..78696eec1 --- /dev/null +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java @@ -0,0 +1,48 @@ +package org.sterl.spring.persistent_tasks; + +import static org.junit.jupiter.api.Assertions.fail; + +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.stat.Statistics; + +import jakarta.persistence.EntityManager; + +public class HibernateAsserts { + private final Statistics statistics; + + public HibernateAsserts(EntityManager entityManager) { + try (Session session = entityManager.unwrap(org.hibernate.Session.class)) { + @SuppressWarnings("resource") + SessionFactory factory = session.getSessionFactory(); + factory.getStatistics().setStatisticsEnabled(true); + statistics = factory.getStatistics(); + } + } + + public HibernateAsserts assertTrxCount(int expected) { + long value = statistics.getTransactionCount(); + if (value != expected) { + logSummary(); + fail("Expected " + expected + " TransactionCount, but found " + value); + } + return this; + } + + public HibernateAsserts assertInsertCount(int expected) { + long value = statistics.getEntityInsertCount(); + if (value != expected) { + logSummary(); + fail("Expected " + expected + " EntityInsertCount, but found " + value); + } + return this; + } + + public void reset() { + statistics.clear(); + } + + public void logSummary() { + statistics.logSummary(); + } +} diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java index 530ba93a9..1aa74d67f 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java @@ -23,17 +23,14 @@ class SchedulerServiceTransactionTest extends AbstractSpringTest { private SchedulerService subject; - @Autowired private AtomicBoolean sendError; + private static AtomicBoolean sendError = new AtomicBoolean(false); + private static AtomicBoolean inTrx = new AtomicBoolean(false); @Autowired private PersonRepository personRepository; @Configuration static class Config { @Bean - AtomicBoolean sendError() { - return new AtomicBoolean(false); - } - @Bean - SpringBeanTask savePerson(PersonRepository personRepository, AtomicBoolean sendError) { + SpringBeanTask savePerson(PersonRepository personRepository) { return new SpringBeanTask<>() { @Transactional @Override @@ -46,6 +43,10 @@ public void accept(String name) { public RetryStrategy retryStrategy() { return RetryStrategy.THREE_RETRIES_IMMEDIATELY; } + @Override + public boolean isTransactional() { + return inTrx.get(); + } }; } } @@ -56,17 +57,55 @@ public void beforeEach() throws Exception { subject = schedulerService; personRepository.deleteAllInBatch(); sendError.set(false); + inTrx.set(false); } - + @Test void testSaveEntity() throws Exception { // GIVEN final var trigger = TaskTriggerBuilder.newTrigger("savePerson").state("Paul").build(); // WHEN + hibernateAsserts.reset(); subject.runOrQueue(trigger).get(); // THEN + // AND one the service, one the event and one more status update, + // one more to save the trigger + hibernateAsserts.assertTrxCount(4); + assertThat(personRepository.count()).isOne(); + } + + @Test + void testSaveTransactions() throws Exception { + // GIVEN + final var request = TaskTriggerBuilder.newTrigger("savePerson").state("Paul").build(); + var trigger = triggerService.queue(request); + + // WHEN + hibernateAsserts.reset(); + triggerService.run(trigger); + + // THEN + // AND one the service, one the event and one more status update + hibernateAsserts.assertTrxCount(3); + assertThat(personRepository.count()).isOne(); + } + + + @Test + void testTrxCountTriggerService() throws Exception { + // GIVEN + final var request = TaskTriggerBuilder.newTrigger("savePerson").state("Paul").build(); + var trigger = triggerService.queue(request); + inTrx.set(true); + + // WHEN + hibernateAsserts.reset(); + triggerService.run(trigger); + + // THEN + hibernateAsserts.assertTrxCount(1); assertThat(personRepository.count()).isOne(); } diff --git a/core/src/test/java/org/sterl/spring/sample_app/component/EditSchedulerStatusComponentTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponentTest.java similarity index 92% rename from core/src/test/java/org/sterl/spring/sample_app/component/EditSchedulerStatusComponentTest.java rename to core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponentTest.java index 1bf694881..0179f5719 100644 --- a/core/src/test/java/org/sterl/spring/sample_app/component/EditSchedulerStatusComponentTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponentTest.java @@ -1,4 +1,4 @@ -package org.sterl.spring.sample_app.component; +package org.sterl.spring.persistent_tasks.scheduler.component; import static org.assertj.core.api.Assertions.assertThat; From 75f49ea94c7e8171f98cb6ae55f517c752901bca Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Thu, 2 Jan 2025 17:43:16 +0100 Subject: [PATCH 6/7] extended javadoc --- README.md | 14 +++++++++++++- .../scheduler/SchedulerServiceTransactionTest.java | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 02513840f..eda1a7592 100644 --- a/README.md +++ b/README.md @@ -66,13 +66,25 @@ public class BuildVehicleTask implements SpringBeanTask { private final VehicleRepository vehicleRepository; - @Transactional(timeout = 5) @Override public void accept(Vehicle vehicle) { // do stuff // save vehicleRepository.save(vehicle); } + // OPTIONAL + @Override + public RetryStrategy retryStrategy() { + // run 5 times, multiply the execution count with 4, add the result in HOURS to now. + return new MultiplicativeRetryStrategy(5, ChronoUnit.HOURS, 4) + } + // OPTIONAL + // if the task in accept requires a DB transaction, join them together with the framework + // if true the TransactionTemplate is used. Set here any timeouts. + @Override + public boolean isTransactional() { + return true; + } } ``` diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java index 1aa74d67f..fcf90639e 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.temporal.ChronoUnit; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -13,6 +14,7 @@ import org.springframework.transaction.annotation.Transactional; import org.sterl.spring.persistent_tasks.AbstractSpringTest; import org.sterl.spring.persistent_tasks.api.RetryStrategy; +import org.sterl.spring.persistent_tasks.api.RetryStrategy.MultiplicativeRetryStrategy; import org.sterl.spring.persistent_tasks.api.SpringBeanTask; import org.sterl.spring.persistent_tasks.api.TaskId.TaskTriggerBuilder; import org.sterl.spring.persistent_tasks.api.TriggerKey; From 07165b1dd90c74f499331826ba09938d04c48f6a Mon Sep 17 00:00:00 2001 From: Paul Sterl Date: Thu, 2 Jan 2025 17:53:24 +0100 Subject: [PATCH 7/7] extended transaction management doc --- README.md | 21 ++++++++++++++++--- .../persistent_tasks/api/RetryStrategy.java | 4 ++-- .../persistent_tasks/AbstractSpringTest.java | 9 ++++++++ .../HibernateAsserts.java | 2 +- 4 files changed, 30 insertions(+), 6 deletions(-) rename core/src/test/java/org/sterl/{spring/persistent_tasks => test}/HibernateAsserts.java (96%) diff --git a/README.md b/README.md index eda1a7592..61fd3012b 100644 --- a/README.md +++ b/README.md @@ -88,14 +88,29 @@ public class BuildVehicleTask implements SpringBeanTask { } ``` +Consider setting a timeout to the `TransactionTemplate`: + +```java +@Bean +TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { + TransactionTemplate template = new TransactionTemplate(transactionManager); + template.setTimeout(10); + return template; +} +``` + ### As a closure -Note: this example has no aspects as above the spring _@Transactional_ +Simple task will use defaults: + +- Not a transactional task, e.g. HTTP calls +- 4 executions, one regular and 3 retries, linear +- using minutes with an offset of 1 which is added to now ```java @Bean -SpringBeanTask task1(VehicleRepository vehicleRepository) { - return v -> vehicleRepository.save(v); +SpringBeanTask task1(VehicleHttpConnector vehicleHttpConnector) { + return v -> vehicleHttpConnector.send(v); } ``` diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java index b052d0c37..e0bdd3fb3 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java @@ -64,7 +64,7 @@ default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception * @author Your Name */ @RequiredArgsConstructor - public class LinearRetryStrategy implements RetryStrategy { + class LinearRetryStrategy implements RetryStrategy { private final int maxExecutionCount; private final TemporalUnit unit; private final int offset; @@ -97,7 +97,7 @@ public OffsetDateTime retryAt(int executionCount, Exception error) { * ({@code maxExecutionCount}) is reached.

*/ @RequiredArgsConstructor - public class MultiplicativeRetryStrategy implements RetryStrategy { + class MultiplicativeRetryStrategy implements RetryStrategy { private final int maxExecutionCount; private final TemporalUnit unit; private final int scalingFactor; diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java index be4943d0b..1606ed4ea 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java @@ -15,6 +15,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; import org.sterl.spring.persistent_tasks.api.SpringBeanTask; import org.sterl.spring.persistent_tasks.api.TaskId; @@ -28,6 +29,7 @@ import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; import org.sterl.spring.sample_app.SampleApp; import org.sterl.test.AsyncAsserts; +import org.sterl.test.HibernateAsserts; import jakarta.persistence.EntityManager; import lombok.RequiredArgsConstructor; @@ -75,6 +77,13 @@ AsyncAsserts asserts() { return new AsyncAsserts(); } + @Bean + TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { + TransactionTemplate template = new TransactionTemplate(transactionManager); + template.setTimeout(10); + return template; + } + @Bean HibernateAsserts hibernateAsserts(EntityManager entityManager) { return new HibernateAsserts(entityManager); diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java b/core/src/test/java/org/sterl/test/HibernateAsserts.java similarity index 96% rename from core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java rename to core/src/test/java/org/sterl/test/HibernateAsserts.java index 78696eec1..593757ee6 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/HibernateAsserts.java +++ b/core/src/test/java/org/sterl/test/HibernateAsserts.java @@ -1,4 +1,4 @@ -package org.sterl.spring.persistent_tasks; +package org.sterl.test; import static org.junit.jupiter.api.Assertions.fail;