Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cascade terminate/purge support in GrpcDurableTaskClient #262

Merged
merged 12 commits into from
Feb 2, 2024
2 changes: 1 addition & 1 deletion eng/proto
56 changes: 50 additions & 6 deletions src/Client/Core/DependencyInjection/DurableTaskClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,68 @@ public static class DurableTaskClientExtensions
/// <param name="createdFrom">Filter purging to orchestrations after this date.</param>
/// <param name="createdTo">Filter purging to orchestrations before this date.</param>
/// <param name="statuses">Filter purging to orchestrations with these statuses.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>1</c> or <c>0</c>, depending on whether the target
/// instance was successfully purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
IEnumerable<OrchestrationRuntimeStatus>? statuses,
PurgeInstanceOptions? options,
CancellationToken cancellation = default)
{
Check.NotNull(client);
PurgeInstancesFilter filter = new(createdFrom, createdTo, statuses);
return client.PurgeAllInstancesAsync(filter, cancellation);
return client.PurgeAllInstancesAsync(filter, options, cancellation);
}

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
/// <param name="client">The DurableTask client.</param>
/// <param name="createdFrom">Filter purging to orchestrations after this date.</param>
/// <param name="createdTo">Filter purging to orchestrations before this date.</param>
/// <param name="statuses">Filter purging to orchestrations with these statuses.</param>
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
IEnumerable<OrchestrationRuntimeStatus>? statuses,
CancellationToken cancellation = default)
=> PurgeInstancesAsync(client, createdFrom, createdTo, statuses, null, cancellation);

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
/// <param name="client">The DurableTask client.</param>
/// <param name="createdFrom">Filter purging to orchestrations after this date.</param>
/// <param name="createdTo">Filter purging to orchestrations before this date.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
PurgeInstanceOptions? options,
CancellationToken cancellation = default)
=> PurgeInstancesAsync(client, createdFrom, createdTo, null, options, cancellation);

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
Expand All @@ -42,13 +86,13 @@ public static class DurableTaskClientExtensions
/// <param name="cancellation">The cancellation token.</param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>1</c> or <c>0</c>, depending on whether the target
/// instance was successfully purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public static Task<PurgeResult> PurgeInstancesAsync(
this DurableTaskClient client,
DateTimeOffset? createdFrom,
DateTimeOffset? createdTo,
CancellationToken cancellation = default)
=> PurgeInstancesAsync(client, createdFrom, createdTo, null, cancellation);
=> PurgeInstancesAsync(client, createdFrom, createdTo, null, null, cancellation);
}
61 changes: 45 additions & 16 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,19 @@ protected DurableTaskClient(string name)
public abstract Task<OrchestrationMetadata> WaitForInstanceCompletionAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default);

/// <inheritdoc cref="TerminateInstanceAsync(string, object, CancellationToken)"/>
/// <inheritdoc cref="TerminateInstanceAsync(string, TerminateInstanceOptions, CancellationToken)"/>
public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken cancellation)
=> this.TerminateInstanceAsync(instanceId, null, cancellation);

/// <inheritdoc cref="TerminateInstanceAsync(string, TerminateInstanceOptions, CancellationToken)"/>
public virtual Task TerminateInstanceAsync(string instanceId, object? output, CancellationToken cancellation = default)
{
TerminateInstanceOptions? options = output is null ? null : new() { Output = output };
return this.TerminateInstanceAsync(instanceId, options, cancellation);
}

