From 7960f01c544313a5353c2e3a4936d74d5bc62e1e Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 1 Jun 2026 17:17:32 +0200 Subject: [PATCH] [Fix #1415] Fixing nested try behaviour Signed-off-by: fjtirado --- .../serverlessworkflow/impl/TaskContext.java | 37 ++++++--- .../impl/executors/TryExecutor.java | 1 + .../executors/retry/DefaultRetryExecutor.java | 11 ++- .../impl/executors/retry/RetryExecutor.java | 3 + .../lifecycle/TraceExecutionListener.java | 5 +- .../WorkflowPersistenceInstance.java | 13 ++++ .../test/AbstractHandlerPersistenceTest.java | 8 ++ .../impl/test/RetryTimeoutTest.java | 77 +++++++++++++++---- .../nested-try-catch-retry-inline.yaml | 39 ++++++++++ 9 files changed, 164 insertions(+), 30 deletions(-) create mode 100644 impl/test/src/test/resources/workflows-samples/nested-try-catch-retry-inline.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 4c256f7e1..279f45337 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -40,6 +40,7 @@ public class TaskContext implements TaskContextData { private short retryAttempt; private int iteration; private AuthorizationDescriptor authorization; + private Optional tryRetryCount = Optional.empty(); public TaskContext( WorkflowModel input, @@ -69,7 +70,8 @@ private TaskContext( this.input = input; this.output = output; this.rawOutput = rawOutput; - this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0); + this.retryAttempt = + parentContext.map(ctx -> ctx.tryRetryCount.orElse(ctx.retryAttempt())).orElse((short) 0); this.contextVariables = parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); } @@ -179,6 +181,14 @@ public void retryAttempt(short retryAttempt) { this.retryAttempt = retryAttempt; } + public void tryRetryCount(short tryRetryCount) { + this.tryRetryCount = Optional.of(tryRetryCount); + } + + public Optional tryRetryCount() { + return tryRetryCount; + } + public boolean isRetrying() { return retryAttempt > 0; } @@ -194,16 +204,19 @@ public void iteration(int iteration) { @Override public String toString() { - return "TaskContext [position=" - + position - + ", startedAt=" - + startedAt - + ", taskName=" - + taskName - + ", completedAt=" - + completedAt - + ", retryAttempt=" - + retryAttempt - + "]"; + StringBuilder sb = + new StringBuilder( + "TaskContext [position=" + + position + + ", startedAt=" + + startedAt + + ", taskName=" + + taskName + + ", completedAt=" + + completedAt + + ", retryAttempt=" + + retryAttempt); + tryRetryCount.ifPresent(s -> sb.append(", tryRetryCount=").append(s)); + return sb.append("]").toString(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index 139a83a46..35807e690 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -157,6 +157,7 @@ protected CompletableFuture internalExecute( private CompletableFuture doIt( WorkflowContext workflow, TaskContext taskContext, WorkflowModel model) { + retryIntervalExecutor.ifPresent(r -> r.init(workflow, taskContext, model)); return TaskExecutorHelper.processTaskList( taskExecutor, workflow, Optional.of(taskContext), model) .exceptionallyCompose(e -> handleException(e, workflow, taskContext)); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java index 04f74cacd..f31ae0ab1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java @@ -46,11 +46,11 @@ public DefaultRetryExecutor( @Override public Optional> retry( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) { - short numAttempts = taskContext.retryAttempt(); + short numAttempts = taskContext.tryRetryCount().orElseThrow(); if (numAttempts++ < maxAttempts && WorkflowUtils.whenExceptTest( whenFilter, exceptFilter, workflowContext, taskContext, model)) { - taskContext.retryAttempt(numAttempts); + taskContext.tryRetryCount(numAttempts); Duration delay = intervalFunction.apply(workflowContext, taskContext, model, numAttempts); CompletableFuture completable = new CompletableFuture<>(); completable.completeOnTimeout(model, delay.toMillis(), TimeUnit.MILLISECONDS); @@ -58,4 +58,11 @@ public Optional> retry( } return Optional.empty(); } + + @Override + public void init(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) { + if (taskContext.tryRetryCount().isEmpty()) { + taskContext.tryRetryCount((short) 0); + } + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java index 007161808..4f9c23d5f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java @@ -22,6 +22,9 @@ import java.util.concurrent.CompletableFuture; public interface RetryExecutor { + + void init(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model); + Optional> retry( WorkflowContext worfklowContext, TaskContext taskContext, WorkflowModel model); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java index 762a60586..0793b14c0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java @@ -100,10 +100,11 @@ public void onTaskFailed(TaskFailedEvent ev) { @Override public void onTaskRetried(TaskRetriedEvent ev) { logger.info( - "Task {} retried at {}, position {}", + "Task {} retried at {}, position {}, retried attempt {}", ev.taskContext().taskName(), ev.eventDate(), - ev.taskContext().position()); + ev.taskContext().position(), + ev.taskContext().retryAttempt()); } @Override diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index dc8737b51..a54ed9b1f 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.impl.persistence; +import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; @@ -23,6 +24,7 @@ import io.serverlessworkflow.impl.WorkflowMutableInstance; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.executors.TransitionInfo; +import java.util.Optional; import java.util.concurrent.CompletableFuture; public class WorkflowPersistenceInstance extends WorkflowMutableInstance { @@ -82,6 +84,17 @@ public void restoreContext(WorkflowContext workflow, TaskContext context) { if (context.retryAttempt() == 0) { context.retryAttempt(retriedTaskInfo.retryAttempt()); } + Optional searchContext = context.parent(); + while (searchContext.isPresent()) { + TaskContext tryContext = searchContext.orElseThrow(); + if (tryContext.task() instanceof TryTask) { + if (tryContext.tryRetryCount().isEmpty()) { + tryContext.tryRetryCount(retriedTaskInfo.retryAttempt()); + } + break; + } + searchContext = tryContext.parent(); + } } } } diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java index 7e120e735..0314e917f 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java @@ -21,6 +21,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowApplication; @@ -140,10 +142,16 @@ void testWorkflowInstance() throws InterruptedException { WorkflowContext updateWContext = mock(WorkflowContext.class); TaskContext updateTContext = mock(TaskContext.class); when(updateTContext.position()).thenReturn(position1); + TaskContext parentContext = mock(TaskContext.class); + TaskBase taskBase = mock(TryTask.class); + when(parentContext.task()).thenReturn(taskBase); + when(updateTContext.parent()).thenReturn(Optional.of(parentContext)); instance.restoreContext(updateWContext, updateTContext); ArgumentCaptor retryAttempt = ArgumentCaptor.forClass(Short.class); verify(updateTContext).retryAttempt(retryAttempt.capture()); assertThat(retryAttempt.getValue()).isEqualTo(numRetries); + verify(parentContext).tryRetryCount(retryAttempt.capture()); + assertThat(retryAttempt.getValue()).isEqualTo(numRetries); // task completed handlers diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index 6db7cffab..d298e32c3 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -18,23 +18,27 @@ import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertThat; import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.jackson.JsonUtils; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -42,28 +46,42 @@ public class RetryTimeoutTest { - private static WorkflowApplication app; + private WorkflowApplication app; + private RetryListener retryListener; private MockWebServer apiServer; - @BeforeAll - static void init() { - app = WorkflowApplication.builder().build(); - } - - @AfterAll - static void cleanup() { - app.close(); - } - @BeforeEach void setUp() throws IOException { apiServer = new MockWebServer(); apiServer.start(9797); + retryListener = new RetryListener(); + app = + WorkflowApplication.builder() + .withListener(retryListener) + .withListener(new TraceExecutionListener()) + .build(); } @AfterEach void tearDown() throws IOException { apiServer.shutdown(); + app.close(); + } + + private class RetryListener implements WorkflowExecutionListener { + + private Map taskRetried = new ConcurrentHashMap<>(); + private Set contexts = ConcurrentHashMap.newKeySet(); + + public void onTaskRetried(TaskRetriedEvent ev) { + taskRetried.put(ev.taskContext().position().jsonPointer(), ev.taskContext().retryAttempt()); + } + + public void onTaskCompleted(TaskCompletedEvent ev) { + if (ev.taskContext().task() instanceof TryTask) { + contexts.add(ev.taskContext().retryAttempt()); + } + } } @ParameterizedTest @@ -88,6 +106,37 @@ void testRetry(String path) throws IOException { Awaitility.await() .atMost(Duration.ofSeconds(1)) .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); + assertThat(retryListener.taskRetried).hasSize(1); + assertThat(retryListener.taskRetried.get("do/0/tryGetPet/do/0/getPet")).isEqualTo((short) 2); + assertThat(retryListener.contexts).containsOnly((short) 0); + } + + @Test + void testNestedRetry() throws IOException { + final JsonNode result = JsonUtils.mapper().createObjectNode().put("name", "Javierito"); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(404)); + apiServer.enqueue(new MockResponse().setResponseCode(500)); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(JsonUtils.mapper().writeValueAsString(result))); + CompletableFuture future = + app.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/nested-try-catch-retry-inline.yaml")) + .instance(Map.of("delay", 0.01)) + .start(); + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result)); + assertThat(retryListener.taskRetried).hasSize(2); + assertThat(retryListener.taskRetried.values()).containsExactlyInAnyOrder((short) 5, (short) 2); + assertThat(retryListener.contexts).containsExactlyInAnyOrder((short) 0, (short) 2); } @Test diff --git a/impl/test/src/test/resources/workflows-samples/nested-try-catch-retry-inline.yaml b/impl/test/src/test/resources/workflows-samples/nested-try-catch-retry-inline.yaml new file mode 100644 index 000000000..27261f193 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/nested-try-catch-retry-inline.yaml @@ -0,0 +1,39 @@ +document: + dsl: '1.0.0' + namespace: test + name: nested-try-catch-retry-inline + version: '0.1.0' +do: + - tryServerError: + try: + - tryCommunication: + try: + - getPet: + call: http + with: + method: get + endpoint: http://localhost:9797 + redirect: true + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/communication + status: 404 + retry: + delay: ${"PT\(.delay)S"} + backoff: + exponential: {} + limit: + attempt: + count: 5 + catch: + errors: + with: + type: https://serverlessworkflow.io/spec/1.0.0/errors/communication + retry: + delay: ${"PT\(.delay)S"} + backoff: + exponential: {} + limit: + attempt: + count: 2 \ No newline at end of file