From 618e878b7d56e8683334dacf760772531e5633f4 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 21 May 2024 14:24:41 -0500 Subject: [PATCH] Required wait update stage and polling improvements (#251) Fixes #198 Fixes #199 Fixes #231 --- .../Interceptors/StartWorkflowUpdateInput.cs | 4 +- .../Client/TemporalClient.Workflow.cs | 48 ++++--- src/Temporalio/Client/WorkflowHandle.cs | 52 +++---- src/Temporalio/Client/WorkflowUpdateHandle.cs | 98 ++++++------- .../Client/WorkflowUpdateOptions.cs | 27 ++-- src/Temporalio/Client/WorkflowUpdateStage.cs | 34 +++++ .../Client/WorkflowUpdateStartOptions.cs | 37 +++++ .../Worker/WorkflowWorkerTests.cs | 129 +++++++++++++++--- 8 files changed, 309 insertions(+), 120 deletions(-) create mode 100644 src/Temporalio/Client/WorkflowUpdateStage.cs create mode 100644 src/Temporalio/Client/WorkflowUpdateStartOptions.cs diff --git a/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs b/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs index d21c2f16..5f3a3bb0 100644 --- a/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs +++ b/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs @@ -11,7 +11,7 @@ namespace Temporalio.Client.Interceptors /// Workflow first execution run ID if any. /// Update name. /// Update arguments. - /// Options if any. + /// Options. /// Headers if any. These will be encoded using the codec before sent /// to the server. /// @@ -24,6 +24,6 @@ public record StartWorkflowUpdateInput( string? FirstExecutionRunId, string Update, IReadOnlyCollection Args, - WorkflowUpdateOptions? Options, + WorkflowUpdateStartOptions Options, IDictionary? Headers); } \ No newline at end of file diff --git a/src/Temporalio/Client/TemporalClient.Workflow.cs b/src/Temporalio/Client/TemporalClient.Workflow.cs index 03c3dc82..0eed44dd 100644 --- a/src/Temporalio/Client/TemporalClient.Workflow.cs +++ b/src/Temporalio/Client/TemporalClient.Workflow.cs @@ -229,6 +229,15 @@ await Client.Options.DataConverter.ToPayloadsAsync( public async override Task> StartWorkflowUpdateAsync( StartWorkflowUpdateInput input) { + if (input.Options.WaitForStage == WorkflowUpdateStage.None) + { + throw new ArgumentException("WaitForStage is required to start workflow update"); + } + else if (input.Options.WaitForStage == WorkflowUpdateStage.Admitted) + { + throw new ArgumentException( + "Admitted is not an allowed wait stage to start workflow update"); + } // Build request var req = new UpdateWorkflowExecutionRequest() { @@ -242,23 +251,17 @@ public async override Task> StartWorkflowUpdateAsy { Meta = new() { - UpdateId = input.Options?.UpdateID ?? Guid.NewGuid().ToString(), + UpdateId = input.Options.Id ?? Guid.NewGuid().ToString(), Identity = Client.Connection.Options.Identity, }, Input = new() { Name = input.Update }, }, WaitPolicy = new() { - // Default is Accepted, but may be overridden later - LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Accepted, + LifecycleStage = (UpdateWorkflowExecutionLifecycleStage)input.Options.WaitForStage, }, FirstExecutionRunId = input.FirstExecutionRunId ?? string.Empty, }; - if (input.Options is { } options && - options.WaitForStage != UpdateWorkflowExecutionLifecycleStage.Unspecified) - { - req.WaitPolicy.LifecycleStage = options.WaitForStage; - } if (input.Args.Count > 0) { req.Request.Input.Args = new Payloads(); @@ -280,15 +283,28 @@ await Client.Options.DataConverter.ToPayloadsAsync( } } } - // Invoke - var resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync( - req, DefaultRetryOptions(input.Options?.Rpc)).ConfigureAwait(false); - // Build handle for result - return new(Client, req.Request.Meta.UpdateId, input.Id, input.RunId) + + // Continually try to start until the user-asked stage is reached or the stage is + // accepted + UpdateWorkflowExecutionResponse resp; + do { - // Put outcome on the handle (may be null) - KnownOutcome = resp.Outcome, - }; + resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync( + req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false); + } + while (resp.Stage < req.WaitPolicy.LifecycleStage && + resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted); + + // If the requested stage is completed, wait for result, but discard the update + // exception, that will come when _they_ call get result + var handle = new WorkflowUpdateHandle( + Client, req.Request.Meta.UpdateId, input.Id, input.RunId) + { KnownOutcome = resp.Outcome }; + if (input.Options.WaitForStage == WorkflowUpdateStage.Completed) + { + await handle.PollUntilOutcomeAsync(input.Options.Rpc).ConfigureAwait(false); + } + return handle; } /// diff --git a/src/Temporalio/Client/WorkflowHandle.cs b/src/Temporalio/Client/WorkflowHandle.cs index 4ada53d6..db43d30d 100644 --- a/src/Temporalio/Client/WorkflowHandle.cs +++ b/src/Temporalio/Client/WorkflowHandle.cs @@ -307,11 +307,11 @@ public Task QueryAsync( /// /// Workflow class type. /// Invocation of workflow update method. - /// Extra options. + /// Update options. Currently WaitForStage is required. /// Workflow update handle. /// WARNING: Workflow update is experimental and APIs may change. public Task StartUpdateAsync( - Expression> updateCall, WorkflowUpdateOptions? options = null) + Expression> updateCall, WorkflowUpdateStartOptions options) { var (method, args) = ExpressionUtil.ExtractCall(updateCall); return StartUpdateAsync( @@ -326,12 +326,12 @@ public Task StartUpdateAsync( /// Workflow class type. /// Update result type. /// Invocation of workflow update method. - /// Extra options. + /// Update options. Currently WaitForStage is required. /// Workflow update handle. /// WARNING: Workflow update is experimental and APIs may change. public Task> StartUpdateAsync( Expression>> updateCall, - WorkflowUpdateOptions? options = null) + WorkflowUpdateStartOptions options) { var (method, args) = ExpressionUtil.ExtractCall(updateCall); return StartUpdateAsync( @@ -345,11 +345,11 @@ public Task> StartUpdateAsync /// Name of the update. /// Arguments for the update. - /// Extra options. + /// Update options. Currently WaitForStage is required. /// Workflow update handle. /// WARNING: Workflow update is experimental and APIs may change. public Task StartUpdateAsync( - string update, IReadOnlyCollection args, WorkflowUpdateOptions? options = null) => + string update, IReadOnlyCollection args, WorkflowUpdateStartOptions options) => StartUpdateAsync(update, args, options).ContinueWith( t => t.Result, TaskScheduler.Current); @@ -359,11 +359,11 @@ public Task StartUpdateAsync( /// Update result type. /// Name of the update. /// Arguments for the update. - /// Extra options. + /// Update options. Currently WaitForStage is required. /// Workflow update handle. /// WARNING: Workflow update is experimental and APIs may change. public Task> StartUpdateAsync( - string update, IReadOnlyCollection args, WorkflowUpdateOptions? options = null) => + string update, IReadOnlyCollection args, WorkflowUpdateStartOptions options) => Client.OutboundInterceptor.StartWorkflowUpdateAsync(new( Id: Id, RunId: RunId, @@ -375,7 +375,7 @@ public Task> StartUpdateAsync /// /// Start an update and wait for it to complete. This is a shortcut for - /// + /// /// + /// . /// @@ -398,7 +398,7 @@ public async Task ExecuteUpdateAsync( /// /// Start an update and wait for it to complete. This is a shortcut for - /// + /// /// + /// . /// @@ -421,7 +421,7 @@ public async Task ExecuteUpdateAsync( /// /// Start an update and wait for it to complete. This is a shortcut for - /// + /// /// + /// . /// @@ -444,7 +444,7 @@ public async Task ExecuteUpdateAsync( /// /// Start an update and wait for it to complete. This is a shortcut for - /// + /// /// + /// . /// @@ -603,14 +603,14 @@ private async IAsyncEnumerable FetchHistoryEventsInternalAsync( /// /// Options to use as base. /// New options. - private protected static WorkflowUpdateOptions UpdateOptionsWithDefaultsForExecute( - WorkflowUpdateOptions? options) - { - var newOptions = options == null ? new() : (WorkflowUpdateOptions)options.Clone(); - // Force override the wait for stage to completed - newOptions.WaitForStage = UpdateWorkflowExecutionLifecycleStage.Completed; - return newOptions; - } + private protected static WorkflowUpdateStartOptions UpdateOptionsWithDefaultsForExecute( + WorkflowUpdateOptions? options) => + (WorkflowUpdateStartOptions)new WorkflowUpdateStartOptions() + { + Id = options?.Id, + Rpc = options?.Rpc, + WaitForStage = WorkflowUpdateStage.Completed, + }.Clone(); } /// @@ -678,11 +678,11 @@ public Task QueryAsync( /// Start a workflow update via a call to a WorkflowUpdate attributed method. /// /// Invocation of workflow update method. - /// Extra options. + /// Update options. Currently WaitForStage is required. /// Workflow update handle. /// WARNING: Workflow update is experimental and APIs may change. public Task StartUpdateAsync( - Expression> updateCall, WorkflowUpdateOptions? options = null) => + Expression> updateCall, WorkflowUpdateStartOptions options) => StartUpdateAsync(updateCall, options); /// @@ -690,17 +690,17 @@ public Task StartUpdateAsync( /// /// Update result type. /// Invocation of workflow update method. - /// Extra options. + /// Update options. Currently WaitForStage is required. /// Workflow update handle. /// WARNING: Workflow update is experimental and APIs may change. public Task> StartUpdateAsync( Expression>> updateCall, - WorkflowUpdateOptions? options = null) => + WorkflowUpdateStartOptions options) => StartUpdateAsync(updateCall, options); /// /// Start an update and wait for it to complete. This is a shortcut for - /// + /// /// + /// . /// @@ -721,7 +721,7 @@ public async Task ExecuteUpdateAsync( /// /// Start an update and wait for it to complete. This is a shortcut for - /// + /// /// + /// . /// diff --git a/src/Temporalio/Client/WorkflowUpdateHandle.cs b/src/Temporalio/Client/WorkflowUpdateHandle.cs index c8d7e00b..bac124c9 100644 --- a/src/Temporalio/Client/WorkflowUpdateHandle.cs +++ b/src/Temporalio/Client/WorkflowUpdateHandle.cs @@ -23,9 +23,9 @@ public record WorkflowUpdateHandle( string? WorkflowRunId = null) { /// - /// Gets the known outcome. + /// Gets or sets the known outcome. /// - internal Outcome? KnownOutcome { private get; init; } + internal Outcome? KnownOutcome { get; set; } /// /// Wait for an update result disregarding any return value. @@ -45,55 +45,15 @@ public Task GetResultAsync(RpcOptions? rpcOptions = null) => /// WARNING: Workflow update is experimental and APIs may change. public virtual async Task GetResultAsync(RpcOptions? rpcOptions = null) { - // If there is not a known outcome, we must poll for one. We intentionally do not - // memoize poll result, so each call to this function when a known outcome is not - // present results in a poll call. - var outcome = KnownOutcome; - if (outcome == null) - { - // No known outcome means poll - var req = new PollWorkflowExecutionUpdateRequest() - { - Namespace = Client.Options.Namespace, - UpdateRef = new() - { - WorkflowExecution = new() - { - WorkflowId = WorkflowId, - RunId = WorkflowRunId ?? string.Empty, - }, - UpdateId = Id, - }, - Identity = Client.Connection.Options.Identity, - WaitPolicy = new() { LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Completed }, - }; - // Continually retry to poll while we either get empty response or while we get a gRPC - // deadline exceeded but our cancellation token isn't complete. - while (outcome == null) - { - try - { - var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync( - req, rpcOptions).ConfigureAwait(false); - outcome = resp.Outcome; - } - catch (RpcException e) when ( - e.Code == RpcException.StatusCode.DeadlineExceeded && - rpcOptions?.CancellationToken?.IsCancellationRequested != true) - { - // Do nothing, our cancellation token wasn't done, continue - // TODO(cretz): Remove when server stops using gRPC status to signal not done yet - } - } - } + await PollUntilOutcomeAsync(rpcOptions).ConfigureAwait(false); // Convert outcome to result - if (outcome.Failure is { } failure) + if (KnownOutcome!.Failure is { } failure) { throw new WorkflowUpdateFailedException( await Client.Options.DataConverter.ToExceptionAsync(failure).ConfigureAwait(false)); } - else if (outcome.Success is { } success) + else if (KnownOutcome.Success is { } success) { // Ignore return if they didn't want it if (typeof(TResult) == typeof(ValueTuple)) @@ -103,7 +63,53 @@ public virtual async Task GetResultAsync(RpcOptions? rpcOption return await Client.Options.DataConverter.ToSingleValueAsync( success.Payloads_).ConfigureAwait(false); } - throw new InvalidOperationException($"Unrecognized outcome case: {outcome.ValueCase}"); + throw new InvalidOperationException($"Unrecognized outcome case: {KnownOutcome.ValueCase}"); + } + + /// + /// Poll until a memoized outcome is set on this handle. + /// + /// RPC options. + /// A representing the result of the asynchronous operation. + internal async Task PollUntilOutcomeAsync(RpcOptions? rpcOptions = null) + { + // If there is not a known outcome, we must poll for one. We intentionally do not lock + // while obtaining the outcome. In the case of concurrent get-result calls, they will + // poll independently and first one can set the known outcome (setting properties in + // .NET is thread safe). + if (KnownOutcome != null) + { + return; + } + // No known outcome means poll + var req = new PollWorkflowExecutionUpdateRequest() + { + Namespace = Client.Options.Namespace, + UpdateRef = new() + { + WorkflowExecution = new() + { + WorkflowId = WorkflowId, + RunId = WorkflowRunId ?? string.Empty, + }, + UpdateId = Id, + }, + Identity = Client.Connection.Options.Identity, + WaitPolicy = new() { LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Completed }, + }; + // Continually retry to poll while we get an empty response + while (KnownOutcome == null) + { + var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync( + req, rpcOptions).ConfigureAwait(false); +#pragma warning disable CA1508 + // .NET incorrectly assumes KnownOutcome cannot be null here because they assume a + // single thread. We accept there is technically a race condition here since this is + // not an atomic CAS operation, but outcome is the same server side for the same + // update. + KnownOutcome ??= resp.Outcome; +#pragma warning restore CA1508 + } } } diff --git a/src/Temporalio/Client/WorkflowUpdateOptions.cs b/src/Temporalio/Client/WorkflowUpdateOptions.cs index f3bb793a..c5854491 100644 --- a/src/Temporalio/Client/WorkflowUpdateOptions.cs +++ b/src/Temporalio/Client/WorkflowUpdateOptions.cs @@ -1,30 +1,37 @@ using System; -using Temporalio.Api.Enums.V1; namespace Temporalio.Client { /// - /// Options for starting an update on a . + /// Options for executing an update on a . /// /// WARNING: Workflow update is experimental and APIs may change. public class WorkflowUpdateOptions : ICloneable { /// - /// Gets or sets the unique identifier for the update. This is optional and is defaulted to - /// a GUID if not set. This must be unique within the scope of a workflow execution (i.e. - /// namespace + workflow ID + run ID). + /// Initializes a new instance of the class. /// - public string? UpdateID { get; set; } + public WorkflowUpdateOptions() + { + } /// - /// Gets or sets RPC options for starting the workflow. + /// Initializes a new instance of the class. /// - public RpcOptions? Rpc { get; set; } + /// Update ID. + public WorkflowUpdateOptions(string id) => Id = id; /// - /// Gets or sets the stage to wait for on start. Internal only. + /// Gets or sets the unique update identifier. If not set, this is defaulted to a GUID. + /// This must be unique within the scope of a workflow execution (i.e. namespace + + /// workflow ID + run ID). /// - internal UpdateWorkflowExecutionLifecycleStage WaitForStage { get; set; } + public string? Id { get; set; } + + /// + /// Gets or sets RPC options for starting the workflow. + /// + public RpcOptions? Rpc { get; set; } /// /// Create a shallow copy of these options. diff --git a/src/Temporalio/Client/WorkflowUpdateStage.cs b/src/Temporalio/Client/WorkflowUpdateStage.cs new file mode 100644 index 00000000..8c8dcc99 --- /dev/null +++ b/src/Temporalio/Client/WorkflowUpdateStage.cs @@ -0,0 +1,34 @@ +using Temporalio.Api.Enums.V1; + +namespace Temporalio.Client +{ + /// + /// Stage that an update can reach. This is used when starting an update to set the stage to + /// wait for before returning. + /// + public enum WorkflowUpdateStage + { + /// + /// Unset stage. This is an invalid value on start. + /// + None = UpdateWorkflowExecutionLifecycleStage.Unspecified, + + /// + /// Admitted stage. This stage is reached when the server receives the update to process. + /// This is currently an invalid value on start. + /// + Admitted = UpdateWorkflowExecutionLifecycleStage.Admitted, + + /// + /// Accepted stage. This stage is reached when a workflow has received the update and either + /// accepted (i.e. it has passed validation) or rejected it. + /// + Accepted = UpdateWorkflowExecutionLifecycleStage.Accepted, + + /// + /// Completed stage. This stage is reached when a workflow has completed processing the + /// update with either a success or failure. + /// + Completed = UpdateWorkflowExecutionLifecycleStage.Completed, + } +} \ No newline at end of file diff --git a/src/Temporalio/Client/WorkflowUpdateStartOptions.cs b/src/Temporalio/Client/WorkflowUpdateStartOptions.cs new file mode 100644 index 00000000..d7d9546f --- /dev/null +++ b/src/Temporalio/Client/WorkflowUpdateStartOptions.cs @@ -0,0 +1,37 @@ +namespace Temporalio.Client +{ + /// + /// Options for starting an update on a . + /// + /// WARNING: Workflow update is experimental and APIs may change. + public class WorkflowUpdateStartOptions : WorkflowUpdateOptions + { + /// + /// Initializes a new instance of the class. + /// + public WorkflowUpdateStartOptions() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Stage to wait for. + public WorkflowUpdateStartOptions(WorkflowUpdateStage waitForStage) => + WaitForStage = waitForStage; + + /// + /// Initializes a new instance of the class. + /// + /// Update ID. + /// Stage to wait for. + public WorkflowUpdateStartOptions(string id, WorkflowUpdateStage waitForStage) + : base(id) => WaitForStage = waitForStage; + + /// + /// Gets or sets the stage to wait for on start. This is required and cannot be set to + /// None or Admitted at this time. + /// + public WorkflowUpdateStage WaitForStage { get; set; } + } +} \ No newline at end of file diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 18c66cdb..9d409e45 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -3678,31 +3678,42 @@ await ExecuteWorkerAsync(async worker => // Make all possible overload calls via start then get response await (await ((WorkflowHandle)handle).StartUpdateAsync( - (UpdateWorkflow wf) => wf.DoUpdateNoParamNoResponseAsync())).GetResultAsync(); + (UpdateWorkflow wf) => wf.DoUpdateNoParamNoResponseAsync(), + new(WorkflowUpdateStage.Accepted))).GetResultAsync(); Assert.Equal( $"no-param-response: {handle.Id}", await (await ((WorkflowHandle)handle).StartUpdateAsync( - (UpdateWorkflow wf) => wf.DoUpdateNoParamResponseAsync())).GetResultAsync()); + (UpdateWorkflow wf) => wf.DoUpdateNoParamResponseAsync(), + new(WorkflowUpdateStage.Accepted))).GetResultAsync()); await (await ((WorkflowHandle)handle).StartUpdateAsync( - (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"))).GetResultAsync(); + (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"), + new(WorkflowUpdateStage.Accepted))).GetResultAsync(); await (await ((WorkflowHandle)handle).StartUpdateAsync( - (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"))).GetResultAsync(); + (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"), + new(WorkflowUpdateStage.Accepted))).GetResultAsync(); Assert.Equal( "one-param-response: some-param", await (await ((WorkflowHandle)handle).StartUpdateAsync( - (UpdateWorkflow wf) => wf.DoUpdateOneParamResponseAsync("some-param"))).GetResultAsync()); + (UpdateWorkflow wf) => wf.DoUpdateOneParamResponseAsync("some-param"), + new(WorkflowUpdateStage.Accepted))).GetResultAsync()); await (await handle.StartUpdateAsync( - "some-update-name", new[] { "some-param" })).GetResultAsync(); + "some-update-name", + new[] { "some-param" }, + new(WorkflowUpdateStage.Accepted))).GetResultAsync(); Assert.Equal( "one-param-response: some-param", await (await handle.StartUpdateAsync( - "DoUpdateOneParamResponse", new[] { "some-param" })).GetResultAsync()); + "DoUpdateOneParamResponse", + new[] { "some-param" }, + new(WorkflowUpdateStage.Accepted))).GetResultAsync()); await (await handle.StartUpdateAsync( - wf => wf.DoUpdateNoParamNoResponseAsync())).GetResultAsync(); + wf => wf.DoUpdateNoParamNoResponseAsync(), + new(WorkflowUpdateStage.Accepted))).GetResultAsync(); Assert.Equal( $"no-param-response: {handle.Id}", await (await handle.StartUpdateAsync( - wf => wf.DoUpdateNoParamResponseAsync())).GetResultAsync()); + wf => wf.DoUpdateNoParamResponseAsync(), + new(WorkflowUpdateStage.Accepted))).GetResultAsync()); // Make all possible overload calls via execute await ((WorkflowHandle)handle).ExecuteUpdateAsync( @@ -3734,15 +3745,18 @@ await handle.ExecuteUpdateAsync( // Make updates, then get handles manually, then get response await handle.GetUpdateHandle((await handle.StartUpdateAsync( - wf => wf.DoUpdateNoParamNoResponseAsync())).Id).GetResultAsync(); + wf => wf.DoUpdateNoParamNoResponseAsync(), + new(WorkflowUpdateStage.Accepted))).Id).GetResultAsync(); Assert.Equal( $"no-param-response: {handle.Id}", await handle.GetUpdateHandle((await handle.StartUpdateAsync( - wf => wf.DoUpdateNoParamResponseAsync())).Id).GetResultAsync()); + wf => wf.DoUpdateNoParamResponseAsync(), + new(WorkflowUpdateStage.Accepted))).Id).GetResultAsync()); Assert.Equal( $"no-param-response: {handle.Id}", await handle.GetUpdateHandle((await handle.StartUpdateAsync( - wf => wf.DoUpdateNoParamResponseAsync())).Id).GetResultAsync()); + wf => wf.DoUpdateNoParamResponseAsync(), + new(WorkflowUpdateStage.Accepted))).Id).GetResultAsync()); }); } @@ -3811,7 +3825,8 @@ await ExecuteWorkerAsync(async worker => // Update invalid operation after accepted - fails workflow task await handle.StartUpdateAsync( - wf => wf.DoUpdateOneParamNoResponseAsync("update-invalid-operation-new-task")); + wf => wf.DoUpdateOneParamNoResponseAsync("update-invalid-operation-new-task"), + new(WorkflowUpdateStage.Accepted)); await AssertTaskFailureContainsEventuallyAsync(handle, "Intentional update invalid operation"); // Terminate the handle so it doesn't keep failing await handle.TerminateAsync(); @@ -3833,7 +3848,8 @@ await AssertTaskFailureContainsEventuallyAsync( (UpdateWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); await handle.StartUpdateAsync( - wf => wf.DoUpdateOneParamNoResponseAsync("update-continue-as-new")); + wf => wf.DoUpdateOneParamNoResponseAsync("update-continue-as-new"), + new(WorkflowUpdateStage.Accepted)); await AssertTaskFailureContainsEventuallyAsync(handle, "Continue as new"); await handle.TerminateAsync(); @@ -3865,11 +3881,9 @@ await ExecuteWorkerAsync(async worker => // Run an update with an ID var result1 = await handle.ExecuteUpdateAsync( - wf => wf.DoUpdateOneParamResponseAsync("first-param"), - new() { UpdateID = "my-update-id" }); + wf => wf.DoUpdateOneParamResponseAsync("first-param"), new(id: "my-update-id")); var result2 = await handle.ExecuteUpdateAsync( - wf => wf.DoUpdateOneParamResponseAsync("second-param"), - new() { UpdateID = "my-update-id" }); + wf => wf.DoUpdateOneParamResponseAsync("second-param"), new(id: "my-update-id")); // Confirm that the first result is the same as the second without running (i.e. doesn't // return second-param) Assert.Equal("one-param-response: first-param", result1); @@ -3886,7 +3900,8 @@ await ExecuteWorkerAsync(async worker => var handle = await Env.Client.StartWorkflowAsync( (UpdateWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); - var updateHandle = await handle.StartUpdateAsync(wf => wf.DoUpdateLongWaitAsync()); + var updateHandle = await handle.StartUpdateAsync( + wf => wf.DoUpdateLongWaitAsync(), new(WorkflowUpdateStage.Accepted)); // Ask for the result but only for 1 second using var tokenSource = new CancellationTokenSource(); tokenSource.CancelAfter(TimeSpan.FromSeconds(1)); @@ -3980,6 +3995,80 @@ await ExecuteWorkerAsync(async worker => }); } + [Workflow] + public class ImmediatelyCompleteUpdateAndWorkflow + { + [WorkflowRun] + public async Task RunAsync() => "workflow-done"; + + [WorkflowQuery] + public string GotUpdate { get; set; } = "no"; + + [WorkflowUpdate] + public async Task UpdateAsync() + { + GotUpdate = "yes"; + return "update-done"; + } + } + + [Fact] + public async Task ExecuteWorkflowAsync_Updates_BeforeWorkflowStart() + { + // In order to confirm that all started workflows get updates before the + // workflow completes, this test will start a workflow and start an + // update. Only then will it start the worker to process both in the + // task. The workflow and update should both succeed properly. This also + // invokes a query to confirm update mutation. We do this with the cache + // off to confirm replay behavior. + + // Start workflow + var taskQueue = $"tq-{Guid.NewGuid()}"; + var handle = await Client.StartWorkflowAsync( + (ImmediatelyCompleteUpdateAndWorkflow wf) => wf.RunAsync(), + new(id: $"wf-{Guid.NewGuid()}", taskQueue)); + + // Execute update in the background + var updateId = $"upd-{Guid.NewGuid()}"; + var updateTask = Task.Run(() => + handle.ExecuteUpdateAsync(wf => wf.UpdateAsync(), new(updateId))); + + // Wait until server says it is admitted + await AssertMore.EventuallyAsync(async () => + { + try + { + var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync(new() + { + Identity = Client.Connection.Options.Identity, + Namespace = Client.Options.Namespace, + UpdateRef = new() + { + UpdateId = updateId, + WorkflowExecution = new() { WorkflowId = handle.Id }, + }, + }); + Assert.Equal(UpdateWorkflowExecutionLifecycleStage.Admitted, resp.Stage); + } + catch (Exception e) + { + // Throw XUnit exception so it doesn't fail this eventually call + throw new Xunit.Sdk.XunitException("Failed polling", e); + } + }); + + // Start no-cache worker on the task queue + await ExecuteWorkerAsync( + async worker => + { + // Confirm things completed as expected + Assert.Equal("workflow-done", await handle.GetResultAsync()); + Assert.Equal("update-done", await updateTask); + Assert.Equal("yes", await handle.QueryAsync(wf => wf.GotUpdate)); + }, + new(taskQueue) { MaxCachedWorkflows = 0 }); + } + [Workflow] public class CurrentBuildIdWorkflow { @@ -4145,7 +4234,7 @@ Task AssertScenario( // Don't care about handle, we'll re-attach later await handle.StartUpdateAsync( wf => wf.UpdateAsync(updateScenarioNotNull), - new() { UpdateID = "my-update-1" }); + new(id: "my-update-1", waitForStage: WorkflowUpdateStage.Accepted)); } // Expect a task or exception fail