From 25da77e6f7991f06b3d41022f65ab72ba7161778 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 13 Sep 2021 19:20:52 -0700 Subject: [PATCH 1/3] Added testDynamicWorkflowFailure unit test --- .../workflow/DynamicWorkflowTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java index 05d7dd7e1d..fc36ae5589 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java @@ -20,14 +20,17 @@ package io.temporal.workflow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import io.temporal.activity.Activity; import io.temporal.activity.ActivityOptions; import io.temporal.activity.DynamicActivity; import io.temporal.activity.LocalActivityOptions; +import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; import io.temporal.common.converter.EncodedValues; +import io.temporal.failure.ApplicationFailure; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.testing.internal.SDKTestWorkflowRule; import java.time.Duration; @@ -64,6 +67,10 @@ public Object execute(EncodedValues args) { + "-" + signals.get(signals.size() - 1)); String arg0 = args.get(0, String.class); + Boolean fail = args.get(1, Boolean.class); + if (fail != null && fail) { + throw ApplicationFailure.newFailure("Simulated failure", "simulated"); + } ActivityStub activity = Workflow.newUntypedActivityStub( ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); @@ -119,4 +126,26 @@ public void testDynamicWorkflowFactory() { String result = workflow.getResult(String.class); assertEquals("activityType2-activityType1-startArg0-workflowFoo", result); } + + @Test + public void testDynamicWorkflowFailure() { + TestWorkflowEnvironment testEnvironment = testWorkflowRule.getTestEnvironment(); + testEnvironment + .getWorkerFactory() + .getWorker(testWorkflowRule.getTaskQueue()) + .registerWorkflowImplementationTypes(DynamicWorkflowImpl.class); + testEnvironment.start(); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build(); + WorkflowStub workflow = + testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("workflowFoo", workflowOptions); + workflow.start("startArg0", true /* fail */); + try { + workflow.getResult(String.class); + fail("failure expected"); + } catch (WorkflowFailedException e) { + // expected + } + } } From 989f3612e79cb670991fd5a563e440b11e872e81 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 13 Sep 2021 19:43:03 -0700 Subject: [PATCH 2/3] moved workflow error handling logic to WorkflowExecuteRunnable --- .../POJOWorkflowImplementationFactory.java | 44 +----------- .../temporal/internal/sync/SyncWorkflow.java | 4 +- .../sync/WorkflowExecuteRunnable.java | 72 ++++++++++++++++++- 3 files changed, 76 insertions(+), 44 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java index b5f3c12697..8fb268cabb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java @@ -19,7 +19,6 @@ package io.temporal.internal.sync; -import static io.temporal.internal.sync.WorkflowInternal.unwrap; import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap; import com.google.common.base.Preconditions; @@ -47,8 +46,6 @@ import io.temporal.workflow.DynamicWorkflow; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInfo; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collections; @@ -316,54 +313,17 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) { @Override public WorkflowOutput execute(WorkflowInput input) { - WorkflowInfo info = Workflow.getInfo(); try { Object result = workflowMethod.invoke(workflow, input.getArguments()); return new WorkflowOutput(result); } catch (IllegalAccessException e) { - throw new Error(mapToWorkflowExecutionException(e, dataConverter)); + throw wrap(e); } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); - if (target instanceof DestroyWorkflowThreadError) { - throw (DestroyWorkflowThreadError) target; - } - Throwable exception = unwrap(target); - - WorkflowImplementationOptions options = implementationOptions.get(info.getWorkflowType()); - Class[] failTypes = options.getFailWorkflowExceptionTypes(); - if (exception instanceof TemporalFailure) { - logWorkflowExecutionException(info, exception); - throw mapToWorkflowExecutionException(exception, dataConverter); - } - for (Class failType : failTypes) { - if (failType.isAssignableFrom(exception.getClass())) { - // fail workflow - if (log.isErrorEnabled()) { - boolean cancelRequested = - WorkflowInternal.getRootWorkflowContext().getContext().isCancelRequested(); - if (!cancelRequested || !FailureConverter.isCanceledCause(exception)) { - logWorkflowExecutionException(info, exception); - } - } - throw mapToWorkflowExecutionException(exception, dataConverter); - } - } - throw wrap(exception); + throw wrap(target); } } - private void logWorkflowExecutionException(WorkflowInfo info, Throwable exception) { - log.error( - "Workflow execution failure " - + "WorkflowId=" - + info.getWorkflowId() - + ", RunId=" - + info.getRunId() - + ", WorkflowType=" - + info.getWorkflowType(), - exception); - } - protected void newInstance() { Func factory = workflowImplementationFactories.get(workflowImplementationClass); if (factory != null) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index 60538964ae..f7765cfe4f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -116,7 +116,9 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) { result, lastFailure); - workflowProc = new WorkflowExecuteRunnable(syncContext, workflow, startEvent); + workflowProc = + new WorkflowExecuteRunnable( + syncContext, workflow, startEvent, workflowImplementationOptions); // The following order is ensured by this code and DeterministicRunner implementation: // 1. workflow.initialize // 2. signal handler (if signalWithStart was called) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecuteRunnable.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecuteRunnable.java index 402c4df683..80da68c62e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecuteRunnable.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecuteRunnable.java @@ -19,16 +19,33 @@ package io.temporal.internal.sync; +import static io.temporal.internal.sync.WorkflowInternal.unwrap; +import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap; + import io.temporal.api.common.v1.Payloads; +import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; +import io.temporal.common.converter.DataConverter; import io.temporal.common.interceptors.Header; +import io.temporal.failure.FailureConverter; +import io.temporal.failure.TemporalFailure; +import io.temporal.internal.worker.WorkflowExecutionException; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInfo; import java.util.Objects; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class WorkflowExecuteRunnable implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(WorkflowExecuteRunnable.class); + private final SyncWorkflowContext context; private final SyncWorkflowDefinition workflow; private final WorkflowExecutionStartedEventAttributes attributes; + private final WorkflowImplementationOptions implementationOptions; private Optional output = Optional.empty(); private boolean done; @@ -36,7 +53,9 @@ class WorkflowExecuteRunnable implements Runnable { public WorkflowExecuteRunnable( SyncWorkflowContext context, SyncWorkflowDefinition workflow, - WorkflowExecutionStartedEventAttributes attributes) { + WorkflowExecutionStartedEventAttributes attributes, + WorkflowImplementationOptions options) { + this.implementationOptions = options; Objects.requireNonNull(context); Objects.requireNonNull(workflow); Objects.requireNonNull(attributes); @@ -51,6 +70,32 @@ public void run() { Optional input = attributes.hasInput() ? Optional.of(attributes.getInput()) : Optional.empty(); output = workflow.execute(new Header(attributes.getHeader()), input); + } catch (Throwable e) { + if (e instanceof DestroyWorkflowThreadError) { + throw (DestroyWorkflowThreadError) e; + } + Throwable exception = unwrap(e); + + Class[] failTypes = + implementationOptions.getFailWorkflowExceptionTypes(); + if (exception instanceof TemporalFailure) { + logWorkflowExecutionException(Workflow.getInfo(), exception); + throw mapToWorkflowExecutionException(exception, context.getDataConverter()); + } + for (Class failType : failTypes) { + if (failType.isAssignableFrom(exception.getClass())) { + // fail workflow + if (log.isErrorEnabled()) { + boolean cancelRequested = + WorkflowInternal.getRootWorkflowContext().getContext().isCancelRequested(); + if (!cancelRequested || !FailureConverter.isCanceledCause(exception)) { + logWorkflowExecutionException(Workflow.getInfo(), exception); + } + } + throw mapToWorkflowExecutionException(exception, context.getDataConverter()); + } + } + throw wrap(exception); } finally { done = true; } @@ -75,4 +120,29 @@ public void handleSignal(String signalName, Optional input, long event public Optional handleQuery(String type, Optional args) { return context.handleQuery(type, args); } + + private void logWorkflowExecutionException(WorkflowInfo info, Throwable exception) { + log.error( + "Workflow execution failure " + + "WorkflowId=" + + info.getWorkflowId() + + ", RunId=" + + info.getRunId() + + ", WorkflowType=" + + info.getWorkflowType(), + exception); + } + + static WorkflowExecutionException mapToWorkflowExecutionException( + Throwable exception, DataConverter dataConverter) { + Throwable e = exception; + while (e != null) { + if (e instanceof TemporalFailure) { + ((TemporalFailure) e).setDataConverter(dataConverter); + } + e = e.getCause(); + } + Failure failure = FailureConverter.exceptionToFailure(exception); + return new WorkflowExecutionException(failure); + } } From 22cb5efdaf31a9e563783d53a32edee01381dc85 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 16 Sep 2021 11:10:41 -0700 Subject: [PATCH 3/3] Changed unit test to use expected exception --- .../java/io/temporal/workflow/DynamicWorkflowTest.java | 9 ++------- ...ongLocalActivityWorkflowTaskHeartbeatFailureTest.java | 3 ++- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java index fc36ae5589..9bcd88f51b 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/DynamicWorkflowTest.java @@ -127,7 +127,7 @@ public void testDynamicWorkflowFactory() { assertEquals("activityType2-activityType1-startArg0-workflowFoo", result); } - @Test + @Test(expected = WorkflowFailedException.class) public void testDynamicWorkflowFailure() { TestWorkflowEnvironment testEnvironment = testWorkflowRule.getTestEnvironment(); testEnvironment @@ -141,11 +141,6 @@ public void testDynamicWorkflowFailure() { WorkflowStub workflow = testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("workflowFoo", workflowOptions); workflow.start("startArg0", true /* fail */); - try { - workflow.getResult(String.class); - fail("failure expected"); - } catch (WorkflowFailedException e) { - // expected - } + workflow.getResult(String.class); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatFailureTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatFailureTest.java index e917f6c2bd..2b03487ed9 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatFailureTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatFailureTest.java @@ -69,7 +69,8 @@ public void testLongLocalActivityWorkflowTaskHeartbeatFailure() { .getWorkflowClient() .newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class, options); String result = workflowStub.execute(); - //Shouldn't this workflow never successfully finish, because local activity suppose the fail the hearbeat every single time? + // Shouldn't this workflow never successfully finish, because local activity suppose the fail + // the hearbeat every single time? Assert.assertEquals("sleepActivity123", result); Assert.assertEquals(activitiesImpl.toString(), REPLAY_COUNT, activitiesImpl.invocations.size()); }