Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.temporal.internal.sync;

import static io.temporal.internal.sync.WorkflowInternal.unwrap;
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -47,8 +46,6 @@
import io.temporal.workflow.DynamicWorkflow;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInfo;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
Expand Down Expand Up @@ -316,54 +313,17 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) {

@Override
public WorkflowOutput execute(WorkflowInput input) {
WorkflowInfo info = Workflow.getInfo();
try {
Object result = workflowMethod.invoke(workflow, input.getArguments());
return new WorkflowOutput(result);
} catch (IllegalAccessException e) {
throw new Error(mapToWorkflowExecutionException(e, dataConverter));
throw wrap(e);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof DestroyWorkflowThreadError) {
throw (DestroyWorkflowThreadError) target;
}
Throwable exception = unwrap(target);

WorkflowImplementationOptions options = implementationOptions.get(info.getWorkflowType());
Class<? extends Throwable>[] failTypes = options.getFailWorkflowExceptionTypes();
if (exception instanceof TemporalFailure) {
logWorkflowExecutionException(info, exception);
throw mapToWorkflowExecutionException(exception, dataConverter);
}
for (Class<? extends Throwable> failType : failTypes) {
if (failType.isAssignableFrom(exception.getClass())) {
// fail workflow
if (log.isErrorEnabled()) {
boolean cancelRequested =
WorkflowInternal.getRootWorkflowContext().getContext().isCancelRequested();
if (!cancelRequested || !FailureConverter.isCanceledCause(exception)) {
logWorkflowExecutionException(info, exception);
}
}
throw mapToWorkflowExecutionException(exception, dataConverter);
}
}
throw wrap(exception);
throw wrap(target);
}
}

private void logWorkflowExecutionException(WorkflowInfo info, Throwable exception) {
log.error(
"Workflow execution failure "
+ "WorkflowId="
+ info.getWorkflowId()
+ ", RunId="
+ info.getRunId()
+ ", WorkflowType="
+ info.getWorkflowType(),
exception);
}

