From 2ad4b7ab47e1e76b5052311afa67a9be4b107571 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Wed, 18 Aug 2021 14:53:10 -0700 Subject: [PATCH 1/6] Added getMemo to Workflow --- .../replay/ReplayWorkflowContext.java | 8 ++ .../replay/ReplayWorkflowContextImpl.java | 6 ++ .../internal/replay/WorkflowContext.java | 4 + .../internal/sync/WorkflowInternal.java | 5 + .../java/io/temporal/workflow/Workflow.java | 5 + .../internal/testing/WorkflowTestingTest.java | 2 +- .../java/io/temporal/workflow/MemoTest.java | 91 ++++++++++++------- .../sync/DummySyncWorkflowContext.java | 6 ++ 8 files changed, 95 insertions(+), 32 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 587fd33566..9f0e96311c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -22,6 +22,7 @@ import com.uber.m3.tally.Scope; import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.SearchAttributes; import io.temporal.api.common.v1.WorkflowExecution; @@ -94,6 +95,13 @@ public interface ReplayWorkflowContext extends ReplayAware { Duration getWorkflowTaskTimeout(); + /** + * Used to retrieve the memo. + * + * @return Memo object + */ + Payload getMemo(String key); + /** * Used to retrieve search attributes. * diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 690d6c4433..62c9cc54aa 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -25,6 +25,7 @@ import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes; import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; import io.temporal.api.command.v1.StartTimerCommandAttributes; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.SearchAttributes; import io.temporal.api.common.v1.WorkflowExecution; @@ -191,6 +192,11 @@ public long getWorkflowExecutionExpirationTimestampMillis() { return workflowContext.getWorkflowExecutionExpirationTimestampMillis(); } + @Override + public Payload getMemo(String key) { + return workflowContext.getMemo(key); + } + @Override public SearchAttributes getSearchAttributes() { return workflowContext.getSearchAttributes(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowContext.java index 7613233fc8..91fbf37c89 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowContext.java @@ -149,6 +149,10 @@ public Map getHeader() { return startedAttributes.getHeader().getFieldsMap(); } + public Payload getMemo(String key) { + return startedAttributes.getMemo().getFieldsMap().get(key); + } + SearchAttributes getSearchAttributes() { return searchAttributes == null || searchAttributes.getIndexedFieldsCount() == 0 ? null diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 6dd5bf87b5..d34a3eecf0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -25,6 +25,7 @@ import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.RetryOptions; import io.temporal.common.converter.DataConverter; @@ -428,6 +429,10 @@ public static WorkflowInfo getWorkflowInfo() { return new WorkflowInfoImpl(getRootWorkflowContext().getContext()); } + public static Payload getMemo(String key) { + return getRootWorkflowContext().getContext().getMemo(key); + } + public static R retry( RetryOptions options, Optional expiration, Functions.Func fn) { return WorkflowRetryerInternal.validateOptionsAndRetry(options, expiration, fn); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 0a4cf87d4f..4f6ed19ece 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -22,6 +22,7 @@ import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.RetryOptions; import io.temporal.failure.ActivityFailure; @@ -615,6 +616,10 @@ public static WorkflowInfo getInfo() { return WorkflowInternal.getWorkflowInfo(); } + public static Payload getMemo(String key) { + return WorkflowInternal.getMemo(key); + } + /** * Wraps the Runnable method argument in a {@link CancellationScope}. The {@link * CancellationScope#run()} calls {@link Runnable#run()} on the wrapped Runnable. The returned diff --git a/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java b/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java index 3b57cb47da..536aa5e039 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java @@ -204,7 +204,7 @@ public void testActivitySimulatedTimeout() { TestWorkflow1 workflow = client.newWorkflowStub(TestWorkflow1.class, options); try { workflow.execute("input1"); - fail("unreacheable"); + fail("unreachable"); } catch (WorkflowException e) { assertTrue(e.getCause() instanceof ActivityFailure); TimeoutFailure te = (TimeoutFailure) e.getCause().getCause(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java index 98902a3ccc..5c5f888f4e 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java @@ -19,8 +19,12 @@ package io.temporal.workflow; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; +import com.uber.m3.util.ImmutableMap; import io.temporal.api.common.v1.Memo; import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; @@ -34,50 +38,75 @@ import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl; import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc; -import java.util.HashMap; +import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow; import java.util.Map; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; public class MemoTest { + private static final String MEMO_KEY = "testKey"; + private static final String MEMO_VALUE = "testValue"; + private static final Map MEMO = ImmutableMap.of(MEMO_KEY, MEMO_VALUE); + @Rule public SDKTestWorkflowRule testWorkflowRule = - SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestMultiArgWorkflowImpl.class).build(); + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestMultiArgWorkflowImpl.class, WorkflowImpl.class) + .build(); @Test public void testMemo() { - if (testWorkflowRule.getTestEnvironment() != null) { - String testMemoKey = "testKey"; - String testMemoValue = "testValue"; - Map memo = new HashMap(); - memo.put(testMemoKey, testMemoValue); + if (testWorkflowRule.getTestEnvironment() == null) { + return; + } + + WorkflowOptions workflowOptions = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) + .toBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(MEMO) + .build(); + + TestNoArgsWorkflowFunc stubF = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions); + WorkflowExecution executionF = WorkflowClient.start(stubF::func); - WorkflowOptions workflowOptions = - SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) - .toBuilder() - .setMemo(memo) - .build(); - TestNoArgsWorkflowFunc stubF = - testWorkflowRule - .getWorkflowClient() - .newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions); - WorkflowExecution executionF = WorkflowClient.start(stubF::func); + GetWorkflowExecutionHistoryResponse historyResp = + WorkflowExecutionUtils.getHistoryPage( + testWorkflowRule.getTestEnvironment().getWorkflowService(), + SDKTestWorkflowRule.NAMESPACE, + executionF, + ByteString.EMPTY, + new NoopScope()); + HistoryEvent startEvent = historyResp.getHistory().getEvents(0); + Memo memoFromEvent = startEvent.getWorkflowExecutionStartedEventAttributes().getMemo(); + Payload memoBytes = memoFromEvent.getFieldsMap().get(MEMO_KEY); + String memoRetrieved = + GsonJsonPayloadConverter.getInstance().fromData(memoBytes, String.class, String.class); + assertEquals(MEMO_VALUE, memoRetrieved); + } + + @Test + public void testMemoInWorkflow() { + WorkflowOptions workflowOptions = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) + .toBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setMemo(MEMO) + .build(); + + NoArgsWorkflow workflow = + testWorkflowRule.getWorkflowClient().newWorkflowStub(NoArgsWorkflow.class, workflowOptions); + workflow.execute(); + } - GetWorkflowExecutionHistoryResponse historyResp = - WorkflowExecutionUtils.getHistoryPage( - testWorkflowRule.getTestEnvironment().getWorkflowService(), - SDKTestWorkflowRule.NAMESPACE, - executionF, - ByteString.EMPTY, - new NoopScope()); - HistoryEvent startEvent = historyResp.getHistory().getEvents(0); - Memo memoFromEvent = startEvent.getWorkflowExecutionStartedEventAttributes().getMemo(); - Payload memoBytes = memoFromEvent.getFieldsMap().get(testMemoKey); - String memoRetrieved = - GsonJsonPayloadConverter.getInstance().fromData(memoBytes, String.class, String.class); - Assert.assertEquals(testMemoValue, memoRetrieved); + public static class WorkflowImpl implements NoArgsWorkflow { + @Override + public void execute() { + assertNotNull(Workflow.getMemo(MEMO_KEY)); } } } diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 5e0c7d1daa..2a56ad2ca5 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -23,6 +23,7 @@ import com.uber.m3.tally.Scope; import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.SearchAttributes; import io.temporal.api.common.v1.WorkflowExecution; @@ -141,6 +142,11 @@ public Duration getWorkflowTaskTimeout() { throw new UnsupportedOperationException("not implemented"); } + @Override + public Payload getMemo(String key) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public SearchAttributes getSearchAttributes() { throw new UnsupportedOperationException("not implemented"); From e5d3824ab198f5708a97bc2d552091a4da218b2d Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 19 Aug 2021 12:21:26 -0700 Subject: [PATCH 2/6] Memo getter returns object --- .../internal/sync/WorkflowInternal.java | 5 +++-- .../java/io/temporal/workflow/Workflow.java | 9 +++++--- .../java/io/temporal/workflow/MemoTest.java | 21 ++++++++++++++++--- 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index d34a3eecf0..4d8fdcabdb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -429,8 +429,9 @@ public static WorkflowInfo getWorkflowInfo() { return new WorkflowInfoImpl(getRootWorkflowContext().getContext()); } - public static Payload getMemo(String key) { - return getRootWorkflowContext().getContext().getMemo(key); + public static T getMemo(String key, Class valueClass, Type valueType) { + Payload memo = getRootWorkflowContext().getContext().getMemo(key); + return DataConverter.getDefaultInstance().fromPayload(memo, valueClass, valueType); } public static R retry( diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 4f6ed19ece..d861453fe2 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -22,7 +22,6 @@ import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; -import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.RetryOptions; import io.temporal.failure.ActivityFailure; @@ -616,8 +615,12 @@ public static WorkflowInfo getInfo() { return WorkflowInternal.getWorkflowInfo(); } - public static Payload getMemo(String key) { - return WorkflowInternal.getMemo(key); + public static Object getMemo(String key, Type valueType) { + return getMemo(key, valueType.getClass(), valueType); + } + + public static T getMemo(String key, Class valueClass, Type valueType) { + return WorkflowInternal.getMemo(key, valueClass, valueType); } /** diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java index 5c5f888f4e..6d347f866e 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java @@ -20,7 +20,7 @@ package io.temporal.workflow; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; @@ -39,6 +39,7 @@ import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl; import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc; import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow; +import java.util.HashMap; import java.util.Map; import org.junit.Rule; import org.junit.Test; @@ -47,7 +48,18 @@ public class MemoTest { private static final String MEMO_KEY = "testKey"; private static final String MEMO_VALUE = "testValue"; - private static final Map MEMO = ImmutableMap.of(MEMO_KEY, MEMO_VALUE); + private static final String MEMO_KEY_2 = "testKey2"; + private static final Integer MEMO_VALUE_2 = 1; + private static final Map MEMO = + ImmutableMap.of( + MEMO_KEY, + MEMO_VALUE, + MEMO_KEY_2, + new HashMap() { + { + put(MEMO_KEY_2, MEMO_VALUE_2); + } + }); @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -106,7 +118,10 @@ public void testMemoInWorkflow() { public static class WorkflowImpl implements NoArgsWorkflow { @Override public void execute() { - assertNotNull(Workflow.getMemo(MEMO_KEY)); + assertEquals(MEMO_VALUE, Workflow.getMemo(MEMO_KEY, String.class)); + Map result = Workflow.getMemo(MEMO_KEY_2, Map.class, HashMap.class); + assertTrue(result instanceof HashMap); + assertEquals(MEMO_VALUE_2, result.get(MEMO_KEY_2)); } } } From c2f3e5254c36899b2a93e227bedd3ae5ffe598b4 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 19 Aug 2021 13:26:22 -0700 Subject: [PATCH 3/6] Mixed up class and type --- .../src/main/java/io/temporal/workflow/Workflow.java | 4 ++-- .../src/test/java/io/temporal/workflow/MemoTest.java | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index d861453fe2..13cdb57951 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -615,8 +615,8 @@ public static WorkflowInfo getInfo() { return WorkflowInternal.getWorkflowInfo(); } - public static Object getMemo(String key, Type valueType) { - return getMemo(key, valueType.getClass(), valueType); + public static Object getMemo(String key, Class valueClass) { + return getMemo(key, valueClass, valueClass.getGenericSuperclass()); } public static T getMemo(String key, Class valueClass, Type valueType) { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java index 6d347f866e..c1c58879e6 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.common.reflect.TypeToken; import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; import com.uber.m3.util.ImmutableMap; @@ -119,7 +120,9 @@ public static class WorkflowImpl implements NoArgsWorkflow { @Override public void execute() { assertEquals(MEMO_VALUE, Workflow.getMemo(MEMO_KEY, String.class)); - Map result = Workflow.getMemo(MEMO_KEY_2, Map.class, HashMap.class); + Map result = + Workflow.getMemo( + MEMO_KEY_2, Map.class, new TypeToken>() {}.getType()); assertTrue(result instanceof HashMap); assertEquals(MEMO_VALUE_2, result.get(MEMO_KEY_2)); } From 444cb78fb00348adcbce4ff8510fd35bc71a68e9 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 19 Aug 2021 12:21:26 -0700 Subject: [PATCH 4/6] Memo getter returns object --- .../internal/sync/WorkflowInternal.java | 5 ++- .../java/io/temporal/workflow/Workflow.java | 45 ++++++++++--------- .../java/io/temporal/workflow/MemoTest.java | 24 ++++++++-- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index d34a3eecf0..4d8fdcabdb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -429,8 +429,9 @@ public static WorkflowInfo getWorkflowInfo() { return new WorkflowInfoImpl(getRootWorkflowContext().getContext()); } - public static Payload getMemo(String key) { - return getRootWorkflowContext().getContext().getMemo(key); + public static T getMemo(String key, Class valueClass, Type valueType) { + Payload memo = getRootWorkflowContext().getContext().getMemo(key); + return DataConverter.getDefaultInstance().fromPayload(memo, valueClass, valueType); } public static R retry( diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 4f6ed19ece..c594b09f5e 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -22,7 +22,6 @@ import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; -import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.RetryOptions; import io.temporal.failure.ActivityFailure; @@ -335,7 +334,7 @@ *
    *
  • Do not use any mutable global variables because multiple instances of workflows are * executed in parallel. - *
  • Do not call any non deterministic functions like non seeded random or {@link + *
  • Do not call any non-deterministic functions like non seeded random or {@link * UUID#randomUUID()} directly form the workflow code. Always do this in activities. *
  • Don’t perform any IO or service calls as they are not usually deterministic. Use activities * for this. @@ -616,8 +615,12 @@ public static WorkflowInfo getInfo() { return WorkflowInternal.getWorkflowInfo(); } - public static Payload getMemo(String key) { - return WorkflowInternal.getMemo(key); + public static Object getMemo(String key, Class valueClass) { + return getMemo(key, valueClass, valueClass.getGenericSuperclass()); + } + + public static T getMemo(String key, Class valueClass, Type valueType) { + return WorkflowInternal.getMemo(key, valueClass, valueType); } /** @@ -630,12 +633,11 @@ public static Payload getMemo(String key) { * canceled and a {@link Promise} that contains their result will throw {@link CanceledFailure} * when {@link Promise#get()} is called. * - *

    The new cancellation scope is linked to the parent one (available as {@link - * CancellationScope#current()}. If the parent one is canceled then all the children scopes are - * canceled automatically. The main workflow function (annotated with @{@link WorkflowMethod} is - * wrapped within a root cancellation scope which gets canceled when a workflow is canceled - * through the Temporal CancelWorkflowExecution API. To perform cleanup operations that require - * blocking after the current scope is canceled use a scope created through {@link + *

    The new cancellation scope {@link CancellationScope#current()} is linked to the parent one. + * If the parent one is canceled then all the children scopes are wrapped within a root + * cancellation scope which gets canceled when a workflow is canceled through the Temporal + * CancelWorkflowExecution API. To perform cleanup operations that require blocking after the + * current scope is canceled use a scope created through {@link * #newDetachedCancellationScope(Runnable)}. * *

    Example of running activities in parallel and cancelling them after a specified timeout. @@ -692,10 +694,10 @@ public static CancellationScope newCancellationScope(Functions.Proc1 * try { @@ -845,8 +847,7 @@ public static R retry( /** * Invokes function retrying in case of failures according to retry options. Synchronous variant. - * Use {@link Async#retry(RetryOptions, Optional, Functions.Func)} (RetryOptions, Optional, Func)} - * for asynchronous functions. + * Use {@link Async#retry(RetryOptions, Optional, Functions.Func)} for asynchronous functions. * * @param options retry options that specify retry policy * @param expiration if specified stop retrying after this interval @@ -900,7 +901,7 @@ public static RuntimeException wrap(Exception e) { * Replay safe way to generate UUID. * *

    Must be used instead of {@link UUID#randomUUID()} which relies on a random generator, thus - * leads to non deterministic code which is prohibited inside a workflow. + * leads to non-deterministic code which is prohibited inside a workflow. */ public static UUID randomUUID() { return WorkflowInternal.randomUUID(); @@ -950,7 +951,7 @@ public static boolean isReplaying() { * * * On replay the provided function is not executed, the random will always be 0, and the workflow - * could takes a different path breaking the determinism. + * could take a different path breaking the determinism. * *

    Here is the correct way to use sideEffect: * @@ -1004,7 +1005,7 @@ public static R sideEffect(Class resultClass, Func func) { * * * On replay the provided function is not executed, the random will always be 0, and the workflow - * could takes a different path breaking the determinism. + * could take a different path breaking the determinism. * *

    Here is the correct way to use sideEffect: * @@ -1057,7 +1058,7 @@ public static R sideEffect(Class resultClass, Type resultType, Func fu * if call to updated with stored and a new value as arguments returns true. It is not called * for the first value. * @param resultClass class of the side effect - * @param func function that produces a value. This function can contain non deterministic code. + * @param func function that produces a value. This function can contain non-deterministic code. * @see #sideEffect(Class, Functions.Func) */ public static R mutableSideEffect( @@ -1092,7 +1093,7 @@ public static R mutableSideEffect( * for the first value. * @param resultClass class of the side effect * @param resultType type of the side effect. Differs from resultClass for generic types. - * @param func function that produces a value. This function can contain non deterministic code. + * @param func function that produces a value. This function can contain non-deterministic code. * @see #sideEffect(Class, Functions.Func) */ public static R mutableSideEffect( @@ -1193,7 +1194,7 @@ public static int getVersion(String changeId, int minSupported, int maxSupported /** * Get scope for reporting business metrics in workflow logic. This should be used instead of - * creating new metrics scopes as it is able to dedup metrics during replay. + * creating new metrics scopes as it is able to dedupe metrics during replay. * *

    The original metrics scope is set through {@link WorkerOptions} when a worker starts up. */ diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java index 5c5f888f4e..c1c58879e6 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java @@ -20,8 +20,9 @@ package io.temporal.workflow; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import com.google.common.reflect.TypeToken; import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; import com.uber.m3.util.ImmutableMap; @@ -39,6 +40,7 @@ import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl; import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc; import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow; +import java.util.HashMap; import java.util.Map; import org.junit.Rule; import org.junit.Test; @@ -47,7 +49,18 @@ public class MemoTest { private static final String MEMO_KEY = "testKey"; private static final String MEMO_VALUE = "testValue"; - private static final Map MEMO = ImmutableMap.of(MEMO_KEY, MEMO_VALUE); + private static final String MEMO_KEY_2 = "testKey2"; + private static final Integer MEMO_VALUE_2 = 1; + private static final Map MEMO = + ImmutableMap.of( + MEMO_KEY, + MEMO_VALUE, + MEMO_KEY_2, + new HashMap() { + { + put(MEMO_KEY_2, MEMO_VALUE_2); + } + }); @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -106,7 +119,12 @@ public void testMemoInWorkflow() { public static class WorkflowImpl implements NoArgsWorkflow { @Override public void execute() { - assertNotNull(Workflow.getMemo(MEMO_KEY)); + assertEquals(MEMO_VALUE, Workflow.getMemo(MEMO_KEY, String.class)); + Map result = + Workflow.getMemo( + MEMO_KEY_2, Map.class, new TypeToken>() {}.getType()); + assertTrue(result instanceof HashMap); + assertEquals(MEMO_VALUE_2, result.get(MEMO_KEY_2)); } } } From 11e3afc2ad95e5bd4e03ea0391984193d185fc47 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Tue, 24 Aug 2021 23:33:40 -0700 Subject: [PATCH 5/6] Adde javadocs --- .../internal/sync/WorkflowInternal.java | 2 +- .../java/io/temporal/workflow/Workflow.java | 26 ++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 4d8fdcabdb..3ad3d11e2c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -431,7 +431,7 @@ public static WorkflowInfo getWorkflowInfo() { public static T getMemo(String key, Class valueClass, Type valueType) { Payload memo = getRootWorkflowContext().getContext().getMemo(key); - return DataConverter.getDefaultInstance().fromPayload(memo, valueClass, valueType); + return getDataConverter().fromPayload(memo, valueClass, valueType); } public static R retry( diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index c594b09f5e..a4dd506609 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -24,6 +24,7 @@ import io.temporal.activity.LocalActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.RetryOptions; +import io.temporal.common.converter.DataConverter; import io.temporal.failure.ActivityFailure; import io.temporal.failure.CanceledFailure; import io.temporal.failure.ChildWorkflowFailure; @@ -615,12 +616,31 @@ public static WorkflowInfo getInfo() { return WorkflowInternal.getWorkflowInfo(); } + /** + * Extract deserialized Memo associated with given key + * + * @param key memo key + * @param valueClass Java class to deserialize into + * @return deserialized Memo + */ public static Object getMemo(String key, Class valueClass) { - return getMemo(key, valueClass, valueClass.getGenericSuperclass()); + return getMemo(key, valueClass, valueClass); } - public static T getMemo(String key, Class valueClass, Type valueType) { - return WorkflowInternal.getMemo(key, valueClass, valueType); + /** + * Extract Memo associated with the given key and deserialized into an object of generic type as + * is done here: {@link DataConverter#fromPayloads(int, java.util.Optional, java.lang.Class, + * java.lang.reflect.Type)} Ex: To deserialize into HashMap + * Workflow.getMemo(key, Map.class, new TypeToken>() {}.getType()); + * + * + * @param key memo key + * @param valueClass Java class to deserialize into + * @param genericType type parameter for the generic class + * @return deserialized Memo + */ + public static T getMemo(String key, Class valueClass, Type genericType) { + return WorkflowInternal.getMemo(key, valueClass, genericType); } /** From 50906b0d0635347a68d0fc8dfff8a6f1867794e3 Mon Sep 17 00:00:00 2001 From: Vera Kobylchak Date: Thu, 26 Aug 2021 13:03:29 -0700 Subject: [PATCH 6/6] Minor changes --- .../temporal/internal/replay/ReplayWorkflowContext.java | 5 ----- .../main/java/io/temporal/internal/worker/Poller.java | 2 +- .../src/test/java/io/temporal/workflow/MemoTest.java | 2 +- .../java/io/temporal/serviceclient/RpcRetryOptions.java | 9 +++------ 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 9f0e96311c..1cf2d15a0a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -95,11 +95,6 @@ public interface ReplayWorkflowContext extends ReplayAware { Duration getWorkflowTaskTimeout(); - /** - * Used to retrieve the memo. - * - * @return Memo object - */ Payload getMemo(String key); /** diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java index 1f4a103cad..ee96a8ae52 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java @@ -22,9 +22,9 @@ import com.uber.m3.tally.Scope; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.temporal.internal.BackoffThrottler; import io.temporal.internal.common.InternalUtils; import io.temporal.internal.metrics.MetricsType; -import io.temporal.internal.BackoffThrottler; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java index c1c58879e6..a5be2570a1 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MemoTest.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableMap; import com.google.common.reflect.TypeToken; import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; -import com.uber.m3.util.ImmutableMap; import io.temporal.api.common.v1.Memo; import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.WorkflowExecution; diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java index 3c307bcc0b..9cea612885 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/RpcRetryOptions.java @@ -233,16 +233,13 @@ public RpcRetryOptions validateBuildWithDefaults() { backoff = DefaultServiceOperationRpcRetryOptions.BACKOFF; } if (initialInterval == null || initialInterval.isZero() || initialInterval.isNegative()) { - initialInterval = - DefaultServiceOperationRpcRetryOptions.INITIAL_INTERVAL; + initialInterval = DefaultServiceOperationRpcRetryOptions.INITIAL_INTERVAL; } if (expiration == null || expiration.isZero() || expiration.isNegative()) { - expiration = - DefaultServiceOperationRpcRetryOptions.EXPIRATION_INTERVAL; + expiration = DefaultServiceOperationRpcRetryOptions.EXPIRATION_INTERVAL; } if (maximumInterval == null || maximumInterval.isZero() || maximumInterval.isNegative()) { - maximumInterval = - DefaultServiceOperationRpcRetryOptions.MAXIMUM_INTERVAL; + maximumInterval = DefaultServiceOperationRpcRetryOptions.MAXIMUM_INTERVAL; } if (doNotRetry == null || doNotRetry.size() == 0) { doNotRetry = DefaultServiceOperationRpcRetryOptions.INSTANCE.doNotRetry;