Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update protobuf, add suspend and resume event support #97

Merged
merged 5 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v1.0.0

- Added `SuspendInstanceAsync` and `ResumeInstanceAsync` to `DurableTaskClient`.
- `TaskOrchestrationContext.CreateReplaySafeLogger` now creates `ILogger` directly (as opposed to wrapping an existing `ILogger`).
- Durable Functions class-based syntax now resolves `ITaskActivity` instances from `IServiceProvider`, if available there.
- `DurableTaskClient` methods have been touched up to ensure `CancellationToken` is included, as well as is the last parameter.
Expand Down
27 changes: 27 additions & 0 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,33 @@ public virtual Task<OrchestrationMetadata> WaitForInstanceStartAsync(
string instanceId, CancellationToken cancellation)
=> this.WaitForInstanceStartAsync(instanceId, false, cancellation);

/// <summary>
/// Suspends an orchestration instance, halting processing of it until <see cref="ResumeInstanceAsync" /> is used
/// to resume the orchestration.
/// </summary>
/// <param name="instanceId">The instance ID of the orchestration to suspend.</param>
/// <param name="reason">The optional suspension reason.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the suspend operation. Note, cancelling this token
/// does <b>not</b> resume the orchestration if suspend was successful.
/// </param>
/// <returns>A task that completes when the suspend has been committed to the backend.</returns>
public abstract Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <summary>
/// Resumes an orchestration instance that was suspended via <see cref="SuspendInstanceAsync" />.
/// </summary>
/// <param name="instanceId">The instance ID of the orchestration to resume.</param>
/// <param name="reason">The optional resume reason.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the resume operation. Note, cancelling this token
/// does <b>not</b> re-suspend the orchestration if resume was successful.
/// </param>
/// <returns>A task that completes when the resume has been committed to the backend.</returns>
public abstract Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <summary>
/// Waits for an orchestration to start running and returns a <see cref="OrchestrationMetadata"/>
/// object that contains metadata about the started instance.
Expand Down
5 changes: 5 additions & 0 deletions src/Client/Core/OrchestrationRuntimeStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public enum OrchestrationRuntimeStatus
/// The orchestration was scheduled but hasn't started running.
/// </summary>
Pending,

/// <summary>
/// The orchestration is in a suspended state.
/// </summary>
Suspended,
}
3 changes: 1 addition & 2 deletions src/Client/Grpc/Client.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Core" Version="2.39.1" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="0.3.1" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
</ItemGroup>
Expand Down
44 changes: 44 additions & 0 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,50 @@ await this.sidecarClient.TerminateInstanceAsync(
cancellationToken: cancellation);
}

/// <inheritdoc/>
public override async Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
P.SuspendRequest request = new()
{
InstanceId = instanceId,
Reason = reason,
};

try
{
await this.sidecarClient.SuspendInstanceAsync(
request, cancellationToken: cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException(
$"The {nameof(this.SuspendInstanceAsync)} operation was canceled.", e, cancellation);
}
}

/// <inheritdoc/>
public override async Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
P.ResumeRequest request = new()
{
InstanceId = instanceId,
Reason = reason,
};

try
{
await this.sidecarClient.ResumeInstanceAsync(
request, cancellationToken: cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException(
$"The {nameof(this.ResumeInstanceAsync)} operation was canceled.", e, cancellation);
}
}

/// <inheritdoc/>
public override async Task<OrchestrationMetadata?> GetInstanceMetadataAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
Expand Down
1 change: 1 addition & 0 deletions src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ internal static P.OrchestrationStatus ToGrpcStatus(this OrchestrationRuntimeStat
OrchestrationRuntimeStatus.Pending => P.OrchestrationStatus.Pending,
OrchestrationRuntimeStatus.Running => P.OrchestrationStatus.Running,
OrchestrationRuntimeStatus.Terminated => P.OrchestrationStatus.Terminated,
OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended,
_ => throw new ArgumentOutOfRangeException(nameof(status), "Unexpected value"),
};
#pragma warning restore 0618 // Referencing Obsolete member.
Expand Down
6 changes: 6 additions & 0 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);
break;
case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended:
historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input);
break;
case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed:
historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input);
break;
case P.HistoryEvent.EventTypeOneofCase.TaskScheduled:
historyEvent = new TaskScheduledEvent(
proto.EventId,
Expand Down
3 changes: 1 addition & 2 deletions src/Worker/Grpc/Worker.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Core" Version="2.39.1" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.2.7" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="0.3.1" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="1.0.0" />
<PackageReference Include="System.Collections.Immutable" Version="6.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public override Task RaiseEventAsync(
throw new NotImplementedException();
}

public override Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
Expand All @@ -115,6 +121,12 @@ public override Task<string> ScheduleNewOrchestrationInstanceAsync(
throw new NotImplementedException();
}

public override Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task TerminateAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public override Task RaiseEventAsync(
throw new NotImplementedException();
}

public override Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
Expand All @@ -145,6 +151,12 @@ public override Task<string> ScheduleNewOrchestrationInstanceAsync(
throw new NotImplementedException();
}

public override Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task TerminateAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
{
Expand Down