Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project>

<!-- Common build settings -->
<PropertyGroup>
<LangVersion>10.0</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<!-- Signing -->
<PropertyGroup>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(MSBuildThisFileDirectory)eng/key.snk</AssemblyOriginatorKeyFile>
<StrongNamePublicKey>00240000048000009400000006020000002400005253413100040000010001000505410141442095e0b0466df68ac32d158abdf0bd9cf26407d7a9d20b93656530556f23979f20b067628b9baee75fc6bba5c349519585a2852c1843bc61d74c4cbf9d80429cbdcbf609ea70fd62061eb65f40a6c9d505dd71cb119ef51f589a014d3b56159abbc38825fafbd119b6e97ebd9a5f3862a5c06220c680a6ac9eec</StrongNamePublicKey>
</PropertyGroup>

</Project>
3 changes: 3 additions & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Project>
<Import Project="$(MSBuildThisFileDirectory)eng/InternalsVisibleTo.targets" />
</Project>
45 changes: 45 additions & 0 deletions eng/InternalsVisibleTo.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<!-- Licensed to the .NET Foundation under one or more agreements. The .NET Foundation licenses this file to you under the MIT license. See the LICENSE file in the project root for more information. -->
<Project>

<PropertyGroup>
<GeneratedInternalsVisibleToFile>$(IntermediateOutputPath)$(MSBuildProjectName).InternalsVisibleTo$(DefaultLanguageSourceExtension)</GeneratedInternalsVisibleToFile>
</PropertyGroup>

<ItemDefinitionGroup>
<InternalsVisibleTo>
<Visible>false</Visible>
</InternalsVisibleTo>
</ItemDefinitionGroup>

<Target Name="PrepareGenerateInternalsVisibleToFile" Condition="'@(InternalsVisibleTo)' != ''">

<ItemGroup>
<_InternalsVisibleToAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1 Condition="'%(InternalsVisibleTo.Key)' != ''">%(InternalsVisibleTo.Identity), PublicKey=%(InternalsVisibleTo.Key)</_Parameter1>
<_Parameter1 Condition="'%(InternalsVisibleTo.Key)' == '' and '$(PublicKey)' != ''">%(InternalsVisibleTo.Identity), PublicKey=$(PublicKey)</_Parameter1>
<_Parameter1 Condition="'%(InternalsVisibleTo.Key)' == '' and '$(PublicKey)' == ''">%(InternalsVisibleTo.Identity)</_Parameter1>
</_InternalsVisibleToAttribute>
</ItemGroup>

</Target>

<!--
Dependency on PrepareForBuild is necessary so that we don't accidentally get ordered before it.
We rely on PrepareForBuild to create the IntermediateOutputDirectory if it doesn't exist.
-->
<Target Name="GenerateInternalsVisibleToFile"
Inputs="$(MSBuildThisFileFullPath);$(MSBuildProjectFile)"
Outputs="$(GeneratedInternalsVisibleToFile)"
DependsOnTargets="PrepareGenerateInternalsVisibleToFile;PrepareForBuild"
Condition="'@(InternalsVisibleTo)' != ''"
BeforeTargets="CoreCompile">

<WriteCodeFragment AssemblyAttributes="@(_InternalsVisibleToAttribute)" Language="$(Language)" OutputFile="$(GeneratedInternalsVisibleToFile)">
<Output TaskParameter="OutputFile" ItemName="CompileBefore" Condition="'$(Language)' == 'F#'" />
<Output TaskParameter="OutputFile" ItemName="Compile" Condition="'$(Language)' != 'F#'" />
<Output TaskParameter="OutputFile" ItemName="FileWrites" />
</WriteCodeFragment>

</Target>

</Project>
File renamed without changes.
11 changes: 5 additions & 6 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
<Project>

<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory)../, $(_DirectoryBuildPropsFile)))/$(_DirectoryBuildPropsFile)"
Condition=" '$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory)../, $(_DirectoryBuildPropsFile)))' != '' " />

