Skip to content

Commit

Permalink
Control over recipients lifetime (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
tommasobertoni committed Dec 31, 2020
2 parents 5d3c2ae + b7152b0 commit 201cfe0
Show file tree
Hide file tree
Showing 76 changed files with 2,235 additions and 1,229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Spectre.Console" Version="0.33.0" />
<PackageReference Include="Spectre.Console" Version="0.35.0" />
</ItemGroup>

</Project>
6 changes: 3 additions & 3 deletions samples/NScatterGather.Samples.CompetingTasks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ static bool IsProductOutOfStock(Evaluation evaluation)
static RecipientsCollection CollectRecipients()
{
var collection = new RecipientsCollection();
collection.Add<AlibabaSupplier>("Alibaba");
collection.Add<AmazonSupplier>("Amazon");
collection.Add<WalmartSupplier>("Walmart");
collection.Add<AlibabaSupplier>(name: "Alibaba");
collection.Add<AmazonSupplier>(name: "Amazon");
collection.Add<WalmartSupplier>(name: "Walmart");
return collection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Spectre.Console" Version="0.33.0" />
<PackageReference Include="Spectre.Console" Version="0.35.0" />
</ItemGroup>

</Project>
11 changes: 9 additions & 2 deletions samples/NScatterGather.Samples/Samples/2.FilterOnResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ public async Task Run()
var collection = new RecipientsCollection();
collection.Add<Foo>();
collection.Add<Bar>();
collection.Add<Baz>();

var aggregator = new Aggregator(collection);

// The Send<TRequest> method checks only the input
// parameter of the methods:
AggregatedResponse<object?> all = await aggregator.Send(42);

var allResults = all.AsResultsList(); // 42L, "42"
var allResults = all.AsResultsList(); // "42", 42L, 42
Console.WriteLine($"" +
$"{allResults[0]} ({allResults[0]?.GetType().Name}), " +
$"{allResults[1]} ({allResults[1]?.GetType().Name})");
$"{allResults[1]} ({allResults[1]?.GetType().Name}), " +
$"{allResults[2]} ({allResults[2]?.GetType().Name})");

// Instead the Send<TRequest, TResponse> method checks
// also the return type of the methods, allowing to filter
Expand All @@ -41,5 +43,10 @@ class Bar
{
public long Longify(int n) => n * 1L;
}

class Baz
{
public int Echo(int n) => n;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public async Task Run()
// before aggregating the response.
var response1 = await aggregator.Send(42);

var results = response1.AsResultsList(); // "42", 42L, 84L
var results = response1.AsResultsList(); // 84L, 42L, "42"
Console.WriteLine($"" +
$"{results[0]} ({results[0]?.GetType().Name}), " +
$"{results[1]} ({results[1]?.GetType().Name}), " +
Expand All @@ -32,9 +32,9 @@ public async Task Run()
// or ValueTask<TResponse> get invoked.

var response2 = await aggregator.Send<long>(42);
var guidResults = response2.AsResultsList();
Console.WriteLine($"{guidResults[0]} ({guidResults[0].GetType().Name})");
Console.WriteLine($"{guidResults[0]} ({guidResults[1].GetType().Name})");
var longResults = response2.AsResultsList();
Console.WriteLine($"{longResults[0]} ({longResults[0].GetType().Name})");
Console.WriteLine($"{longResults[1]} ({longResults[1].GetType().Name})");

var response3 = await aggregator.Send<string>(42);
var stringResults = response3.AsResultsList();
Expand Down
13 changes: 6 additions & 7 deletions samples/NScatterGather.Samples/Samples/5.Timeout.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace NScatterGather.Samples.Samples
Expand All @@ -15,20 +14,20 @@ public async Task Run()

var aggregator = new Aggregator(collection);

// The consumer can limit the duration of the aggregation
// by providing a CancellationToken to the aggregator:
// The consumer can limit the duration of the aggregation by
// providing a timeout (or CancellationToken) to the aggregator:
// this will ensure that the response will be ready in
// the given amount of time and will "discard" incomplete
// (and also never-ending) invocations.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
var response = await aggregator.Send(42, cts.Token);
var response = await aggregator.Send(42, TimeSpan.FromMilliseconds(50));

// The recipients that didn't complete in time will be
// listed in the Incomplete property of the result:
Console.WriteLine($"Completed {response.Completed.Count}");
Console.WriteLine(
$"Incomplete {response.Incomplete.Count}: " +
$"{response.Incomplete[0].RecipientType?.Name}");
$"{response.Incomplete[0].RecipientType?.Name}, " +
$"{response.Incomplete[1].RecipientType?.Name}");
}

class Foo
Expand All @@ -40,7 +39,7 @@ class Bar
{
public async Task<int> Long(int n)
{
await Task.Delay(100);
await Task.Delay(1000);
return n;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async Task Run()
var aggregator = new Aggregator(collection);

var responseOfInt = await aggregator.Send(42);
var resultsOfInt = responseOfInt.AsResultsList(); // 84, "42"
var resultsOfInt = responseOfInt.AsResultsList(); // "42", 84
Console.WriteLine($"{resultsOfInt[0]}, {resultsOfInt[1]}");

var onlyStrings = await aggregator.Send<string>(42);
Expand Down
27 changes: 15 additions & 12 deletions src/NScatterGather/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
using System.Threading;
using System.Threading.Tasks;
using NScatterGather.Recipients;
using NScatterGather.Recipients.Collection.Scope;
using NScatterGather.Recipients.Run;
using NScatterGather.Responses;
using NScatterGather.Run;

namespace NScatterGather
{
public class Aggregator
{
private readonly RecipientsCollection _recipients;
private readonly IRecipientsScope _scope;

public Aggregator(RecipientsCollection recipients) =>
_recipients = recipients;
public Aggregator(RecipientsCollection collection)
{
_scope = collection.CreateScope();
}

public async Task<AggregatedResponse<object?>> Send(
object request,
Expand All @@ -31,23 +34,23 @@ public class Aggregator
if (request is null)
throw new ArgumentNullException(nameof(request));

var recipients = _recipients.ListRecipientsAccepting(request.GetType());
var recipients = _scope.ListRecipientsAccepting(request.GetType());

var invocations = await Invoke(recipients, request, cancellationToken)
.ConfigureAwait(false);

return AggregatedResponseFactory.CreateFrom(invocations);
}

private async Task<IReadOnlyList<RecipientRunner<object?>>> Invoke(
private async Task<IReadOnlyList<RecipientRun<object?>>> Invoke(
IReadOnlyList<Recipient> recipients,
object request,
CancellationToken cancellationToken)
{
var runners = recipients.Select(r => new RecipientRunner<object?>(r)).ToArray();
var runners = recipients.Select(recipient => recipient.Accept(request)).ToArray();

var tasks = runners
.Select(runner => runner.Run(x => x.Accept(request)))
.Select(runner => runner.Start())
.ToArray();

var allTasksCompleted = Task.WhenAll(tasks);
Expand Down Expand Up @@ -76,23 +79,23 @@ public class Aggregator
if (request is null)
throw new ArgumentNullException(nameof(request));

var recipients = _recipients.ListRecipientsReplyingWith(request.GetType(), typeof(TResponse));
var recipients = _scope.ListRecipientsReplyingWith(request.GetType(), typeof(TResponse));

var runners = await Invoke<TResponse>(recipients, request, cancellationToken)
.ConfigureAwait(false);

return AggregatedResponseFactory.CreateFrom(runners);
}

private async Task<IReadOnlyList<RecipientRunner<TResponse>>> Invoke<TResponse>(
private async Task<IReadOnlyList<RecipientRun<TResponse>>> Invoke<TResponse>(
IReadOnlyList<Recipient> recipients,
object request,
CancellationToken cancellationToken)
{
var runners = recipients.Select(r => new RecipientRunner<TResponse>(r)).ToArray();
var runners = recipients.Select(recipient => recipient.ReplyWith<TResponse>(request)).ToArray();

var tasks = runners
.Select(runner => runner.Run(x => x.ReplyWith<TResponse>(request)))
.Select(runner => runner.Start())
.ToArray();

var allTasksCompleted = Task.WhenAll(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

namespace NScatterGather
{
public class ConflictException : Exception
public class CollisionException : Exception
{
public Type RecipientType { get; }

public Type RequestType { get; }

public Type? ResponseType { get; }

internal ConflictException(
internal CollisionException(
Type recipientType,
Type requestType,
Type? responseType = null)
Expand Down
8 changes: 5 additions & 3 deletions src/NScatterGather/Inspection/TypeInspector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ internal class TypeInspector
private static readonly BindingFlags DefaultFlags = BindingFlags.Public | BindingFlags.Instance;
private static readonly MethodAnalyzer _methodAnalyzer = new MethodAnalyzer();

public Type Type => _type;

private readonly MethodMatchEvaluationCache _evaluationsCache = new MethodMatchEvaluationCache();

private readonly IReadOnlyList<MethodInspection> _methodInspections;
private readonly Type _type;

public TypeInspector(Type type)
{
_type = type;
_type = type ?? throw new ArgumentNullException(nameof(type));
_methodInspections = InspectMethods(type);

// Local functions.
Expand Down Expand Up @@ -68,7 +70,7 @@ static IReadOnlyList<MethodInspection> InspectMethods(Type type)
_evaluationsCache.TryAdd(requestType, new MethodMatchEvaluation(false, method));

if (matches.Count > 1)
throw new ConflictException(_type, requestType);
throw new CollisionException(_type, requestType);

return false;
}
Expand Down Expand Up @@ -126,7 +128,7 @@ private IReadOnlyList<MethodInfo> ListMatchingMethods(Type requestType)
_evaluationsCache.TryAdd(requestType, responseType, new MethodMatchEvaluation(false, method));

if (matches.Count > 1)
throw new ConflictException(_type, requestType, responseType);
throw new CollisionException(_type, requestType, responseType);

return false;
}
Expand Down
19 changes: 3 additions & 16 deletions src/NScatterGather/Inspection/TypeInspectorRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace NScatterGather.Inspection
{
Expand All @@ -9,10 +8,9 @@ internal class TypeInspectorRegistry
private readonly ConcurrentDictionary<Type, TypeInspector> _registry =
new ConcurrentDictionary<Type, TypeInspector>();

public TypeInspector Register<T>() =>
Register(typeof(T));
public TypeInspector For<T>() => For(typeof(T));

public TypeInspector Register(Type type)
public TypeInspector For(Type type)
{
if (_registry.TryGetValue(type, out var cached))
return cached;
Expand All @@ -22,17 +20,6 @@ public TypeInspector Register(Type type)
return inspector;
}

public TypeInspector Of<T>() =>
Of(typeof(T));

public TypeInspector Of(Type t)
{
_ = _registry.TryGetValue(t, out var inspector);
return inspector ??
throw new KeyNotFoundException($"No {nameof(TypeInspector)} for type '{t.Name}' was registered.");
}

internal void Clear() =>
_registry.Clear();
internal void Clear() => _registry.Clear();
}
}
16 changes: 16 additions & 0 deletions src/NScatterGather/Internals/EnumBcl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace NScatterGather
{
internal static class EnumBcl
{
public static bool IsValid<TEnum>(this TEnum value) where TEnum : struct, Enum
{
#if NETSTANDARD2_0 || NETSTANDARD2_1
return Enum.IsDefined(typeof(TEnum), value);
#else
return Enum.IsDefined(value);
#endif
}
}
}
2 changes: 1 addition & 1 deletion src/NScatterGather/NScatterGather.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFrameworks>net5.0;netstandard2.1;netstandard2.0</TargetFrameworks>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<Version>0.3.0</Version>
<Version>0.4.0</Version>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>
Expand Down
4 changes: 4 additions & 0 deletions src/NScatterGather/Recipients/Collection/CollisionHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace NScatterGather
{
public delegate void CollisionHandler(CollisionException ex);
}

0 comments on commit 201cfe0

Please sign in to comment.