Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1462,8 +1462,25 @@ private static void continueAsNewWorkflow(
if (d.hasFailure()) {
a.setFailure(d.getFailure());
}
// For both search attributes and memo below: a present-but-empty command field is
// preserved as an explicit empty override; only an absent command field falls through
// to inheriting the live workflow state.
if (d.hasSearchAttributes()) {
a.setSearchAttributes(d.getSearchAttributes());
} else {
SearchAttributes currentSearchAttributes =
ctx.getWorkflowMutableState().getCurrentSearchAttributes();
if (currentSearchAttributes.getIndexedFieldsCount() > 0) {
a.setSearchAttributes(currentSearchAttributes);
}
}
if (d.hasMemo()) {
a.setMemo(d.getMemo());
} else {
Memo currentMemo = ctx.getWorkflowMutableState().getCurrentMemo();
if (currentMemo.getFieldsCount() > 0) {
a.setMemo(currentMemo);
}
}
a.setNewExecutionRunId(UUID.randomUUID().toString());
HistoryEvent event =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import io.grpc.Deadline;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.common.v1.Memo;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
Expand Down Expand Up @@ -32,6 +34,15 @@ interface TestWorkflowMutableState {

StartWorkflowExecutionRequest getStartRequest();

/** Returns the current memo, reflecting any upserts since workflow start. */
Memo getCurrentMemo();

/**
* Returns the current search attributes from the visibility store, reflecting any upserts since
* workflow start.
*/
SearchAttributes getCurrentSearchAttributes();

void startWorkflowTask(
PollWorkflowTaskQueueResponse.Builder task, PollWorkflowTaskQueueRequest pollRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3271,10 +3271,18 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() {
}
}

private Memo getCurrentMemo() {
@Override
public Memo getCurrentMemo() {
return Memo.newBuilder().putAllFields(currentMemo).build();
}

@Override
public SearchAttributes getCurrentSearchAttributes() {
SearchAttributes searchAttributes =
visibilityStore.getSearchAttributesForExecution(executionId);
return searchAttributes == null ? SearchAttributes.getDefaultInstance() : searchAttributes;
}

private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() {
WorkflowExecutionConfig.Builder executionConfig =
WorkflowExecutionConfig.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,9 @@ public String continueAsNew(
if (ea.hasSearchAttributes()) {
startRequestBuilder.setSearchAttributes(ea.getSearchAttributes());
}
if (ea.hasMemo()) {
startRequestBuilder.setMemo(ea.getMemo());
}
StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
lock.lock();
Optional<Failure> lastFail =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package io.temporal.testserver.functional;

import com.google.common.collect.ImmutableMap;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.SearchAttributeKey;
import io.temporal.common.SearchAttributes;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.interceptors.*;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testserver.functional.common.TestWorkflows;
import io.temporal.worker.WorkerFactoryOptions;
import io.temporal.workflow.ContinueAsNewOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Collections;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -24,7 +35,13 @@ public class ContinueAsNewTest {
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(new StripsTqFromCanInterceptor())
.build())
.setWorkflowTypes(TestWorkflow.class)
.setWorkflowTypes(
TestWorkflow.class,
OverridingWorkflow.class,
UpsertingSearchAttributesWorkflow.class,
UpsertingMemoWorkflow.class,
ClearingMemoWorkflow.class,
RemovingMemoWorkflow.class)
.build();

@Test
Expand Down Expand Up @@ -54,6 +71,173 @@ public void repeatedFailure() {
.isEmpty());
}

private static final SearchAttributeKey<String> CUSTOM_KEYWORD =
SearchAttributeKey.forKeyword("CustomKeywordField");

@Test
public void inheritsMemoAndSearchAttributesAcrossContinueAsNew() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(ImmutableMap.of("memoKey", "memoValue"))
.setTypedSearchAttributes(
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "initialSA").build())
.build();

TestWorkflows.WorkflowTakesBool workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.WorkflowTakesBool.class, options);
workflowStub.execute(true);

WorkflowExecutionStartedEventAttributes started =
getContinuedRunStartedAttributes(workflowStub);

Assert.assertTrue("Memo should be inherited by the continued run", started.hasMemo());
Assert.assertEquals("memoValue", decodeString(started.getMemo().getFieldsOrThrow("memoKey")));

Assert.assertTrue(
"Search attributes should be inherited by the continued run",
started.hasSearchAttributes());
Assert.assertEquals(
"initialSA",
decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField")));
}

@Test
public void overridesMemoAndSearchAttributesOnContinueAsNew() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
.setTypedSearchAttributes(
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "originalSA").build())
.build();

OverridingWorkflowInterface workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(OverridingWorkflowInterface.class, options);
workflowStub.execute(true);

WorkflowExecutionStartedEventAttributes started =
getContinuedRunStartedAttributes(workflowStub);

Assert.assertEquals(
"overriddenMemo", decodeString(started.getMemo().getFieldsOrThrow("memoKey")));

Assert.assertEquals(
"overriddenSA",
decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField")));
}

@Test
public void inheritsUpsertedMemoAcrossContinueAsNew() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
.build();

UpsertingMemoWorkflowInterface workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(UpsertingMemoWorkflowInterface.class, options);
workflowStub.execute(true);

WorkflowExecutionStartedEventAttributes started =
getContinuedRunStartedAttributes(workflowStub);

Assert.assertEquals(
"upsertedMemo", decodeString(started.getMemo().getFieldsOrThrow("memoKey")));
}