/// <summary>
/// Terminates a running orchestration instance and updates its runtime status to
/// Terminates an orchestration instance and updates its runtime status to
/// <see cref="OrchestrationRuntimeStatus.Terminated"/>.
/// </summary>
/// <remarks>
Expand All @@ -226,25 +233,27 @@ public virtual Task TerminateInstanceAsync(string instanceId, CancellationToken
/// the terminated state.
/// </para>
/// <para>
/// Terminating an orchestration by default will not terminate any of the child sub-orchestrations that were started by
/// the orchetration instance. If you want to terminate sub-orchestration instances as well, you can set <see cref="TerminateInstanceOptions.Recursive"/>
/// flag to true which will enable termination of child sub-orchestration instances. It is set to false by default.
/// Terminating an orchestration instance has no effect on any in-flight activity function executions
/// or sub-orchestrations that were started by the terminated instance. Those actions will continue to run
/// without interruption. However, their results will be discarded. If you want to terminate sub-orchestrations,
/// you must issue separate terminate commands for each sub-orchestration instance.
/// that were started by the terminated instance. Those actions will continue to run
/// without interruption. However, their results will be discarded.
/// </para><para>
/// At the time of writing, there is no way to terminate an in-flight activity execution.
/// </para><para>
/// Attempting to terminate a completed or non-existent orchestration instance will fail silently.
/// </para>
/// </remarks>
/// <param name="instanceId">The ID of the orchestration instance to terminate.</param>
/// <param name="output">The optional output to set for the terminated orchestration instance.</param>
/// <param name="options">The optional options for terminating the orchestration.</param>
/// <param name="cancellation">
/// The cancellation token. This only cancels enqueueing the termination request to the backend. Does not abort
/// termination of the orchestration once enqueued.
/// </param>
/// <returns>A task that completes when the terminate message is enqueued.</returns>
public abstract Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default);
public virtual Task TerminateInstanceAsync(string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
=> throw new NotSupportedException($"{this.GetType()} does not support orchestration termination.");

/// <inheritdoc cref="SuspendInstanceAsync(string, string, CancellationToken)"/>
public virtual Task SuspendInstanceAsync(string instanceId, CancellationToken cancellation)
Expand Down Expand Up @@ -327,6 +336,10 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
/// <returns>An async pageable of the query results.</returns>
public abstract AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(OrchestrationQuery? filter = null);

/// <inheritdoc cref="PurgeInstanceAsync(string, PurgeInstanceOptions, CancellationToken)"/>
public virtual Task<PurgeResult> PurgeInstancesAsync(string instanceId, CancellationToken cancellation)
=> this.PurgeInstanceAsync(instanceId, null, cancellation);

/// <summary>
/// Purges orchestration instance metadata from the durable store.
/// </summary>
Expand All @@ -338,37 +351,53 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
/// <see cref="OrchestrationRuntimeStatus.Completed"/>, <see cref="OrchestrationRuntimeStatus.Failed"/>, or
/// <see cref="OrchestrationRuntimeStatus.Terminated"/> state can be purged.
/// </para><para>
/// Purging an orchestration will by default not purge any of the child sub-orchestrations that were started by the
/// orchetration instance. If you want to purge sub-orchestration instances, you can set <see cref="PurgeInstanceOptions.Recursive"/> flag to
/// true which will enable purging of child sub-orchestration instances. It is set to false by default.
/// If <paramref name="instanceId"/> is not found in the data store, or if the instance is found but not in a
/// terminal state, then the returned <see cref="PurgeResult"/> object will have a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>0</c>. Otherwise, the existing data will be purged and
/// <see cref="PurgeResult.PurgedInstanceCount"/> will be <c>1</c>.
/// <see cref="PurgeResult.PurgedInstanceCount"/> will be the count of purged instances.
/// </para>
/// </remarks>
/// <param name="instanceId">The unique ID of the orchestration instance to purge.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
/// </param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> value of <c>1</c> or <c>0</c>, depending on whether the target
/// instance was successfully purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public abstract Task<PurgeResult> PurgeInstanceAsync(
string instanceId, CancellationToken cancellation = default);
public virtual Task<PurgeResult> PurgeInstanceAsync(
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
}

/// <inheritdoc cref="PurgeAllInstancesAsync(PurgeInstancesFilter, PurgeInstanceOptions, CancellationToken)"/>
public virtual Task<PurgeResult> PurgeAllInstancesAsync(PurgeInstancesFilter filter, CancellationToken cancellation)
=> this.PurgeAllInstancesAsync(new PurgeInstancesFilter(), null, cancellation);

