Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/TeamCloud.Data.Expanders/ComponentTaskExpander.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public async Task ExpandAsync(ComponentTask document)

private async Task<string> GetEventsAsync(ComponentTask document)
{
var output = default(string);

if (document.TaskState.IsActive() && AzureResourceIdentifier.TryParse(document.ResourceId, out var resourceId))
{
try
Expand All @@ -83,8 +85,8 @@ private async Task<string> GetEventsAsync(ComponentTask document)

if (containerGroup is not null)
{
return await containerGroup
.GetEventContentAsync("runner")
output = await containerGroup
.GetEventContentAsync(document.Id)
.ConfigureAwait(false);
}
}
Expand All @@ -94,7 +96,7 @@ private async Task<string> GetEventsAsync(ComponentTask document)
}
}

return default;
return output;
}

private async Task<string> GetOutputAsync(ComponentTask document)
Expand Down
26 changes: 19 additions & 7 deletions src/TeamCloud.Data/IDocumentSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,36 @@
*/

using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading.Tasks;
using TeamCloud.Model.Data.Core;

namespace TeamCloud.Data;

public abstract class DocumentSubscription : IDocumentSubscription
{
private static readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, MethodInfo>> HandleMethodCache = new ConcurrentDictionary<Type,ConcurrentDictionary<Type, MethodInfo>>();

private MethodInfo GetHandleMethod(IContainerDocument containerDocument) => HandleMethodCache
.GetOrAdd(GetType(), _ => new ConcurrentDictionary<Type, MethodInfo>())
.GetOrAdd(containerDocument.GetType(), containerDocumentType =>
{
var subscriberInterface = typeof(IDocumentSubscription<>)
.MakeGenericType(containerDocument.GetType());

if (subscriberInterface.IsAssignableFrom(GetType()))
return subscriberInterface.GetMethod(nameof(HandleAsync), new Type[] { containerDocument.GetType(), typeof(DocumentSubscriptionEvent) });

return null;
});

public virtual bool CanHandle(IContainerDocument containerDocument)
{
if (containerDocument is null)
throw new ArgumentNullException(nameof(containerDocument));

return typeof(IDocumentSubscription<>).MakeGenericType(containerDocument.GetType()).IsAssignableFrom(GetType());
return GetHandleMethod(containerDocument) is not null;
}

public virtual Task HandleAsync(IContainerDocument containerDocument, DocumentSubscriptionEvent subscriptionEvent)
Expand All @@ -25,12 +42,7 @@ public virtual Task HandleAsync(IContainerDocument containerDocument, DocumentSu
throw new ArgumentNullException(nameof(containerDocument));

if (CanHandle(containerDocument))
{
return (Task)typeof(IDocumentExpander<>)
.MakeGenericType(containerDocument.GetType())
.GetMethod(nameof(HandleAsync), new Type[] { containerDocument.GetType(), typeof(DocumentSubscriptionEvent) })
.Invoke(this, new object[] { containerDocument, subscriptionEvent });
}
return (Task)GetHandleMethod(containerDocument).Invoke(this, new object[] { containerDocument, subscriptionEvent });

throw new NotImplementedException($"Missing document subscription implementation IDocumentSubscription<{containerDocument.GetType().Name}> at {GetType()}");
}
Expand Down
2 changes: 1 addition & 1 deletion src/TeamCloud.Orchestrator/API/CommandTrigger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ await commandAuditWriter
else
{
commandResult = await commandHandler
.HandleAsync(command, commandCollector, durableClient, null, log ?? NullLogger.Instance)
.HandleAsync(command, commandCollector, null, log ?? NullLogger.Instance)
.ConfigureAwait(false);

if (!commandResult.RuntimeStatus.IsFinal())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

namespace TeamCloud.Orchestrator.Command.Activities;

public sealed class CommandCollectActivity
public sealed class CommandEnqueueActivity
{
[FunctionName(nameof(CommandCollectActivity))]
[FunctionName(nameof(CommandEnqueueActivity))]
public async Task RunActivity(
[ActivityTrigger] IDurableActivityContext activityContext,
[Queue(CommandHandler.ProcessorQueue)] IAsyncCollector<ICommand> commandCollector,
Expand All @@ -40,7 +40,7 @@ await commandCollector
}
catch (Exception exc)
{
log.LogError(exc, $"Failed to collect command: {exc.Message}");
log.LogError(exc, $"Failed to enqeueu command: {exc.Message}");

throw exc.AsSerializable();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using TeamCloud.Serialization;

namespace TeamCloud.Orchestrator.Command.Activities;

public sealed class CommandStatusActivity
{
[FunctionName(nameof(CommandStatusActivity))]
public Task<DurableOrchestrationStatus> RunActivity(
[ActivityTrigger] IDurableActivityContext activityContext,
[DurableClient] IDurableClient orchestrationClient,
ILogger log)
{
if (activityContext is null)
throw new ArgumentNullException(nameof(activityContext));

if (orchestrationClient is null)
throw new ArgumentNullException(nameof(orchestrationClient));

if (log is null)
throw new ArgumentNullException(nameof(log));

try
{
var input = activityContext.GetInput<Input>();

return orchestrationClient
.GetStatusAsync(input.CommandId.ToString(), showHistory: input.ShowHistory, showHistoryOutput: input.ShowHistoryOutput, showInput: input.ShowInput);
}
catch (Exception exc)
{
log.LogError(exc, $"Failed to enqeueu command: {exc.Message}");

throw exc.AsSerializable();
}
}

internal struct Input
{
public Guid CommandId { get; set; }

public bool ShowHistory { get; set; }

public bool ShowHistoryOutput { get; set; }

public bool ShowInput { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
using TeamCloud.Serialization;

namespace TeamCloud.Orchestrator.Command.Activities;

public sealed class CommandTerminateActivity
{
[FunctionName(nameof(CommandTerminateActivity))]
public async Task RunActivity(
[ActivityTrigger] IDurableActivityContext activityContext,
[DurableClient] IDurableClient orchestrationClient,
ILogger log)
{
if (activityContext is null)
throw new ArgumentNullException(nameof(activityContext));

if (orchestrationClient is null)
throw new ArgumentNullException(nameof(orchestrationClient));

if (log is null)
throw new ArgumentNullException(nameof(log));

try
{
var input = activityContext.GetInput<Input>();

await orchestrationClient
.TerminateAsync(input.CommandId.ToString(), input.Reason ?? string.Empty)
.ConfigureAwait(false);
}
catch (Exception exc)
{
log.LogError(exc, $"Failed to enqeueu command: {exc.Message}");

throw exc.AsSerializable();
}
}

internal struct Input
{
public Guid CommandId { get; set; }

public string Reason { get; set; }
}
}
18 changes: 13 additions & 5 deletions src/TeamCloud.Orchestrator/Command/CommandCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ public sealed class CommandCollector : IAsyncCollector<ICommand>
private readonly ICommand commandContext;
private readonly IDurableOrchestrationContext orchestrationContext;

public CommandCollector(IAsyncCollector<ICommand> collector, ICommand commandContext = null, IDurableOrchestrationContext orchestrationContext = null)
public CommandCollector(IAsyncCollector<ICommand> collector, ICommand commandContext = null)
{
this.collector = collector ?? throw new ArgumentNullException(nameof(collector));
this.commandContext = commandContext;
this.orchestrationContext = orchestrationContext;
}
public CommandCollector(IDurableOrchestrationContext orchestrationContext, ICommand commandContext = null)
{
this.orchestrationContext = orchestrationContext ?? throw new ArgumentNullException(nameof(orchestrationContext));
this.commandContext = commandContext;
}

public async Task AddAsync(ICommand item, CancellationToken cancellationToken = default)
Expand All @@ -33,18 +37,22 @@ public async Task AddAsync(ICommand item, CancellationToken cancellationToken =

item.ParentId = commandContext?.CommandId ?? Guid.Empty;

if (orchestrationContext is null)
if (collector is not null)
{
await collector
.AddAsync(item, cancellationToken)
.ConfigureAwait(false);
}
else
else if (orchestrationContext is not null)
{
await orchestrationContext
.CallActivityAsync(nameof(CommandCollectActivity), new CommandCollectActivity.Input() { Command = item })
.CallActivityAsync(nameof(CommandEnqueueActivity), new CommandEnqueueActivity.Input() { Command = item })
.ConfigureAwait(true);
}
else
{
throw new NotSupportedException();
}
}

public Task FlushAsync(CancellationToken cancellationToken = default)
Expand Down
20 changes: 20 additions & 0 deletions src/TeamCloud.Orchestrator/Command/CommandExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,32 @@
using System;
using System.Threading.Tasks;
using TeamCloud.Model.Commands.Core;
using TeamCloud.Model.Data;
using TeamCloud.Orchestrator.Command.Activities;
using TeamCloud.Serialization;

namespace TeamCloud.Orchestrator.Command;

internal static class CommandExtensions
{
internal static Task TerminateCommandAsync(this IDurableOrchestrationContext orchestrationContext, ComponentTask componentTask, string reason = null)
=> orchestrationContext.TerminateCommandAsync(Guid.Parse(componentTask.Id), reason);

internal static Task TerminateCommandAsync(this IDurableOrchestrationContext orchestrationContext, ICommand command, string reason = null)
=> orchestrationContext.TerminateCommandAsync(command.CommandId, reason);

internal static Task TerminateCommandAsync(this IDurableOrchestrationContext orchestrationContext, Guid commandId, string reason = null)
=> orchestrationContext.CallActivityAsync(nameof(CommandTerminateActivity), new CommandTerminateActivity.Input() { CommandId = commandId, Reason = reason });

internal static Task<DurableOrchestrationStatus> GetCommandStatusAsync(this IDurableOrchestrationContext orchestrationContext, ComponentTask componentTask, bool showHistory = false, bool showHistoryOutput = false, bool showInput = true)
=> orchestrationContext.GetCommandStatusAsync(Guid.Parse(componentTask.Id), showHistory, showHistoryOutput, showInput);

internal static Task<DurableOrchestrationStatus> GetCommandStatusAsync(this IDurableOrchestrationContext orchestrationContext, ICommand command, bool showHistory = false, bool showHistoryOutput = false, bool showInput = true)
=> orchestrationContext.GetCommandStatusAsync(command.CommandId, showHistory, showHistoryOutput, showInput);

internal static Task<DurableOrchestrationStatus> GetCommandStatusAsync(this IDurableOrchestrationContext orchestrationContext, Guid commandId, bool showHistory = false, bool showHistoryOutput = false, bool showInput = true)
=> orchestrationContext.CallActivityAsync<DurableOrchestrationStatus>(nameof(CommandStatusActivity), new CommandStatusActivity.Input() { CommandId = commandId, ShowHistory = showHistory, ShowHistoryOutput = showHistoryOutput, ShowInput = showInput });

internal static async Task<ICommand> GetCommandAsync(this IDurableClient durableClient, Guid commandId)
{
if (durableClient is null)
Expand Down
36 changes: 23 additions & 13 deletions src/TeamCloud.Orchestrator/Command/CommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,41 @@
*/

using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading.Tasks;
using Jose;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.Services.Common;
using TeamCloud.Model.Commands.Core;

namespace TeamCloud.Orchestrator.Command;

public abstract class CommandHandler<TCommand> : CommandHandler, ICommandHandler<TCommand>
where TCommand : class, ICommand
{
public abstract Task<ICommandResult> HandleAsync(TCommand command, IAsyncCollector<ICommand> commandQueue, IDurableClient orchestrationClient, IDurableOrchestrationContext orchestrationContext, ILogger log);
public abstract Task<ICommandResult> HandleAsync(TCommand command, IAsyncCollector<ICommand> commandQueue, IDurableOrchestrationContext orchestrationContext, ILogger log);
}

public abstract class CommandHandler : ICommandHandler
{
private static readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, MethodInfo>> HandleMethodCache = new ConcurrentDictionary<Type, ConcurrentDictionary<Type, MethodInfo>>();

private MethodInfo GetHandleMethod(ICommand command) => HandleMethodCache
.GetOrAdd(GetType(), _ => new ConcurrentDictionary<Type, MethodInfo>())
.GetOrAdd(command.GetType(), commandType =>
{
var handlerInterface = typeof(ICommandHandler<>)
.MakeGenericType(commandType);

if (handlerInterface.IsAssignableFrom(GetType()))
return handlerInterface.GetMethod(nameof(HandleAsync), new Type[] { command.GetType(), typeof(IAsyncCollector<ICommand>), typeof(IDurableOrchestrationContext), typeof(ILogger) });

return null;
});

public const string ProcessorQueue = "command-processor";
public const string MonitorQueue = "command-monitor";

Expand All @@ -30,25 +49,16 @@ public virtual bool CanHandle(ICommand command)
if (command is null)
throw new ArgumentNullException(nameof(command));

return typeof(ICommandHandler<>)
.MakeGenericType(command.GetType())
.IsAssignableFrom(GetType());
return GetHandleMethod(command) is not null;
}

public virtual Task<ICommandResult> HandleAsync(ICommand command, IAsyncCollector<ICommand> commandQueue, IDurableClient orchestrationClient, IDurableOrchestrationContext orchestrationContext, ILogger log)
public virtual Task<ICommandResult> HandleAsync(ICommand command, IAsyncCollector<ICommand> commandQueue, IDurableOrchestrationContext orchestrationContext, ILogger log)
{
if (command is null)
throw new ArgumentNullException(nameof(command));

if (CanHandle(command))
{
var handleMethod = typeof(ICommandHandler<>)
.MakeGenericType(command.GetType())
.GetMethod(nameof(HandleAsync), new Type[] { command.GetType(), typeof(IAsyncCollector<ICommand>), typeof(IDurableClient), typeof(IDurableOrchestrationContext), typeof(ILogger) });

return (Task<ICommandResult>)handleMethod
.Invoke(this, new object[] { command, commandQueue, orchestrationClient, orchestrationContext, log });
}
return (Task<ICommandResult>)GetHandleMethod(command).Invoke(this, new object[] { command, commandQueue, orchestrationContext, log });

throw new NotImplementedException($"Missing orchestrator command handler implementation ICommandHandler<{command.GetTypeName(prettyPrint: true)}> at {GetType()}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,8 @@ public CommandOrchestration(ICommandHandler[] commandHandlers)
[FunctionName(nameof(CommandOrchestration))]
public async Task Execute(
[OrchestrationTrigger] IDurableOrchestrationContext orchestratorContext,
[DurableClient] IDurableClient orchestratorClient,
[Queue(CommandHandler.ProcessorQueue)] IAsyncCollector<ICommand> commandQueue,
ILogger log)
{
if (orchestratorClient is null)
throw new ArgumentNullException(nameof(orchestratorClient));

if (orchestratorContext is null)
throw new ArgumentNullException(nameof(orchestratorContext));

Expand All @@ -60,7 +55,7 @@ await orchestratorContext
.ConfigureAwait(true);

commandResult = await commandHandler
.HandleAsync(command, new CommandCollector(commandQueue, command, orchestratorContext), orchestratorClient, orchestratorContext, log)
.HandleAsync(command, new CommandCollector(orchestratorContext, command), orchestratorContext, log)
.ConfigureAwait(true);

if (commandResult is null)
Expand Down
Loading