@Test
public void inheritsUpsertedSearchAttributesAcrossContinueAsNewWhenCommandOmitsThem() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setTypedSearchAttributes(
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "originalSA").build())
.build();

UpsertingSearchAttributesWorkflowInterface workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(UpsertingSearchAttributesWorkflowInterface.class, options);
workflowStub.execute(true);

WorkflowExecutionStartedEventAttributes started =
getContinuedRunStartedAttributes(workflowStub);

Assert.assertEquals(
"upsertedSA",
decodeString(started.getSearchAttributes().getIndexedFieldsOrThrow("CustomKeywordField")));
}

@Test
public void doesNotReinheritRemovedMemoAcrossContinueAsNew() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
.build();

RemovingMemoWorkflowInterface workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(RemovingMemoWorkflowInterface.class, options);
workflowStub.execute(true);

WorkflowExecutionStartedEventAttributes started =
getContinuedRunStartedAttributes(workflowStub);
Assert.assertFalse(started.getMemo().containsFields("memoKey"));
}

@Test
public void overridesMemoWithEmptyMemoOnContinueAsNew() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(ImmutableMap.of("memoKey", "originalMemo"))
.build();

ClearingMemoWorkflowInterface workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(ClearingMemoWorkflowInterface.class, options);
workflowStub.execute(true);

WorkflowExecutionStartedEventAttributes started =
getContinuedRunStartedAttributes(workflowStub);
Assert.assertTrue("Empty memo should be an explicit override", started.hasMemo());
Assert.assertEquals(0, started.getMemo().getFieldsCount());
}

private WorkflowExecutionStartedEventAttributes getContinuedRunStartedAttributes(
Object workflowStub) {
WorkflowExecution execution = WorkflowStub.fromTyped(workflowStub).getExecution();
HistoryEvent firstEvent =
testWorkflowRule.getExecutionHistory(execution.getWorkflowId()).getEvents().get(0);
Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, firstEvent.getEventType());
WorkflowExecutionStartedEventAttributes started =
firstEvent.getWorkflowExecutionStartedEventAttributes();
Assert.assertFalse(
"Inspected event must belong to the continued run",
started.getContinuedExecutionRunId().isEmpty());
return started;
}

private static String decodeString(Payload payload) {
return DefaultDataConverter.STANDARD_INSTANCE.fromPayload(payload, String.class, String.class);
}

public static class TestWorkflow implements TestWorkflows.WorkflowTakesBool {
@Override
public void execute(boolean doContinue) {
Expand All @@ -63,6 +247,99 @@ public void execute(boolean doContinue) {
}
}

@WorkflowInterface
public interface OverridingWorkflowInterface {
@WorkflowMethod
void execute(boolean doContinue);
}

public static class OverridingWorkflow implements OverridingWorkflowInterface {
@Override
public void execute(boolean doContinue) {
if (doContinue) {
Workflow.continueAsNew(
ContinueAsNewOptions.newBuilder()
.setMemo(ImmutableMap.of("memoKey", "overriddenMemo"))
.setTypedSearchAttributes(
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD, "overriddenSA").build())
.build(),
false);
}
}
}

@WorkflowInterface
public interface UpsertingSearchAttributesWorkflowInterface {
@WorkflowMethod
void execute(boolean doContinue);
}

public static class UpsertingSearchAttributesWorkflow
implements UpsertingSearchAttributesWorkflowInterface {
@Override
public void execute(boolean doContinue) {
if (doContinue) {
Workflow.upsertTypedSearchAttributes(CUSTOM_KEYWORD.valueSet("upsertedSA"));
// Empty typed search attributes are not encoded into the command, so this exercises
// server-side inheritance after the upsert.
Workflow.continueAsNew(
ContinueAsNewOptions.newBuilder()
.setTypedSearchAttributes(SearchAttributes.EMPTY)
.build(),
false);
}
}
}

@WorkflowInterface
public interface UpsertingMemoWorkflowInterface {
@WorkflowMethod
void execute(boolean doContinue);
}

public static class UpsertingMemoWorkflow implements UpsertingMemoWorkflowInterface {
@Override
public void execute(boolean doContinue) {
if (doContinue) {
Workflow.upsertMemo(ImmutableMap.of("memoKey", "upsertedMemo"));
Workflow.continueAsNew(false);
}
}
}

@WorkflowInterface
public interface ClearingMemoWorkflowInterface {
@WorkflowMethod
void execute(boolean doContinue);
}

public static class ClearingMemoWorkflow implements ClearingMemoWorkflowInterface {
@Override
public void execute(boolean doContinue) {
if (doContinue) {
Workflow.continueAsNew(
ContinueAsNewOptions.newBuilder().setMemo(Collections.emptyMap()).build(), false);
}
}
}

@WorkflowInterface
public interface RemovingMemoWorkflowInterface {
@WorkflowMethod
void execute(boolean doContinue);
}

public static class RemovingMemoWorkflow implements RemovingMemoWorkflowInterface {
@Override
public void execute(boolean doContinue) {
if (doContinue) {
// ImmutableMap does not allow null values, and null removes the memo field.
Workflow.upsertMemo(Collections.singletonMap("memoKey", null));
Workflow.continueAsNew(false);
}
}
}

// Verify that we can strip the TQ name and test server continues onto same TQ
private static class StripsTqFromCanInterceptor extends WorkerInterceptorBase {
@Override
Expand Down