diff --git a/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs b/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs
index d21c2f16..5f3a3bb0 100644
--- a/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs
+++ b/src/Temporalio/Client/Interceptors/StartWorkflowUpdateInput.cs
@@ -11,7 +11,7 @@ namespace Temporalio.Client.Interceptors
/// Workflow first execution run ID if any.
/// Update name.
/// Update arguments.
- /// Options if any.
+ /// Options.
/// Headers if any. These will be encoded using the codec before sent
/// to the server.
///
@@ -24,6 +24,6 @@ public record StartWorkflowUpdateInput(
string? FirstExecutionRunId,
string Update,
IReadOnlyCollection Args,
- WorkflowUpdateOptions? Options,
+ WorkflowUpdateStartOptions Options,
IDictionary? Headers);
}
\ No newline at end of file
diff --git a/src/Temporalio/Client/TemporalClient.Workflow.cs b/src/Temporalio/Client/TemporalClient.Workflow.cs
index 03c3dc82..0eed44dd 100644
--- a/src/Temporalio/Client/TemporalClient.Workflow.cs
+++ b/src/Temporalio/Client/TemporalClient.Workflow.cs
@@ -229,6 +229,15 @@ await Client.Options.DataConverter.ToPayloadsAsync(
public async override Task> StartWorkflowUpdateAsync(
StartWorkflowUpdateInput input)
{
+ if (input.Options.WaitForStage == WorkflowUpdateStage.None)
+ {
+ throw new ArgumentException("WaitForStage is required to start workflow update");
+ }
+ else if (input.Options.WaitForStage == WorkflowUpdateStage.Admitted)
+ {
+ throw new ArgumentException(
+ "Admitted is not an allowed wait stage to start workflow update");
+ }
// Build request
var req = new UpdateWorkflowExecutionRequest()
{
@@ -242,23 +251,17 @@ public async override Task> StartWorkflowUpdateAsy
{
Meta = new()
{
- UpdateId = input.Options?.UpdateID ?? Guid.NewGuid().ToString(),
+ UpdateId = input.Options.Id ?? Guid.NewGuid().ToString(),
Identity = Client.Connection.Options.Identity,
},
Input = new() { Name = input.Update },
},
WaitPolicy = new()
{
- // Default is Accepted, but may be overridden later
- LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Accepted,
+ LifecycleStage = (UpdateWorkflowExecutionLifecycleStage)input.Options.WaitForStage,
},
FirstExecutionRunId = input.FirstExecutionRunId ?? string.Empty,
};
- if (input.Options is { } options &&
- options.WaitForStage != UpdateWorkflowExecutionLifecycleStage.Unspecified)
- {
- req.WaitPolicy.LifecycleStage = options.WaitForStage;
- }
if (input.Args.Count > 0)
{
req.Request.Input.Args = new Payloads();
@@ -280,15 +283,28 @@ await Client.Options.DataConverter.ToPayloadsAsync(
}
}
}
- // Invoke
- var resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
- req, DefaultRetryOptions(input.Options?.Rpc)).ConfigureAwait(false);
- // Build handle for result
- return new(Client, req.Request.Meta.UpdateId, input.Id, input.RunId)
+
+ // Continually try to start until the user-asked stage is reached or the stage is
+ // accepted
+ UpdateWorkflowExecutionResponse resp;
+ do
{
- // Put outcome on the handle (may be null)
- KnownOutcome = resp.Outcome,
- };
+ resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
+ req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
+ }
+ while (resp.Stage < req.WaitPolicy.LifecycleStage &&
+ resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted);
+
+ // If the requested stage is completed, wait for result, but discard the update
+ // exception, that will come when _they_ call get result
+ var handle = new WorkflowUpdateHandle(
+ Client, req.Request.Meta.UpdateId, input.Id, input.RunId)
+ { KnownOutcome = resp.Outcome };
+ if (input.Options.WaitForStage == WorkflowUpdateStage.Completed)
+ {
+ await handle.PollUntilOutcomeAsync(input.Options.Rpc).ConfigureAwait(false);
+ }
+ return handle;
}
///
diff --git a/src/Temporalio/Client/WorkflowHandle.cs b/src/Temporalio/Client/WorkflowHandle.cs
index 4ada53d6..db43d30d 100644
--- a/src/Temporalio/Client/WorkflowHandle.cs
+++ b/src/Temporalio/Client/WorkflowHandle.cs
@@ -307,11 +307,11 @@ public Task QueryAsync(
///
/// Workflow class type.
/// Invocation of workflow update method.
- /// Extra options.
+ /// Update options. Currently WaitForStage is required.
/// Workflow update handle.
/// WARNING: Workflow update is experimental and APIs may change.
public Task StartUpdateAsync(
- Expression> updateCall, WorkflowUpdateOptions? options = null)
+ Expression> updateCall, WorkflowUpdateStartOptions options)
{
var (method, args) = ExpressionUtil.ExtractCall(updateCall);
return StartUpdateAsync(
@@ -326,12 +326,12 @@ public Task StartUpdateAsync(
/// Workflow class type.
/// Update result type.
/// Invocation of workflow update method.
- /// Extra options.
+ /// Update options. Currently WaitForStage is required.
/// Workflow update handle.
/// WARNING: Workflow update is experimental and APIs may change.
public Task> StartUpdateAsync(
Expression>> updateCall,
- WorkflowUpdateOptions? options = null)
+ WorkflowUpdateStartOptions options)
{
var (method, args) = ExpressionUtil.ExtractCall(updateCall);
return StartUpdateAsync(
@@ -345,11 +345,11 @@ public Task> StartUpdateAsync
/// Name of the update.
/// Arguments for the update.
- /// Extra options.
+ /// Update options. Currently WaitForStage is required.
/// Workflow update handle.
/// WARNING: Workflow update is experimental and APIs may change.
public Task StartUpdateAsync(
- string update, IReadOnlyCollection args, WorkflowUpdateOptions? options = null) =>
+ string update, IReadOnlyCollection args, WorkflowUpdateStartOptions options) =>
StartUpdateAsync(update, args, options).ContinueWith(
t => t.Result, TaskScheduler.Current);
@@ -359,11 +359,11 @@ public Task StartUpdateAsync(
/// Update result type.
/// Name of the update.
/// Arguments for the update.
- /// Extra options.
+ /// Update options. Currently WaitForStage is required.
/// Workflow update handle.
/// WARNING: Workflow update is experimental and APIs may change.
public Task> StartUpdateAsync(
- string update, IReadOnlyCollection args, WorkflowUpdateOptions? options = null) =>
+ string update, IReadOnlyCollection args, WorkflowUpdateStartOptions options) =>
Client.OutboundInterceptor.StartWorkflowUpdateAsync(new(
Id: Id,
RunId: RunId,
@@ -375,7 +375,7 @@ public Task> StartUpdateAsync
///
/// Start an update and wait for it to complete. This is a shortcut for
- ///
+ ///
/// +
/// .
///
@@ -398,7 +398,7 @@ public async Task ExecuteUpdateAsync(
///
/// Start an update and wait for it to complete. This is a shortcut for
- ///
+ ///
/// +
/// .
///
@@ -421,7 +421,7 @@ public async Task ExecuteUpdateAsync(
///
/// Start an update and wait for it to complete. This is a shortcut for
- ///
+ ///
/// +
/// .
///
@@ -444,7 +444,7 @@ public async Task ExecuteUpdateAsync(
///
/// Start an update and wait for it to complete. This is a shortcut for
- ///
+ ///
/// +
/// .
///
@@ -603,14 +603,14 @@ private async IAsyncEnumerable FetchHistoryEventsInternalAsync(
///
/// Options to use as base.
/// New options.
- private protected static WorkflowUpdateOptions UpdateOptionsWithDefaultsForExecute(
- WorkflowUpdateOptions? options)
- {
- var newOptions = options == null ? new() : (WorkflowUpdateOptions)options.Clone();
- // Force override the wait for stage to completed
- newOptions.WaitForStage = UpdateWorkflowExecutionLifecycleStage.Completed;
- return newOptions;
- }
+ private protected static WorkflowUpdateStartOptions UpdateOptionsWithDefaultsForExecute(
+ WorkflowUpdateOptions? options) =>
+ (WorkflowUpdateStartOptions)new WorkflowUpdateStartOptions()
+ {
+ Id = options?.Id,
+ Rpc = options?.Rpc,
+ WaitForStage = WorkflowUpdateStage.Completed,
+ }.Clone();
}
///
@@ -678,11 +678,11 @@ public Task QueryAsync(
/// Start a workflow update via a call to a WorkflowUpdate attributed method.
///
/// Invocation of workflow update method.
- /// Extra options.
+ /// Update options. Currently WaitForStage is required.
/// Workflow update handle.
/// WARNING: Workflow update is experimental and APIs may change.
public Task StartUpdateAsync(
- Expression> updateCall, WorkflowUpdateOptions? options = null) =>
+ Expression> updateCall, WorkflowUpdateStartOptions options) =>
StartUpdateAsync(updateCall, options);
///
@@ -690,17 +690,17 @@ public Task StartUpdateAsync(
///
/// Update result type.
/// Invocation of workflow update method.
- /// Extra options.
+ /// Update options. Currently WaitForStage is required.
/// Workflow update handle.
/// WARNING: Workflow update is experimental and APIs may change.
public Task> StartUpdateAsync(
Expression>> updateCall,
- WorkflowUpdateOptions? options = null) =>
+ WorkflowUpdateStartOptions options) =>
StartUpdateAsync(updateCall, options);
///
/// Start an update and wait for it to complete. This is a shortcut for
- ///
+ ///
/// +
/// .
///
@@ -721,7 +721,7 @@ public async Task ExecuteUpdateAsync(
///
/// Start an update and wait for it to complete. This is a shortcut for
- ///
+ ///
/// +
/// .
///
diff --git a/src/Temporalio/Client/WorkflowUpdateHandle.cs b/src/Temporalio/Client/WorkflowUpdateHandle.cs
index c8d7e00b..bac124c9 100644
--- a/src/Temporalio/Client/WorkflowUpdateHandle.cs
+++ b/src/Temporalio/Client/WorkflowUpdateHandle.cs
@@ -23,9 +23,9 @@ public record WorkflowUpdateHandle(
string? WorkflowRunId = null)
{
///
- /// Gets the known outcome.
+ /// Gets or sets the known outcome.
///
- internal Outcome? KnownOutcome { private get; init; }
+ internal Outcome? KnownOutcome { get; set; }
///
/// Wait for an update result disregarding any return value.
@@ -45,55 +45,15 @@ public Task GetResultAsync(RpcOptions? rpcOptions = null) =>
/// WARNING: Workflow update is experimental and APIs may change.
public virtual async Task GetResultAsync(RpcOptions? rpcOptions = null)
{
- // If there is not a known outcome, we must poll for one. We intentionally do not
- // memoize poll result, so each call to this function when a known outcome is not
- // present results in a poll call.
- var outcome = KnownOutcome;
- if (outcome == null)
- {
- // No known outcome means poll
- var req = new PollWorkflowExecutionUpdateRequest()
- {
- Namespace = Client.Options.Namespace,
- UpdateRef = new()
- {
- WorkflowExecution = new()
- {
- WorkflowId = WorkflowId,
- RunId = WorkflowRunId ?? string.Empty,
- },
- UpdateId = Id,
- },
- Identity = Client.Connection.Options.Identity,
- WaitPolicy = new() { LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Completed },
- };
- // Continually retry to poll while we either get empty response or while we get a gRPC
- // deadline exceeded but our cancellation token isn't complete.
- while (outcome == null)
- {
- try
- {
- var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync(
- req, rpcOptions).ConfigureAwait(false);
- outcome = resp.Outcome;
- }
- catch (RpcException e) when (
- e.Code == RpcException.StatusCode.DeadlineExceeded &&
- rpcOptions?.CancellationToken?.IsCancellationRequested != true)
- {
- // Do nothing, our cancellation token wasn't done, continue
- // TODO(cretz): Remove when server stops using gRPC status to signal not done yet
- }
- }
- }
+ await PollUntilOutcomeAsync(rpcOptions).ConfigureAwait(false);
// Convert outcome to result
- if (outcome.Failure is { } failure)
+ if (KnownOutcome!.Failure is { } failure)
{
throw new WorkflowUpdateFailedException(
await Client.Options.DataConverter.ToExceptionAsync(failure).ConfigureAwait(false));
}
- else if (outcome.Success is { } success)
+ else if (KnownOutcome.Success is { } success)
{
// Ignore return if they didn't want it
if (typeof(TResult) == typeof(ValueTuple))
@@ -103,7 +63,53 @@ public virtual async Task GetResultAsync(RpcOptions? rpcOption
return await Client.Options.DataConverter.ToSingleValueAsync(
success.Payloads_).ConfigureAwait(false);
}
- throw new InvalidOperationException($"Unrecognized outcome case: {outcome.ValueCase}");
+ throw new InvalidOperationException($"Unrecognized outcome case: {KnownOutcome.ValueCase}");
+ }
+
+ ///
+ /// Poll until a memoized outcome is set on this handle.
+ ///
+ /// RPC options.
+ /// A representing the result of the asynchronous operation.
+ internal async Task PollUntilOutcomeAsync(RpcOptions? rpcOptions = null)
+ {
+ // If there is not a known outcome, we must poll for one. We intentionally do not lock
+ // while obtaining the outcome. In the case of concurrent get-result calls, they will
+ // poll independently and first one can set the known outcome (setting properties in
+ // .NET is thread safe).
+ if (KnownOutcome != null)
+ {
+ return;
+ }
+ // No known outcome means poll
+ var req = new PollWorkflowExecutionUpdateRequest()
+ {
+ Namespace = Client.Options.Namespace,
+ UpdateRef = new()
+ {
+ WorkflowExecution = new()
+ {
+ WorkflowId = WorkflowId,
+ RunId = WorkflowRunId ?? string.Empty,
+ },
+ UpdateId = Id,
+ },
+ Identity = Client.Connection.Options.Identity,
+ WaitPolicy = new() { LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Completed },
+ };
+ // Continually retry to poll while we get an empty response
+ while (KnownOutcome == null)
+ {
+ var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync(
+ req, rpcOptions).ConfigureAwait(false);
+#pragma warning disable CA1508
+ // .NET incorrectly assumes KnownOutcome cannot be null here because they assume a
+ // single thread. We accept there is technically a race condition here since this is
+ // not an atomic CAS operation, but outcome is the same server side for the same
+ // update.
+ KnownOutcome ??= resp.Outcome;
+#pragma warning restore CA1508
+ }
}
}
diff --git a/src/Temporalio/Client/WorkflowUpdateOptions.cs b/src/Temporalio/Client/WorkflowUpdateOptions.cs
index f3bb793a..c5854491 100644
--- a/src/Temporalio/Client/WorkflowUpdateOptions.cs
+++ b/src/Temporalio/Client/WorkflowUpdateOptions.cs
@@ -1,30 +1,37 @@
using System;
-using Temporalio.Api.Enums.V1;
namespace Temporalio.Client
{
///
- /// Options for starting an update on a .
+ /// Options for executing an update on a .
///
/// WARNING: Workflow update is experimental and APIs may change.
public class WorkflowUpdateOptions : ICloneable
{
///
- /// Gets or sets the unique identifier for the update. This is optional and is defaulted to
- /// a GUID if not set. This must be unique within the scope of a workflow execution (i.e.
- /// namespace + workflow ID + run ID).
+ /// Initializes a new instance of the class.
///
- public string? UpdateID { get; set; }
+ public WorkflowUpdateOptions()
+ {
+ }
///
- /// Gets or sets RPC options for starting the workflow.
+ /// Initializes a new instance of the class.
///
- public RpcOptions? Rpc { get; set; }
+ /// Update ID.
+ public WorkflowUpdateOptions(string id) => Id = id;
///
- /// Gets or sets the stage to wait for on start. Internal only.
+ /// Gets or sets the unique update identifier. If not set, this is defaulted to a GUID.
+ /// This must be unique within the scope of a workflow execution (i.e. namespace +
+ /// workflow ID + run ID).
///
- internal UpdateWorkflowExecutionLifecycleStage WaitForStage { get; set; }
+ public string? Id { get; set; }
+
+ ///
+ /// Gets or sets RPC options for starting the workflow.
+ ///
+ public RpcOptions? Rpc { get; set; }
///
/// Create a shallow copy of these options.
diff --git a/src/Temporalio/Client/WorkflowUpdateStage.cs b/src/Temporalio/Client/WorkflowUpdateStage.cs
new file mode 100644
index 00000000..8c8dcc99
--- /dev/null
+++ b/src/Temporalio/Client/WorkflowUpdateStage.cs
@@ -0,0 +1,34 @@
+using Temporalio.Api.Enums.V1;
+
+namespace Temporalio.Client
+{
+ ///
+ /// Stage that an update can reach. This is used when starting an update to set the stage to
+ /// wait for before returning.
+ ///
+ public enum WorkflowUpdateStage
+ {
+ ///
+ /// Unset stage. This is an invalid value on start.
+ ///
+ None = UpdateWorkflowExecutionLifecycleStage.Unspecified,
+
+ ///
+ /// Admitted stage. This stage is reached when the server receives the update to process.
+ /// This is currently an invalid value on start.
+ ///
+ Admitted = UpdateWorkflowExecutionLifecycleStage.Admitted,
+
+ ///
+ /// Accepted stage. This stage is reached when a workflow has received the update and either
+ /// accepted (i.e. it has passed validation) or rejected it.
+ ///
+ Accepted = UpdateWorkflowExecutionLifecycleStage.Accepted,
+
+ ///
+ /// Completed stage. This stage is reached when a workflow has completed processing the
+ /// update with either a success or failure.
+ ///
+ Completed = UpdateWorkflowExecutionLifecycleStage.Completed,
+ }
+}
\ No newline at end of file
diff --git a/src/Temporalio/Client/WorkflowUpdateStartOptions.cs b/src/Temporalio/Client/WorkflowUpdateStartOptions.cs
new file mode 100644
index 00000000..d7d9546f
--- /dev/null
+++ b/src/Temporalio/Client/WorkflowUpdateStartOptions.cs
@@ -0,0 +1,37 @@
+namespace Temporalio.Client
+{
+ ///
+ /// Options for starting an update on a .
+ ///
+ /// WARNING: Workflow update is experimental and APIs may change.
+ public class WorkflowUpdateStartOptions : WorkflowUpdateOptions
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public WorkflowUpdateStartOptions()
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Stage to wait for.
+ public WorkflowUpdateStartOptions(WorkflowUpdateStage waitForStage) =>
+ WaitForStage = waitForStage;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// Update ID.
+ /// Stage to wait for.
+ public WorkflowUpdateStartOptions(string id, WorkflowUpdateStage waitForStage)
+ : base(id) => WaitForStage = waitForStage;
+
+ ///
+ /// Gets or sets the stage to wait for on start. This is required and cannot be set to
+ /// None or Admitted at this time.
+ ///
+ public WorkflowUpdateStage WaitForStage { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
index 18c66cdb..9d409e45 100644
--- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
+++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
@@ -3678,31 +3678,42 @@ await ExecuteWorkerAsync(async worker =>
// Make all possible overload calls via start then get response
await (await ((WorkflowHandle)handle).StartUpdateAsync(
- (UpdateWorkflow wf) => wf.DoUpdateNoParamNoResponseAsync())).GetResultAsync();
+ (UpdateWorkflow wf) => wf.DoUpdateNoParamNoResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync();
Assert.Equal(
$"no-param-response: {handle.Id}",
await (await ((WorkflowHandle)handle).StartUpdateAsync(
- (UpdateWorkflow wf) => wf.DoUpdateNoParamResponseAsync())).GetResultAsync());
+ (UpdateWorkflow wf) => wf.DoUpdateNoParamResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync());
await (await ((WorkflowHandle)handle).StartUpdateAsync(
- (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"))).GetResultAsync();
+ (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync();
await (await ((WorkflowHandle)handle).StartUpdateAsync(
- (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"))).GetResultAsync();
+ (UpdateWorkflow wf) => wf.DoUpdateOneParamNoResponseAsync("some-param"),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync();
Assert.Equal(
"one-param-response: some-param",
await (await ((WorkflowHandle)handle).StartUpdateAsync(
- (UpdateWorkflow wf) => wf.DoUpdateOneParamResponseAsync("some-param"))).GetResultAsync());
+ (UpdateWorkflow wf) => wf.DoUpdateOneParamResponseAsync("some-param"),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync());
await (await handle.StartUpdateAsync(
- "some-update-name", new[] { "some-param" })).GetResultAsync();
+ "some-update-name",
+ new[] { "some-param" },
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync();
Assert.Equal(
"one-param-response: some-param",
await (await handle.StartUpdateAsync(
- "DoUpdateOneParamResponse", new[] { "some-param" })).GetResultAsync());
+ "DoUpdateOneParamResponse",
+ new[] { "some-param" },
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync());
await (await handle.StartUpdateAsync(
- wf => wf.DoUpdateNoParamNoResponseAsync())).GetResultAsync();
+ wf => wf.DoUpdateNoParamNoResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync();
Assert.Equal(
$"no-param-response: {handle.Id}",
await (await handle.StartUpdateAsync(
- wf => wf.DoUpdateNoParamResponseAsync())).GetResultAsync());
+ wf => wf.DoUpdateNoParamResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).GetResultAsync());
// Make all possible overload calls via execute
await ((WorkflowHandle)handle).ExecuteUpdateAsync(
@@ -3734,15 +3745,18 @@ await handle.ExecuteUpdateAsync(
// Make updates, then get handles manually, then get response
await handle.GetUpdateHandle((await handle.StartUpdateAsync(
- wf => wf.DoUpdateNoParamNoResponseAsync())).Id).GetResultAsync();
+ wf => wf.DoUpdateNoParamNoResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).Id).GetResultAsync();
Assert.Equal(
$"no-param-response: {handle.Id}",
await handle.GetUpdateHandle((await handle.StartUpdateAsync(
- wf => wf.DoUpdateNoParamResponseAsync())).Id).GetResultAsync());
+ wf => wf.DoUpdateNoParamResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).Id).GetResultAsync());
Assert.Equal(
$"no-param-response: {handle.Id}",
await handle.GetUpdateHandle((await handle.StartUpdateAsync(
- wf => wf.DoUpdateNoParamResponseAsync())).Id).GetResultAsync());
+ wf => wf.DoUpdateNoParamResponseAsync(),
+ new(WorkflowUpdateStage.Accepted))).Id).GetResultAsync());
});
}
@@ -3811,7 +3825,8 @@ await ExecuteWorkerAsync(async worker =>
// Update invalid operation after accepted - fails workflow task
await handle.StartUpdateAsync(
- wf => wf.DoUpdateOneParamNoResponseAsync("update-invalid-operation-new-task"));
+ wf => wf.DoUpdateOneParamNoResponseAsync("update-invalid-operation-new-task"),
+ new(WorkflowUpdateStage.Accepted));
await AssertTaskFailureContainsEventuallyAsync(handle, "Intentional update invalid operation");
// Terminate the handle so it doesn't keep failing
await handle.TerminateAsync();
@@ -3833,7 +3848,8 @@ await AssertTaskFailureContainsEventuallyAsync(
(UpdateWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await handle.StartUpdateAsync(
- wf => wf.DoUpdateOneParamNoResponseAsync("update-continue-as-new"));
+ wf => wf.DoUpdateOneParamNoResponseAsync("update-continue-as-new"),
+ new(WorkflowUpdateStage.Accepted));
await AssertTaskFailureContainsEventuallyAsync(handle, "Continue as new");
await handle.TerminateAsync();
@@ -3865,11 +3881,9 @@ await ExecuteWorkerAsync(async worker =>
// Run an update with an ID
var result1 = await handle.ExecuteUpdateAsync(
- wf => wf.DoUpdateOneParamResponseAsync("first-param"),
- new() { UpdateID = "my-update-id" });
+ wf => wf.DoUpdateOneParamResponseAsync("first-param"), new(id: "my-update-id"));
var result2 = await handle.ExecuteUpdateAsync(
- wf => wf.DoUpdateOneParamResponseAsync("second-param"),
- new() { UpdateID = "my-update-id" });
+ wf => wf.DoUpdateOneParamResponseAsync("second-param"), new(id: "my-update-id"));
// Confirm that the first result is the same as the second without running (i.e. doesn't
// return second-param)
Assert.Equal("one-param-response: first-param", result1);
@@ -3886,7 +3900,8 @@ await ExecuteWorkerAsync(async worker =>
var handle = await Env.Client.StartWorkflowAsync(
(UpdateWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
- var updateHandle = await handle.StartUpdateAsync(wf => wf.DoUpdateLongWaitAsync());
+ var updateHandle = await handle.StartUpdateAsync(
+ wf => wf.DoUpdateLongWaitAsync(), new(WorkflowUpdateStage.Accepted));
// Ask for the result but only for 1 second
using var tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(TimeSpan.FromSeconds(1));
@@ -3980,6 +3995,80 @@ await ExecuteWorkerAsync(async worker =>
});
}
+ [Workflow]
+ public class ImmediatelyCompleteUpdateAndWorkflow
+ {
+ [WorkflowRun]
+ public async Task RunAsync() => "workflow-done";
+
+ [WorkflowQuery]
+ public string GotUpdate { get; set; } = "no";
+
+ [WorkflowUpdate]
+ public async Task UpdateAsync()
+ {
+ GotUpdate = "yes";
+ return "update-done";
+ }
+ }
+
+ [Fact]
+ public async Task ExecuteWorkflowAsync_Updates_BeforeWorkflowStart()
+ {
+ // In order to confirm that all started workflows get updates before the
+ // workflow completes, this test will start a workflow and start an
+ // update. Only then will it start the worker to process both in the
+ // task. The workflow and update should both succeed properly. This also
+ // invokes a query to confirm update mutation. We do this with the cache
+ // off to confirm replay behavior.
+
+ // Start workflow
+ var taskQueue = $"tq-{Guid.NewGuid()}";
+ var handle = await Client.StartWorkflowAsync(
+ (ImmediatelyCompleteUpdateAndWorkflow wf) => wf.RunAsync(),
+ new(id: $"wf-{Guid.NewGuid()}", taskQueue));
+
+ // Execute update in the background
+ var updateId = $"upd-{Guid.NewGuid()}";
+ var updateTask = Task.Run(() =>
+ handle.ExecuteUpdateAsync(wf => wf.UpdateAsync(), new(updateId)));
+
+ // Wait until server says it is admitted
+ await AssertMore.EventuallyAsync(async () =>
+ {
+ try
+ {
+ var resp = await Client.Connection.WorkflowService.PollWorkflowExecutionUpdateAsync(new()
+ {
+ Identity = Client.Connection.Options.Identity,
+ Namespace = Client.Options.Namespace,
+ UpdateRef = new()
+ {
+ UpdateId = updateId,
+ WorkflowExecution = new() { WorkflowId = handle.Id },
+ },
+ });
+ Assert.Equal(UpdateWorkflowExecutionLifecycleStage.Admitted, resp.Stage);
+ }
+ catch (Exception e)
+ {
+ // Throw XUnit exception so it doesn't fail this eventually call
+ throw new Xunit.Sdk.XunitException("Failed polling", e);
+ }
+ });
+
+ // Start no-cache worker on the task queue
+ await ExecuteWorkerAsync(
+ async worker =>
+ {
+ // Confirm things completed as expected
+ Assert.Equal("workflow-done", await handle.GetResultAsync());
+ Assert.Equal("update-done", await updateTask);
+ Assert.Equal("yes", await handle.QueryAsync(wf => wf.GotUpdate));
+ },
+ new(taskQueue) { MaxCachedWorkflows = 0 });
+ }
+
[Workflow]
public class CurrentBuildIdWorkflow
{
@@ -4145,7 +4234,7 @@ Task AssertScenario(
// Don't care about handle, we'll re-attach later
await handle.StartUpdateAsync(
wf => wf.UpdateAsync(updateScenarioNotNull),
- new() { UpdateID = "my-update-1" });
+ new(id: "my-update-1", waitForStage: WorkflowUpdateStage.Accepted));
}
// Expect a task or exception fail