Skip to content

Commit

Permalink
Update protobuf, add suspend and resume event support (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
jviau committed Jan 10, 2023
1 parent 64ea918 commit 8efb708
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v1.0.0

- Added `SuspendInstanceAsync` and `ResumeInstanceAsync` to `DurableTaskClient`.
- `TaskOrchestrationContext.CreateReplaySafeLogger` now creates `ILogger` directly (as opposed to wrapping an existing `ILogger`).
- Durable Functions class-based syntax now resolves `ITaskActivity` instances from `IServiceProvider`, if available there.
- `DurableTaskClient` methods have been touched up to ensure `CancellationToken` is included, as well as is the last parameter.
Expand Down
27 changes: 27 additions & 0 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,33 @@ protected DurableTaskClient(string name)
string instanceId, CancellationToken cancellation)
=> this.WaitForInstanceStartAsync(instanceId, false, cancellation);

/// <summary>
/// Suspends an orchestration instance, halting processing of it until <see cref="ResumeInstanceAsync" /> is used
/// to resume the orchestration.
/// </summary>
/// <param name="instanceId">The instance ID of the orchestration to suspend.</param>
/// <param name="reason">The optional suspension reason.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the suspend operation. Note, cancelling this token
/// does <b>not</b> resume the orchestration if suspend was successful.
/// </param>
/// <returns>A task that completes when the suspend has been committed to the backend.</returns>
public abstract Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <summary>
/// Resumes an orchestration instance that was suspended via <see cref="SuspendInstanceAsync" />.
/// </summary>
/// <param name="instanceId">The instance ID of the orchestration to resume.</param>
/// <param name="reason">The optional resume reason.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the resume operation. Note, cancelling this token
/// does <b>not</b> re-suspend the orchestration if resume was successful.
/// </param>
/// <returns>A task that completes when the resume has been committed to the backend.</returns>
public abstract Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <summary>
/// Waits for an orchestration to start running and returns a <see cref="OrchestrationMetadata"/>
/// object that contains metadata about the started instance.
Expand Down
5 changes: 5 additions & 0 deletions src/Client/Core/OrchestrationRuntimeStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public enum OrchestrationRuntimeStatus
/// The orchestration was scheduled but hasn't started running.
/// </summary>
Pending,

/// <summary>
/// The orchestration is in a suspended state.
/// </summary>
Suspended,
}
3 changes: 1 addition & 2 deletions src/Client/Grpc/Client.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Core" Version="2.39.1" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="0.3.1" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
</ItemGroup>
Expand Down
44 changes: 44 additions & 0 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,50 @@ public override ValueTask DisposeAsync()
cancellationToken: cancellation);
}

/// <inheritdoc/>
public override async Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
P.SuspendRequest request = new()
{
InstanceId = instanceId,
Reason = reason,
};

try
{
await this.sidecarClient.SuspendInstanceAsync(
request, cancellationToken: cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException(
$"The {nameof(this.SuspendInstanceAsync)} operation was canceled.", e, cancellation);
}
}

/// <inheritdoc/>
public override async Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
P.ResumeRequest request = new()
{
InstanceId = instanceId,
Reason = reason,
};

try
{
await this.sidecarClient.ResumeInstanceAsync(
request, cancellationToken: cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException(
$"The {nameof(this.ResumeInstanceAsync)} operation was canceled.", e, cancellation);
}
}

/// <inheritdoc/>
public override async Task<OrchestrationMetadata?> GetInstanceMetadataAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
Expand Down
1 change: 1 addition & 0 deletions src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ internal static P.OrchestrationStatus ToGrpcStatus(this OrchestrationRuntimeStat
OrchestrationRuntimeStatus.Pending => P.OrchestrationStatus.Pending,
OrchestrationRuntimeStatus.Running => P.OrchestrationStatus.Running,
OrchestrationRuntimeStatus.Terminated => P.OrchestrationStatus.Terminated,
OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended,
_ => throw new ArgumentOutOfRangeException(nameof(status), "Unexpected value"),
};
#pragma warning restore 0618 // Referencing Obsolete member.
Expand Down
6 changes: 6 additions & 0 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);
break;
case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended:
historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input);
break;
case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed:
historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input);
break;
case P.HistoryEvent.EventTypeOneofCase.TaskScheduled:
historyEvent = new TaskScheduledEvent(
proto.EventId,
Expand Down
3 changes: 1 addition & 2 deletions src/Worker/Grpc/Worker.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Core" Version="2.39.1" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.2.7" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="0.3.1" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="1.0.0" />
<PackageReference Include="System.Collections.Immutable" Version="6.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public override AsyncPageable<OrchestrationMetadata> GetInstances(OrchestrationQ
throw new NotImplementedException();
}

public override Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
Expand All @@ -115,6 +121,12 @@ public override AsyncPageable<OrchestrationMetadata> GetInstances(OrchestrationQ
throw new NotImplementedException();
}

public override Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task TerminateAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public override AsyncPageable<OrchestrationMetadata> GetInstances(OrchestrationQ
throw new NotImplementedException();
}

public override Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
Expand All @@ -145,6 +151,12 @@ public override AsyncPageable<OrchestrationMetadata> GetInstances(OrchestrationQ
throw new NotImplementedException();
}

public override Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
throw new NotImplementedException();
}

public override Task TerminateAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
{
Expand Down

0 comments on commit 8efb708

Please sign in to comment.