From 79dcd0227d932e9393150ae8af966a8cdac2e3b2 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 16 Oct 2025 21:25:09 +0200 Subject: [PATCH 1/2] [Fix #727] Implementing retries Signed-off-by: fjtirado --- .../serverlessworkflow/impl/TaskContext.java | 20 +++- .../impl/WorkflowUtils.java | 36 ++++++ .../impl/executors/AbstractTaskExecutor.java | 13 ++- .../impl/executors/TryExecutor.java | 107 +++++++++++++++--- .../retry/AbstractRetryIntervalFunction.java | 69 +++++++++++ .../retry/ConstantRetryIntervalFunction.java | 34 ++++++ .../executors/retry/DefaultRetryExecutor.java | 62 ++++++++++ .../ExponentialRetryIntervalFunction.java | 34 ++++++ .../retry/LinearRetryIntervalFunction.java | 34 ++++++ .../impl/executors/retry/RetryExecutor.java | 26 +++++ .../retry/RetryIntervalFunction.java | 29 +++++ .../ce/AbstractLifeCyclePublisher.java | 21 ++++ .../impl/lifecycle/ce/TaskRetriedCEData.java | 21 ++++ .../impl/test/RetryTest.java | 85 ++++++++++++++ .../impl/test/TraceExecutionListener.java | 26 ++++- .../try-catch-retry-inline.yaml | 26 +++++ .../try-catch-retry-reusable.yaml | 30 +++++ 17 files changed, 649 insertions(+), 24 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java create mode 100644 impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 9634c5f7..8ef68513 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -37,7 +37,7 @@ public class TaskContext implements TaskContextData { private WorkflowModel rawOutput; private Instant completedAt; private TransitionInfo transition; - private boolean completed; + private short retryAttempt; public TaskContext( WorkflowModel input, @@ -67,6 +67,7 @@ private TaskContext( this.input = input; this.output = output; this.rawOutput = rawOutput; + this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0); this.contextVariables = parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); } @@ -110,7 +111,6 @@ public WorkflowModel rawOutput() { public TaskContext output(WorkflowModel output) { this.output = output; - this.completed = true; return this; } @@ -162,7 +162,19 @@ public TaskContext transition(TransitionInfo transition) { } public boolean isCompleted() { - return completed; + return completedAt != null; + } + + public short retryAttempt() { + return retryAttempt; + } + + public void retryAttempt(short retryAttempt) { + this.retryAttempt = retryAttempt; + } + + public boolean isRetrying() { + return retryAttempt > 0; } @Override @@ -175,6 +187,8 @@ public String toString() { + taskName + ", completedAt=" + completedAt + + ", retryAttempt=" + + retryAttempt + "]"; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 8cf35e31..45d25ede 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -15,10 +15,12 @@ */ package io.serverlessworkflow.impl; +import io.serverlessworkflow.api.types.DurationInline; import io.serverlessworkflow.api.types.ExportAs; import io.serverlessworkflow.api.types.InputFrom; import io.serverlessworkflow.api.types.OutputAs; import io.serverlessworkflow.api.types.SchemaUnion; +import io.serverlessworkflow.api.types.TimeoutAfter; import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import io.serverlessworkflow.impl.expressions.ExpressionUtils; @@ -26,6 +28,7 @@ import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; import java.net.URI; +import java.time.Duration; import java.util.Map; import java.util.Optional; import org.slf4j.Logger; @@ -153,4 +156,37 @@ public static void safeClose(AutoCloseable closeable) { } } } + + public static boolean whenExceptTest( + Optional whenFilter, + Optional exceptFilter, + WorkflowContext workflow, + TaskContext taskContext, + WorkflowModel model) { + return whenFilter.map(w -> w.test(workflow, taskContext, model)).orElse(true) + && exceptFilter.map(w -> !w.test(workflow, taskContext, model)).orElse(true); + } + + public static WorkflowValueResolver fromTimeoutAfter( + WorkflowApplication application, TimeoutAfter timeout) { + if (timeout.getDurationExpression() != null) { + return (w, f, t) -> + Duration.parse( + application + .expressionFactory() + .resolveString(ExpressionDescriptor.from(timeout.getDurationExpression())) + .apply(w, f, t)); + } else if (timeout.getDurationInline() != null) { + DurationInline inlineDuration = timeout.getDurationInline(); + return (w, t, f) -> + Duration.ofDays(inlineDuration.getDays()) + .plus( + Duration.ofHours(inlineDuration.getHours()) + .plus(Duration.ofMinutes(inlineDuration.getMinutes())) + .plus(Duration.ofSeconds(inlineDuration.getSeconds())) + .plus(Duration.ofMillis(inlineDuration.getMilliseconds()))); + } else { + return (w, t, f) -> Duration.ZERO; + } + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 5c80adc8..30956970 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -38,6 +38,7 @@ import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.schema.SchemaValidator; @@ -204,9 +205,15 @@ public CompletableFuture apply( .thenCompose(workflowContext.instance()::suspendedCheck) .thenApply( t -> { - publishEvent( - workflowContext, - l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext))); + if (t.isRetrying()) { + publishEvent( + workflowContext, + l -> l.onTaskRetried(new TaskRetriedEvent(workflowContext, taskContext))); + } else { + publishEvent( + workflowContext, + l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext))); + } inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); inputProcessor.ifPresent( p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index d7d7cf0b..a071d996 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -17,6 +17,9 @@ import io.serverlessworkflow.api.types.CatchErrors; import io.serverlessworkflow.api.types.ErrorFilter; +import io.serverlessworkflow.api.types.Retry; +import io.serverlessworkflow.api.types.RetryBackoff; +import io.serverlessworkflow.api.types.RetryPolicy; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.api.types.TryTaskCatch; @@ -29,6 +32,12 @@ import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.executors.retry.ConstantRetryIntervalFunction; +import io.serverlessworkflow.impl.executors.retry.DefaultRetryExecutor; +import io.serverlessworkflow.impl.executors.retry.ExponentialRetryIntervalFunction; +import io.serverlessworkflow.impl.executors.retry.LinearRetryIntervalFunction; +import io.serverlessworkflow.impl.executors.retry.RetryExecutor; +import io.serverlessworkflow.impl.executors.retry.RetryIntervalFunction; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -42,6 +51,7 @@ public class TryExecutor extends RegularTaskExecutor { private final Optional> errorFilter; private final TaskExecutor taskExecutor; private final Optional> catchTaskExecutor; + private final Optional retryIntervalExecutor; public static class TryExecutorBuilder extends RegularTaskExecutorBuilder { @@ -50,6 +60,7 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder> errorFilter; private final TaskExecutor taskExecutor; private final Optional> catchTaskExecutor; + private final Optional retryIntervalExecutor; protected TryExecutorBuilder( WorkflowMutablePosition position, TryTask task, WorkflowDefinition definition) { @@ -60,13 +71,63 @@ protected TryExecutorBuilder( this.exceptFilter = WorkflowUtils.optionalPredicate(application, catchInfo.getExceptWhen()); this.taskExecutor = TaskExecutorHelper.createExecutorList(position, task.getTry(), definition); - List catchTask = task.getCatch().getDo(); - this.catchTaskExecutor = - catchTask != null && !catchTask.isEmpty() - ? Optional.of( - TaskExecutorHelper.createExecutorList( - position, task.getCatch().getDo(), definition)) - : Optional.empty(); + TryTaskCatch catchTask = task.getCatch(); + if (catchTask != null) { + List catchTaskDo = catchTask.getDo(); + + this.catchTaskExecutor = + catchTaskDo != null && !catchTaskDo.isEmpty() + ? Optional.of( + TaskExecutorHelper.createExecutorList(position, catchTaskDo, definition)) + : Optional.empty(); + + Retry retry = catchTask.getRetry(); + this.retryIntervalExecutor = retry != null ? buildRetryInterval(retry) : Optional.empty(); + } else { + this.catchTaskExecutor = Optional.empty(); + this.retryIntervalExecutor = Optional.empty(); + } + } + + private Optional buildRetryInterval(Retry retry) { + RetryPolicy retryPolicy = null; + if (retry.getRetryPolicyDefinition() != null) { + retryPolicy = retry.getRetryPolicyDefinition(); + } else if (retry.getRetryPolicyReference() != null) { + retryPolicy = + workflow + .getUse() + .getRetries() + .getAdditionalProperties() + .get(retry.getRetryPolicyReference()); + if (retryPolicy == null) { + throw new IllegalStateException("Retry policy " + retryPolicy + " was not found"); + } + } + return retryPolicy != null ? Optional.of(buildRetryExecutor(retryPolicy)) : Optional.empty(); + } + + protected RetryExecutor buildRetryExecutor(RetryPolicy retryPolicy) { + return new DefaultRetryExecutor( + retryPolicy.getLimit().getAttempt().getCount(), + buildIntervalFunction(retryPolicy), + WorkflowUtils.optionalPredicate(application, retryPolicy.getWhen()), + WorkflowUtils.optionalPredicate(application, retryPolicy.getExceptWhen())); + } + + private RetryIntervalFunction buildIntervalFunction(RetryPolicy retryPolicy) { + RetryBackoff backoff = retryPolicy.getBackoff(); + if (backoff.getConstantBackoff() != null) { + return new ConstantRetryIntervalFunction( + application, retryPolicy.getDelay(), retryPolicy.getJitter()); + } else if (backoff.getLinearBackoff() != null) { + return new LinearRetryIntervalFunction( + application, retryPolicy.getDelay(), retryPolicy.getJitter()); + } else if (backoff.getExponentialBackOff() != null) { + return new ExponentialRetryIntervalFunction( + application, retryPolicy.getDelay(), retryPolicy.getJitter()); + } + throw new IllegalStateException("A backoff strategy should be set"); } @Override @@ -82,13 +143,19 @@ protected TryExecutor(TryExecutorBuilder builder) { this.exceptFilter = builder.exceptFilter; this.taskExecutor = builder.taskExecutor; this.catchTaskExecutor = builder.catchTaskExecutor; + this.retryIntervalExecutor = builder.retryIntervalExecutor; } @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { + return doIt(workflow, taskContext, taskContext.input()); + } + + private CompletableFuture doIt( + WorkflowContext workflow, TaskContext taskContext, WorkflowModel model) { return TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), taskContext.input()) + taskExecutor, workflow, Optional.of(taskContext), model) .exceptionallyCompose(e -> handleException(e, workflow, taskContext)); } @@ -99,17 +166,27 @@ private CompletableFuture handleException( } if (e instanceof WorkflowException) { WorkflowException exception = (WorkflowException) e; + CompletableFuture completable = + CompletableFuture.completedFuture(taskContext.rawOutput()); if (errorFilter.map(f -> f.test(exception.getWorkflowError())).orElse(true) - && whenFilter.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true) - && exceptFilter - .map(w -> !w.test(workflow, taskContext, taskContext.input())) - .orElse(true)) { + && WorkflowUtils.whenExceptTest( + whenFilter, exceptFilter, workflow, taskContext, taskContext.rawOutput())) { if (catchTaskExecutor.isPresent()) { - return TaskExecutorHelper.processTaskList( - catchTaskExecutor.get(), workflow, Optional.of(taskContext), taskContext.input()); + completable = + completable.thenCompose( + model -> + TaskExecutorHelper.processTaskList( + catchTaskExecutor.get(), workflow, Optional.of(taskContext), model)); + } + if (retryIntervalExecutor.isPresent()) { + completable = + completable + .thenCompose( + model -> retryIntervalExecutor.get().retry(workflow, taskContext, model)) + .thenCompose(model -> doIt(workflow, taskContext, model)); } } - return CompletableFuture.completedFuture(taskContext.rawOutput()); + return completable; } else { if (e instanceof RuntimeException) { throw (RuntimeException) e; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java new file mode 100644 index 00000000..8779f444 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.time.Duration; +import java.util.Optional; + +public abstract class AbstractRetryIntervalFunction implements RetryIntervalFunction { + + private final Optional> minJitteringResolver; + private final Optional> maxJitteringResolver; + private final WorkflowValueResolver delayResolver; + + public AbstractRetryIntervalFunction( + WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) { + if (jitter != null) { + minJitteringResolver = Optional.of(WorkflowUtils.fromTimeoutAfter(appl, jitter.getFrom())); + maxJitteringResolver = Optional.of(WorkflowUtils.fromTimeoutAfter(appl, jitter.getTo())); + } else { + minJitteringResolver = Optional.empty(); + maxJitteringResolver = Optional.empty(); + } + delayResolver = WorkflowUtils.fromTimeoutAfter(appl, delay); + } + + @Override + public Duration apply( + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model, + short numAttempts) { + Duration delay = delayResolver.apply(workflowContext, taskContext, model); + Duration minJittering = + minJitteringResolver + .map(min -> min.apply(workflowContext, taskContext, model)) + .orElse(Duration.ZERO); + Duration maxJittering = + maxJitteringResolver + .map(max -> max.apply(workflowContext, taskContext, model)) + .orElse(Duration.ZERO); + return calcDelay(delay, numAttempts) + .plus( + Duration.ofMillis( + (long) (minJittering.toMillis() + Math.random() * maxJittering.toMillis()))); + } + + protected abstract Duration calcDelay(Duration delay, short numAttempts); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java new file mode 100644 index 00000000..e7746fbb --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.time.Duration; + +public class ConstantRetryIntervalFunction extends AbstractRetryIntervalFunction { + + public ConstantRetryIntervalFunction( + WorkflowApplication application, TimeoutAfter delay, RetryPolicyJitter jitter) { + super(application, delay, jitter); + } + + @Override + protected Duration calcDelay(Duration delay, short numAttempts) { + return delay; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java new file mode 100644 index 00000000..fcfb83c8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java @@ -0,0 +1,62 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowPredicate; +import io.serverlessworkflow.impl.WorkflowUtils; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class DefaultRetryExecutor implements RetryExecutor { + + private final int maxAttempts; + private final RetryIntervalFunction intervalFunction; + private final Optional whenFilter; + private final Optional exceptFilter; + + public DefaultRetryExecutor( + int maxAttempts, + RetryIntervalFunction intervalFunction, + Optional whenFilter, + Optional exceptFilter) { + this.maxAttempts = maxAttempts; + this.intervalFunction = intervalFunction; + this.whenFilter = whenFilter; + this.exceptFilter = exceptFilter; + } + + @Override + public CompletableFuture retry( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) { + CompletableFuture completable = new CompletableFuture<>(); + short numAttempts = taskContext.retryAttempt(); + if (numAttempts++ < maxAttempts + && WorkflowUtils.whenExceptTest( + whenFilter, exceptFilter, workflowContext, taskContext, model)) { + taskContext.retryAttempt(numAttempts); + Duration delay = intervalFunction.apply(workflowContext, taskContext, model, numAttempts); + completable.completeOnTimeout(model, delay.toMillis(), TimeUnit.MILLISECONDS); + } else { + completable.complete(model); + } + return completable; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java new file mode 100644 index 00000000..fff1d8c8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.time.Duration; + +public class ExponentialRetryIntervalFunction extends AbstractRetryIntervalFunction { + + public ExponentialRetryIntervalFunction( + WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) { + super(appl, delay, jitter); + } + + @Override + protected Duration calcDelay(Duration delay, short numAttempts) { + return delay.multipliedBy(1 << numAttempts); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java new file mode 100644 index 00000000..f2713cf4 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.time.Duration; + +public class LinearRetryIntervalFunction extends AbstractRetryIntervalFunction { + + public LinearRetryIntervalFunction( + WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) { + super(appl, delay, jitter); + } + + @Override + protected Duration calcDelay(Duration delay, short numAttempts) { + return delay.multipliedBy(numAttempts); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java new file mode 100644 index 00000000..ff3bd7d3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.concurrent.CompletableFuture; + +public interface RetryExecutor { + CompletableFuture retry( + WorkflowContext worfklowContext, TaskContext taskContext, WorkflowModel model); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java new file mode 100644 index 00000000..b0e06d92 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Duration; + +public interface RetryIntervalFunction { + Duration apply( + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model, + short numAttempts); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index 374c6751..1d80665d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -31,6 +31,7 @@ import io.serverlessworkflow.impl.lifecycle.TaskEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; @@ -54,6 +55,7 @@ public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionLis private static final String TASK_RESUMED = "io.serverlessworkflow.task.resumed.v1"; private static final String TASK_FAULTED = "io.serverlessworkflow.task.faulted.v1"; private static final String TASK_CANCELLED = "io.serverlessworkflow.task.cancelled.v1"; + private static final String TASK_RETRIED = "io.serverlessworkflow.task.retried.v1"; private static final String WORKFLOW_STARTED = "io.serverlessworkflow.workflow.started.v1"; private static final String WORKFLOW_COMPLETED = "io.serverlessworkflow.workflow.completed.v1"; @@ -70,6 +72,7 @@ public static Collection getLifeCycleTypes() { TASK_RESUMED, TASK_FAULTED, TASK_CANCELLED, + TASK_RETRIED, WORKFLOW_STARTED, WORKFLOW_COMPLETED, WORKFLOW_SUSPENDED, @@ -92,6 +95,20 @@ public void onTaskStarted(TaskStartedEvent event) { .build()); } + @Override + public void onTaskRetried(TaskRetriedEvent event) { + publish( + event, + ev -> + builder() + .withData( + cloudEventData( + new TaskRetriedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType(TASK_STARTED) + .build()); + } + @Override public void onTaskCompleted(TaskCompletedEvent event) { publish( @@ -278,6 +295,10 @@ protected byte[] convert(WorkflowResumedCEData data) { return convertToBytes(data); } + protected byte[] convert(TaskRetriedCEData data) { + return convertToBytes(data); + } + protected byte[] convert(WorkflowCancelledCEData data) { return convertToBytes(data); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java new file mode 100644 index 00000000..230d161e --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskRetriedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime retriedAt) {} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java new file mode 100644 index 00000000..4de09443 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class RetryTest { + + private static WorkflowApplication app; + private MockWebServer apiServer; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().withListener(new TraceExecutionListener()).build(); + } + + @AfterAll + static void cleanup() { + app.close(); + } + + @BeforeEach + void setUp() throws IOException { + apiServer = new MockWebServer(); + apiServer.start(9797); + } + + @AfterEach + void tearDown() throws IOException { + apiServer.shutdown(); + } + + @ParameterizedTest + @ValueSource( + strings = { + "workflows-samples/try-catch-retry-inline.yaml", + "workflows-samples/try-catch-retry-reusable.yaml" + }) + void testRetry(String path) throws IOException { + final JsonNode result = JsonUtils.mapper().createObjectNode().put("name", "Javierito"); + apiServer.enqueue(new MockResponse().setResponseCode(503)); + apiServer.enqueue(new MockResponse().setResponseCode(503)); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(JsonUtils.mapper().writeValueAsString(result))); + CompletableFuture future = + app.workflowDefinition(readWorkflowFromClasspath(path)).instance(Map.of()).start(); + Awaitility.await() + .atMost(Duration.ofSeconds(6)) + .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java index 40e24cfb..679e8d56 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java @@ -69,7 +69,14 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) { ev.eventDate()); } - public void onWorkflowFailed(WorkflowFailedEvent ev) {} + public void onWorkflowFailed(WorkflowFailedEvent ev) { + logger.info( + "Workflow definition {} with id {} failed at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate(), + ev.cause()); + } public void onWorkflowCancelled(WorkflowCancelledEvent ev) {} @@ -89,7 +96,14 @@ public void onTaskCompleted(TaskCompletedEvent ev) { ev.taskContext().output().asJavaObject()); } - public void onTaskFailed(TaskFailedEvent ev) {} + public void onTaskFailed(TaskFailedEvent ev) { + logger.info( + "Task {} failed at {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().output().asJavaObject(), + ev.cause()); + } public void onTaskCancelled(TaskCancelledEvent ev) {} @@ -97,5 +111,11 @@ public void onTaskSuspended(TaskSuspendedEvent ev) {} public void onTaskResumed(TaskResumedEvent ev) {} - public void onTaskRetried(TaskRetriedEvent ev) {} + public void onTaskRetried(TaskRetriedEvent ev) { + logger.info( + "Task {} retried at {}, position {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().position()); + } } diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml new file mode 100644 index 00000000..98ab01cd --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml @@ -0,0 +1,26 @@ +document: + dsl: '1.0.0' + namespace: default + name: try-catch-retry + version: '0.1.0' +do: + - tryGetPet: + try: + - getPet: + call: http + with: + method: get + endpoint: http://localhost:9797 + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/communication + status: 503 + retry: + delay: + milliseconds: 10 + backoff: + exponential: {} + limit: + attempt: + count: 5 \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml new file mode 100644 index 00000000..62c28aa9 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml @@ -0,0 +1,30 @@ +document: + dsl: '1.0.0' + namespace: default + name: try-catch-retry + version: '0.1.0' +use: + retries: + default: + delay: + milliseconds: 10 + backoff: + exponential: {} + limit: + attempt: + count: 5 +do: + - tryGetPet: + try: + - getPet: + call: http + with: + method: get + endpoint: https://petstore.swagger.io/v2/pet/{petId} + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/communication + status: 503 + retry: default + \ No newline at end of file From 34fd929e8604459c2a2951acc40e62702f5f73d5 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 17 Oct 2025 17:42:18 +0200 Subject: [PATCH 2/2] [Fix #727] Adding persistence support Signed-off-by: fjtirado --- .../impl/WorkflowUtils.java | 17 ++++--- .../impl/persistence/CompletedTaskInfo.java | 27 +++++++++++ .../PersistenceInstanceWriter.java | 2 + .../impl/persistence/PersistenceTaskInfo.java | 10 +--- .../impl/persistence/RetriedTaskInfo.java | 18 +++++++ .../WorkflowPersistenceInstance.java | 18 ++++--- .../WorkflowPersistenceListener.java | 6 +++ .../bigmap/BigMapInstanceWriter.java | 13 +++++ .../bigmap/BytesMapInstanceReader.java | 45 +++++++++++++----- .../bigmap/BytesMapInstanceWriter.java | 15 +++++- .../persistence/bigmap/MarshallingUtils.java | 1 + .../impl/persistence/bigmap/TaskStatus.java | 21 ++++++++ impl/test/db-samples/running_v1.db | Bin 0 -> 16384 bytes impl/test/db-samples/suspended_v1.db | Bin 0 -> 12288 bytes .../impl/test/DBGenerator.java | 4 +- .../impl/test/MvStorePersistenceTest.java | 14 +++++- .../impl/test/RetryTest.java | 6 +-- .../test/TaskCounterPerInstanceListener.java | 14 ++++++ .../try-catch-retry-inline.yaml | 5 +- .../try-catch-retry-reusable.yaml | 4 +- 20 files changed, 194 insertions(+), 46 deletions(-) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java create mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java create mode 100644 impl/test/db-samples/running_v1.db create mode 100644 impl/test/db-samples/suspended_v1.db diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 45d25ede..76bcf23d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -170,12 +170,17 @@ public static boolean whenExceptTest( public static WorkflowValueResolver fromTimeoutAfter( WorkflowApplication application, TimeoutAfter timeout) { if (timeout.getDurationExpression() != null) { - return (w, f, t) -> - Duration.parse( - application - .expressionFactory() - .resolveString(ExpressionDescriptor.from(timeout.getDurationExpression())) - .apply(w, f, t)); + if (ExpressionUtils.isExpr(timeout.getDurationExpression())) { + return (w, f, t) -> + Duration.parse( + application + .expressionFactory() + .resolveString(ExpressionDescriptor.from(timeout.getDurationExpression())) + .apply(w, f, t)); + } else { + Duration duration = Duration.parse(timeout.getDurationExpression()); + return (w, f, t) -> duration; + } } else if (timeout.getDurationInline() != null) { DurationInline inlineDuration = timeout.getDurationInline(); return (w, t, f) -> diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java new file mode 100644 index 00000000..c08ef7ec --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Instant; + +public record CompletedTaskInfo( + Instant instant, + WorkflowModel model, + WorkflowModel context, + Boolean isEndNode, + String nextPosition) + implements PersistenceTaskInfo {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index ae75ef38..f6b07548 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -32,6 +32,8 @@ public interface PersistenceInstanceWriter extends AutoCloseable { void resumed(WorkflowContextData workflowContext); + void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext); + void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java index 2e6c688b..93b6526d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java @@ -15,12 +15,4 @@ */ package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.WorkflowModel; -import java.time.Instant; - -public record PersistenceTaskInfo( - Instant instant, - WorkflowModel model, - WorkflowModel context, - Boolean isEndNode, - String nextPosition) {} +public interface PersistenceTaskInfo {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java new file mode 100644 index 00000000..192fdd62 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +public record RetriedTaskInfo(short retryAttempt) implements PersistenceTaskInfo {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 335b87d0..773f606c 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -47,16 +47,20 @@ public CompletableFuture start() { @Override public void restoreContext(WorkflowContext workflow, TaskContext context) { PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer()); - if (taskInfo != null) { - context.output(taskInfo.model()); - context.completedAt(taskInfo.instant()); + if (taskInfo instanceof CompletedTaskInfo completedTaskInfo) { + context.output(completedTaskInfo.model()); + context.completedAt(completedTaskInfo.instant()); context.transition( new TransitionInfo( - taskInfo.nextPosition() == null + completedTaskInfo.nextPosition() == null ? null - : workflow.definition().taskExecutor(taskInfo.nextPosition()), - taskInfo.isEndNode())); - workflow.context(taskInfo.context()); + : workflow.definition().taskExecutor(completedTaskInfo.nextPosition()), + completedTaskInfo.isEndNode())); + workflow.context(completedTaskInfo.context()); + } else if (taskInfo instanceof RetriedTaskInfo retriedTaskInfo) { + if (context.retryAttempt() == 0) { + context.retryAttempt(retriedTaskInfo.retryAttempt()); + } } } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index d03da00d..958036fc 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -18,6 +18,7 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; @@ -75,6 +76,11 @@ public void onTaskCompleted(TaskCompletedEvent ev) { persistenceWriter.taskCompleted(ev.workflowContext(), ev.taskContext()); } + @Override + public void onTaskRetried(TaskRetriedEvent ev) { + persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); + } + public void close() { safeClose(persistenceWriter); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java index 77f96e24..8d305264 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java @@ -68,6 +68,16 @@ public void aborted(WorkflowContextData workflowContext) { @Override public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {} + @Override + public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction( + t -> + t.tasks(key(workflowContext)) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext))); + } + @Override public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { doTransaction( @@ -108,5 +118,8 @@ protected void removeProcessInstance(WorkflowContextData workflowContext) { protected abstract T marshallTaskCompleted( WorkflowContextData workflowContext, TaskContext taskContext); + protected abstract T marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext); + protected abstract S marshallStatus(WorkflowStatus status); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java index 8b90ae18..ce979708 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java @@ -19,7 +19,9 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; +import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; import java.io.ByteArrayInputStream; import java.time.Instant; @@ -36,21 +38,42 @@ public BytesMapInstanceReader( @Override protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { - buffer.readByte(); // version byte not used at the moment - Instant date = buffer.readInstant(); - WorkflowModel model = (WorkflowModel) buffer.readObject(); - WorkflowModel context = (WorkflowModel) buffer.readObject(); - Boolean isEndNode = null; - String nextPosition = null; - isEndNode = buffer.readBoolean(); - boolean hasNext = buffer.readBoolean(); - if (hasNext) { - nextPosition = buffer.readString(); + byte version = buffer.readByte(); + switch (version) { + case MarshallingUtils.VERSION_0: + default: + return readVersion0(buffer); + case MarshallingUtils.VERSION_1: + return readVersion1(buffer); } - return new PersistenceTaskInfo(date, model, context, isEndNode, nextPosition); } } + private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + default: + return readVersion0(buffer); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); + } + } + + private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { + Instant date = buffer.readInstant(); + WorkflowModel model = (WorkflowModel) buffer.readObject(); + WorkflowModel context = (WorkflowModel) buffer.readObject(); + Boolean isEndNode = null; + String nextPosition = null; + isEndNode = buffer.readBoolean(); + boolean hasNext = buffer.readBoolean(); + if (hasNext) { + nextPosition = buffer.readString(); + } + return new CompletedTaskInfo(date, model, context, isEndNode, nextPosition); + } + @Override protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java index 320d7eff..279bc093 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java @@ -40,7 +40,8 @@ public BytesMapInstanceWriter( protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); writeModel(writer, taskContext.output()); writeModel(writer, contextData.context()); @@ -82,4 +83,16 @@ protected byte[] marshallInstance(WorkflowInstanceData instance) { protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { writer.writeObject(model); } + + @Override + protected byte[] marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeEnum(TaskStatus.RETRIED); + writer.writeShort(taskContext.retryAttempt()); + } + return bytes.toByteArray(); + } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java index 2fea0224..6c38d901 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java @@ -20,4 +20,5 @@ class MarshallingUtils { private MarshallingUtils() {} public static final byte VERSION_0 = 0; + public static final byte VERSION_1 = 1; } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java new file mode 100644 index 00000000..5db1a57c --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +enum TaskStatus { + COMPLETED, + RETRIED +} diff --git a/impl/test/db-samples/running_v1.db b/impl/test/db-samples/running_v1.db new file mode 100644 index 0000000000000000000000000000000000000000..fa4c2bebeb8825fe11610f648fc7c28337226283 GIT binary patch literal 16384 zcmeI3O>f&a7{^JcaoeoxF6*Ve3`Pg+MOG+Ewk%%StZ`bb3F) z8?PwPPq1OYKEd{7b{O_CcGz*)X{S+?;~)is1Z`Giz@q~ny*zyTkreQMKt5vB=_N{x zcH{3P^9Ui1$RyXY&%7ua_+gLGUg!)|Hs+f7&LELm3}wc8Eid4nC)W^MKmZ5;0U!Vb zfB+Bx0zd!=00AHX1c1OLC-4aL|0N$GXdVy%0zd!=00AHX1b_e#00KY&2mpb%Cm@E? zbi_O-k*P^1d|&3Q;f(nq^CdBM!kSK|lxBh18bFBTLDSLkSTaheGm}{!t0H464~yLa zS_$LW_~=5fVu!&G=rdHQple>x8iWHLT@zj}7*G*9D%*F5)E&=qtupEALwB!imF(~F zWGcJ!iM+eTh50(Kz3yzjxleirQs_;tdEt5v7t@??@dLby(R;Uv_c+{r+9A(K_u*Gh zX?KfIfA4W;tG$74;)iy4hJSs3>wVOSm7B}V5KCm9KUdjkkf`TwJf0-(SoLM%9&j!R;&tjRyTGd|OKp1^s+TpDWo+O*9K#LIWt=wI~YiQNX z8JF7Bw`pM0hE1Cv7sa`!Q>$y9kMgSZ`sS83Y9iBa#?#!4p_y#?=k~3Zxvu9@*ONHc zGOcS>=jpsoT+b!-IL$Rrh0JeS-)Y)c+iKtSY`Lr~m%U!D^<_)nc9E5xuNr40=WID6 zXFOp?^YorA<*fdmo*y?c#XfG}zzpWsD}P@>tLUm3dhW*kYOyX&r>vo+X_Tgky|r?| zMxKz1TN)9*xDAA8EpD6UtttBzLZ6z2hVLV^{j0I&IW88SdH(TtW6kyFR%syx0zd!= z00AHX1b_e#00KbZ4GEmo-^YS_xz9c#eLtivaiNlWS=d@xKRu!UX{rCsQGi1Kho$~+ zS^dwc5T*Y2qcB7J(QO9y(PYFZFF`74AQk9HT0_YG6u&w((lt$|x$JM~<1QU9)kV3B3 zLC&MU?;HKyDgPlq|MrIe637MwfB+Bx0zd!=00AHX1c1QhCU8=JABhTje9k`NM*j!@ Hng0J5x`dh5 literal 0 HcmV?d00001 diff --git a/impl/test/db-samples/suspended_v1.db b/impl/test/db-samples/suspended_v1.db new file mode 100644 index 0000000000000000000000000000000000000000..1acb73558fb43fecccaaf451ce0c4a91c4fa7954 GIT binary patch literal 12288 zcmeI2&2G~`5P)~n|5D1Oy}$)UAXUO~*N$r^dnsxYsGz?jRNTLQ;QH{ z_%UaS*Qr%3Hu7GLI5loGMBHJD=8ZGscUrDA$5GBb(eZZEQF$tZVe!o<&c7*f{tz#u#Fy%WgwDy5kE|t1C;Qo?iRRz ztyj`u@CoHmCW9s|r_k^ktT^dgHK$?wo+09<*|N<}Pbv{}QyrSS{j7L>mxcShder3) zo-kiOJ<6VU*3ZnX+8*~+i7A%2UcyDE^9HZrF^q2ACDwDVva@Zuq_Rm~xVBSWb=_CH zYh(w_;-~5Ew0?h_zm1jxVM-orVj-9EK*a4vC=Se^(+$mzsPoX=V}4tT$b8n<8;Mab zkU2il@*BpPaUJkpT`ZEtx~PZ_SFIpwu5jI%!OJ}!DIM_;qp8)cjqUYvwOmS~UorUH zaW-Ndo~7A6)(dzXD3LvxK^Wc9d02}+3&WB~(CSc5MvhDjVWJS4s=Tahmp4mEe)!kO z_fb59#&nsPb}I9!bW*vLN;j3>!z2;&V%|B$WLu7>Ua#jA<&~Or@90$7u5-Iw<-c3+dliVL4r87?_LauME$k|4T1k0=wA8dAx)@?C45}MwiFX zIGWHU%FLiXo_szunN-<*wbWN`UwPP?PCqZC$cyB3Xcv5TI&}O(;dJPpS&LR7iS`bm zg`c{X9wW5=L2G^AO74Fk^yNK5NZY@D)>@n9CQ}Up2mk>f00e*l5C8%|00{gy0{@=B Y&rZ~XKKml6=RN&r_*&1@gT future = app.workflowDefinition(readWorkflowFromClasspath(path)).instance(Map.of()).start(); Awaitility.await() - .atMost(Duration.ofSeconds(6)) + .atMost(Duration.ofSeconds(1)) .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java index 19fe6217..bfd44f2c 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.test; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; @@ -27,6 +28,7 @@ public class TaskCounterPerInstanceListener implements WorkflowExecutionListener public static class TaskCounter { private int started; private int completed; + private int retried; public void incStarted() { started++; @@ -36,6 +38,10 @@ public void incCompleted() { completed++; } + public void incRetried() { + retried++; + } + public int started() { return started; } @@ -43,6 +49,10 @@ public int started() { public int completed() { return completed; } + + public int retried() { + return retried; + } } private Map taskCounter = new ConcurrentHashMap<>(); @@ -62,4 +72,8 @@ public TaskCounter taskCounter(String instanceId) { public void onTaskCompleted(TaskCompletedEvent ev) { taskCounter(ev).incCompleted(); } + + public void onTaskRetried(TaskRetriedEvent ev) { + taskCounter(ev).incRetried(); + } } diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml index 98ab01cd..cdf9aa76 100644 --- a/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml +++ b/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml @@ -15,10 +15,9 @@ do: errors: with: type: https://serverlessworkflow.io/spec/1.0.0/errors/communication - status: 503 + status: 404 retry: - delay: - milliseconds: 10 + delay: "PT0.01S" backoff: exponential: {} limit: diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml index 62c28aa9..5c743847 100644 --- a/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml +++ b/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml @@ -20,11 +20,11 @@ do: call: http with: method: get - endpoint: https://petstore.swagger.io/v2/pet/{petId} + endpoint: http://localhost:9797 catch: errors: with: type: https://serverlessworkflow.io/spec/1.0.0/errors/communication - status: 503 + status: 404 retry: default \ No newline at end of file