Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cascade terminate/purge support in GrpcDurableTaskClient #262

Merged
merged 12 commits into from
Feb 2, 2024
2 changes: 1 addition & 1 deletion eng/proto
56 changes: 50 additions & 6 deletions src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,68 @@ public static class DurableTaskClientExtensions
/// <param name="createdFrom">Filter purging to orchestrations after this date.</param>
/// <param name="createdTo">Filter purging to orchestrations before this date.</param>
/// <param name="statuses">Filter purging to orchestrations with these statuses.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>1</c> or <c>0</c>, depending on whether the target
/// instance was successfully purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
IEnumerable<OrchestrationRuntimeStatus>? statuses,
PurgeInstanceOptions? options,
CancellationToken cancellation = default)
{
Check.NotNull(client);
PurgeInstancesFilter filter = new(createdFrom, createdTo, statuses);
return client.PurgeAllInstancesAsync(filter, cancellation);
return client.PurgeAllInstancesAsync(filter, options, cancellation);
}

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
/// <param name="client">The DurableTask client.</param>
/// <param name="createdFrom">Filter purging to orchestrations after this date.</param>
/// <param name="createdTo">Filter purging to orchestrations before this date.</param>
/// <param name="statuses">Filter purging to orchestrations with these statuses.</param>
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
IEnumerable<OrchestrationRuntimeStatus>? statuses,
CancellationToken cancellation = default)
=> PurgeInstancesAsync(client, createdFrom, createdTo, statuses, null, cancellation);

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
/// <param name="client">The DurableTask client.</param>
/// <param name="createdFrom">Filter purging to orchestrations after this date.</param>
/// <param name="createdTo">Filter purging to orchestrations before this date.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
PurgeInstanceOptions? options,
CancellationToken cancellation = default)
=> PurgeInstancesAsync(client, createdFrom, createdTo, null, options, cancellation);

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
Expand All @@ -42,13 +86,13 @@ public static class DurableTaskClientExtensions
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>1</c> or <c>0</c>, depending on whether the target
/// instance was successfully purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
CancellationToken cancellation = default)
=> PurgeInstancesAsync(client, createdFrom, createdTo, null, cancellation);
=> PurgeInstancesAsync(client, createdFrom, createdTo, null, null, cancellation);
}
61 changes: 45 additions & 16 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,19 @@ protected DurableTaskClient(string name)
public abstract Task<OrchestrationMetadata> WaitForInstanceCompletionAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default);

/// <inheritdoc cref="TerminateInstanceAsync(string, object, CancellationToken)"/>
/// <inheritdoc cref="TerminateInstanceAsync(string, TerminateInstanceOptions, CancellationToken)"/>
public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken cancellation)
=> this.TerminateInstanceAsync(instanceId, null, cancellation);

/// <inheritdoc cref="TerminateInstanceAsync(string, TerminateInstanceOptions, CancellationToken)"/>
public virtual Task TerminateInstanceAsync(string instanceId, object? output, CancellationToken cancellation = default)
{
TerminateInstanceOptions? options = output is null ? null : new() { Output = output };
return this.TerminateInstanceAsync(instanceId, options, cancellation);
}

/// <summary>
/// Terminates a running orchestration instance and updates its runtime status to
/// Terminates an orchestration instance and updates its runtime status to
/// <see cref="OrchestrationRuntimeStatus.Terminated"/>.
/// </summary>
/// <remarks>
Expand All @@ -226,25 +233,27 @@ public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken
/// the terminated state.
/// </para>
/// <para>
/// Terminating an orchestration by default will terminate all of the child sub-orchestrations that were started by
/// the orchetration instance. If you don't want to terminate sub-orchestration instances, you can set <see cref="TerminateInstanceOptions.Recursive"/>
/// flag to false which will disable termination of child sub-orchestration instances. It is set to true by default.
/// Terminating an orchestration instance has no effect on any in-flight activity function executions
/// or sub-orchestrations that were started by the terminated instance. Those actions will continue to run
/// without interruption. However, their results will be discarded. If you want to terminate sub-orchestrations,
/// you must issue separate terminate commands for each sub-orchestration instance.
/// that were started by the terminated instance. Those actions will continue to run
/// without interruption. However, their results will be discarded.
/// </para><para>
/// At the time of writing, there is no way to terminate an in-flight activity execution.
/// </para><para>
/// Attempting to terminate a completed or non-existent orchestration instance will fail silently.
/// </para>
/// </remarks>
/// <param name="instanceId">The ID of the orchestration instance to terminate.</param>
/// <param name="output">The optional output to set for the terminated orchestration instance.</param>
/// <param name="options">The optional options for terminating the orchestration.</param>
/// <param name="cancellation">
/// The cancellation token. This only cancels enqueueing the termination request to the backend. Does not abort
/// termination of the orchestration once enqueued.
/// </param>
/// <returns>A task that completes when the terminate message is enqueued.</returns>
public abstract Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default);
public virtual Task TerminateInstanceAsync(string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
=> throw new NotSupportedException($"{this.GetType()} does not support orchestration termination.");

/// <inheritdoc cref="SuspendInstanceAsync(string, string, CancellationToken)"/>
public virtual Task SuspendInstanceAsync(string instanceId, CancellationToken cancellation)
Expand Down Expand Up @@ -327,6 +336,10 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
/// <returns>An async pageable of the query results.</returns>
public abstract AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(OrchestrationQuery? filter = null);

/// <inheritdoc cref="PurgeInstanceAsync(string, PurgeInstanceOptions, CancellationToken)"/>
public virtual Task<PurgeResult> PurgeInstancesAsync(string instanceId, CancellationToken cancellation)
=> this.PurgeInstanceAsync(instanceId, null, cancellation);

/// <summary>
/// Purges orchestration instance metadata from the durable store.
/// </summary>
Expand All @@ -338,37 +351,53 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
/// <see cref="OrchestrationRuntimeStatus.Completed"/>, <see cref="OrchestrationRuntimeStatus.Failed"/>, or
/// <see cref="OrchestrationRuntimeStatus.Terminated"/> state can be purged.
/// </para><para>
/// Purging an orchestration will by default purge all of the child sub-orchestrations that were started by the
/// orchetration instance. If you don't want to purge sub-orchestration instances, you can set <see cref="PurgeInstanceOptions.Recursive"/> flag to
/// false which will disable purging of child sub-orchestration instances. It is set to true by default.
/// If <paramref name="instanceId"/> is not found in the data store, or if the instance is found but not in a
/// terminal state, then the returned <see cref="PurgeResult"/> object will have a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>0</c>. Otherwise, the existing data will be purged and
/// <see cref="PurgeResult.PurgedInstanceCount"/> will be <c>1</c>.
/// <see cref="PurgeResult.PurgedInstanceCount"/> will be the count of purged instances.
/// </para>
/// </remarks>
/// <param name="instanceId">The unique ID of the orchestration instance to purge.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
/// </param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>1</c> or <c>0</c>, depending on whether the target
/// instance was successfully purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public abstract Task<PurgeResult> PurgeInstanceAsync(
string instanceId, CancellationToken cancellation = default);
public virtual Task<PurgeResult> PurgeInstanceAsync(
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
}