<!-- Common build settings -->
<PropertyGroup>
<LangVersion>10.0</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>../../key.snk</AssemblyOriginatorKeyFile>
<DebugType>embedded</DebugType>
<Company>Microsoft Corporation</Company>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
Expand All @@ -30,4 +28,5 @@
<PackagePath>content/SBOM</PackagePath>
</Content>
</ItemGroup>
</Project>

</Project>
44 changes: 44 additions & 0 deletions src/DurableTask/AsyncPageable{T}.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask;

/// <summary>
/// A collection of values that may take multiple service requests to iterate over.
/// </summary>
/// <typeparam name="T">The type of the value.</typeparam>
public abstract class AsyncPageable<T> : IAsyncEnumerable<T>
where T : notnull
{
// This code was adapted from Azure SDK AsyncPageable.
// https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/AsyncPageable.cs

/// <summary>
/// Enumerate the values a <see cref="Page{T}"/> at a time.
/// </summary>
/// <param name="continuationToken">
/// A continuation token indicating where to resume paging or null to begin paging from the
/// beginning.
/// </param>
/// <param name="pageSizeHint">
/// The number of items per <see cref="Page{T}"/> that should be requested
/// (from service operations that support it). It's not guaranteed that the value will be
/// respected.
/// </param>
/// <returns>An async enumerable of pages.</returns>
public abstract IAsyncEnumerable<Page<T>> AsPages(
string? continuationToken = default, int? pageSizeHint = default);

/// <inheritdoc/>
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
// TODO: ConfigureAwait(false)? This may cause issues when used in an orchestration.
await foreach (Page<T> page in this.AsPages().WithCancellation(cancellationToken))
{
foreach (T value in page.Values)
{
yield return value;
}
}
}
}
7 changes: 6 additions & 1 deletion src/DurableTask/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ public abstract Task<OrchestrationMetadata> WaitForInstanceCompletionAsync(
/// <inheritdoc cref="WaitForInstanceStartAsync(string, CancellationToken, bool)"/>
public abstract Task<OrchestrationMetadata?> GetInstanceMetadataAsync(string instanceId, bool getInputsAndOutputs);

// TODO: Multi-instance query
/// <summary>
/// Queries orchestration instances
/// </summary>
/// <param name="query">Filters down the instances included in the query.</param>
/// <returns>An async pageable of the query results.</returns>
public abstract AsyncPageable<OrchestrationMetadata> GetInstances(OrchestrationQuery? query = null);

/// <summary>
/// Purges orchestration instance metadata from the durable store.
Expand Down
54 changes: 51 additions & 3 deletions src/DurableTask/Grpc/DurableTaskGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,55 @@ await this.sidecarClient.TerminateInstanceAsync(new P.TerminateRequest
return null;
}

return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs);
return new OrchestrationMetadata(response.OrchestrationState, this.dataConverter, getInputsAndOutputs);
}

/// <inheritdoc/>
public override AsyncPageable<OrchestrationMetadata> GetInstances(OrchestrationQuery? query = null)
{
return Pageable.Create(async (continuation, pageSize, cancellation) =>
{
P.QueryInstancesRequest request = new()
{
Query = new P.InstanceQuery
{
CreatedTimeFrom = query?.CreatedFrom?.ToTimestamp(),
CreatedTimeTo = query?.CreatedTo?.ToTimestamp(),
FetchInputsAndOutputs = query?.FetchInputsAndOutputs ?? false,
InstanceIdPrefix = query?.InstanceIdPrefix,
MaxInstanceCount = pageSize ?? query?.PageSize ?? OrchestrationQuery.DefaultPageSize,
ContinuationToken = continuation ?? query?.ContinuationToken,
},
};

if (query?.Statuses is not null)
{
request.Query.RuntimeStatus.AddRange(query.Statuses.Select(x => x.ToGrpcStatus()));
}

if (query?.TaskHubNames is not null)
{
request.Query.TaskHubNames.AddRange(query.TaskHubNames);
}

try
{

P.QueryInstancesResponse response = await this.sidecarClient.QueryInstancesAsync(
request, cancellationToken: cancellation);

bool getInputsAndOutputs = query?.FetchInputsAndOutputs ?? false;
IReadOnlyList<OrchestrationMetadata> values = response.OrchestrationState
.Select(x => new OrchestrationMetadata(x, this.dataConverter, getInputsAndOutputs))
.ToList();

return new Page<OrchestrationMetadata>(values, response.ContinuationToken);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException($"The {nameof(GetInstances)} operation was canceled.", e, cancellation);
}
});
}

