Skip to content

Commit

Permalink
Required wait update stage and polling improvements (#251)
Browse files Browse the repository at this point in the history
Fixes #198
Fixes #199
Fixes #231
  • Loading branch information
cretz committed May 21, 2024
1 parent 48fac0c commit 618e878
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Temporalio.Client.Interceptors
/// <param name="FirstExecutionRunId">Workflow first execution run ID if any.</param>
/// <param name="Update">Update name.</param>
/// <param name="Args">Update arguments.</param>
/// <param name="Options">Options if any.</param>
/// <param name="Options">Options.</param>
/// <param name="Headers">Headers if any. These will be encoded using the codec before sent
/// to the server.</param>
/// <remarks>
Expand All @@ -24,6 +24,6 @@ public record StartWorkflowUpdateInput(
string? FirstExecutionRunId,
string Update,
IReadOnlyCollection<object?> Args,
WorkflowUpdateOptions? Options,
WorkflowUpdateStartOptions Options,
IDictionary<string, Payload>? Headers);
}
48 changes: 32 additions & 16 deletions src/Temporalio/Client/TemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ await Client.Options.DataConverter.ToPayloadsAsync(
public async override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsync<TResult>(
StartWorkflowUpdateInput input)
{
if (input.Options.WaitForStage == WorkflowUpdateStage.None)
{
throw new ArgumentException("WaitForStage is required to start workflow update");
}
else if (input.Options.WaitForStage == WorkflowUpdateStage.Admitted)
{
throw new ArgumentException(
"Admitted is not an allowed wait stage to start workflow update");
}
// Build request
var req = new UpdateWorkflowExecutionRequest()
{
Expand All @@ -242,23 +251,17 @@ public async override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsy
{
Meta = new()
{
UpdateId = input.Options?.UpdateID ?? Guid.NewGuid().ToString(),
UpdateId = input.Options.Id ?? Guid.NewGuid().ToString(),
Identity = Client.Connection.Options.Identity,
},
Input = new() { Name = input.Update },
},
WaitPolicy = new()
{
// Default is Accepted, but may be overridden later
LifecycleStage = UpdateWorkflowExecutionLifecycleStage.Accepted,
LifecycleStage = (UpdateWorkflowExecutionLifecycleStage)input.Options.WaitForStage,
},
FirstExecutionRunId = input.FirstExecutionRunId ?? string.Empty,
};
if (input.Options is { } options &&
options.WaitForStage != UpdateWorkflowExecutionLifecycleStage.Unspecified)
{
req.WaitPolicy.LifecycleStage = options.WaitForStage;
}
if (input.Args.Count > 0)
{
req.Request.Input.Args = new Payloads();
Expand All @@ -280,15 +283,28 @@ await Client.Options.DataConverter.ToPayloadsAsync(
}
}
}
// Invoke
var resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options?.Rpc)).ConfigureAwait(false);
// Build handle for result
return new(Client, req.Request.Meta.UpdateId, input.Id, input.RunId)

// Continually try to start until the user-asked stage is reached or the stage is
// accepted
UpdateWorkflowExecutionResponse resp;
do
{
// Put outcome on the handle (may be null)
KnownOutcome = resp.Outcome,
};
resp = await Client.Connection.WorkflowService.UpdateWorkflowExecutionAsync(
req, DefaultRetryOptions(input.Options.Rpc)).ConfigureAwait(false);
}
while (resp.Stage < req.WaitPolicy.LifecycleStage &&
resp.Stage < UpdateWorkflowExecutionLifecycleStage.Accepted);

// If the requested stage is completed, wait for result, but discard the update
// exception, that will come when _they_ call get result
var handle = new WorkflowUpdateHandle<TResult>(
Client, req.Request.Meta.UpdateId, input.Id, input.RunId)
{ KnownOutcome = resp.Outcome };
if (input.Options.WaitForStage == WorkflowUpdateStage.Completed)
{
await handle.PollUntilOutcomeAsync(input.Options.Rpc).ConfigureAwait(false);
}
return handle;
}

