diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java index a23c51dd..bb44f3e6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java @@ -56,6 +56,10 @@ public static Builder runtime(TaskContext context, Exception ex) { return runtime(Errors.RUNTIME.status(), context, ex); } + public static Builder timeout() { + return error(Errors.TIMEOUT.toString(), Errors.TIMEOUT.status()); + } + public static class Builder { private final String type; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 30956970..055cbba6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -24,17 +24,23 @@ import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TaskTimeout; +import io.serverlessworkflow.api.types.Timeout; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; @@ -42,13 +48,16 @@ import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.schema.SchemaValidator; +import java.time.Duration; import java.time.Instant; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; public abstract class AbstractTaskExecutor implements TaskExecutor { @@ -62,6 +71,7 @@ public abstract class AbstractTaskExecutor implements TaskEx private final Optional outputSchemaValidator; private final Optional contextSchemaValidator; private final Optional ifFilter; + private final Optional> timeout; public abstract static class AbstractTaskExecutorBuilder< T extends TaskBase, V extends AbstractTaskExecutor> @@ -80,6 +90,7 @@ public abstract static class AbstractTaskExecutorBuilder< protected final Workflow workflow; protected final ResourceLoader resourceLoader; private final WorkflowDefinition definition; + private final Optional> timeout; private V instance; @@ -113,6 +124,28 @@ protected AbstractTaskExecutorBuilder( getSchemaValidator(application.validatorFactory(), resourceLoader, export.getSchema()); } this.ifFilter = application.expressionFactory().buildIfFilter(task); + this.timeout = getTaskTimeout(); + } + + private Optional> getTaskTimeout() { + TaskTimeout timeout = task.getTimeout(); + if (timeout == null) { + return Optional.empty(); + } + Timeout timeoutDef = timeout.getTaskTimeoutDefinition(); + if (timeoutDef == null && timeout.getTaskTimeoutReference() != null) { + timeoutDef = + Objects.requireNonNull( + Objects.requireNonNull( + workflow.getUse().getTimeouts(), + "Timeout reference " + + timeout.getTaskTimeoutReference() + + " specified, but use timeout was not defined") + .getAdditionalProperties() + .get(timeout.getTaskTimeoutReference()), + "Timeout reference " + timeout.getTaskTimeoutReference() + "cannot be found"); + } + return Optional.of(WorkflowUtils.fromTimeoutAfter(application, timeoutDef.getAfter())); } protected final TransitionInfoBuilder next( @@ -171,6 +204,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder builder) { this.outputSchemaValidator = builder.outputSchemaValidator; this.contextSchemaValidator = builder.contextSchemaValidator; this.ifFilter = builder.ifFilter; + this.timeout = builder.timeout; } protected final CompletableFuture executeNext( @@ -200,7 +234,7 @@ public CompletableFuture apply( } else if (taskContext.isCompleted()) { return executeNext(completable, workflowContext); } else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) { - return executeNext( + completable = completable .thenCompose(workflowContext.instance()::suspendedCheck) .thenApply( @@ -247,8 +281,26 @@ public CompletableFuture apply( l.onTaskCompleted( new TaskCompletedEvent(workflowContext, taskContext))); return t; - }), - workflowContext); + }); + if (timeout.isPresent()) { + completable = + completable + .orTimeout( + timeout + .map(t -> t.apply(workflowContext, taskContext, input)) + .orElseThrow() + .toMillis(), + TimeUnit.MILLISECONDS) + .exceptionallyCompose( + e -> + CompletableFuture.failedFuture( + new WorkflowException( + WorkflowError.timeout() + .instance(taskContext.position().jsonPointer()) + .build(), + e))); + } + return executeNext(completable, workflowContext); } else { taskContext.transition(getSkipTransition()); return executeNext(completable, workflowContext); diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java similarity index 87% rename from impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java rename to impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index 76846272..586cc018 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -16,7 +16,9 @@ package io.serverlessworkflow.impl.test; import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertThat; import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.impl.WorkflowApplication; @@ -38,7 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class RetryTest { +public class RetryTimeoutTest { private static WorkflowApplication app; private MockWebServer apiServer; @@ -106,4 +108,17 @@ void testRetryEnd() throws IOException { .join()) .hasCauseInstanceOf(WorkflowException.class); } + + @Test + void testTimeout() throws IOException { + Map result = + app.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/listen-to-one-timeout.yaml")) + .instance(Map.of("delay", 0.01f)) + .start() + .join() + .asMap() + .orElseThrow(); + assertThat(result.get("message")).isEqualTo("Viva er Beti Balompie"); + } } diff --git a/impl/test/src/test/resources/workflows-samples/listen-to-one-timeout.yaml b/impl/test/src/test/resources/workflows-samples/listen-to-one-timeout.yaml new file mode 100644 index 00000000..2f3845a9 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/listen-to-one-timeout.yaml @@ -0,0 +1,26 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: listen-to-one-timeout + version: '0.1.0' +do: + - tryListen: + try: + - waitingNotForever: + listen: + to: + one: + with: + type: neven-happening-event + timeout: + after: ${"PT\(.delay)S"} + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/timeout + status: 408 + do: + - setMessage: + set: + message: Viva er Beti Balompie + \ No newline at end of file