protected void newInstance() {
Func<?> factory = workflowImplementationFactories.get(workflowImplementationClass);
if (factory != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
result,
lastFailure);

workflowProc = new WorkflowExecuteRunnable(syncContext, workflow, startEvent);
workflowProc =
new WorkflowExecuteRunnable(
syncContext, workflow, startEvent, workflowImplementationOptions);
// The following order is ensured by this code and DeterministicRunner implementation:
// 1. workflow.initialize
// 2. signal handler (if signalWithStart was called)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,43 @@

package io.temporal.internal.sync;

import static io.temporal.internal.sync.WorkflowInternal.unwrap;
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;

import io.temporal.api.common.v1.Payloads;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.interceptors.Header;
import io.temporal.failure.FailureConverter;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.worker.WorkflowExecutionException;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInfo;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkflowExecuteRunnable implements Runnable {

private static final Logger log = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);

private final SyncWorkflowContext context;
private final SyncWorkflowDefinition workflow;
private final WorkflowExecutionStartedEventAttributes attributes;
private final WorkflowImplementationOptions implementationOptions;

private Optional<Payloads> output = Optional.empty();
private boolean done;

public WorkflowExecuteRunnable(
SyncWorkflowContext context,
SyncWorkflowDefinition workflow,
WorkflowExecutionStartedEventAttributes attributes) {
WorkflowExecutionStartedEventAttributes attributes,
WorkflowImplementationOptions options) {
this.implementationOptions = options;
Objects.requireNonNull(context);
Objects.requireNonNull(workflow);
Objects.requireNonNull(attributes);
Expand All @@ -51,6 +70,32 @@ public void run() {
Optional<Payloads> input =
attributes.hasInput() ? Optional.of(attributes.getInput()) : Optional.empty();
output = workflow.execute(new Header(attributes.getHeader()), input);
} catch (Throwable e) {
if (e instanceof DestroyWorkflowThreadError) {
throw (DestroyWorkflowThreadError) e;
}
Throwable exception = unwrap(e);

Class<? extends Throwable>[] failTypes =
implementationOptions.getFailWorkflowExceptionTypes();
if (exception instanceof TemporalFailure) {
logWorkflowExecutionException(Workflow.getInfo(), exception);
throw mapToWorkflowExecutionException(exception, context.getDataConverter());
}
for (Class<? extends Throwable> failType : failTypes) {
if (failType.isAssignableFrom(exception.getClass())) {
// fail workflow
if (log.isErrorEnabled()) {
boolean cancelRequested =
WorkflowInternal.getRootWorkflowContext().getContext().isCancelRequested();
if (!cancelRequested || !FailureConverter.isCanceledCause(exception)) {
logWorkflowExecutionException(Workflow.getInfo(), exception);
}
}
throw mapToWorkflowExecutionException(exception, context.getDataConverter());
}
}
throw wrap(exception);
} finally {
done = true;
}
Expand All @@ -75,4 +120,29 @@ public void handleSignal(String signalName, Optional<Payloads> input, long event
public Optional<Payloads> handleQuery(String type, Optional<Payloads> args) {
return context.handleQuery(type, args);
}

private void logWorkflowExecutionException(WorkflowInfo info, Throwable exception) {
log.error(
"Workflow execution failure "
+ "WorkflowId="
+ info.getWorkflowId()
+ ", RunId="
+ info.getRunId()
+ ", WorkflowType="
+ info.getWorkflowType(),
exception);
}

static WorkflowExecutionException mapToWorkflowExecutionException(
Throwable exception, DataConverter dataConverter) {
Throwable e = exception;
while (e != null) {
if (e instanceof TemporalFailure) {
((TemporalFailure) e).setDataConverter(dataConverter);
}
e = e.getCause();
}
Failure failure = FailureConverter.exceptionToFailure(exception);
return new WorkflowExecutionException(failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
package io.temporal.workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityOptions;
import io.temporal.activity.DynamicActivity;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.converter.EncodedValues;
import io.temporal.failure.ApplicationFailure;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import java.time.Duration;
Expand Down Expand Up @@ -64,6 +67,10 @@ public Object execute(EncodedValues args) {
+ "-"
+ signals.get(signals.size() - 1));
String arg0 = args.get(0, String.class);
Boolean fail = args.get(1, Boolean.class);
if (fail != null && fail) {
throw ApplicationFailure.newFailure("Simulated failure", "simulated");
}
ActivityStub activity =
Workflow.newUntypedActivityStub(
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
Expand Down Expand Up @@ -119,4 +126,21 @@ public void testDynamicWorkflowFactory() {
String result = workflow.getResult(String.class);
assertEquals("activityType2-activityType1-startArg0-workflowFoo", result);
}

@Test(expected = WorkflowFailedException.class)
public void testDynamicWorkflowFailure() {
TestWorkflowEnvironment testEnvironment = testWorkflowRule.getTestEnvironment();
testEnvironment
.getWorkerFactory()
.getWorker(testWorkflowRule.getTaskQueue())
.registerWorkflowImplementationTypes(DynamicWorkflowImpl.class);
testEnvironment.start();

WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
WorkflowStub workflow =
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("workflowFoo", workflowOptions);
workflow.start("startArg0", true /* fail */);
workflow.getResult(String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void testLongLocalActivityWorkflowTaskHeartbeatFailure() {
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class, options);
String result = workflowStub.execute();
//Shouldn't this workflow never successfully finish, because local activity suppose the fail the hearbeat every single time?
// Shouldn't this workflow never successfully finish, because local activity suppose the fail
// the hearbeat every single time?
Assert.assertEquals("sleepActivity123", result);
Assert.assertEquals(activitiesImpl.toString(), REPLAY_COUNT, activitiesImpl.invocations.size());
}
Expand Down