diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 9634c5f7..8ef68513 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -37,7 +37,7 @@ public class TaskContext implements TaskContextData { private WorkflowModel rawOutput; private Instant completedAt; private TransitionInfo transition; - private boolean completed; + private short retryAttempt; public TaskContext( WorkflowModel input, @@ -67,6 +67,7 @@ private TaskContext( this.input = input; this.output = output; this.rawOutput = rawOutput; + this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0); this.contextVariables = parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); } @@ -110,7 +111,6 @@ public WorkflowModel rawOutput() { public TaskContext output(WorkflowModel output) { this.output = output; - this.completed = true; return this; } @@ -162,7 +162,19 @@ public TaskContext transition(TransitionInfo transition) { } public boolean isCompleted() { - return completed; + return completedAt != null; + } + + public short retryAttempt() { + return retryAttempt; + } + + public void retryAttempt(short retryAttempt) { + this.retryAttempt = retryAttempt; + } + + public boolean isRetrying() { + return retryAttempt > 0; } @Override @@ -175,6 +187,8 @@ public String toString() { + taskName + ", completedAt=" + completedAt + + ", retryAttempt=" + + retryAttempt + "]"; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 8cf35e31..76bcf23d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -15,10 +15,12 @@ */ package io.serverlessworkflow.impl; +import io.serverlessworkflow.api.types.DurationInline; import io.serverlessworkflow.api.types.ExportAs; import io.serverlessworkflow.api.types.InputFrom; import io.serverlessworkflow.api.types.OutputAs; import io.serverlessworkflow.api.types.SchemaUnion; +import io.serverlessworkflow.api.types.TimeoutAfter; import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import io.serverlessworkflow.impl.expressions.ExpressionUtils; @@ -26,6 +28,7 @@ import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; import java.net.URI; +import java.time.Duration; import java.util.Map; import java.util.Optional; import org.slf4j.Logger; @@ -153,4 +156,42 @@ public static void safeClose(AutoCloseable closeable) { } } } + + public static boolean whenExceptTest( + Optional whenFilter, + Optional exceptFilter, + WorkflowContext workflow, + TaskContext taskContext, + WorkflowModel model) { + return whenFilter.map(w -> w.test(workflow, taskContext, model)).orElse(true) + && exceptFilter.map(w -> !w.test(workflow, taskContext, model)).orElse(true); + } + + public static WorkflowValueResolver fromTimeoutAfter( + WorkflowApplication application, TimeoutAfter timeout) { + if (timeout.getDurationExpression() != null) { + if (ExpressionUtils.isExpr(timeout.getDurationExpression())) { + return (w, f, t) -> + Duration.parse( + application + .expressionFactory() + .resolveString(ExpressionDescriptor.from(timeout.getDurationExpression())) + .apply(w, f, t)); + } else { + Duration duration = Duration.parse(timeout.getDurationExpression()); + return (w, f, t) -> duration; + } + } else if (timeout.getDurationInline() != null) { + DurationInline inlineDuration = timeout.getDurationInline(); + return (w, t, f) -> + Duration.ofDays(inlineDuration.getDays()) + .plus( + Duration.ofHours(inlineDuration.getHours()) + .plus(Duration.ofMinutes(inlineDuration.getMinutes())) + .plus(Duration.ofSeconds(inlineDuration.getSeconds())) + .plus(Duration.ofMillis(inlineDuration.getMilliseconds()))); + } else { + return (w, t, f) -> Duration.ZERO; + } + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 5c80adc8..30956970 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -38,6 +38,7 @@ import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.schema.SchemaValidator; @@ -204,9 +205,15 @@ public CompletableFuture apply( .thenCompose(workflowContext.instance()::suspendedCheck) .thenApply( t -> { - publishEvent( - workflowContext, - l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext))); + if (t.isRetrying()) { + publishEvent( + workflowContext, + l -> l.onTaskRetried(new TaskRetriedEvent(workflowContext, taskContext))); + } else { + publishEvent( + workflowContext, + l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext))); + } inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); inputProcessor.ifPresent( p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index d7d7cf0b..a071d996 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -17,6 +17,9 @@ import io.serverlessworkflow.api.types.CatchErrors; import io.serverlessworkflow.api.types.ErrorFilter; +import io.serverlessworkflow.api.types.Retry; +import io.serverlessworkflow.api.types.RetryBackoff; +import io.serverlessworkflow.api.types.RetryPolicy; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.api.types.TryTaskCatch; @@ -29,6 +32,12 @@ import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.executors.retry.ConstantRetryIntervalFunction; +import io.serverlessworkflow.impl.executors.retry.DefaultRetryExecutor; +import io.serverlessworkflow.impl.executors.retry.ExponentialRetryIntervalFunction; +import io.serverlessworkflow.impl.executors.retry.LinearRetryIntervalFunction; +import io.serverlessworkflow.impl.executors.retry.RetryExecutor; +import io.serverlessworkflow.impl.executors.retry.RetryIntervalFunction; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -42,6 +51,7 @@ public class TryExecutor extends RegularTaskExecutor { private final Optional> errorFilter; private final TaskExecutor taskExecutor; private final Optional> catchTaskExecutor; + private final Optional retryIntervalExecutor; public static class TryExecutorBuilder extends RegularTaskExecutorBuilder { @@ -50,6 +60,7 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder> errorFilter; private final TaskExecutor taskExecutor; private final Optional> catchTaskExecutor; + private final Optional retryIntervalExecutor; protected TryExecutorBuilder( WorkflowMutablePosition position, TryTask task, WorkflowDefinition definition) { @@ -60,13 +71,63 @@ protected TryExecutorBuilder( this.exceptFilter = WorkflowUtils.optionalPredicate(application, catchInfo.getExceptWhen()); this.taskExecutor = TaskExecutorHelper.createExecutorList(position, task.getTry(), definition); - List catchTask = task.getCatch().getDo(); - this.catchTaskExecutor = - catchTask != null && !catchTask.isEmpty() - ? Optional.of( - TaskExecutorHelper.createExecutorList( - position, task.getCatch().getDo(), definition)) - : Optional.empty(); + TryTaskCatch catchTask = task.getCatch(); + if (catchTask != null) { + List catchTaskDo = catchTask.getDo(); + + this.catchTaskExecutor = + catchTaskDo != null && !catchTaskDo.isEmpty() + ? Optional.of( + TaskExecutorHelper.createExecutorList(position, catchTaskDo, definition)) + : Optional.empty(); + + Retry retry = catchTask.getRetry(); + this.retryIntervalExecutor = retry != null ? buildRetryInterval(retry) : Optional.empty(); + } else { + this.catchTaskExecutor = Optional.empty(); + this.retryIntervalExecutor = Optional.empty(); + } + } + + private Optional buildRetryInterval(Retry retry) { + RetryPolicy retryPolicy = null; + if (retry.getRetryPolicyDefinition() != null) { + retryPolicy = retry.getRetryPolicyDefinition(); + } else if (retry.getRetryPolicyReference() != null) { + retryPolicy = + workflow + .getUse() + .getRetries() + .getAdditionalProperties() + .get(retry.getRetryPolicyReference()); + if (retryPolicy == null) { + throw new IllegalStateException("Retry policy " + retryPolicy + " was not found"); + } + } + return retryPolicy != null ? Optional.of(buildRetryExecutor(retryPolicy)) : Optional.empty(); + } + + protected RetryExecutor buildRetryExecutor(RetryPolicy retryPolicy) { + return new DefaultRetryExecutor( + retryPolicy.getLimit().getAttempt().getCount(), + buildIntervalFunction(retryPolicy), + WorkflowUtils.optionalPredicate(application, retryPolicy.getWhen()), + WorkflowUtils.optionalPredicate(application, retryPolicy.getExceptWhen())); + } + + private RetryIntervalFunction buildIntervalFunction(RetryPolicy retryPolicy) { + RetryBackoff backoff = retryPolicy.getBackoff(); + if (backoff.getConstantBackoff() != null) { + return new ConstantRetryIntervalFunction( + application, retryPolicy.getDelay(), retryPolicy.getJitter()); + } else if (backoff.getLinearBackoff() != null) { + return new LinearRetryIntervalFunction( + application, retryPolicy.getDelay(), retryPolicy.getJitter()); + } else if (backoff.getExponentialBackOff() != null) { + return new ExponentialRetryIntervalFunction( + application, retryPolicy.getDelay(), retryPolicy.getJitter()); + } + throw new IllegalStateException("A backoff strategy should be set"); } @Override @@ -82,13 +143,19 @@ protected TryExecutor(TryExecutorBuilder builder) { this.exceptFilter = builder.exceptFilter; this.taskExecutor = builder.taskExecutor; this.catchTaskExecutor = builder.catchTaskExecutor; + this.retryIntervalExecutor = builder.retryIntervalExecutor; } @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { + return doIt(workflow, taskContext, taskContext.input()); + } + + private CompletableFuture doIt( + WorkflowContext workflow, TaskContext taskContext, WorkflowModel model) { return TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), taskContext.input()) + taskExecutor, workflow, Optional.of(taskContext), model) .exceptionallyCompose(e -> handleException(e, workflow, taskContext)); } @@ -99,17 +166,27 @@ private CompletableFuture handleException( } if (e instanceof WorkflowException) { WorkflowException exception = (WorkflowException) e; + CompletableFuture completable = + CompletableFuture.completedFuture(taskContext.rawOutput()); if (errorFilter.map(f -> f.test(exception.getWorkflowError())).orElse(true) - && whenFilter.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true) - && exceptFilter - .map(w -> !w.test(workflow, taskContext, taskContext.input())) - .orElse(true)) { + && WorkflowUtils.whenExceptTest( + whenFilter, exceptFilter, workflow, taskContext, taskContext.rawOutput())) { if (catchTaskExecutor.isPresent()) { - return TaskExecutorHelper.processTaskList( - catchTaskExecutor.get(), workflow, Optional.of(taskContext), taskContext.input()); + completable = + completable.thenCompose( + model -> + TaskExecutorHelper.processTaskList( + catchTaskExecutor.get(), workflow, Optional.of(taskContext), model)); + } + if (retryIntervalExecutor.isPresent()) { + completable = + completable + .thenCompose( + model -> retryIntervalExecutor.get().retry(workflow, taskContext, model)) + .thenCompose(model -> doIt(workflow, taskContext, model)); } } - return CompletableFuture.completedFuture(taskContext.rawOutput()); + return completable; } else { if (e instanceof RuntimeException) { throw (RuntimeException) e; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java new file mode 100644 index 00000000..8779f444 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/AbstractRetryIntervalFunction.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.time.Duration; +import java.util.Optional; + +public abstract class AbstractRetryIntervalFunction implements RetryIntervalFunction { + + private final Optional> minJitteringResolver; + private final Optional> maxJitteringResolver; + private final WorkflowValueResolver delayResolver; + + public AbstractRetryIntervalFunction( + WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) { + if (jitter != null) { + minJitteringResolver = Optional.of(WorkflowUtils.fromTimeoutAfter(appl, jitter.getFrom())); + maxJitteringResolver = Optional.of(WorkflowUtils.fromTimeoutAfter(appl, jitter.getTo())); + } else { + minJitteringResolver = Optional.empty(); + maxJitteringResolver = Optional.empty(); + } + delayResolver = WorkflowUtils.fromTimeoutAfter(appl, delay); + } + + @Override + public Duration apply( + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model, + short numAttempts) { + Duration delay = delayResolver.apply(workflowContext, taskContext, model); + Duration minJittering = + minJitteringResolver + .map(min -> min.apply(workflowContext, taskContext, model)) + .orElse(Duration.ZERO); + Duration maxJittering = + maxJitteringResolver + .map(max -> max.apply(workflowContext, taskContext, model)) + .orElse(Duration.ZERO); + return calcDelay(delay, numAttempts) + .plus( + Duration.ofMillis( + (long) (minJittering.toMillis() + Math.random() * maxJittering.toMillis()))); + } + + protected abstract Duration calcDelay(Duration delay, short numAttempts); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java new file mode 100644 index 00000000..e7746fbb --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ConstantRetryIntervalFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.time.Duration; + +public class ConstantRetryIntervalFunction extends AbstractRetryIntervalFunction { + + public ConstantRetryIntervalFunction( + WorkflowApplication application, TimeoutAfter delay, RetryPolicyJitter jitter) { + super(application, delay, jitter); + } + + @Override + protected Duration calcDelay(Duration delay, short numAttempts) { + return delay; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java new file mode 100644 index 00000000..fcfb83c8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java @@ -0,0 +1,62 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowPredicate; +import io.serverlessworkflow.impl.WorkflowUtils; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class DefaultRetryExecutor implements RetryExecutor { + + private final int maxAttempts; + private final RetryIntervalFunction intervalFunction; + private final Optional whenFilter; + private final Optional exceptFilter; + + public DefaultRetryExecutor( + int maxAttempts, + RetryIntervalFunction intervalFunction, + Optional whenFilter, + Optional exceptFilter) { + this.maxAttempts = maxAttempts; + this.intervalFunction = intervalFunction; + this.whenFilter = whenFilter; + this.exceptFilter = exceptFilter; + } + + @Override + public CompletableFuture retry( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) { + CompletableFuture completable = new CompletableFuture<>(); + short numAttempts = taskContext.retryAttempt(); + if (numAttempts++ < maxAttempts + && WorkflowUtils.whenExceptTest( + whenFilter, exceptFilter, workflowContext, taskContext, model)) { + taskContext.retryAttempt(numAttempts); + Duration delay = intervalFunction.apply(workflowContext, taskContext, model, numAttempts); + completable.completeOnTimeout(model, delay.toMillis(), TimeUnit.MILLISECONDS); + } else { + completable.complete(model); + } + return completable; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java new file mode 100644 index 00000000..fff1d8c8 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/ExponentialRetryIntervalFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.time.Duration; + +public class ExponentialRetryIntervalFunction extends AbstractRetryIntervalFunction { + + public ExponentialRetryIntervalFunction( + WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) { + super(appl, delay, jitter); + } + + @Override + protected Duration calcDelay(Duration delay, short numAttempts) { + return delay.multipliedBy(1 << numAttempts); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java new file mode 100644 index 00000000..f2713cf4 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/LinearRetryIntervalFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.api.types.RetryPolicyJitter; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.time.Duration; + +public class LinearRetryIntervalFunction extends AbstractRetryIntervalFunction { + + public LinearRetryIntervalFunction( + WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) { + super(appl, delay, jitter); + } + + @Override + protected Duration calcDelay(Duration delay, short numAttempts) { + return delay.multipliedBy(numAttempts); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java new file mode 100644 index 00000000..ff3bd7d3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.concurrent.CompletableFuture; + +public interface RetryExecutor { + CompletableFuture retry( + WorkflowContext worfklowContext, TaskContext taskContext, WorkflowModel model); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java new file mode 100644 index 00000000..b0e06d92 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryIntervalFunction.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.retry; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Duration; + +public interface RetryIntervalFunction { + Duration apply( + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model, + short numAttempts); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index 374c6751..1d80665d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -31,6 +31,7 @@ import io.serverlessworkflow.impl.lifecycle.TaskEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; @@ -54,6 +55,7 @@ public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionLis private static final String TASK_RESUMED = "io.serverlessworkflow.task.resumed.v1"; private static final String TASK_FAULTED = "io.serverlessworkflow.task.faulted.v1"; private static final String TASK_CANCELLED = "io.serverlessworkflow.task.cancelled.v1"; + private static final String TASK_RETRIED = "io.serverlessworkflow.task.retried.v1"; private static final String WORKFLOW_STARTED = "io.serverlessworkflow.workflow.started.v1"; private static final String WORKFLOW_COMPLETED = "io.serverlessworkflow.workflow.completed.v1"; @@ -70,6 +72,7 @@ public static Collection getLifeCycleTypes() { TASK_RESUMED, TASK_FAULTED, TASK_CANCELLED, + TASK_RETRIED, WORKFLOW_STARTED, WORKFLOW_COMPLETED, WORKFLOW_SUSPENDED, @@ -92,6 +95,20 @@ public void onTaskStarted(TaskStartedEvent event) { .build()); } + @Override + public void onTaskRetried(TaskRetriedEvent event) { + publish( + event, + ev -> + builder() + .withData( + cloudEventData( + new TaskRetriedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType(TASK_STARTED) + .build()); + } + @Override public void onTaskCompleted(TaskCompletedEvent event) { publish( @@ -278,6 +295,10 @@ protected byte[] convert(WorkflowResumedCEData data) { return convertToBytes(data); } + protected byte[] convert(TaskRetriedCEData data) { + return convertToBytes(data); + } + protected byte[] convert(WorkflowCancelledCEData data) { return convertToBytes(data); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java new file mode 100644 index 00000000..230d161e --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskRetriedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime retriedAt) {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java new file mode 100644 index 00000000..c08ef7ec --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CompletedTaskInfo.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Instant; + +public record CompletedTaskInfo( + Instant instant, + WorkflowModel model, + WorkflowModel context, + Boolean isEndNode, + String nextPosition) + implements PersistenceTaskInfo {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index ae75ef38..f6b07548 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -32,6 +32,8 @@ public interface PersistenceInstanceWriter extends AutoCloseable { void resumed(WorkflowContextData workflowContext); + void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext); + void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java index 2e6c688b..93b6526d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java @@ -15,12 +15,4 @@ */ package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.WorkflowModel; -import java.time.Instant; - -public record PersistenceTaskInfo( - Instant instant, - WorkflowModel model, - WorkflowModel context, - Boolean isEndNode, - String nextPosition) {} +public interface PersistenceTaskInfo {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java new file mode 100644 index 00000000..192fdd62 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/RetriedTaskInfo.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +public record RetriedTaskInfo(short retryAttempt) implements PersistenceTaskInfo {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 335b87d0..773f606c 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -47,16 +47,20 @@ public CompletableFuture start() { @Override public void restoreContext(WorkflowContext workflow, TaskContext context) { PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer()); - if (taskInfo != null) { - context.output(taskInfo.model()); - context.completedAt(taskInfo.instant()); + if (taskInfo instanceof CompletedTaskInfo completedTaskInfo) { + context.output(completedTaskInfo.model()); + context.completedAt(completedTaskInfo.instant()); context.transition( new TransitionInfo( - taskInfo.nextPosition() == null + completedTaskInfo.nextPosition() == null ? null - : workflow.definition().taskExecutor(taskInfo.nextPosition()), - taskInfo.isEndNode())); - workflow.context(taskInfo.context()); + : workflow.definition().taskExecutor(completedTaskInfo.nextPosition()), + completedTaskInfo.isEndNode())); + workflow.context(completedTaskInfo.context()); + } else if (taskInfo instanceof RetriedTaskInfo retriedTaskInfo) { + if (context.retryAttempt() == 0) { + context.retryAttempt(retriedTaskInfo.retryAttempt()); + } } } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index d03da00d..958036fc 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -18,6 +18,7 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; @@ -75,6 +76,11 @@ public void onTaskCompleted(TaskCompletedEvent ev) { persistenceWriter.taskCompleted(ev.workflowContext(), ev.taskContext()); } + @Override + public void onTaskRetried(TaskRetriedEvent ev) { + persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); + } + public void close() { safeClose(persistenceWriter); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java index 77f96e24..8d305264 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java @@ -68,6 +68,16 @@ public void aborted(WorkflowContextData workflowContext) { @Override public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {} + @Override + public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction( + t -> + t.tasks(key(workflowContext)) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext))); + } + @Override public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { doTransaction( @@ -108,5 +118,8 @@ protected void removeProcessInstance(WorkflowContextData workflowContext) { protected abstract T marshallTaskCompleted( WorkflowContextData workflowContext, TaskContext taskContext); + protected abstract T marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext); + protected abstract S marshallStatus(WorkflowStatus status); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java index 8b90ae18..ce979708 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java @@ -19,7 +19,9 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; +import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; import java.io.ByteArrayInputStream; import java.time.Instant; @@ -36,21 +38,42 @@ public BytesMapInstanceReader( @Override protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { - buffer.readByte(); // version byte not used at the moment - Instant date = buffer.readInstant(); - WorkflowModel model = (WorkflowModel) buffer.readObject(); - WorkflowModel context = (WorkflowModel) buffer.readObject(); - Boolean isEndNode = null; - String nextPosition = null; - isEndNode = buffer.readBoolean(); - boolean hasNext = buffer.readBoolean(); - if (hasNext) { - nextPosition = buffer.readString(); + byte version = buffer.readByte(); + switch (version) { + case MarshallingUtils.VERSION_0: + default: + return readVersion0(buffer); + case MarshallingUtils.VERSION_1: + return readVersion1(buffer); } - return new PersistenceTaskInfo(date, model, context, isEndNode, nextPosition); } } + private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + default: + return readVersion0(buffer); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); + } + } + + private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { + Instant date = buffer.readInstant(); + WorkflowModel model = (WorkflowModel) buffer.readObject(); + WorkflowModel context = (WorkflowModel) buffer.readObject(); + Boolean isEndNode = null; + String nextPosition = null; + isEndNode = buffer.readBoolean(); + boolean hasNext = buffer.readBoolean(); + if (hasNext) { + nextPosition = buffer.readString(); + } + return new CompletedTaskInfo(date, model, context, isEndNode, nextPosition); + } + @Override protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java index 320d7eff..279bc093 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java @@ -40,7 +40,8 @@ public BytesMapInstanceWriter( protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); writeModel(writer, taskContext.output()); writeModel(writer, contextData.context()); @@ -82,4 +83,16 @@ protected byte[] marshallInstance(WorkflowInstanceData instance) { protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { writer.writeObject(model); } + + @Override + protected byte[] marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeEnum(TaskStatus.RETRIED); + writer.writeShort(taskContext.retryAttempt()); + } + return bytes.toByteArray(); + } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java index 2fea0224..6c38d901 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java @@ -20,4 +20,5 @@ class MarshallingUtils { private MarshallingUtils() {} public static final byte VERSION_0 = 0; + public static final byte VERSION_1 = 1; } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java new file mode 100644 index 00000000..5db1a57c --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +enum TaskStatus { + COMPLETED, + RETRIED +} diff --git a/impl/test/db-samples/running_v1.db b/impl/test/db-samples/running_v1.db new file mode 100644 index 00000000..fa4c2beb Binary files /dev/null and b/impl/test/db-samples/running_v1.db differ diff --git a/impl/test/db-samples/suspended_v1.db b/impl/test/db-samples/suspended_v1.db new file mode 100644 index 00000000..1acb7355 Binary files /dev/null and b/impl/test/db-samples/suspended_v1.db differ diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 1d4f41cf..706f5277 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -32,8 +32,8 @@ public class DBGenerator { public static void main(String[] args) throws IOException { - runInstance("db-samples/running.db", false); - runInstance("db-samples/suspended.db", true); + runInstance("db-samples/running_v1.db", false); + runInstance("db-samples/suspended_v1.db", true); } private static void runInstance(String dbName, boolean suspend) throws IOException { diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index bd591e3f..d814a25f 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -71,15 +71,25 @@ void testWaitingInstance() throws IOException { } @Test - void testRestoreWaitingInstance() throws IOException { + void testRestoreWaitingInstanceV0() throws IOException { runIt("db-samples/running.db", WorkflowStatus.WAITING); } @Test - void testRestoreSuspendedInstance() throws IOException { + void testRestoreSuspendedInstanceV0() throws IOException { runIt("db-samples/suspended.db", WorkflowStatus.SUSPENDED); } + @Test + void testRestoreWaitingInstanceV1() throws IOException { + runIt("db-samples/running_v1.db", WorkflowStatus.WAITING); + } + + @Test + void testRestoreSuspendedInstanceV1() throws IOException { + runIt("db-samples/suspended_v1.db", WorkflowStatus.SUSPENDED); + } + private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); try (PersistenceInstanceHandlers handlers = diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java new file mode 100644 index 00000000..a963c9f8 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class RetryTest { + + private static WorkflowApplication app; + private MockWebServer apiServer; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().withListener(new TraceExecutionListener()).build(); + } + + @AfterAll + static void cleanup() { + app.close(); + } + + @BeforeEach + void setUp() throws IOException { + apiServer = new MockWebServer(); + apiServer.start(9797); + } + + @AfterEach + void tearDown() throws IOException { + apiServer.shutdown(); + } + + @ParameterizedTest + @ValueSource( + strings = { + "workflows-samples/try-catch-retry-inline.yaml", + "workflows-samples/try-catch-retry-reusable.yaml" + }) + void testRetry(String path) throws IOException { + final JsonNode result = JsonUtils.mapper().createObjectNode().put("name", "Javierito"); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(JsonUtils.mapper().writeValueAsString(result))); + CompletableFuture future = + app.workflowDefinition(readWorkflowFromClasspath(path)).instance(Map.of()).start(); + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java index 19fe6217..bfd44f2c 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.test; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; @@ -27,6 +28,7 @@ public class TaskCounterPerInstanceListener implements WorkflowExecutionListener public static class TaskCounter { private int started; private int completed; + private int retried; public void incStarted() { started++; @@ -36,6 +38,10 @@ public void incCompleted() { completed++; } + public void incRetried() { + retried++; + } + public int started() { return started; } @@ -43,6 +49,10 @@ public int started() { public int completed() { return completed; } + + public int retried() { + return retried; + } } private Map taskCounter = new ConcurrentHashMap<>(); @@ -62,4 +72,8 @@ public TaskCounter taskCounter(String instanceId) { public void onTaskCompleted(TaskCompletedEvent ev) { taskCounter(ev).incCompleted(); } + + public void onTaskRetried(TaskRetriedEvent ev) { + taskCounter(ev).incRetried(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java index 40e24cfb..679e8d56 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java @@ -69,7 +69,14 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) { ev.eventDate()); } - public void onWorkflowFailed(WorkflowFailedEvent ev) {} + public void onWorkflowFailed(WorkflowFailedEvent ev) { + logger.info( + "Workflow definition {} with id {} failed at {}", + ev.workflowContext().definition().workflow().getDocument().getName(), + ev.workflowContext().instanceData().id(), + ev.eventDate(), + ev.cause()); + } public void onWorkflowCancelled(WorkflowCancelledEvent ev) {} @@ -89,7 +96,14 @@ public void onTaskCompleted(TaskCompletedEvent ev) { ev.taskContext().output().asJavaObject()); } - public void onTaskFailed(TaskFailedEvent ev) {} + public void onTaskFailed(TaskFailedEvent ev) { + logger.info( + "Task {} failed at {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().output().asJavaObject(), + ev.cause()); + } public void onTaskCancelled(TaskCancelledEvent ev) {} @@ -97,5 +111,11 @@ public void onTaskSuspended(TaskSuspendedEvent ev) {} public void onTaskResumed(TaskResumedEvent ev) {} - public void onTaskRetried(TaskRetriedEvent ev) {} + public void onTaskRetried(TaskRetriedEvent ev) { + logger.info( + "Task {} retried at {}, position {}", + ev.taskContext().taskName(), + ev.eventDate(), + ev.taskContext().position()); + } } diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml new file mode 100644 index 00000000..cdf9aa76 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/try-catch-retry-inline.yaml @@ -0,0 +1,25 @@ +document: + dsl: '1.0.0' + namespace: default + name: try-catch-retry + version: '0.1.0' +do: + - tryGetPet: + try: + - getPet: + call: http + with: + method: get + endpoint: http://localhost:9797 + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/communication + status: 404 + retry: + delay: "PT0.01S" + backoff: + exponential: {} + limit: + attempt: + count: 5 \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml b/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml new file mode 100644 index 00000000..5c743847 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/try-catch-retry-reusable.yaml @@ -0,0 +1,30 @@ +document: + dsl: '1.0.0' + namespace: default + name: try-catch-retry + version: '0.1.0' +use: + retries: + default: + delay: + milliseconds: 10 + backoff: + exponential: {} + limit: + attempt: + count: 5 +do: + - tryGetPet: + try: + - getPet: + call: http + with: + method: get + endpoint: http://localhost:9797 + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/communication + status: 404 + retry: default + \ No newline at end of file