Skip to content

Commit

Permalink
Apply serialization context to Dynamic Workflows (#1992)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Feb 16, 2024
1 parent 4da4591 commit f4a572a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@ final class DynamicSyncWorkflowDefinition implements SyncWorkflowDefinition {

private final Functions.Func<? extends DynamicWorkflow> factory;
private final WorkerInterceptor[] workerInterceptors;
private final DataConverter dataConverter;
// don't pass it down to other classes, it's a "cached" instance for internal usage only
private final DataConverter dataConverterWithWorkflowContext;
private WorkflowInboundCallsInterceptor workflowInvoker;

public DynamicSyncWorkflowDefinition(
Functions.Func<? extends DynamicWorkflow> factory,
WorkerInterceptor[] workerInterceptors,
DataConverter dataConverter) {
DataConverter dataConverterWithWorkflowContext) {
this.factory = factory;
this.workerInterceptors = workerInterceptors;
this.dataConverter = dataConverter;
this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
}

@Override
Expand All @@ -61,11 +62,11 @@ public void initialize() {

@Override
public Optional<Payloads> execute(Header header, Optional<Payloads> input) {
Values args = new EncodedValues(input, dataConverter);
Values args = new EncodedValues(input, dataConverterWithWorkflowContext);
WorkflowInboundCallsInterceptor.WorkflowOutput result =
workflowInvoker.execute(
new WorkflowInboundCallsInterceptor.WorkflowInput(header, new Object[] {args}));
return dataConverter.toPayloads(result.getResult());
return dataConverterWithWorkflowContext.toPayloads(result.getResult());
}

class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ private SyncWorkflowDefinition getWorkflowDefinition(
if (factory == null) {
if (dynamicWorkflowImplementationFactory != null) {
return new DynamicSyncWorkflowDefinition(
dynamicWorkflowImplementationFactory, workerInterceptors, dataConverter);
dynamicWorkflowImplementationFactory,
workerInterceptors,
dataConverter.withContext(
new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId())));
}
// throw Error to abort the workflow task, not fail the workflow
throw new Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import io.temporal.payload.context.SerializationContext;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.ContinueAsNewOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestWorkflowWithCronScheduleImpl;
import io.temporal.workflow.shared.TestWorkflows;
import java.io.IOException;
Expand Down Expand Up @@ -79,10 +77,13 @@ public class WorkflowIdSignedPayloadsTest {
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(
SimpleWorkflowWithAnActivity.class, TestWorkflowWithCronScheduleImpl.class)
SimpleWorkflowWithAnActivity.class,
TestWorkflowWithCronScheduleImpl.class,
DynamicWorkflowImpl.class)
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder().setDataConverter(codecDataConverter).build())
.setActivityImplementations(heartbeatingActivity, manualCompletionActivity)
.setActivityImplementations(
heartbeatingActivity, manualCompletionActivity, new DynamicActivityImpl())
.build();

@Rule public TestName testName = new TestName();
Expand Down Expand Up @@ -162,6 +163,16 @@ public void testSimpleCronWorkflow() {
TestWorkflowWithCronScheduleImpl.lastFail.get().getMessage().contains("simulated error"));
}

@Test
public void testDynamicWorkflow() {
WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
WorkflowStub workflow =
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("workflowFoo", options);
workflow.start("World");
assertEquals("Hello World", workflow.getResult(String.class));
}

@ActivityInterface
public interface SimpleActivity {
@ActivityMethod(name = "simple")
Expand Down Expand Up @@ -258,6 +269,24 @@ public String execute(String input) {
}
}

public static class DynamicWorkflowImpl implements DynamicWorkflow {
@Override
public Object execute(EncodedValues args) {
String input = args.get(0, String.class);
ActivityStub activity =
Workflow.newUntypedActivityStub(
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
return activity.execute("dynamicActivity", String.class, input);
}
}

public static class DynamicActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
return "Hello " + args.get(0, String.class);
}
}

private static class PayloadEncoderWithWorkflowIdSignature implements PayloadCodec {
private final ByteString METADATA_ENCODING = ByteString.copyFromUtf8("signed");

Expand Down

0 comments on commit f4a572a

Please sign in to comment.