/// <inheritdoc cref="PurgeAllInstancesAsync(PurgeInstancesFilter, PurgeInstanceOptions, CancellationToken)"/>
public virtual Task<PurgeResult> PurgeAllInstancesAsync(PurgeInstancesFilter filter, CancellationToken cancellation)
=> this.PurgeAllInstancesAsync(new PurgeInstancesFilter(), null, cancellation);

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
/// <param name="filter">The filter for which orchestrations to purge.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
/// </param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public abstract Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, CancellationToken cancellation = default);
public virtual Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
}

// TODO: Create task hub

Expand Down
24 changes: 24 additions & 0 deletions src/Client/Core/PurgeInstanceOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Options to purge an orchestration.
/// </summary>
public record PurgeInstanceOptions
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Initializes a new instance of the <see cref="PurgeInstanceOptions"/> class.
/// </summary>
/// <param name="recursive">The optional boolean value indicating whether to purge sub-orchestrations as well.</param>
public PurgeInstanceOptions(bool recursive = true)
{
this.Recursive = recursive;
}

/// <summary>
/// Gets a value indicating whether to purge sub-orchestrations as well.
/// </summary>
public bool Recursive { get; init; }
}
31 changes: 31 additions & 0 deletions src/Client/Core/TerminateInstanceOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Options to terminate an orchestration.
/// </summary>
public record TerminateInstanceOptions
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Initializes a new instance of the <see cref="TerminateInstanceOptions"/> class.
/// </summary>
/// <param name="output">The optional output to set for the terminated orchestration instance.</param>
/// <param name="recursive">The optional boolean value indicating whether to terminate sub-orchestrations as well.</param>
public TerminateInstanceOptions(object? output = null, bool recursive = true)
{
this.Output = output;
this.Recursive = recursive;
}

/// <summary>
/// Gets the optional output to set for the terminated orchestration instance.
/// </summary>
public object? Output { get; init; }

/// <summary>
/// Gets a value indicating whether to terminate sub-orchestrations as well.
/// </summary>
public bool Recursive { get; init; }
}
15 changes: 11 additions & 4 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ public override ValueTask DisposeAsync()

/// <inheritdoc/>
public override async Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
{
object? output = options?.Output;
bool recursive = options?.Recursive ?? true;

Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

Expand All @@ -137,6 +140,7 @@ public override ValueTask DisposeAsync()
{
InstanceId = instanceId,
Output = serializedOutput,
Recursive = recursive,
},
cancellationToken: cancellation);
}
Expand Down Expand Up @@ -321,18 +325,20 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override Task<PurgeResult> PurgeInstanceAsync(
string instanceId, CancellationToken cancellation = default)
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
bool recursive = options?.Recursive ?? true;
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
this.logger.PurgingInstanceMetadata(instanceId);

P.PurgeInstancesRequest request = new() { InstanceId = instanceId };
P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive };
return this.PurgeInstancesCoreAsync(request, cancellation);
}

/// <inheritdoc/>
public override Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, CancellationToken cancellation = default)
PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
bool recursive = options?.Recursive ?? true;
this.logger.PurgingInstances(filter);
P.PurgeInstancesRequest request = new()
{
Expand All @@ -341,6 +347,7 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches
CreatedTimeFrom = filter?.CreatedFrom.ToTimestamp(),
CreatedTimeTo = filter?.CreatedTo.ToTimestamp(),
},
Recursive = recursive,
};

if (filter?.Statuses is not null)
Expand Down
jviau marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override async Task<PurgeResult> PurgeInstanceAsync(
string instanceId, CancellationToken cancellation = default)
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(instanceId);
cancellation.ThrowIfCancellationRequested();
Expand All @@ -105,7 +105,7 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override async Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, CancellationToken cancellation = default)
PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Check.NotNull(filter);
cancellation.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -169,8 +169,9 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
{
object? output = options?.Output;
Check.NotNullOrEmpty(instanceId);
cancellation.ThrowIfCancellationRequested();
string? reason = this.DataConverter.Serialize(output);
Expand Down