Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.uber.m3.tally.Scope;
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -94,6 +95,8 @@ public interface ReplayWorkflowContext extends ReplayAware {

Duration getWorkflowTaskTimeout();

Payload getMemo(String key);

/**
* Used to retrieve search attributes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.command.v1.StartTimerCommandAttributes;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -191,6 +192,11 @@ public long getWorkflowExecutionExpirationTimestampMillis() {
return workflowContext.getWorkflowExecutionExpirationTimestampMillis();
}

@Override
public Payload getMemo(String key) {
return workflowContext.getMemo(key);
}

@Override
public SearchAttributes getSearchAttributes() {
return workflowContext.getSearchAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public Map<String, Payload> getHeader() {
return startedAttributes.getHeader().getFieldsMap();
}

public Payload getMemo(String key) {
return startedAttributes.getMemo().getFieldsMap().get(key);
}

SearchAttributes getSearchAttributes() {
return searchAttributes == null || searchAttributes.getIndexedFieldsCount() == 0
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityOptions;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.RetryOptions;
import io.temporal.common.converter.DataConverter;
Expand Down Expand Up @@ -428,6 +429,11 @@ public static WorkflowInfo getWorkflowInfo() {
return new WorkflowInfoImpl(getRootWorkflowContext().getContext());
}

public static <T> T getMemo(String key, Class<T> valueClass, Type valueType) {
Payload memo = getRootWorkflowContext().getContext().getMemo(key);
return getDataConverter().fromPayload(memo, valueClass, valueType);
}

public static <R> R retry(
RetryOptions options, Optional<Duration> expiration, Functions.Func<R> fn) {
return WorkflowRetryerInternal.validateOptionsAndRetry(options, expiration, fn);
Expand Down
64 changes: 45 additions & 19 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.RetryOptions;
import io.temporal.common.converter.DataConverter;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.ChildWorkflowFailure;
Expand Down Expand Up @@ -334,7 +335,7 @@
* <ul>
* <li>Do not use any mutable global variables because multiple instances of workflows are
* executed in parallel.
* <li>Do not call any non deterministic functions like non seeded random or {@link
* <li>Do not call any non-deterministic functions like non seeded random or {@link
* UUID#randomUUID()} directly form the workflow code. Always do this in activities.
* <li>Don’t perform any IO or service calls as they are not usually deterministic. Use activities
* for this.
Expand Down Expand Up @@ -615,6 +616,33 @@ public static WorkflowInfo getInfo() {
return WorkflowInternal.getWorkflowInfo();
}

/**
* Extract deserialized Memo associated with given key
*
* @param key memo key
* @param valueClass Java class to deserialize into
* @return deserialized Memo
*/
public static <T> Object getMemo(String key, Class<T> valueClass) {
return getMemo(key, valueClass, valueClass);
}

/**
* Extract Memo associated with the given key and deserialized into an object of generic type as
* is done here: {@link DataConverter#fromPayloads(int, java.util.Optional, java.lang.Class,
* java.lang.reflect.Type)} Ex: To deserialize into HashMap<String, Integer> <code>
* Workflow.getMemo(key, Map.class, new TypeToken<HashMap<String, Integer>>() {}.getType());
* </code>
*
* @param key memo key
* @param valueClass Java class to deserialize into
* @param genericType type parameter for the generic class
* @return deserialized Memo
*/
public static <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
return WorkflowInternal.getMemo(key, valueClass, genericType);
}