/// <inheritdoc/>
Expand Down Expand Up @@ -214,7 +262,7 @@ public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
throw new OperationCanceledException($"The {nameof(WaitForInstanceStartAsync)} operation was canceled.", e, cancellationToken);
}

return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs);
return new OrchestrationMetadata(response.OrchestrationState, this.dataConverter, getInputsAndOutputs);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -243,7 +291,7 @@ public override async Task<OrchestrationMetadata> WaitForInstanceCompletionAsync
throw new OperationCanceledException($"The {nameof(WaitForInstanceCompletionAsync)} operation was canceled.", e, cancellationToken);
}

return new OrchestrationMetadata(response, this.dataConverter, getInputsAndOutputs);
return new OrchestrationMetadata(response.OrchestrationState, this.dataConverter, getInputsAndOutputs);
}

/// <inheritdoc/>
Expand Down
4 changes: 4 additions & 0 deletions src/DurableTask/Microsoft.DurableTask.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@
<ProjectReference Include="..\DurableTask.Generators\Microsoft.DurableTask.Generators.csproj" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Microsoft.DurableTask.Tests" Key="$(StrongNamePublicKey)" />
</ItemGroup>

</Project>
20 changes: 10 additions & 10 deletions src/DurableTask/OrchestrationMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ public sealed class OrchestrationMetadata
readonly bool requestedInputsAndOutputs;

internal OrchestrationMetadata(
P.GetInstanceResponse response,
P.OrchestrationState state,
DataConverter dataConverter,
bool requestedInputsAndOutputs)
{
this.Name = response.OrchestrationState.Name;
this.InstanceId = response.OrchestrationState.InstanceId;
this.RuntimeStatus = (OrchestrationRuntimeStatus)response.OrchestrationState.OrchestrationStatus;
this.CreatedAt = response.OrchestrationState.CreatedTimestamp.ToDateTimeOffset();
this.LastUpdatedAt = response.OrchestrationState.LastUpdatedTimestamp.ToDateTimeOffset();
this.SerializedInput = response.OrchestrationState.Input;
this.SerializedOutput = response.OrchestrationState.Output;
this.SerializedCustomStatus = response.OrchestrationState.CustomStatus;
this.FailureDetails = ProtoUtils.ConvertTaskFailureDetails(response.OrchestrationState?.FailureDetails);
this.Name = state.Name;
this.InstanceId = state.InstanceId;
this.RuntimeStatus = (OrchestrationRuntimeStatus)state.OrchestrationStatus;
this.CreatedAt = state.CreatedTimestamp.ToDateTimeOffset();
this.LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset();
this.SerializedInput = state.Input;
this.SerializedOutput = state.Output;
this.SerializedCustomStatus = state.CustomStatus;
this.FailureDetails = ProtoUtils.ConvertTaskFailureDetails(state?.FailureDetails);
this.dataConverter = dataConverter;
this.requestedInputsAndOutputs = requestedInputsAndOutputs;
}
Expand Down
31 changes: 31 additions & 0 deletions src/DurableTask/OrchestrationQuery.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask;

