From acab5ef20444cfaa17c3a89cd5c858a09149b9ee Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 25 Aug 2022 15:28:55 -0700 Subject: [PATCH 01/18] Add AsyncPageable abstractions --- src/DurableTask/AsyncPageable{T}.cs | 43 ++++++++++ src/DurableTask/Pageable.cs | 118 ++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 src/DurableTask/AsyncPageable{T}.cs create mode 100644 src/DurableTask/Pageable.cs diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs new file mode 100644 index 000000000..059efb53e --- /dev/null +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// A collection of values that may take multiple service requests to iterate over. +/// +/// The type of the value. +public abstract class AsyncPageable : IAsyncEnumerable + where T : notnull +{ + // This code was adapted from Azure SDK AsyncPageable. + + /// + /// Enumerate the values a at a time. + /// + /// + /// A continuation token indicating where to resume paging or null to begin paging from the + /// beginning. + /// + /// + /// The number of items per that should be requested + /// (from service operations that support it). It's not guaranteed that the value will be + /// respected. + /// + /// An async enumerable of pages. + public abstract IAsyncEnumerable> AsPages( + string? continuationToken = default, int? pageSizeHint = default); + + /// + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + await foreach (Pageable.Page page in this.AsPages() + .ConfigureAwait(false).WithCancellation(cancellationToken)) + { + foreach (T value in page) + { + yield return value; + } + } + } +} \ No newline at end of file diff --git a/src/DurableTask/Pageable.cs b/src/DurableTask/Pageable.cs new file mode 100644 index 000000000..442aaa5c5 --- /dev/null +++ b/src/DurableTask/Pageable.cs @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections; +using System.Runtime.CompilerServices; + +namespace Microsoft.DurableTask; + +/// +/// Pageable helpers. +/// +public static class Pageable +{ + // This code was adapted from Azure SDK AsyncPageable. + // TODO: Add Pageable (non-async) when/if it becomes relevant. + + /// + /// Creates an async pageable from a callback function . + /// + /// The type of the value. + /// The callback to fetch additional pages. + /// + /// + public static AsyncPageable Create(Func>> pageFunc) + where T : notnull + { + if (pageFunc is null) + { + throw new ArgumentNullException(nameof(pageFunc)); + } + + return Create((continuation, size, cancellation) => pageFunc(continuation, cancellation)); + } + + /// + /// Creates an async pageable from a callback function . + /// + /// The type of the value. + /// The callback to fetch additional pages. + /// + /// + public static AsyncPageable Create(Func>> pageFunc) + where T : notnull + { + if (pageFunc is null) + { + throw new ArgumentNullException(nameof(pageFunc)); + } + + return new FuncAsyncPageable(pageFunc); + } + + /// + /// Represents a single page of results. + /// + public sealed class Page : IReadOnlyList + where T : notnull + { + readonly IReadOnlyList values; + + /// + /// Initializes a new instance of the class. + /// + /// The values this holds. + /// The continuation token. + public Page(IReadOnlyList values, string? continuationToken = null) + { + this.values = values; + this.ContinuationToken = continuationToken; + } + + /// + public T this[int index] => this.values[index]; + + /// + /// Gets the continuation token or null if there are no more items. + /// + public string? ContinuationToken { get; } + + /// + public int Count => this.values.Count; + + /// + public IEnumerator GetEnumerator() => this.values.GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator(); + } + + class FuncAsyncPageable : AsyncPageable + where T : notnull + { + readonly Func>> pageFunc; + + public FuncAsyncPageable(Func>> pageFunc) + { + this.pageFunc = pageFunc; + } + + public override IAsyncEnumerable> AsPages( + string? continuationToken = default, int? pageSizeHint = default) + => this.AsPagesCore(continuationToken, pageSizeHint); + + async IAsyncEnumerable> AsPagesCore( + string? continuationToken = default, + int? pageSizeHint = default, + [EnumeratorCancellation] CancellationToken cancellation = default) + { + do + { + Page page = await this.pageFunc(continuationToken, pageSizeHint, cancellation) + .ConfigureAwait(false); + yield return page; + continuationToken = page.ContinuationToken; + } + while (continuationToken is not null); + } + } +} \ No newline at end of file From aa171ac55fe21abd5ae0ca4822867bc1bc6b9579 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 25 Aug 2022 15:29:29 -0700 Subject: [PATCH 02/18] Add GetInstances method to DurableTaskClient --- src/DurableTask/DurableTaskClient.cs | 7 ++- src/DurableTask/Grpc/DurableTaskGrpcClient.cs | 54 +++++++++++++++++-- src/DurableTask/OrchestrationMetadata.cs | 20 +++---- src/DurableTask/OrchestrationQuery.cs | 31 +++++++++++ 4 files changed, 98 insertions(+), 14 deletions(-) create mode 100644 src/DurableTask/OrchestrationQuery.cs diff --git a/src/DurableTask/DurableTaskClient.cs b/src/DurableTask/DurableTaskClient.cs index e8a61884d..4b6634ad4 100644 --- a/src/DurableTask/DurableTaskClient.cs +++ b/src/DurableTask/DurableTaskClient.cs @@ -191,7 +191,12 @@ public abstract Task WaitForInstanceCompletionAsync( /// public abstract Task GetInstanceMetadataAsync(string instanceId, bool getInputsAndOutputs); - // TODO: Multi-instance query + /// + /// Queries orchestration instances + /// + /// Filters down the instances included in the query. + /// + public abstract AsyncPageable GetInstances(OrchestrationQuery? filter = null); /// /// Purges orchestration instance metadata from the durable store. diff --git a/src/DurableTask/Grpc/DurableTaskGrpcClient.cs b/src/DurableTask/Grpc/DurableTaskGrpcClient.cs index 4dce68cca..8addea064 100644 --- a/src/DurableTask/Grpc/DurableTaskGrpcClient.cs +++ b/src/DurableTask/Grpc/DurableTaskGrpcClient.cs @@ -185,7 +185,55 @@ await this.sidecarClient.TerminateInstanceAsync(new P.TerminateRequest return null; } - return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs); + return new OrchestrationMetadata(response.OrchestrationState, this.dataConverter, getInputsAndOutputs); + } + + /// + public override AsyncPageable GetInstances(OrchestrationQuery? query = null) + { + return Pageable.Create(async (continuation, pageSize, cancellation) => + { + P.QueryInstancesRequest request = new() + { + Query = new P.InstanceQuery + { + CreatedTimeFrom = query?.CreatedFrom?.ToTimestamp(), + CreatedTimeTo = query?.CreatedTo?.ToTimestamp(), + FetchInputsAndOutputs = query?.FetchInputsAndOutputs ?? false, + InstanceIdPrefix = query?.InstanceIdPrefix, + MaxInstanceCount = pageSize ?? query?.PageSize ?? OrchestrationQuery.DefaultPageSize, + ContinuationToken = continuation ?? query?.ContinuationToken, + }, + }; + + if (query?.Statuses is not null) + { + request.Query.RuntimeStatus.AddRange(query.Statuses.Select(x => x.ToGrpcStatus())); + } + + if (query?.TaskHubNames is not null) + { + request.Query.TaskHubNames.AddRange(query.TaskHubNames); + } + + try + { + + P.QueryInstancesResponse response = await this.sidecarClient.QueryInstancesAsync( + request, cancellationToken: cancellation); + + bool getInputsAndOutputs = query?.FetchInputsAndOutputs ?? false; + IReadOnlyList values = response.OrchestrationState + .Select(x => new OrchestrationMetadata(x, this.dataConverter, getInputsAndOutputs)) + .ToList(); + + return new Pageable.Page(values, response.ContinuationToken); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) + { + throw new OperationCanceledException($"The {nameof(GetInstances)} operation was canceled.", e, cancellation); + } + }); } /// @@ -214,7 +262,7 @@ public override async Task WaitForInstanceStartAsync( throw new OperationCanceledException($"The {nameof(WaitForInstanceStartAsync)} operation was canceled.", e, cancellationToken); } - return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs); + return new OrchestrationMetadata(response.OrchestrationState, this.dataConverter, getInputsAndOutputs); } /// @@ -243,7 +291,7 @@ public override async Task WaitForInstanceCompletionAsync throw new OperationCanceledException($"The {nameof(WaitForInstanceCompletionAsync)} operation was canceled.", e, cancellationToken); } - return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs); + return new OrchestrationMetadata(response.OrchestrationState, this.dataConverter, getInputsAndOutputs); } /// diff --git a/src/DurableTask/OrchestrationMetadata.cs b/src/DurableTask/OrchestrationMetadata.cs index b24c8a66a..3460e6bb9 100644 --- a/src/DurableTask/OrchestrationMetadata.cs +++ b/src/DurableTask/OrchestrationMetadata.cs @@ -23,19 +23,19 @@ public sealed class OrchestrationMetadata readonly bool requestedInputsAndOutputs; internal OrchestrationMetadata( - P.GetInstanceResponse response, + P.OrchestrationState state, DataConverter dataConverter, bool requestedInputsAndOutputs) { - this.Name = response.OrchestrationState.Name; - this.InstanceId = response.OrchestrationState.InstanceId; - this.RuntimeStatus = (OrchestrationRuntimeStatus)response.OrchestrationState.OrchestrationStatus; - this.CreatedAt = response.OrchestrationState.CreatedTimestamp.ToDateTimeOffset(); - this.LastUpdatedAt = response.OrchestrationState.LastUpdatedTimestamp.ToDateTimeOffset(); - this.SerializedInput = response.OrchestrationState.Input; - this.SerializedOutput = response.OrchestrationState.Output; - this.SerializedCustomStatus = response.OrchestrationState.CustomStatus; - this.FailureDetails = ProtoUtils.ConvertTaskFailureDetails(response.OrchestrationState?.FailureDetails); + this.Name = state.Name; + this.InstanceId = state.InstanceId; + this.RuntimeStatus = (OrchestrationRuntimeStatus)state.OrchestrationStatus; + this.CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(); + this.LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(); + this.SerializedInput = state.Input; + this.SerializedOutput = state.Output; + this.SerializedCustomStatus = state.CustomStatus; + this.FailureDetails = ProtoUtils.ConvertTaskFailureDetails(state?.FailureDetails); this.dataConverter = dataConverter; this.requestedInputsAndOutputs = requestedInputsAndOutputs; } diff --git a/src/DurableTask/OrchestrationQuery.cs b/src/DurableTask/OrchestrationQuery.cs new file mode 100644 index 000000000..f0264fb4f --- /dev/null +++ b/src/DurableTask/OrchestrationQuery.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// A filter for querying orchestration instances. +/// +/// Creation date of instances to query from. +/// Creation date of instances to query to. +/// Runtime statuses of instances to query. +/// Names of task hubs to query across. +/// Prefix of instance IDs to include. +/// Max item count to include per page. +/// Whether to include instance inputs or outputs in the query results. +/// The continuation token to continue a paged query. +public record OrchestrationQuery( + DateTimeOffset? CreatedFrom = null, + DateTimeOffset? CreatedTo = null, + IEnumerable? Statuses = null, + IEnumerable? TaskHubNames = null, + string? InstanceIdPrefix = null, + int PageSize = OrchestrationQuery.DefaultPageSize, + bool FetchInputsAndOutputs = false, + string? ContinuationToken = null) +{ + /// + /// The default page size when not supplied. + /// + public const int DefaultPageSize = 100; +} \ No newline at end of file From c68d6f36cb825f5f381430eb5490396845a14279 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 29 Aug 2022 11:58:56 -0700 Subject: [PATCH 03/18] Add newlines at end of files --- src/DurableTask/AsyncPageable{T}.cs | 2 +- src/DurableTask/OrchestrationQuery.cs | 2 +- src/DurableTask/Pageable.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs index 059efb53e..cf3ae2db4 100644 --- a/src/DurableTask/AsyncPageable{T}.cs +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -40,4 +40,4 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellati } } } -} \ No newline at end of file +} diff --git a/src/DurableTask/OrchestrationQuery.cs b/src/DurableTask/OrchestrationQuery.cs index f0264fb4f..51001a2d6 100644 --- a/src/DurableTask/OrchestrationQuery.cs +++ b/src/DurableTask/OrchestrationQuery.cs @@ -28,4 +28,4 @@ public record OrchestrationQuery( /// The default page size when not supplied. /// public const int DefaultPageSize = 100; -} \ No newline at end of file +} diff --git a/src/DurableTask/Pageable.cs b/src/DurableTask/Pageable.cs index 442aaa5c5..69f10c259 100644 --- a/src/DurableTask/Pageable.cs +++ b/src/DurableTask/Pageable.cs @@ -115,4 +115,4 @@ async IAsyncEnumerable> AsPagesCore( while (continuationToken is not null); } } -} \ No newline at end of file +} From d0997ae267629465978454de8f8d177515a5de4d Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Wed, 7 Sep 2022 11:32:28 -0700 Subject: [PATCH 04/18] Make Pageable helpers internal --- src/DurableTask/AsyncPageable{T}.cs | 4 +- src/DurableTask/Grpc/DurableTaskGrpcClient.cs | 2 +- src/DurableTask/Page.cs | 42 +++++++++++++++++++ src/DurableTask/Pageable.cs | 39 +---------------- 4 files changed, 46 insertions(+), 41 deletions(-) create mode 100644 src/DurableTask/Page.cs diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs index cf3ae2db4..b514ea462 100644 --- a/src/DurableTask/AsyncPageable{T}.cs +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -25,13 +25,13 @@ public abstract class AsyncPageable : IAsyncEnumerable /// respected. /// /// An async enumerable of pages. - public abstract IAsyncEnumerable> AsPages( + public abstract IAsyncEnumerable> AsPages( string? continuationToken = default, int? pageSizeHint = default); /// public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { - await foreach (Pageable.Page page in this.AsPages() + await foreach (Page page in this.AsPages() .ConfigureAwait(false).WithCancellation(cancellationToken)) { foreach (T value in page) diff --git a/src/DurableTask/Grpc/DurableTaskGrpcClient.cs b/src/DurableTask/Grpc/DurableTaskGrpcClient.cs index 8addea064..76720a07b 100644 --- a/src/DurableTask/Grpc/DurableTaskGrpcClient.cs +++ b/src/DurableTask/Grpc/DurableTaskGrpcClient.cs @@ -227,7 +227,7 @@ public override AsyncPageable GetInstances(OrchestrationQ .Select(x => new OrchestrationMetadata(x, this.dataConverter, getInputsAndOutputs)) .ToList(); - return new Pageable.Page(values, response.ContinuationToken); + return new Page(values, response.ContinuationToken); } catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { diff --git a/src/DurableTask/Page.cs b/src/DurableTask/Page.cs new file mode 100644 index 000000000..9dfeb8a7b --- /dev/null +++ b/src/DurableTask/Page.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections; + +namespace Microsoft.DurableTask; + +/// +/// Represents a single page of results. +/// +public sealed class Page : IReadOnlyList + where T : notnull +{ + readonly IReadOnlyList values; + + /// + /// Initializes a new instance of the class. + /// + /// The values this holds. + /// The continuation token. + public Page(IReadOnlyList values, string? continuationToken = null) + { + this.values = values ?? throw new ArgumentNullException(nameof(values)); + this.ContinuationToken = continuationToken; + } + + /// + public T this[int index] => this.values[index]; + + /// + /// Gets the continuation token or null if there are no more items. + /// + public string? ContinuationToken { get; } + + /// + public int Count => this.values.Count; + + /// + public IEnumerator GetEnumerator() => this.values.GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator(); +} diff --git a/src/DurableTask/Pageable.cs b/src/DurableTask/Pageable.cs index 69f10c259..604f6670b 100644 --- a/src/DurableTask/Pageable.cs +++ b/src/DurableTask/Pageable.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System.Collections; using System.Runtime.CompilerServices; namespace Microsoft.DurableTask; @@ -9,7 +8,7 @@ namespace Microsoft.DurableTask; /// /// Pageable helpers. /// -public static class Pageable +static class Pageable { // This code was adapted from Azure SDK AsyncPageable. // TODO: Add Pageable (non-async) when/if it becomes relevant. @@ -50,42 +49,6 @@ public static AsyncPageable Create(Func(pageFunc); } - /// - /// Represents a single page of results. - /// - public sealed class Page : IReadOnlyList - where T : notnull - { - readonly IReadOnlyList values; - - /// - /// Initializes a new instance of the class. - /// - /// The values this holds. - /// The continuation token. - public Page(IReadOnlyList values, string? continuationToken = null) - { - this.values = values; - this.ContinuationToken = continuationToken; - } - - /// - public T this[int index] => this.values[index]; - - /// - /// Gets the continuation token or null if there are no more items. - /// - public string? ContinuationToken { get; } - - /// - public int Count => this.values.Count; - - /// - public IEnumerator GetEnumerator() => this.values.GetEnumerator(); - - IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator(); - } - class FuncAsyncPageable : AsyncPageable where T : notnull { From 26e1b03ea67278ddcfec3670d92319da891a9d00 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Wed, 7 Sep 2022 11:33:22 -0700 Subject: [PATCH 05/18] Page.cs -> Page{T}.cs --- src/DurableTask/AsyncPageable{T}.cs | 4 ++-- src/DurableTask/{Page.cs => Page{T}.cs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename src/DurableTask/{Page.cs => Page{T}.cs} (100%) diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs index b514ea462..444af8dfa 100644 --- a/src/DurableTask/AsyncPageable{T}.cs +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -13,14 +13,14 @@ public abstract class AsyncPageable : IAsyncEnumerable // This code was adapted from Azure SDK AsyncPageable. /// - /// Enumerate the values a at a time. + /// Enumerate the values a at a time. /// /// /// A continuation token indicating where to resume paging or null to begin paging from the /// beginning. /// /// - /// The number of items per that should be requested + /// The number of items per that should be requested /// (from service operations that support it). It's not guaranteed that the value will be /// respected. /// diff --git a/src/DurableTask/Page.cs b/src/DurableTask/Page{T}.cs similarity index 100% rename from src/DurableTask/Page.cs rename to src/DurableTask/Page{T}.cs From 948bf8767a265d7cf47e1a182afb422548284a48 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Wed, 7 Sep 2022 14:33:22 -0700 Subject: [PATCH 06/18] Clarify code reference comments, add return comment --- src/DurableTask/AsyncPageable{T}.cs | 1 + src/DurableTask/DurableTaskClient.cs | 6 +++--- src/DurableTask/Pageable.cs | 3 ++- src/DurableTask/Page{T}.cs | 3 +++ 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs index 444af8dfa..f7b55a800 100644 --- a/src/DurableTask/AsyncPageable{T}.cs +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -11,6 +11,7 @@ public abstract class AsyncPageable : IAsyncEnumerable where T : notnull { // This code was adapted from Azure SDK AsyncPageable. + // https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/AsyncPageable.cs /// /// Enumerate the values a at a time. diff --git a/src/DurableTask/DurableTaskClient.cs b/src/DurableTask/DurableTaskClient.cs index 4b6634ad4..5efcc9e3e 100644 --- a/src/DurableTask/DurableTaskClient.cs +++ b/src/DurableTask/DurableTaskClient.cs @@ -194,9 +194,9 @@ public abstract Task WaitForInstanceCompletionAsync( /// /// Queries orchestration instances /// - /// Filters down the instances included in the query. - /// - public abstract AsyncPageable GetInstances(OrchestrationQuery? filter = null); + /// Filters down the instances included in the query. + /// An async pageable of the query results. + public abstract AsyncPageable GetInstances(OrchestrationQuery? query = null); /// /// Purges orchestration instance metadata from the durable store. diff --git a/src/DurableTask/Pageable.cs b/src/DurableTask/Pageable.cs index 604f6670b..eb8d58eba 100644 --- a/src/DurableTask/Pageable.cs +++ b/src/DurableTask/Pageable.cs @@ -10,7 +10,8 @@ namespace Microsoft.DurableTask; /// static class Pageable { - // This code was adapted from Azure SDK AsyncPageable. + // This code was adapted from Azure SDK PageResponseEnumerator. + // https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/Shared/PageResponseEnumerator.cs // TODO: Add Pageable (non-async) when/if it becomes relevant. /// diff --git a/src/DurableTask/Page{T}.cs b/src/DurableTask/Page{T}.cs index 9dfeb8a7b..52e588221 100644 --- a/src/DurableTask/Page{T}.cs +++ b/src/DurableTask/Page{T}.cs @@ -11,6 +11,9 @@ namespace Microsoft.DurableTask; public sealed class Page : IReadOnlyList where T : notnull { + // This code was adopted from Azure SDK Page. + // https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/Page.cs + readonly IReadOnlyList values; /// From b9407f2709ec72c482e15741a7f7ded011ea813e Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 8 Sep 2022 14:49:10 -0700 Subject: [PATCH 07/18] Add tests, msbuild updates, remove ConfigureAwait --- Directory.Build.props | 17 ++++ Directory.Build.targets | 5 ++ eng/InternalsVisibleTo.targets | 45 +++++++++++ key.snk => eng/key.snk | Bin src/Directory.Build.props | 9 +-- src/DurableTask/AsyncPageable{T}.cs | 6 +- src/DurableTask/Microsoft.DurableTask.csproj | 4 + src/DurableTask/Pageable.cs | 11 ++- .../GrpcSidecarFixture.cs | 1 - .../IntegrationTestBase.cs | 3 - .../DurableTask.Sdk.Tests/Logging/LogEntry.cs | 1 - .../Logging/TestLogProvider.cs | 3 - .../Microsoft.DurableTask.Tests.csproj | 2 + .../OrchestrationErrorHandling.cs | 2 - .../OrchestrationPatterns.cs | 5 -- .../PageableIntegrationTests.cs | 67 +++++++++++++++ test/DurableTask.Sdk.Tests/PageableTests.cs | 76 ++++++++++++++++++ 17 files changed, 228 insertions(+), 29 deletions(-) create mode 100644 Directory.Build.props create mode 100644 Directory.Build.targets create mode 100644 eng/InternalsVisibleTo.targets rename key.snk => eng/key.snk (100%) create mode 100644 test/DurableTask.Sdk.Tests/PageableIntegrationTests.cs create mode 100644 test/DurableTask.Sdk.Tests/PageableTests.cs diff --git a/Directory.Build.props b/Directory.Build.props new file mode 100644 index 000000000..ebb4173d3 --- /dev/null +++ b/Directory.Build.props @@ -0,0 +1,17 @@ + + + + + 10.0 + enable + enable + + + + + true + $(MSBuildThisFileDirectory)eng/key.snk + 00240000048000009400000006020000002400005253413100040000010001000505410141442095e0b0466df68ac32d158abdf0bd9cf26407d7a9d20b93656530556f23979f20b067628b9baee75fc6bba5c349519585a2852c1843bc61d74c4cbf9d80429cbdcbf609ea70fd62061eb65f40a6c9d505dd71cb119ef51f589a014d3b56159abbc38825fafbd119b6e97ebd9a5f3862a5c06220c680a6ac9eec + + + \ No newline at end of file diff --git a/Directory.Build.targets b/Directory.Build.targets new file mode 100644 index 000000000..771f5c6c5 --- /dev/null +++ b/Directory.Build.targets @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/eng/InternalsVisibleTo.targets b/eng/InternalsVisibleTo.targets new file mode 100644 index 000000000..79e1f5d7f --- /dev/null +++ b/eng/InternalsVisibleTo.targets @@ -0,0 +1,45 @@ + + + + + $(IntermediateOutputPath)$(MSBuildProjectName).InternalsVisibleTo$(DefaultLanguageSourceExtension) + + + + + false + + + + + + + <_InternalsVisibleToAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute"> + <_Parameter1 Condition="'%(InternalsVisibleTo.Key)' != ''">%(InternalsVisibleTo.Identity), PublicKey=%(InternalsVisibleTo.Key) + <_Parameter1 Condition="'%(InternalsVisibleTo.Key)' == '' and '$(PublicKey)' != ''">%(InternalsVisibleTo.Identity), PublicKey=$(PublicKey) + <_Parameter1 Condition="'%(InternalsVisibleTo.Key)' == '' and '$(PublicKey)' == ''">%(InternalsVisibleTo.Identity) + + + + + + + + + + + + + + + + + diff --git a/key.snk b/eng/key.snk similarity index 100% rename from key.snk rename to eng/key.snk diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 5856601f9..2cfbfd2ad 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,12 +1,10 @@ + + - 10.0 - enable - enable - true - ../../key.snk embedded Microsoft Corporation true @@ -30,4 +28,5 @@ content/SBOM + \ No newline at end of file diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs index f7b55a800..054c77f3e 100644 --- a/src/DurableTask/AsyncPageable{T}.cs +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -14,7 +14,7 @@ public abstract class AsyncPageable : IAsyncEnumerable // https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/AsyncPageable.cs /// - /// Enumerate the values a at a time. + /// Enumerate the values a at a time. /// /// /// A continuation token indicating where to resume paging or null to begin paging from the @@ -32,8 +32,8 @@ public abstract IAsyncEnumerable> AsPages( /// public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { - await foreach (Page page in this.AsPages() - .ConfigureAwait(false).WithCancellation(cancellationToken)) + // TODO: ConfigureAwait(false)? This may cause issues when used in an orchestration. + await foreach (Page page in this.AsPages().WithCancellation(cancellationToken)) { foreach (T value in page) { diff --git a/src/DurableTask/Microsoft.DurableTask.csproj b/src/DurableTask/Microsoft.DurableTask.csproj index cfdea4f9e..8d3251c6c 100644 --- a/src/DurableTask/Microsoft.DurableTask.csproj +++ b/src/DurableTask/Microsoft.DurableTask.csproj @@ -46,4 +46,8 @@ + + + + diff --git a/src/DurableTask/Pageable.cs b/src/DurableTask/Pageable.cs index eb8d58eba..1cd4b0be4 100644 --- a/src/DurableTask/Pageable.cs +++ b/src/DurableTask/Pageable.cs @@ -19,8 +19,7 @@ static class Pageable /// /// The type of the value. /// The callback to fetch additional pages. - /// - /// + /// An async pageable. public static AsyncPageable Create(Func>> pageFunc) where T : notnull { @@ -37,8 +36,7 @@ public static AsyncPageable Create(Func /// The type of the value. /// The callback to fetch additional pages. - /// - /// + /// An async pageable. public static AsyncPageable Create(Func>> pageFunc) where T : notnull { @@ -71,8 +69,9 @@ async IAsyncEnumerable> AsPagesCore( { do { - Page page = await this.pageFunc(continuationToken, pageSizeHint, cancellation) - .ConfigureAwait(false); + // TODO: Do we need to support customizing ConfigureAwait(bool) here? + // ConfigureAwait(false) makes this unusable in orchestrations. + Page page = await this.pageFunc(continuationToken, pageSizeHint, cancellation); yield return page; continuationToken = page.ContinuationToken; } diff --git a/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs b/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs index ebb36e7e1..83cc283e5 100644 --- a/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs +++ b/test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; using DurableTask.Core; using Grpc.Core; using Microsoft.AspNetCore.Builder; diff --git a/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs b/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs index 540f91867..901211b3f 100644 --- a/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs +++ b/test/DurableTask.Sdk.Tests/IntegrationTestBase.cs @@ -1,10 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Threading; using Microsoft.DurableTask.Grpc; using Microsoft.DurableTask.Tests.Logging; using Microsoft.Extensions.Logging; diff --git a/test/DurableTask.Sdk.Tests/Logging/LogEntry.cs b/test/DurableTask.Sdk.Tests/Logging/LogEntry.cs index dc338f01e..e0ca5760b 100644 --- a/test/DurableTask.Sdk.Tests/Logging/LogEntry.cs +++ b/test/DurableTask.Sdk.Tests/Logging/LogEntry.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; using Microsoft.Extensions.Logging; namespace Microsoft.DurableTask.Tests.Logging; diff --git a/test/DurableTask.Sdk.Tests/Logging/TestLogProvider.cs b/test/DurableTask.Sdk.Tests/Logging/TestLogProvider.cs index ef318142f..287c68179 100644 --- a/test/DurableTask.Sdk.Tests/Logging/TestLogProvider.cs +++ b/test/DurableTask.Sdk.Tests/Logging/TestLogProvider.cs @@ -1,10 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; using Microsoft.Extensions.Logging; using Xunit.Abstractions; diff --git a/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj b/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj index 07586f61c..c99fe6c4e 100644 --- a/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj +++ b/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj @@ -8,8 +8,10 @@ + + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs index 6772ca550..a7aa2e5ae 100644 --- a/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs +++ b/test/DurableTask.Sdk.Tests/OrchestrationErrorHandling.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; -using System.Threading.Tasks; using Microsoft.DurableTask.Grpc; using Xunit; using Xunit.Abstractions; diff --git a/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs b/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs index b3998493d..f9d256c78 100644 --- a/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs +++ b/test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs @@ -1,12 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; -using System.Collections.Generic; -using System.Linq; using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; using Microsoft.DurableTask.Grpc; using Microsoft.DurableTask.Options; using Microsoft.DurableTask.Tests.Logging; diff --git a/test/DurableTask.Sdk.Tests/PageableIntegrationTests.cs b/test/DurableTask.Sdk.Tests/PageableIntegrationTests.cs new file mode 100644 index 000000000..355f7f4c9 --- /dev/null +++ b/test/DurableTask.Sdk.Tests/PageableIntegrationTests.cs @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Grpc; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Tests; + +public class PageableIntegrationTests : IntegrationTestBase +{ + public PageableIntegrationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + : base(output, sidecarFixture) + { + } + + [Fact] + public async Task PageableActivity_Enumerates() + { + TaskName orchestratorName = nameof(PageableActivity_Enumerates); + + await using DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => tasks + .AddOrchestrator(orchestratorName, (ctx, input) => PageableOrchestrationAsync(ctx, input)) + .AddActivity>(nameof(PageableActivityAsync), (_, input) => PageableActivityAsync(input))) + .Build(); + await server.StartAsync(this.TimeoutToken); + + DurableTaskClient client = this.CreateDurableTaskClient(); + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: string.Empty); + OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync( + instanceId, + this.TimeoutToken, + getInputsAndOutputs: true); + + metadata.ReadOutputAs().Should().Be(9); + } + + static Task?> PageableActivityAsync(PageRequest? input) + { + int pageSize = input?.PageSize ?? 3; + Page CreatePage(string? next) + => new (Enumerable.Range(0, pageSize).Select(x => $"item_{x}").ToList(), next); + Page? page = input?.Continuation switch + { + null => CreatePage("1"), + "1" => CreatePage("2"), + "2" => CreatePage(null), + _ => null, + }; + + return Task.FromResult(page); + } + + static async Task PageableOrchestrationAsync(TaskOrchestrationContext context, string? input) + { + AsyncPageable pageable = Pageable.Create((continuation, _, _) => + { + return context.CallActivityAsync?>(nameof(PageableActivityAsync), new PageRequest(continuation))!; + }); + + return await pageable.CountAsync(); + } + + record PageRequest(string? Continuation, int? PageSize = null); +} diff --git a/test/DurableTask.Sdk.Tests/PageableTests.cs b/test/DurableTask.Sdk.Tests/PageableTests.cs new file mode 100644 index 000000000..3b09d281b --- /dev/null +++ b/test/DurableTask.Sdk.Tests/PageableTests.cs @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Xunit; + +namespace Microsoft.DurableTask.Tests; + +public class PageableTests +{ + [Fact] + public void Create_Func_Null_Throws() + { + Func>>? func = null; + Action act = () => Pageable.Create(func!); + act.Should().ThrowExactly(); + } + + [Fact] + public async Task Create_Func_Enumerable() + { + CallbackCounter counter = new(); + List expected = CreateSource(15); + AsyncPageable pageable = CreatePageable(expected, counter); + + List actual = await pageable.ToListAsync(); + actual.Should().BeEquivalentTo(expected); + counter.Callbacks.Should().Be(5); // 15 / 3 = 5. + } + + [Theory] + [InlineData(null, null)] + [InlineData(0, 2)] + [InlineData(4, 2)] + [InlineData(5, 3)] + [InlineData(null, 15)] + public async Task Create_Func_AsPages(int? start, int? pageSize) + { + CallbackCounter counter = new(); + List source = CreateSource(15); + AsyncPageable pageable = CreatePageable(source, counter); + List expected = source.Skip(start ?? 0).ToList(); + + List> pages = await pageable.AsPages(start?.ToString(), pageSize).ToListAsync(); + pages.Should().HaveCount((int)Math.Ceiling((15.0 - (start ?? 0)) / (pageSize ?? 3))); + pages.SelectMany(x => x).Should().BeEquivalentTo(expected); + counter.Callbacks.Should().Be(pages.Count); + } + + static List CreateSource(int count) + { + return Enumerable.Range(0, count).Select(x => $"item_{x}").ToList(); + } + + static AsyncPageable CreatePageable(List source, CallbackCounter counter) + { + Task> Callback(string? continuation, int? pageSize, CancellationToken cancellation) + { + counter.Callbacks++; + int skip = continuation is string c ? int.Parse(c) : 0; + int take = pageSize ?? 3; + IEnumerable values = source.Skip(skip).Take(take); + int total = skip + take; + string? next = total < source.Count ? total.ToString() : null; + Page page = new(values.ToList(), next); + return Task.FromResult(page); + } + + return Pageable.Create(Callback); + } + + class CallbackCounter // Mutable box for Callbacks + { + public int Callbacks { get; set; } + } +} From 724a8c148e3337491a9212dde6920718f88a3885 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 8 Sep 2022 16:02:12 -0700 Subject: [PATCH 08/18] Remove IReadOnlyList from Page to allow for serialization --- src/DurableTask/AsyncPageable{T}.cs | 2 +- src/DurableTask/Page{T}.cs | 22 ++++++--------------- test/DurableTask.Sdk.Tests/PageableTests.cs | 2 +- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/DurableTask/AsyncPageable{T}.cs b/src/DurableTask/AsyncPageable{T}.cs index 054c77f3e..bb81abbc5 100644 --- a/src/DurableTask/AsyncPageable{T}.cs +++ b/src/DurableTask/AsyncPageable{T}.cs @@ -35,7 +35,7 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellati // TODO: ConfigureAwait(false)? This may cause issues when used in an orchestration. await foreach (Page page in this.AsPages().WithCancellation(cancellationToken)) { - foreach (T value in page) + foreach (T value in page.Values) { yield return value; } diff --git a/src/DurableTask/Page{T}.cs b/src/DurableTask/Page{T}.cs index 52e588221..d2fbe05ec 100644 --- a/src/DurableTask/Page{T}.cs +++ b/src/DurableTask/Page{T}.cs @@ -1,21 +1,17 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System.Collections; - namespace Microsoft.DurableTask; /// /// Represents a single page of results. /// -public sealed class Page : IReadOnlyList +public sealed class Page where T : notnull { // This code was adopted from Azure SDK Page. // https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/Page.cs - readonly IReadOnlyList values; - /// /// Initializes a new instance of the class. /// @@ -23,23 +19,17 @@ public sealed class Page : IReadOnlyList /// The continuation token. public Page(IReadOnlyList values, string? continuationToken = null) { - this.values = values ?? throw new ArgumentNullException(nameof(values)); + this.Values = values ?? throw new ArgumentNullException(nameof(values)); this.ContinuationToken = continuationToken; } - /// - public T this[int index] => this.values[index]; + /// + /// Gets the values contained in this page. + /// + public IReadOnlyList Values { get; } /// /// Gets the continuation token or null if there are no more items. /// public string? ContinuationToken { get; } - - /// - public int Count => this.values.Count; - - /// - public IEnumerator GetEnumerator() => this.values.GetEnumerator(); - - IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator(); } diff --git a/test/DurableTask.Sdk.Tests/PageableTests.cs b/test/DurableTask.Sdk.Tests/PageableTests.cs index 3b09d281b..8fb03645e 100644 --- a/test/DurableTask.Sdk.Tests/PageableTests.cs +++ b/test/DurableTask.Sdk.Tests/PageableTests.cs @@ -43,7 +43,7 @@ public async Task Create_Func_AsPages(int? start, int? pageSize) List> pages = await pageable.AsPages(start?.ToString(), pageSize).ToListAsync(); pages.Should().HaveCount((int)Math.Ceiling((15.0 - (start ?? 0)) / (pageSize ?? 3))); - pages.SelectMany(x => x).Should().BeEquivalentTo(expected); + pages.SelectMany(x => x.Values).Should().BeEquivalentTo(expected); counter.Callbacks.Should().Be(pages.Count); } From e84a7eae422af31bd850a4c1420eee58188e2def Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 8 Sep 2022 16:10:44 -0700 Subject: [PATCH 09/18] Fix end of file new lines --- Directory.Build.props | 2 +- Directory.Build.targets | 4 +--- src/Directory.Build.props | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index ebb4173d3..658d3c008 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -14,4 +14,4 @@ 00240000048000009400000006020000002400005253413100040000010001000505410141442095e0b0466df68ac32d158abdf0bd9cf26407d7a9d20b93656530556f23979f20b067628b9baee75fc6bba5c349519585a2852c1843bc61d74c4cbf9d80429cbdcbf609ea70fd62061eb65f40a6c9d505dd71cb119ef51f589a014d3b56159abbc38825fafbd119b6e97ebd9a5f3862a5c06220c680a6ac9eec - \ No newline at end of file + diff --git a/Directory.Build.targets b/Directory.Build.targets index 771f5c6c5..ba88f5951 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -1,5 +1,3 @@ - - - \ No newline at end of file + diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 2cfbfd2ad..3ea54447d 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -29,4 +29,4 @@ - \ No newline at end of file + From ab0fb463e227530f94f604da643bc8ea30f6d00c Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Fri, 9 Sep 2022 09:38:27 -0700 Subject: [PATCH 10/18] Rename strong name key property --- Directory.Build.props | 2 +- src/DurableTask/Microsoft.DurableTask.csproj | 2 +- test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index 658d3c008..38c127e6c 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ true $(MSBuildThisFileDirectory)eng/key.snk - 00240000048000009400000006020000002400005253413100040000010001000505410141442095e0b0466df68ac32d158abdf0bd9cf26407d7a9d20b93656530556f23979f20b067628b9baee75fc6bba5c349519585a2852c1843bc61d74c4cbf9d80429cbdcbf609ea70fd62061eb65f40a6c9d505dd71cb119ef51f589a014d3b56159abbc38825fafbd119b6e97ebd9a5f3862a5c06220c680a6ac9eec + 00240000048000009400000006020000002400005253413100040000010001000505410141442095e0b0466df68ac32d158abdf0bd9cf26407d7a9d20b93656530556f23979f20b067628b9baee75fc6bba5c349519585a2852c1843bc61d74c4cbf9d80429cbdcbf609ea70fd62061eb65f40a6c9d505dd71cb119ef51f589a014d3b56159abbc38825fafbd119b6e97ebd9a5f3862a5c06220c680a6ac9eec diff --git a/src/DurableTask/Microsoft.DurableTask.csproj b/src/DurableTask/Microsoft.DurableTask.csproj index 8d3251c6c..d0e5fe249 100644 --- a/src/DurableTask/Microsoft.DurableTask.csproj +++ b/src/DurableTask/Microsoft.DurableTask.csproj @@ -47,7 +47,7 @@ - + diff --git a/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj b/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj index c99fe6c4e..22a600b5f 100644 --- a/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj +++ b/test/DurableTask.Sdk.Tests/Microsoft.DurableTask.Tests.csproj @@ -9,6 +9,10 @@ + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + From ad38f69f63cd9bb168624217d8fa65df242ff04a Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Fri, 9 Sep 2022 16:17:04 -0700 Subject: [PATCH 11/18] Add instance query end-to-end tests --- .../DurableTaskGrpcClientIntegrationTests.cs | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs new file mode 100644 index 000000000..621b08ea5 --- /dev/null +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -0,0 +1,153 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using FluentAssertions.Execution; +using Microsoft.DurableTask.Tests; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +public class DurableTaskGrpcClientIntegrationTests : IntegrationTestBase +{ + const string OrchestrationName = "TestOrchestration"; + + public DurableTaskGrpcClientIntegrationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + : base(output, sidecarFixture) + { + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task GetInstanceMetadata_EndToEnd(bool shouldThrow) + { + static void AssertMetadata(OrchestrationMetadata metadata, string instanceId, OrchestrationRuntimeStatus status) + { + metadata.Should().NotBeNull(); + using (new AssertionScope()) + { + metadata.Name.Should().Be(OrchestrationName); + metadata.InstanceId.Should().Be(instanceId); + metadata.SerializedInput.Should().BeNull(); + metadata.SerializedOutput.Should().BeNull(); + metadata.RuntimeStatus.Should().Be(status); + metadata.FailureDetails.Should().BeNull(); + } + } + + await using DurableTaskGrpcWorker server = await this.StartAsync(); + DurableTaskClient client = this.CreateDurableTaskClient(); + + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, input: shouldThrow); + OrchestrationMetadata? metadata = await client.GetInstanceMetadataAsync(instanceId, false); + AssertMetadata(metadata!, instanceId, OrchestrationRuntimeStatus.Pending); + + await client.WaitForInstanceStartAsync(instanceId, default); + metadata = await client.GetInstanceMetadataAsync(instanceId, false); + AssertMetadata(metadata!, instanceId, OrchestrationRuntimeStatus.Running); + + await client.RaiseEventAsync(instanceId, "event", default); + await client.WaitForInstanceCompletionAsync(instanceId, default); + metadata = await client.GetInstanceMetadataAsync(instanceId, false); + AssertMetadata(metadata!, instanceId, shouldThrow ? OrchestrationRuntimeStatus.Failed : OrchestrationRuntimeStatus.Completed); + } + + [Fact] + public async Task GetInstances_EndToEnd() + { + static void AssertMetadata(OrchestrationMetadata metadata, OrchestrationRuntimeStatus status) + { + metadata.Should().NotBeNull(); + using (new AssertionScope()) + { + metadata.Name.Should().Be(OrchestrationName); + metadata.InstanceId.Should().StartWith("expected-"); + metadata.RuntimeStatus.Should().Be(status); + + // InMemoryOrchestrationService always returns these in a query. + // metadata.FailureDetails.Should().BeNull(); + // metadata.SerializedInput.Should().BeNull(); + // metadata.SerializedOutput.Should().BeNull(); + } + } + + OrchestrationQuery query = new() { InstanceIdPrefix = "GetInstances_EndToEnd" }; + + static async Task ForEachAsync(Func func) + { + await func("GetInstances_EndToEnd-1"); + await func("GetInstances_EndToEnd-2"); + } + + await using DurableTaskGrpcWorker server = await this.StartAsync(); + DurableTaskClient client = this.CreateDurableTaskClient(); + + string notIncluded = await client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, input: false); + + await ForEachAsync(x => client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, x, input: false)); + AsyncPageable pageable = client.GetInstances(query); + + List metadata = await pageable.ToListAsync(); + metadata.Should().HaveCount(2) + .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Pending)); + + await ForEachAsync(x => client.WaitForInstanceStartAsync(x, default)); + metadata = await pageable.ToListAsync(); + metadata.Should().HaveCount(2) + .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Running)); + + await ForEachAsync(x => client.RaiseEventAsync(x, "event", default)); + await ForEachAsync(x => client.WaitForInstanceCompletionAsync(x, default)); + metadata = await pageable.ToListAsync(); + metadata.Should().HaveCount(2) + .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Completed)); + } + + [Fact] + public async Task GetInstances_AsPages_EndToEnd() + { + OrchestrationQuery query = new() { InstanceIdPrefix = "GetInstances_AsPages_EndToEnd" }; + await using DurableTaskGrpcWorker server = await this.StartAsync(); + DurableTaskClient client = this.CreateDurableTaskClient(); + + for (int i = 0; i < 21; i++) + { + await client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, $"GetInstances_AsPages_EndToEnd-{i}", input: false); + } + + AsyncPageable pageable = client.GetInstances(query); + List> pages = await pageable.AsPages(pageSizeHint: 5).ToListAsync(); + pages.Should().HaveCount(5); + pages.ForEach(p => p.Values.Should().HaveCount(p.ContinuationToken is null ? 1 : 5)); + + List> resumedPages = await pageable.AsPages(pages[1].ContinuationToken, pageSizeHint: 4).ToListAsync(); + resumedPages.Should().HaveCount(3); + + List left = resumedPages.SelectMany(p => p.Values).ToList(); + List right = pages.Skip(2).SelectMany(p => p.Values).ToList(); + left.Should().BeEquivalentTo(right, cfg => cfg.Including(x => x.InstanceId).Including(x => x.CreatedAt)); + } + + async Task StartAsync() + { + static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow) + { + context.SetCustomStatus("waiting"); + await context.WaitForExternalEvent("event"); + if (shouldThrow) + { + throw new InvalidOperationException("Orchestration failed"); + } + + return $"{shouldThrow} -> output"; + } + + DurableTaskGrpcWorker server = this.CreateWorkerBuilder() + .AddTasks(tasks => tasks.AddOrchestrator(OrchestrationName, Orchestration)) + .Build(); + await server.StartAsync(this.TimeoutToken); + return server; + } +} From 9fe5363d6266d720f2b7c9c639fd3e4af99dfc41 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Fri, 9 Sep 2022 16:22:47 -0700 Subject: [PATCH 12/18] Add FirstAsync example to tests --- .../Grpc/DurableTaskGrpcClientIntegrationTests.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index 621b08ea5..3282c2147 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -128,6 +128,10 @@ public async Task GetInstances_AsPages_EndToEnd() List left = resumedPages.SelectMany(p => p.Values).ToList(); List right = pages.Skip(2).SelectMany(p => p.Values).ToList(); left.Should().BeEquivalentTo(right, cfg => cfg.Including(x => x.InstanceId).Including(x => x.CreatedAt)); + + Page page = await pageable.AsPages(pageSizeHint: 10).FirstAsync(); + page.Values.Should().HaveCount(10); + page.ContinuationToken.Should().Be("10"); } async Task StartAsync() From e3943506c0de353ee76c6cf92a0950f6daff0025 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 12 Sep 2022 15:22:44 -0700 Subject: [PATCH 13/18] Fix test failures --- .../Grpc/DurableTaskGrpcClientIntegrationTests.cs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index 3282c2147..c79194261 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -41,11 +41,9 @@ static void AssertMetadata(OrchestrationMetadata metadata, string instanceId, Or DurableTaskClient client = this.CreateDurableTaskClient(); string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, input: shouldThrow); - OrchestrationMetadata? metadata = await client.GetInstanceMetadataAsync(instanceId, false); - AssertMetadata(metadata!, instanceId, OrchestrationRuntimeStatus.Pending); await client.WaitForInstanceStartAsync(instanceId, default); - metadata = await client.GetInstanceMetadataAsync(instanceId, false); + OrchestrationMetadata? metadata = await client.GetInstanceMetadataAsync(instanceId, false); AssertMetadata(metadata!, instanceId, OrchestrationRuntimeStatus.Running); await client.RaiseEventAsync(instanceId, "event", default); @@ -63,7 +61,7 @@ static void AssertMetadata(OrchestrationMetadata metadata, OrchestrationRuntimeS using (new AssertionScope()) { metadata.Name.Should().Be(OrchestrationName); - metadata.InstanceId.Should().StartWith("expected-"); + metadata.InstanceId.Should().StartWith("GetInstances_EndToEnd-"); metadata.RuntimeStatus.Should().Be(status); // InMemoryOrchestrationService always returns these in a query. @@ -89,12 +87,8 @@ static async Task ForEachAsync(Func func) await ForEachAsync(x => client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, x, input: false)); AsyncPageable pageable = client.GetInstances(query); - List metadata = await pageable.ToListAsync(); - metadata.Should().HaveCount(2) - .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Pending)); - await ForEachAsync(x => client.WaitForInstanceStartAsync(x, default)); - metadata = await pageable.ToListAsync(); + List metadata = await pageable.ToListAsync(); metadata.Should().HaveCount(2) .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Running)); From 51202962ad865cd331952c48cdb478b460089a8a Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 12 Sep 2022 15:39:02 -0700 Subject: [PATCH 14/18] Fix test failures --- .../Grpc/DurableTaskGrpcClientIntegrationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index c79194261..807cd7594 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -125,7 +125,7 @@ public async Task GetInstances_AsPages_EndToEnd() Page page = await pageable.AsPages(pageSizeHint: 10).FirstAsync(); page.Values.Should().HaveCount(10); - page.ContinuationToken.Should().Be("10"); + page.ContinuationToken.Should().NotBeNull(); } async Task StartAsync() From 25c0f42326378c6a5931e393922cd8e6a265bfff Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Wed, 14 Sep 2022 16:00:24 -0700 Subject: [PATCH 15/18] Address PR comments --- .../DurableTaskGrpcClientIntegrationTests.cs | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index 807cd7594..496cb80fd 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -65,15 +65,17 @@ static void AssertMetadata(OrchestrationMetadata metadata, OrchestrationRuntimeS metadata.RuntimeStatus.Should().Be(status); // InMemoryOrchestrationService always returns these in a query. - // metadata.FailureDetails.Should().BeNull(); - // metadata.SerializedInput.Should().BeNull(); - // metadata.SerializedOutput.Should().BeNull(); + // The NotBeNull() here is to force this test failure when correct behavior + // is added, so we remember to change bellow to "NotNull()". + metadata.FailureDetails.Should().NotBeNull(); + metadata.SerializedInput.Should().NotBeNull(); + metadata.SerializedOutput.Should().NotBeNull(); } } OrchestrationQuery query = new() { InstanceIdPrefix = "GetInstances_EndToEnd" }; - static async Task ForEachAsync(Func func) + static async Task ForEachOrchestrationAsync(Func func) { await func("GetInstances_EndToEnd-1"); await func("GetInstances_EndToEnd-2"); @@ -82,21 +84,24 @@ static async Task ForEachAsync(Func func) await using DurableTaskGrpcWorker server = await this.StartAsync(); DurableTaskClient client = this.CreateDurableTaskClient(); + // Enqueue an extra orchestration which we will verify is NOT present. string notIncluded = await client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, input: false); - await ForEachAsync(x => client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, x, input: false)); + await ForEachOrchestrationAsync(x => client.ScheduleNewOrchestrationInstanceAsync(OrchestrationName, x, input: false)); AsyncPageable pageable = client.GetInstances(query); - await ForEachAsync(x => client.WaitForInstanceStartAsync(x, default)); + await ForEachOrchestrationAsync(x => client.WaitForInstanceStartAsync(x, default)); List metadata = await pageable.ToListAsync(); metadata.Should().HaveCount(2) - .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Running)); + .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Running)) + .And.NotContain(x => string.Equals(x.InstanceId, notIncluded, StringComparison.OrdinalIgnoreCase)); - await ForEachAsync(x => client.RaiseEventAsync(x, "event", default)); - await ForEachAsync(x => client.WaitForInstanceCompletionAsync(x, default)); + await ForEachOrchestrationAsync(x => client.RaiseEventAsync(x, "event", default)); + await ForEachOrchestrationAsync(x => client.WaitForInstanceCompletionAsync(x, default)); metadata = await pageable.ToListAsync(); metadata.Should().HaveCount(2) - .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Completed)); + .And.AllSatisfy(m => AssertMetadata(m, OrchestrationRuntimeStatus.Completed)) + .And.NotContain(x => string.Equals(x.InstanceId, notIncluded, StringComparison.OrdinalIgnoreCase)); } [Fact] From a5f758e6592071e4796db71d09b8f1b35ca48c26 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 19 Sep 2022 09:35:41 -0700 Subject: [PATCH 16/18] Fix unit test conditions --- .../Grpc/DurableTaskGrpcClientIntegrationTests.cs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index 496cb80fd..51ec91ca3 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -67,9 +67,17 @@ static void AssertMetadata(OrchestrationMetadata metadata, OrchestrationRuntimeS // InMemoryOrchestrationService always returns these in a query. // The NotBeNull() here is to force this test failure when correct behavior // is added, so we remember to change bellow to "NotNull()". - metadata.FailureDetails.Should().NotBeNull(); + metadata.FailureDetails.Should().BeNull(); metadata.SerializedInput.Should().NotBeNull(); - metadata.SerializedOutput.Should().NotBeNull(); + + if (status == OrchestrationRuntimeStatus.Completed) + { + metadata.SerializedOutput.Should().NotBeNull(); + } + else + { + metadata.SerializedOutput.Should().BeNull(); + } } } From 3e3d18a0392b3a202dd43987a12c6eabe4beb4f8 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 19 Sep 2022 10:16:50 -0700 Subject: [PATCH 17/18] Change comment location --- .../Grpc/DurableTaskGrpcClientIntegrationTests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index 51ec91ca3..f0b5e5dfb 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -63,13 +63,12 @@ static void AssertMetadata(OrchestrationMetadata metadata, OrchestrationRuntimeS metadata.Name.Should().Be(OrchestrationName); metadata.InstanceId.Should().StartWith("GetInstances_EndToEnd-"); metadata.RuntimeStatus.Should().Be(status); + metadata.FailureDetails.Should().BeNull(); // InMemoryOrchestrationService always returns these in a query. // The NotBeNull() here is to force this test failure when correct behavior // is added, so we remember to change bellow to "NotNull()". - metadata.FailureDetails.Should().BeNull(); metadata.SerializedInput.Should().NotBeNull(); - if (status == OrchestrationRuntimeStatus.Completed) { metadata.SerializedOutput.Should().NotBeNull(); From ef3844188d11ae99fac44d41a7bccc581d9a0b51 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Mon, 19 Sep 2022 10:17:19 -0700 Subject: [PATCH 18/18] Fix comment --- .../Grpc/DurableTaskGrpcClientIntegrationTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs index f0b5e5dfb..e2aca7190 100644 --- a/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs +++ b/test/DurableTask.Sdk.Tests/Grpc/DurableTaskGrpcClientIntegrationTests.cs @@ -67,7 +67,7 @@ static void AssertMetadata(OrchestrationMetadata metadata, OrchestrationRuntimeS // InMemoryOrchestrationService always returns these in a query. // The NotBeNull() here is to force this test failure when correct behavior - // is added, so we remember to change bellow to "NotNull()". + // is added, so we remember to change bellow to "BeNull()". metadata.SerializedInput.Should().NotBeNull(); if (status == OrchestrationRuntimeStatus.Completed) {