/**
* Wraps the Runnable method argument in a {@link CancellationScope}. The {@link
* CancellationScope#run()} calls {@link Runnable#run()} on the wrapped Runnable. The returned
Expand All @@ -625,12 +653,11 @@ public static WorkflowInfo getInfo() {
* canceled and a {@link Promise} that contains their result will throw {@link CanceledFailure}
* when {@link Promise#get()} is called.
*
* <p>The new cancellation scope is linked to the parent one (available as {@link
* CancellationScope#current()}. If the parent one is canceled then all the children scopes are
* canceled automatically. The main workflow function (annotated with @{@link WorkflowMethod} is
* wrapped within a root cancellation scope which gets canceled when a workflow is canceled
* through the Temporal CancelWorkflowExecution API. To perform cleanup operations that require
* blocking after the current scope is canceled use a scope created through {@link
* <p>The new cancellation scope {@link CancellationScope#current()} is linked to the parent one.
* If the parent one is canceled then all the children scopes are wrapped within a root
* cancellation scope which gets canceled when a workflow is canceled through the Temporal
* CancelWorkflowExecution API. To perform cleanup operations that require blocking after the
* current scope is canceled use a scope created through {@link
* #newDetachedCancellationScope(Runnable)}.
*
* <p>Example of running activities in parallel and cancelling them after a specified timeout.
Expand Down Expand Up @@ -687,10 +714,10 @@ public static CancellationScope newCancellationScope(Functions.Proc1<Cancellatio
}

/**
* Creates a CancellationScope that is not linked to a parent scope. {@link
* CancellationScope#run()} must be called to execute the code the scope wraps. The detached scope
* is needed to execute cleanup code after a workflow is canceled which cancels the root scope
* that wraps the @WorkflowMethod invocation. Here is an example usage:
* Creates a CancellationScope {@link CancellationScope#run()} that is not linked to a parent
* scope must be called to execute the code the scope wraps. The detached scope is needed to
* execute cleanup code after a workflow is canceled which cancels the root scope that wraps
* the @WorkflowMethod invocation. Here is an example usage:
*
* <pre><code>
* try {
Expand Down Expand Up @@ -840,8 +867,7 @@ public static <R> R retry(

/**
* Invokes function retrying in case of failures according to retry options. Synchronous variant.
* Use {@link Async#retry(RetryOptions, Optional, Functions.Func)} (RetryOptions, Optional, Func)}
* for asynchronous functions.
* Use {@link Async#retry(RetryOptions, Optional, Functions.Func)} for asynchronous functions.
*
* @param options retry options that specify retry policy
* @param expiration if specified stop retrying after this interval
Expand Down Expand Up @@ -895,7 +921,7 @@ public static RuntimeException wrap(Exception e) {
* Replay safe way to generate UUID.
*
* <p>Must be used instead of {@link UUID#randomUUID()} which relies on a random generator, thus
* leads to non deterministic code which is prohibited inside a workflow.
* leads to non-deterministic code which is prohibited inside a workflow.
*/
public static UUID randomUUID() {
return WorkflowInternal.randomUUID();
Expand Down Expand Up @@ -945,7 +971,7 @@ public static boolean isReplaying() {
* </code></pre>
*
* On replay the provided function is not executed, the random will always be 0, and the workflow
* could takes a different path breaking the determinism.
* could take a different path breaking the determinism.
*
* <p>Here is the correct way to use sideEffect:
*
Expand Down Expand Up @@ -999,7 +1025,7 @@ public static <R> R sideEffect(Class<R> resultClass, Func<R> func) {
* </code></pre>
*
* On replay the provided function is not executed, the random will always be 0, and the workflow
* could takes a different path breaking the determinism.
* could take a different path breaking the determinism.
*
* <p>Here is the correct way to use sideEffect:
*
Expand Down Expand Up @@ -1052,7 +1078,7 @@ public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> fu
* if call to updated with stored and a new value as arguments returns true. It is not called
* for the first value.
* @param resultClass class of the side effect
* @param func function that produces a value. This function can contain non deterministic code.
* @param func function that produces a value. This function can contain non-deterministic code.
* @see #sideEffect(Class, Functions.Func)
*/
public static <R> R mutableSideEffect(
Expand Down Expand Up @@ -1087,7 +1113,7 @@ public static <R> R mutableSideEffect(
* for the first value.
* @param resultClass class of the side effect
* @param resultType type of the side effect. Differs from resultClass for generic types.
* @param func function that produces a value. This function can contain non deterministic code.
* @param func function that produces a value. This function can contain non-deterministic code.
* @see #sideEffect(Class, Functions.Func)
*/
public static <R> R mutableSideEffect(
Expand Down Expand Up @@ -1188,7 +1214,7 @@ public static int getVersion(String changeId, int minSupported, int maxSupported

/**
* Get scope for reporting business metrics in workflow logic. This should be used instead of
* creating new metrics scopes as it is able to dedup metrics during replay.
* creating new metrics scopes as it is able to dedupe metrics during replay.
*
* <p>The original metrics scope is set through {@link WorkerOptions} when a worker starts up.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testActivitySimulatedTimeout() {
TestWorkflow1 workflow = client.newWorkflowStub(TestWorkflow1.class, options);
try {
workflow.execute("input1");
fail("unreacheable");
fail("unreachable");
} catch (WorkflowException e) {
assertTrue(e.getCause() instanceof ActivityFailure);
TimeoutFailure te = (TimeoutFailure) e.getCause().getCause();
Expand Down
107 changes: 77 additions & 30 deletions temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package io.temporal.workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import com.google.common.reflect.TypeToken;
import com.google.protobuf.ByteString;
import com.uber.m3.tally.NoopScope;
import io.temporal.api.common.v1.Memo;
Expand All @@ -34,50 +39,92 @@
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl;
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc;
import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class MemoTest {

private static final String MEMO_KEY = "testKey";
private static final String MEMO_VALUE = "testValue";
private static final String MEMO_KEY_2 = "testKey2";
private static final Integer MEMO_VALUE_2 = 1;
private static final Map<String, Object> MEMO =
ImmutableMap.of(
MEMO_KEY,
MEMO_VALUE,
MEMO_KEY_2,
new HashMap<String, Integer>() {
{
put(MEMO_KEY_2, MEMO_VALUE_2);
}
});

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestMultiArgWorkflowImpl.class).build();
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestMultiArgWorkflowImpl.class, WorkflowImpl.class)
.build();

@Test
public void testMemo() {
if (testWorkflowRule.getTestEnvironment() != null) {
String testMemoKey = "testKey";
String testMemoValue = "testValue";
Map<String, Object> memo = new HashMap<String, Object>();
memo.put(testMemoKey, testMemoValue);
if (testWorkflowRule.getTestEnvironment() == null) {
return;
}

WorkflowOptions workflowOptions =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())
.toBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(MEMO)
.build();

TestNoArgsWorkflowFunc stubF =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions);
WorkflowExecution executionF = WorkflowClient.start(stubF::func);

WorkflowOptions workflowOptions =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())
.toBuilder()
.setMemo(memo)
.build();
TestNoArgsWorkflowFunc stubF =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions);
WorkflowExecution executionF = WorkflowClient.start(stubF::func);
GetWorkflowExecutionHistoryResponse historyResp =
WorkflowExecutionUtils.getHistoryPage(
testWorkflowRule.getTestEnvironment().getWorkflowService(),
SDKTestWorkflowRule.NAMESPACE,
executionF,
ByteString.EMPTY,
new NoopScope());
HistoryEvent startEvent = historyResp.getHistory().getEvents(0);
Memo memoFromEvent = startEvent.getWorkflowExecutionStartedEventAttributes().getMemo();
Payload memoBytes = memoFromEvent.getFieldsMap().get(MEMO_KEY);
String memoRetrieved =
GsonJsonPayloadConverter.getInstance().fromData(memoBytes, String.class, String.class);
assertEquals(MEMO_VALUE, memoRetrieved);
}

@Test
public void testMemoInWorkflow() {
WorkflowOptions workflowOptions =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())
.toBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setMemo(MEMO)
.build();

NoArgsWorkflow workflow =
testWorkflowRule.getWorkflowClient().newWorkflowStub(NoArgsWorkflow.class, workflowOptions);
workflow.execute();
}

GetWorkflowExecutionHistoryResponse historyResp =
WorkflowExecutionUtils.getHistoryPage(
testWorkflowRule.getTestEnvironment().getWorkflowService(),
SDKTestWorkflowRule.NAMESPACE,
executionF,
ByteString.EMPTY,
new NoopScope());
HistoryEvent startEvent = historyResp.getHistory().getEvents(0);
Memo memoFromEvent = startEvent.getWorkflowExecutionStartedEventAttributes().getMemo();
Payload memoBytes = memoFromEvent.getFieldsMap().get(testMemoKey);
String memoRetrieved =
GsonJsonPayloadConverter.getInstance().fromData(memoBytes, String.class, String.class);
Assert.assertEquals(testMemoValue, memoRetrieved);
public static class WorkflowImpl implements NoArgsWorkflow {
@Override
public void execute() {
assertEquals(MEMO_VALUE, Workflow.getMemo(MEMO_KEY, String.class));
Map result =
Workflow.getMemo(
MEMO_KEY_2, Map.class, new TypeToken<HashMap<String, Integer>>() {}.getType());
assertTrue(result instanceof HashMap);
assertEquals(MEMO_VALUE_2, result.get(MEMO_KEY_2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.uber.m3.tally.Scope;
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -141,6 +142,11 @@ public Duration getWorkflowTaskTimeout() {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Payload getMemo(String key) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public SearchAttributes getSearchAttributes() {
throw new UnsupportedOperationException("not implemented");
Expand Down