/// <summary>
/// A filter for querying orchestration instances.
/// </summary>
/// <param name="CreatedFrom">Creation date of instances to query from.</param>
/// <param name="CreatedTo">Creation date of instances to query to.</param>
/// <param name="Statuses">Runtime statuses of instances to query.</param>
/// <param name="TaskHubNames">Names of task hubs to query across.</param>
/// <param name="InstanceIdPrefix">Prefix of instance IDs to include.</param>
/// <param name="PageSize">Max item count to include per page.</param>
/// <param name="FetchInputsAndOutputs">Whether to include instance inputs or outputs in the query results.</param>
/// <param name="ContinuationToken">The continuation token to continue a paged query.</param>
public record OrchestrationQuery(
DateTimeOffset? CreatedFrom = null,
DateTimeOffset? CreatedTo = null,
IEnumerable<OrchestrationRuntimeStatus>? Statuses = null,
IEnumerable<string>? TaskHubNames = null,
string? InstanceIdPrefix = null,
int PageSize = OrchestrationQuery.DefaultPageSize,
bool FetchInputsAndOutputs = false,
string? ContinuationToken = null)
{
/// <summary>
/// The default page size when not supplied.
/// </summary>
public const int DefaultPageSize = 100;
}
81 changes: 81 additions & 0 deletions src/DurableTask/Pageable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Runtime.CompilerServices;

namespace Microsoft.DurableTask;

/// <summary>
/// Pageable helpers.
/// </summary>
static class Pageable
{
// This code was adapted from Azure SDK PageResponseEnumerator.
// https://github.com/Azure/azure-sdk-for-net/blob/e811f016a3655e4b29a23c71f84d59f34fe01233/sdk/core/Azure.Core/src/Shared/PageResponseEnumerator.cs
// TODO: Add Pageable<T> (non-async) when/if it becomes relevant.

/// <summary>
/// Creates an async pageable from a callback function <paramref name="pageFunc" />.
/// </summary>
/// <typeparam name="T">The type of the value.</typeparam>
/// <param name="pageFunc">The callback to fetch additional pages.</param>
/// <returns>An async pageable.</returns>
public static AsyncPageable<T> Create<T>(Func<string?, CancellationToken, Task<Page<T>>> pageFunc)
where T : notnull
{
if (pageFunc is null)
{
throw new ArgumentNullException(nameof(pageFunc));
}

return Create((continuation, size, cancellation) => pageFunc(continuation, cancellation));
}

/// <summary>
/// Creates an async pageable from a callback function <paramref name="pageFunc" />.
/// </summary>
/// <typeparam name="T">The type of the value.</typeparam>
/// <param name="pageFunc">The callback to fetch additional pages.</param>
/// <returns>An async pageable.</returns>
public static AsyncPageable<T> Create<T>(Func<string?, int?, CancellationToken, Task<Page<T>>> pageFunc)
where T : notnull
{
if (pageFunc is null)
{
throw new ArgumentNullException(nameof(pageFunc));
}

return new FuncAsyncPageable<T>(pageFunc);
}

class FuncAsyncPageable<T> : AsyncPageable<T>
where T : notnull
{
readonly Func<string?, int?, CancellationToken, Task<Page<T>>> pageFunc;

public FuncAsyncPageable(Func<string?, int?, CancellationToken, Task<Page<T>>> pageFunc)
{
this.pageFunc = pageFunc;
}

public override IAsyncEnumerable<Page<T>> AsPages(
string? continuationToken = default, int? pageSizeHint = default)
=> this.AsPagesCore(continuationToken, pageSizeHint);

async IAsyncEnumerable<Page<T>> AsPagesCore(
string? continuationToken = default,
int? pageSizeHint = default,
[EnumeratorCancellation] CancellationToken cancellation = default)
{
do
{
// TODO: Do we need to support customizing ConfigureAwait(bool) here?
// ConfigureAwait(false) makes this unusable in orchestrations.
Page<T> page = await this.pageFunc(continuationToken, pageSizeHint, cancellation);
yield return page;
continuationToken = page.ContinuationToken;
}
while (continuationToken is not null);
}
}
}
Loading