Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public static Builder runtime(TaskContext context, Exception ex) {
return runtime(Errors.RUNTIME.status(), context, ex);
}

public static Builder timeout() {
return error(Errors.TIMEOUT.toString(), Errors.TIMEOUT.status());
}

public static class Builder {

private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,40 @@
import io.serverlessworkflow.api.types.Input;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.TaskTimeout;
import io.serverlessworkflow.api.types.Timeout;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowError;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowFilter;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowPosition;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.WorkflowStatus;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {

Expand All @@ -62,6 +71,7 @@ public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskEx
private final Optional<SchemaValidator> outputSchemaValidator;
private final Optional<SchemaValidator> contextSchemaValidator;
private final Optional<WorkflowPredicate> ifFilter;
private final Optional<WorkflowValueResolver<Duration>> timeout;

public abstract static class AbstractTaskExecutorBuilder<
T extends TaskBase, V extends AbstractTaskExecutor<T>>
Expand All @@ -80,6 +90,7 @@ public abstract static class AbstractTaskExecutorBuilder<
protected final Workflow workflow;
protected final ResourceLoader resourceLoader;
private final WorkflowDefinition definition;
private final Optional<WorkflowValueResolver<Duration>> timeout;

private V instance;

Expand Down Expand Up @@ -113,6 +124,28 @@ protected AbstractTaskExecutorBuilder(
getSchemaValidator(application.validatorFactory(), resourceLoader, export.getSchema());
}
this.ifFilter = application.expressionFactory().buildIfFilter(task);
this.timeout = getTaskTimeout();
}

private Optional<WorkflowValueResolver<Duration>> getTaskTimeout() {
TaskTimeout timeout = task.getTimeout();
if (timeout == null) {
return Optional.empty();
}
Timeout timeoutDef = timeout.getTaskTimeoutDefinition();
if (timeoutDef == null && timeout.getTaskTimeoutReference() != null) {
timeoutDef =
Objects.requireNonNull(
Objects.requireNonNull(
workflow.getUse().getTimeouts(),
"Timeout reference "
+ timeout.getTaskTimeoutReference()
+ " specified, but use timeout was not defined")
.getAdditionalProperties()
.get(timeout.getTaskTimeoutReference()),
"Timeout reference " + timeout.getTaskTimeoutReference() + "cannot be found");
}
return Optional.of(WorkflowUtils.fromTimeoutAfter(application, timeoutDef.getAfter()));
}

protected final TransitionInfoBuilder next(
Expand Down Expand Up @@ -171,6 +204,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
this.outputSchemaValidator = builder.outputSchemaValidator;
this.contextSchemaValidator = builder.contextSchemaValidator;
this.ifFilter = builder.ifFilter;
this.timeout = builder.timeout;
}

protected final CompletableFuture<TaskContext> executeNext(
Expand Down Expand Up @@ -200,7 +234,7 @@ public CompletableFuture<TaskContext> apply(
} else if (taskContext.isCompleted()) {
return executeNext(completable, workflowContext);
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
return executeNext(
completable =
completable
.thenCompose(workflowContext.instance()::suspendedCheck)
.thenApply(
Expand Down Expand Up @@ -247,8 +281,26 @@ public CompletableFuture<TaskContext> apply(
l.onTaskCompleted(
new TaskCompletedEvent(workflowContext, taskContext)));
return t;
}),
workflowContext);
});
if (timeout.isPresent()) {
completable =
completable
.orTimeout(
timeout
.map(t -> t.apply(workflowContext, taskContext, input))
.orElseThrow()
.toMillis(),
TimeUnit.MILLISECONDS)
.exceptionallyCompose(
e ->
CompletableFuture.failedFuture(
new WorkflowException(
WorkflowError.timeout()
.instance(taskContext.position().jsonPointer())
.build(),
e)));
}
return executeNext(completable, workflowContext);
} else {
taskContext.transition(getSkipTransition());
return executeNext(completable, workflowContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.serverlessworkflow.impl.test;

import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertThat;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.impl.WorkflowApplication;
Expand All @@ -38,7 +40,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class RetryTest {
public class RetryTimeoutTest {

private static WorkflowApplication app;
private MockWebServer apiServer;
Expand Down Expand Up @@ -106,4 +108,17 @@ void testRetryEnd() throws IOException {
.join())
.hasCauseInstanceOf(WorkflowException.class);
}

@Test
void testTimeout() throws IOException {
Map<String, Object> result =
app.workflowDefinition(
readWorkflowFromClasspath("workflows-samples/listen-to-one-timeout.yaml"))
.instance(Map.of("delay", 0.01f))
.start()
.join()
.asMap()
.orElseThrow();
assertThat(result.get("message")).isEqualTo("Viva er Beti Balompie");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
document:
dsl: '1.0.0-alpha5'
namespace: test
name: listen-to-one-timeout
version: '0.1.0'
do:
- tryListen:
try:
- waitingNotForever:
listen:
to:
one:
with:
type: neven-happening-event
timeout:
after: ${"PT\(.delay)S"}
catch:
errors:
with:
type: https://serverlessworkflow.io/spec/1.0.0/errors/timeout
status: 408
do:
- setMessage:
set:
message: Viva er Beti Balompie