/// <inheritdoc />
Expand Down
52 changes: 26 additions & 26 deletions src/Temporalio/Client/WorkflowHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,11 @@ public Task<TQueryResult> QueryAsync<TQueryResult>(
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync<TWorkflow>(
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateOptions? options = null)
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateStartOptions options)
{
var (method, args) = ExpressionUtil.ExtractCall(updateCall);
return StartUpdateAsync(
Expand All @@ -326,12 +326,12 @@ public Task<WorkflowUpdateHandle> StartUpdateAsync<TWorkflow>(
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TWorkflow, TUpdateResult>(
Expression<Func<TWorkflow, Task<TUpdateResult>>> updateCall,
WorkflowUpdateOptions? options = null)
WorkflowUpdateStartOptions options)
{
var (method, args) = ExpressionUtil.ExtractCall(updateCall);
return StartUpdateAsync<TUpdateResult>(
Expand All @@ -345,11 +345,11 @@ public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TWorkflow, TUp
/// </summary>
/// <param name="update">Name of the update.</param>
/// <param name="args">Arguments for the update.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync(
string update, IReadOnlyCollection<object?> args, WorkflowUpdateOptions? options = null) =>
string update, IReadOnlyCollection<object?> args, WorkflowUpdateStartOptions options) =>
StartUpdateAsync<ValueTuple>(update, args, options).ContinueWith<WorkflowUpdateHandle>(
t => t.Result, TaskScheduler.Current);

Expand All @@ -359,11 +359,11 @@ public Task<WorkflowUpdateHandle> StartUpdateAsync(
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="update">Name of the update.</param>
/// <param name="args">Arguments for the update.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TUpdateResult>(
string update, IReadOnlyCollection<object?> args, WorkflowUpdateOptions? options = null) =>
string update, IReadOnlyCollection<object?> args, WorkflowUpdateStartOptions options) =>
Client.OutboundInterceptor.StartWorkflowUpdateAsync<TUpdateResult>(new(
Id: Id,
RunId: RunId,
Expand All @@ -375,7 +375,7 @@ public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TUpdateResult>

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TWorkflow}(Expression{Func{TWorkflow, Task}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TWorkflow}(Expression{Func{TWorkflow, Task}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -398,7 +398,7 @@ public async Task ExecuteUpdateAsync<TWorkflow>(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TWorkflow, TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TWorkflow, TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle{TResult}.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -421,7 +421,7 @@ public async Task<TUpdateResult> ExecuteUpdateAsync<TWorkflow, TUpdateResult>(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync(string, IReadOnlyCollection{object?}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync(string, IReadOnlyCollection{object?}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -444,7 +444,7 @@ public async Task ExecuteUpdateAsync(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TUpdateResult}(string, IReadOnlyCollection{object?}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TUpdateResult}(string, IReadOnlyCollection{object?}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand Down Expand Up @@ -603,14 +603,14 @@ private async IAsyncEnumerable<HistoryEvent> FetchHistoryEventsInternalAsync(
/// </summary>
/// <param name="options">Options to use as base.</param>
/// <returns>New options.</returns>
private protected static WorkflowUpdateOptions UpdateOptionsWithDefaultsForExecute(
WorkflowUpdateOptions? options)
{
var newOptions = options == null ? new() : (WorkflowUpdateOptions)options.Clone();
// Force override the wait for stage to completed
newOptions.WaitForStage = UpdateWorkflowExecutionLifecycleStage.Completed;
return newOptions;
}
private protected static WorkflowUpdateStartOptions UpdateOptionsWithDefaultsForExecute(
WorkflowUpdateOptions? options) =>
(WorkflowUpdateStartOptions)new WorkflowUpdateStartOptions()
{
Id = options?.Id,
Rpc = options?.Rpc,
WaitForStage = WorkflowUpdateStage.Completed,
}.Clone();
}

/// <summary>
Expand Down Expand Up @@ -678,29 +678,29 @@ public Task<TQueryResult> QueryAsync<TQueryResult>(
/// Start a workflow update via a call to a WorkflowUpdate attributed method.
/// </summary>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle> StartUpdateAsync(
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateOptions? options = null) =>
Expression<Func<TWorkflow, Task>> updateCall, WorkflowUpdateStartOptions options) =>
StartUpdateAsync<TWorkflow>(updateCall, options);

/// <summary>
/// Start a workflow update via a call to a WorkflowUpdate attributed method.
/// </summary>
/// <typeparam name="TUpdateResult">Update result type.</typeparam>
/// <param name="updateCall">Invocation of workflow update method.</param>
/// <param name="options">Extra options.</param>
/// <param name="options">Update options. Currently <c>WaitForStage</c> is required.</param>
/// <returns>Workflow update handle.</returns>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public Task<WorkflowUpdateHandle<TUpdateResult>> StartUpdateAsync<TUpdateResult>(
Expression<Func<TWorkflow, Task<TUpdateResult>>> updateCall,
WorkflowUpdateOptions? options = null) =>
WorkflowUpdateStartOptions options) =>
StartUpdateAsync<TWorkflow, TUpdateResult>(updateCall, options);

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync(Expression{Func{TWorkflow, Task}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync(Expression{Func{TWorkflow, Task}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand All @@ -721,7 +721,7 @@ public async Task ExecuteUpdateAsync(

/// <summary>
/// Start an update and wait for it to complete. This is a shortcut for
/// <see cref="StartUpdateAsync{TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateOptions?)" />
/// <see cref="StartUpdateAsync{TUpdateResult}(Expression{Func{TWorkflow, Task{TUpdateResult}}}, WorkflowUpdateStartOptions)" />
/// +
/// <see cref="WorkflowUpdateHandle{TResult}.GetResultAsync(RpcOptions?)" />.
/// </summary>
Expand Down
Loading

0 comments on commit 618e878

Please sign in to comment.