Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TaskContext implements TaskContextData {
private short retryAttempt;
private int iteration;
private AuthorizationDescriptor authorization;
private Optional<Short> tryRetryCount = Optional.empty();

public TaskContext(
WorkflowModel input,
Expand Down Expand Up @@ -69,7 +70,8 @@ private TaskContext(
this.input = input;
this.output = output;
this.rawOutput = rawOutput;
this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0);
this.retryAttempt =
parentContext.map(ctx -> ctx.tryRetryCount.orElse(ctx.retryAttempt())).orElse((short) 0);
this.contextVariables =
parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new);
}
Expand Down Expand Up @@ -179,6 +181,14 @@ public void retryAttempt(short retryAttempt) {
this.retryAttempt = retryAttempt;
}

public void tryRetryCount(short tryRetryCount) {
this.tryRetryCount = Optional.of(tryRetryCount);
}

public Optional<Short> tryRetryCount() {
return tryRetryCount;
}

public boolean isRetrying() {
return retryAttempt > 0;
}
Expand All @@ -194,16 +204,19 @@ public void iteration(int iteration) {

@Override
public String toString() {
return "TaskContext [position="
+ position
+ ", startedAt="
+ startedAt
+ ", taskName="
+ taskName
+ ", completedAt="
+ completedAt
+ ", retryAttempt="
+ retryAttempt
+ "]";
StringBuilder sb =
new StringBuilder(
"TaskContext [position="
+ position
+ ", startedAt="
+ startedAt
+ ", taskName="
+ taskName
+ ", completedAt="
+ completedAt
+ ", retryAttempt="
+ retryAttempt);
tryRetryCount.ifPresent(s -> sb.append(", tryRetryCount=").append(s));
return sb.append("]").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ protected CompletableFuture<WorkflowModel> internalExecute(

private CompletableFuture<WorkflowModel> doIt(
WorkflowContext workflow, TaskContext taskContext, WorkflowModel model) {
retryIntervalExecutor.ifPresent(r -> r.init(workflow, taskContext, model));
return TaskExecutorHelper.processTaskList(
taskExecutor, workflow, Optional.of(taskContext), model)
.exceptionallyCompose(e -> handleException(e, workflow, taskContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,23 @@ public DefaultRetryExecutor(
@Override
public Optional<CompletableFuture<WorkflowModel>> retry(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
short numAttempts = taskContext.retryAttempt();
short numAttempts = taskContext.tryRetryCount().orElseThrow();
if (numAttempts++ < maxAttempts
&& WorkflowUtils.whenExceptTest(
whenFilter, exceptFilter, workflowContext, taskContext, model)) {
taskContext.retryAttempt(numAttempts);
taskContext.tryRetryCount(numAttempts);
Duration delay = intervalFunction.apply(workflowContext, taskContext, model, numAttempts);
CompletableFuture<WorkflowModel> completable = new CompletableFuture<>();
completable.completeOnTimeout(model, delay.toMillis(), TimeUnit.MILLISECONDS);
return Optional.of(completable);
}
return Optional.empty();
}

@Override
public void init(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
if (taskContext.tryRetryCount().isEmpty()) {
taskContext.tryRetryCount((short) 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.concurrent.CompletableFuture;

public interface RetryExecutor {

void init(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model);

Optional<CompletableFuture<WorkflowModel>> retry(
WorkflowContext worfklowContext, TaskContext taskContext, WorkflowModel model);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ public void onTaskFailed(TaskFailedEvent ev) {
@Override
public void onTaskRetried(TaskRetriedEvent ev) {
logger.info(
"Task {} retried at {}, position {}",
"Task {} retried at {}, position {}, retried attempt {}",
ev.taskContext().taskName(),
ev.eventDate(),
ev.taskContext().position());
ev.taskContext().position(),
ev.taskContext().retryAttempt());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.api.types.TryTask;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
Expand All @@ -23,6 +24,7 @@
import io.serverlessworkflow.impl.WorkflowMutableInstance;
import io.serverlessworkflow.impl.WorkflowStatus;
import io.serverlessworkflow.impl.executors.TransitionInfo;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class WorkflowPersistenceInstance extends WorkflowMutableInstance {
Expand Down Expand Up @@ -82,6 +84,17 @@ public void restoreContext(WorkflowContext workflow, TaskContext context) {
if (context.retryAttempt() == 0) {
context.retryAttempt(retriedTaskInfo.retryAttempt());
}
Optional<TaskContext> searchContext = context.parent();
while (searchContext.isPresent()) {
TaskContext tryContext = searchContext.orElseThrow();
if (tryContext.task() instanceof TryTask) {
if (tryContext.tryRetryCount().isEmpty()) {
tryContext.tryRetryCount(retriedTaskInfo.retryAttempt());
}
break;
}
searchContext = tryContext.parent();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.TryTask;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowApplication;
Expand Down Expand Up @@ -140,10 +142,16 @@ void testWorkflowInstance() throws InterruptedException {
WorkflowContext updateWContext = mock(WorkflowContext.class);
TaskContext updateTContext = mock(TaskContext.class);
when(updateTContext.position()).thenReturn(position1);
TaskContext parentContext = mock(TaskContext.class);
TaskBase taskBase = mock(TryTask.class);
when(parentContext.task()).thenReturn(taskBase);
when(updateTContext.parent()).thenReturn(Optional.of(parentContext));
instance.restoreContext(updateWContext, updateTContext);
ArgumentCaptor<Short> retryAttempt = ArgumentCaptor.forClass(Short.class);
verify(updateTContext).retryAttempt(retryAttempt.capture());
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);
verify(parentContext).tryRetryCount(retryAttempt.capture());
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);
Comment thread
fjtirado marked this conversation as resolved.

// task completed
handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,70 @@
import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertThat;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.TryTask;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.jackson.JsonUtils;
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class RetryTimeoutTest {

private static WorkflowApplication app;
private WorkflowApplication app;
private RetryListener retryListener;
private MockWebServer apiServer;

@BeforeAll
static void init() {
app = WorkflowApplication.builder().build();
}

@AfterAll
static void cleanup() {
app.close();
}

@BeforeEach
void setUp() throws IOException {
apiServer = new MockWebServer();
apiServer.start(9797);
retryListener = new RetryListener();
app =
WorkflowApplication.builder()
.withListener(retryListener)
.withListener(new TraceExecutionListener())
.build();
}

@AfterEach
void tearDown() throws IOException {
apiServer.shutdown();
app.close();
}

private class RetryListener implements WorkflowExecutionListener {

private Map<String, Short> taskRetried = new ConcurrentHashMap<>();
private Set<Short> contexts = ConcurrentHashMap.newKeySet();

Comment thread
fjtirado marked this conversation as resolved.
public void onTaskRetried(TaskRetriedEvent ev) {
taskRetried.put(ev.taskContext().position().jsonPointer(), ev.taskContext().retryAttempt());
}

public void onTaskCompleted(TaskCompletedEvent ev) {
if (ev.taskContext().task() instanceof TryTask) {
contexts.add(ev.taskContext().retryAttempt());
}
}
}

@ParameterizedTest
Expand All @@ -88,6 +106,37 @@ void testRetry(String path) throws IOException {
Awaitility.await()
.atMost(Duration.ofSeconds(1))
.until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result));
assertThat(retryListener.taskRetried).hasSize(1);
assertThat(retryListener.taskRetried.get("do/0/tryGetPet/do/0/getPet")).isEqualTo((short) 2);
assertThat(retryListener.contexts).containsOnly((short) 0);
}

@Test
void testNestedRetry() throws IOException {
final JsonNode result = JsonUtils.mapper().createObjectNode().put("name", "Javierito");
apiServer.enqueue(new MockResponse().setResponseCode(404));
apiServer.enqueue(new MockResponse().setResponseCode(404));
apiServer.enqueue(new MockResponse().setResponseCode(404));
apiServer.enqueue(new MockResponse().setResponseCode(404));
apiServer.enqueue(new MockResponse().setResponseCode(404));
apiServer.enqueue(new MockResponse().setResponseCode(404));
apiServer.enqueue(new MockResponse().setResponseCode(500));
apiServer.enqueue(
new MockResponse()
.setResponseCode(200)
.setHeader("Content-Type", "application/json")
.setBody(JsonUtils.mapper().writeValueAsString(result)));
CompletableFuture<WorkflowModel> future =
app.workflowDefinition(
readWorkflowFromClasspath("workflows-samples/nested-try-catch-retry-inline.yaml"))
.instance(Map.of("delay", 0.01))
.start();
Awaitility.await()
.atMost(Duration.ofSeconds(1))
.until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result));
assertThat(retryListener.taskRetried).hasSize(2);
assertThat(retryListener.taskRetried.values()).containsExactlyInAnyOrder((short) 5, (short) 2);
assertThat(retryListener.contexts).containsExactlyInAnyOrder((short) 0, (short) 2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
document:
dsl: '1.0.0'
namespace: test
name: nested-try-catch-retry-inline
version: '0.1.0'
do:
- tryServerError:
try:
- tryCommunication:
try:
- getPet:
call: http
with:
method: get
endpoint: http://localhost:9797
redirect: true
catch:
errors:
with:
type: https://serverlessworkflow.io/spec/1.0.0/errors/communication
status: 404
retry:
delay: ${"PT\(.delay)S"}
backoff:
exponential: {}
limit:
attempt:
count: 5
Comment thread
fjtirado marked this conversation as resolved.
catch:
errors:
with:
type: https://serverlessworkflow.io/spec/1.0.0/errors/communication
retry:
delay: ${"PT\(.delay)S"}
backoff:
exponential: {}
limit:
attempt:
count: 2
Comment thread
fjtirado marked this conversation as resolved.
Loading