/// <summary>
/// Purges orchestration instances metadata from the durable store.
/// </summary>
/// <param name="filter">The filter for which orchestrations to purge.</param>
/// <param name="options">The optional options for purging the orchestration.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the purge operation.
/// </param>
/// <returns>
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged.
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
public abstract Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, CancellationToken cancellation = default);
public virtual Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
throw new NotSupportedException($"{this.GetType()} does not support purging of orchestration instances.");
}

// TODO: Create task hub

Expand Down
10 changes: 10 additions & 0 deletions src/Client/Core/PurgeInstanceOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Options to purge an orchestration.
/// </summary>
/// <param name="Recursive">The optional boolean value indicating whether to purge sub-orchestrations as well.</param>
public record PurgeInstanceOptions(bool Recursive = false);
11 changes: 11 additions & 0 deletions src/Client/Core/TerminateInstanceOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Options to terminate an orchestration.
/// </summary>
/// <param name="Output">The optional output to set for the terminated orchestration instance.</param>
/// <param name="Recursive">The optional boolean value indicating whether to terminate sub-orchestrations as well.</param>
public record TerminateInstanceOptions(object? Output = null, bool Recursive = false);
15 changes: 11 additions & 4 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ public override ValueTask DisposeAsync()

/// <inheritdoc/>
public override async Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
{
object? output = options?.Output;
bool recursive = options?.Recursive ?? false;

Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

Expand All @@ -137,6 +140,7 @@ public override ValueTask DisposeAsync()
{
InstanceId = instanceId,
Output = serializedOutput,
Recursive = recursive,
},
cancellationToken: cancellation);
}
Expand Down Expand Up @@ -321,18 +325,20 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override Task<PurgeResult> PurgeInstanceAsync(
string instanceId, CancellationToken cancellation = default)
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
bool recursive = options?.Recursive ?? false;
this.logger.PurgingInstanceMetadata(instanceId);

P.PurgeInstancesRequest request = new() { InstanceId = instanceId };
P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive };
return this.PurgeInstancesCoreAsync(request, cancellation);
}

/// <inheritdoc/>
public override Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, CancellationToken cancellation = default)
PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
bool recursive = options?.Recursive ?? false;
this.logger.PurgingInstances(filter);
P.PurgeInstancesRequest request = new()
{
Expand All @@ -341,6 +347,7 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches
CreatedTimeFrom = filter?.CreatedFrom.ToTimestamp(),
CreatedTimeTo = filter?.CreatedTo.ToTimestamp(),
},
Recursive = recursive,
};

if (filter?.Statuses is not null)
Expand Down
13 changes: 10 additions & 3 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
jviau marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,24 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override async Task<PurgeResult> PurgeInstanceAsync(
string instanceId, CancellationToken cancellation = default)
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(instanceId);
cancellation.ThrowIfCancellationRequested();

// TODO: Support recursive purge of sub-orchestrations
Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(instanceId);
return result.ConvertFromCore();
}

/// <inheritdoc/>
public override async Task<PurgeResult> PurgeAllInstancesAsync(
PurgeInstancesFilter filter, CancellationToken cancellation = default)
PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Check.NotNull(filter);
cancellation.ThrowIfCancellationRequested();

// TODO: Support recursive purge of sub-orchestrations
Core.PurgeResult result = await this.PurgeClient.PurgeInstanceStateAsync(filter.ConvertToCore());
return result.ConvertFromCore();
}
Expand Down Expand Up @@ -169,11 +173,14 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches

/// <inheritdoc/>
public override Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
{
object? output = options?.Output;
Check.NotNullOrEmpty(instanceId);
cancellation.ThrowIfCancellationRequested();
string? reason = this.DataConverter.Serialize(output);

// TODO: Support recursive termination of sub-orchestrations
return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
}

Expand Down
1 change: 0 additions & 1 deletion src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(),
TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId,
},
Correlation = proto.ExecutionStarted.CorrelationData,
jviau marked this conversation as resolved.
Show resolved Hide resolved
ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(),
};
break;
Expand Down