Skip to content

Commit

Permalink
Block mutating workflow state in a read only context (#1821)
Browse files Browse the repository at this point in the history
Block mutating workflow state in a read only context
  • Loading branch information
Quinn-With-Two-Ns committed Jul 21, 2023
1 parent c5cf7bb commit 16755a1
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.temporal.common.MethodRetry;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
Expand All @@ -36,26 +37,30 @@ public class ActivityInvocationHandler extends ActivityInvocationHandlerBase {
private final ActivityOptions options;
private final Map<String, ActivityOptions> activityMethodOptions;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
ActivityOptions options,
Map<String, ActivityOptions> methodOptions,
WorkflowOutboundCallsInterceptor activityExecutor) {
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
return new ActivityInvocationHandler(
activityInterface, activityExecutor, options, methodOptions);
activityInterface, activityExecutor, options, methodOptions, assertReadOnly);
}

private ActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
ActivityOptions options,
Map<String, ActivityOptions> methodOptions) {
Map<String, ActivityOptions> methodOptions,
Functions.Proc assertReadOnly) {
super(activityInterface);
this.options = options;
this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions;
this.activityExecutor = activityExecutor;
this.assertReadOnly = assertReadOnly;
}

@Override
Expand All @@ -73,7 +78,7 @@ protected Function<Object[], Object> getActivityFunc(
+ activityName
+ " activity. Please set at least one of the above through the ActivityStub or WorkflowImplementationOptions.");
}
ActivityStub stub = ActivityStubImpl.newInstance(merged, activityExecutor);
ActivityStub stub = ActivityStubImpl.newInstance(merged, activityExecutor, assertReadOnly);
function =
(a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a);
return function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,37 @@
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Promise;
import java.lang.reflect.Type;

final class ActivityStubImpl extends ActivityStubBase {
protected final ActivityOptions options;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

static ActivityStub newInstance(
ActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor) {
ActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
ActivityOptions validatedOptions =
ActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
return new ActivityStubImpl(validatedOptions, activityExecutor);
return new ActivityStubImpl(validatedOptions, activityExecutor, assertReadOnly);
}

ActivityStubImpl(ActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor) {
ActivityStubImpl(
ActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
this.options = options;
this.activityExecutor = activityExecutor;
this.assertReadOnly = assertReadOnly;
}

@Override
public <R> Promise<R> executeAsync(
String activityName, Class<R> resultClass, Type resultType, Object... args) {
this.assertReadOnly.apply();
return activityExecutor
.executeActivity(
new WorkflowOutboundCallsInterceptor.ActivityInput<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.temporal.common.metadata.WorkflowMethodType;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.ChildWorkflowStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Optional;
Expand All @@ -43,7 +44,8 @@ class ChildWorkflowInvocationHandler implements InvocationHandler {
ChildWorkflowInvocationHandler(
Class<?> workflowInterface,
ChildWorkflowOptions options,
WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
WorkflowOutboundCallsInterceptor outboundCallsInterceptor,
Functions.Proc1<String> assertReadOnly) {
workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface);
Optional<POJOWorkflowMethodMetadata> workflowMethodMetadata =
workflowMetadata.getWorkflowMethod();
Expand All @@ -61,7 +63,10 @@ class ChildWorkflowInvocationHandler implements InvocationHandler {
.validateAndBuildWithDefaults();
this.stub =
new ChildWorkflowStubImpl(
workflowMethodMetadata.get().getName(), merged, outboundCallsInterceptor);
workflowMethodMetadata.get().getName(),
merged,
outboundCallsInterceptor,
assertReadOnly);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub {
private final ChildWorkflowOptions options;
private final WorkflowOutboundCallsInterceptor outboundCallsInterceptor;
private final CompletablePromise<WorkflowExecution> execution;
private final Functions.Proc1<String> assertReadOnly;

ChildWorkflowStubImpl(
String workflowType,
ChildWorkflowOptions options,
WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
WorkflowOutboundCallsInterceptor outboundCallsInterceptor,
Functions.Proc1<String> assertReadOnly) {
this.workflowType = Objects.requireNonNull(workflowType);
this.options = ChildWorkflowOptions.newBuilder(options).validateAndBuildWithDefaults();
this.outboundCallsInterceptor = Objects.requireNonNull(outboundCallsInterceptor);
Expand All @@ -50,6 +52,7 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub {
// The "main" Child Workflow promise is the one returned from the execute method and that
// promise will always be logged if not accessed.
this.execution.handle((ex, failure) -> null);
this.assertReadOnly = assertReadOnly;
}

@Override
Expand Down Expand Up @@ -77,6 +80,7 @@ public <R> R execute(Class<R> resultClass, Object... args) {

@Override
public <R> R execute(Class<R> resultClass, Type resultType, Object... args) {
assertReadOnly.apply("schedule child workflow");
Promise<R> result = executeAsync(resultClass, resultType, args);
if (AsyncInternal.isAsync()) {
AsyncInternal.setAsyncResult(result);
Expand All @@ -99,6 +103,7 @@ public <R> Promise<R> executeAsync(Class<R> resultClass, Object... args) {

@Override
public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object... args) {
assertReadOnly.apply("schedule child workflow");
ChildWorkflowOutput<R> result =
outboundCallsInterceptor.executeChildWorkflow(
new WorkflowOutboundCallsInterceptor.ChildWorkflowInput<>(
Expand All @@ -115,6 +120,7 @@ public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object

@Override
public void signal(String signalName, Object... args) {
assertReadOnly.apply("signal workflow");
Promise<Void> signaled =
outboundCallsInterceptor
.signalExternalWorkflow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
import io.temporal.workflow.ExternalWorkflowStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

Expand All @@ -37,9 +38,11 @@ class ExternalWorkflowInvocationHandler implements InvocationHandler {
public ExternalWorkflowInvocationHandler(
Class<?> workflowInterface,
WorkflowExecution execution,
WorkflowOutboundCallsInterceptor workflowOutboundCallsInterceptor) {
workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface);
stub = new ExternalWorkflowStubImpl(execution, workflowOutboundCallsInterceptor);
WorkflowOutboundCallsInterceptor workflowOutboundCallsInterceptor,
Functions.Proc1<String> assertReadOnly) {
this.workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface);
this.stub =
new ExternalWorkflowStubImpl(execution, workflowOutboundCallsInterceptor, assertReadOnly);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,23 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.CancelExternalWorkflowException;
import io.temporal.workflow.ExternalWorkflowStub;
import io.temporal.workflow.Promise;
import io.temporal.workflow.SignalExternalWorkflowException;
import io.temporal.workflow.*;
import java.util.Objects;

/** Dynamic implementation of a strongly typed child workflow interface. */
class ExternalWorkflowStubImpl implements ExternalWorkflowStub {

private final WorkflowOutboundCallsInterceptor outboundCallsInterceptor;
private final WorkflowExecution execution;
private Functions.Proc1<String> assertReadOnly;

public ExternalWorkflowStubImpl(
WorkflowExecution execution, WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
WorkflowExecution execution,
WorkflowOutboundCallsInterceptor outboundCallsInterceptor,
Functions.Proc1<String> assertReadOnly) {
this.outboundCallsInterceptor = Objects.requireNonNull(outboundCallsInterceptor);
this.execution = Objects.requireNonNull(execution);
this.assertReadOnly = assertReadOnly;
}

@Override
Expand All @@ -47,6 +48,7 @@ public WorkflowExecution getExecution() {

@Override
public void signal(String signalName, Object... args) {
assertReadOnly.apply("signal external workflow");
Promise<Void> signaled =
outboundCallsInterceptor
.signalExternalWorkflow(
Expand All @@ -69,6 +71,7 @@ public void signal(String signalName, Object... args) {

@Override
public void cancel() {
assertReadOnly.apply("cancel external workflow");
Promise<Void> cancelRequested =
outboundCallsInterceptor
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.temporal.common.MethodRetry;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
Expand All @@ -36,26 +37,30 @@ public class LocalActivityInvocationHandler extends ActivityInvocationHandlerBas
private final LocalActivityOptions options;
private final Map<String, LocalActivityOptions> activityMethodOptions;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

@VisibleForTesting
public static InvocationHandler newInstance(
Class<?> activityInterface,
LocalActivityOptions options,
Map<String, LocalActivityOptions> methodOptions,
WorkflowOutboundCallsInterceptor activityExecutor) {
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
return new LocalActivityInvocationHandler(
activityInterface, activityExecutor, options, methodOptions);
activityInterface, activityExecutor, options, methodOptions, assertReadOnly);
}

private LocalActivityInvocationHandler(
Class<?> activityInterface,
WorkflowOutboundCallsInterceptor activityExecutor,
LocalActivityOptions options,
Map<String, LocalActivityOptions> methodOptions) {
Map<String, LocalActivityOptions> methodOptions,
Functions.Proc assertReadOnly) {
super(activityInterface);
this.options = options;
this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions;
this.activityExecutor = activityExecutor;
this.assertReadOnly = assertReadOnly;
}

@VisibleForTesting
Expand All @@ -68,7 +73,8 @@ public Function<Object[], Object> getActivityFunc(
.mergeActivityOptions(activityMethodOptions.get(activityName))
.setMethodRetry(methodRetry)
.build();
ActivityStub stub = LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor);
ActivityStub stub =
LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor, assertReadOnly);
function =
(a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a);
return function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,37 @@
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Promise;
import java.lang.reflect.Type;

class LocalActivityStubImpl extends ActivityStubBase {
protected final LocalActivityOptions options;
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

static ActivityStub newInstance(
LocalActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor) {
LocalActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
LocalActivityOptions validatedOptions =
LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
return new LocalActivityStubImpl(validatedOptions, activityExecutor);
return new LocalActivityStubImpl(validatedOptions, activityExecutor, assertReadOnly);
}

private LocalActivityStubImpl(
LocalActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor) {
LocalActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Functions.Proc assertReadOnly) {
this.options = options;
this.activityExecutor = activityExecutor;
this.assertReadOnly = assertReadOnly;
}

@Override
public <R> Promise<R> executeAsync(
String activityName, Class<R> resultClass, Type resultType, Object... args) {
this.assertReadOnly.apply();
return activityExecutor
.executeLocalActivity(
new WorkflowOutboundCallsInterceptor.LocalActivityInput<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,13 @@ public void handleUpdate(
// TODO(https://github.com/temporalio/sdk-java/issues/1748) handleValidateUpdate
// should not just be run
// in a workflow thread
workflowContext.setReadOnly(true);
workflowProc.handleValidateUpdate(updateName, input, eventId);
} catch (Exception e) {
callbacks.reject(this.dataConverter.exceptionToFailure(e));
return;
} finally {
workflowContext.setReadOnly(false);
}
}
callbacks.accept();
Expand Down

0 comments on commit 16755a1

Please sign in to comment.