Skip to content

Commit

Permalink
Support to Named Pipelines added #2
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanpaulovich committed Oct 20, 2019
1 parent a09ebef commit f02de7f
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 48 deletions.
85 changes: 67 additions & 18 deletions src/FluentMediator/Mediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,89 @@ public sealed class Mediator : IMediator
_pipelines = pipelines;
}

public void Publish(object request)
public void Publish(object request, string? pipelineName = null)
{
var pipeline = _pipelines.GetPipeline(request.GetType());
pipeline.Publish(GetService, request!);
if (pipelineName is string)
{
var pipeline = _pipelines.GetPipeline(pipelineName);
pipeline.Publish(GetService, request!);

}
else
{
var pipeline = _pipelines.GetPipeline(request.GetType());
pipeline.Publish(GetService, request!);
}
}

public TResult Send<TResult>(object request)
public TResult Send<TResult>(object request, string? pipelineName = null)
{
var pipeline = _pipelines.GetPipeline(request.GetType());
return pipeline.Send<TResult>(GetService, request!);
if (pipelineName is string)
{
var pipeline = _pipelines.GetPipeline(pipelineName);
return pipeline.Send<TResult>(GetService, request);
}
else
{
var pipeline = _pipelines.GetPipeline(request.GetType());
return pipeline.Send<TResult>(GetService, request);
}
}

public async Task PublishAsync(object request)
public async Task PublishAsync(object request, string? pipelineName = null)
{
var pipeline = _pipelines.GetAsyncPipeline(request.GetType());
await pipeline.PublishAsync(GetService, request!);
if (pipelineName is string)
{
var pipeline = _pipelines.GetAsyncPipeline(pipelineName);
await pipeline.PublishAsync(GetService, request);
}
else
{
var pipeline = _pipelines.GetAsyncPipeline(request.GetType());
await pipeline.PublishAsync(GetService, request);
}
}

public async Task<TResult> SendAsync<TResult>(object request)
public async Task<TResult> SendAsync<TResult>(object request, string? pipelineName = null)
{
var pipeline = _pipelines.GetAsyncPipeline(request.GetType());
return await pipeline.SendAsync<TResult>(GetService, request!);
if (pipelineName is string)
{
var pipeline = _pipelines.GetAsyncPipeline(pipelineName);
return await pipeline.SendAsync<TResult>(GetService, request);
}
else
{
var pipeline = _pipelines.GetAsyncPipeline(request.GetType());
return await pipeline.SendAsync<TResult>(GetService, request);
}
}

public async Task PublishAsync(object request, CancellationToken ct)
public async Task PublishAsync(object request, CancellationToken ct, string? pipelineName = null)
{
var pipeline = _pipelines.GetCancellablePipeline(request.GetType());
await pipeline.PublishAsync(GetService, request!, ct);
if (pipelineName is string)
{
var pipeline = _pipelines.GetCancellablePipeline(pipelineName);
await pipeline.PublishAsync(GetService, request, ct);
}
else
{
var pipeline = _pipelines.GetCancellablePipeline(request.GetType());
await pipeline.PublishAsync(GetService, request, ct);
}
}

public async Task<TResult> SendAsync<TResult>(object request, CancellationToken ct)
public async Task<TResult> SendAsync<TResult>(object request, CancellationToken ct, string? pipelineName = null)
{
var pipeline = _pipelines.GetCancellablePipeline(request.GetType());
return await pipeline.SendAsync<TResult>(GetService, request!, ct);
if (pipelineName is string)
{
var pipeline = _pipelines.GetCancellablePipeline(pipelineName);
return await pipeline.SendAsync<TResult>(GetService, request, ct);
}
else
{
var pipeline = _pipelines.GetCancellablePipeline(request.GetType());
return await pipeline.SendAsync<TResult>(GetService, request, ct);
}
}
}
}
15 changes: 15 additions & 0 deletions src/FluentMediator/PipelineProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,29 @@ public IPipelineAsync GetAsyncPipeline(Type requestType)
return _asyncPipelineCollection.Get(requestType);
}

public IPipelineAsync GetAsyncPipeline(string pipelineName)
{
return _asyncPipelineCollection.Get(pipelineName);
}

public ICancellablePipelineAsync GetCancellablePipeline(Type requestType)
{
return _cancellablePipelineCollection.Get(requestType);
}

public ICancellablePipelineAsync GetCancellablePipeline(string pipelineName)
{
return _cancellablePipelineCollection.Get(pipelineName);
}

public IPipeline GetPipeline(Type requestType)
{
return _pipelineCollection.Get(requestType);
}

public IPipeline GetPipeline(string pipelineName)
{
return _pipelineCollection.Get(pipelineName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace FluentMediator.Pipelines.CancellablePipelineAsync
{
public interface IMediator
{
Task PublishAsync(object request, CancellationToken ct);
Task<TResult> SendAsync<TResult>(object request, CancellationToken ct);
Task PublishAsync(object request, CancellationToken ct, string? pipelineName = null);
Task<TResult> SendAsync<TResult>(object request, CancellationToken ct, string? pipelineName = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ namespace FluentMediator.Pipelines.CancellablePipelineAsync
public interface IPipelineProvider
{
ICancellablePipelineAsync GetCancellablePipeline(Type requestType);
ICancellablePipelineAsync GetCancellablePipeline(string pipelineName);
}
}
4 changes: 1 addition & 3 deletions src/FluentMediator/Pipelines/IPipelineCollection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;

namespace FluentMediator.Pipelines
{
Expand All @@ -8,7 +7,6 @@ public interface IPipelineCollection<TPipeline>
{
void Add(TPipeline pipeline);
TPipeline Get(Type requestType);
bool Contains(Type requestType, out TPipeline? pipeline);
IEnumerable<TPipeline> ToIEnumerable();
TPipeline Get(string pipelineName);
}
}
4 changes: 2 additions & 2 deletions src/FluentMediator/Pipelines/Pipeline/IMediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace FluentMediator.Pipelines.Pipeline
{
public interface IMediator
{
void Publish(object request);
TResult Send<TResult>(object request);
void Publish(object request, string? pipelineName = null);
TResult Send<TResult>(object request, string? pipelineName = null);
}
}
1 change: 1 addition & 0 deletions src/FluentMediator/Pipelines/Pipeline/IPipelineProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ namespace FluentMediator.Pipelines.Pipeline
public interface IPipelineProvider
{
IPipeline GetPipeline(Type requestType);
IPipeline GetPipeline(string pipelineName);
}
}
4 changes: 2 additions & 2 deletions src/FluentMediator/Pipelines/PipelineAsync/IMediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace FluentMediator.Pipelines.PipelineAsync
{
public interface IMediator
{
Task PublishAsync(object request);
Task<TResult> SendAsync<TResult>(object request);
Task PublishAsync(object request, string? pipelineName = null);
Task<TResult> SendAsync<TResult>(object request, string? pipelineName = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ namespace FluentMediator.Pipelines.PipelineAsync
public interface IPipelineProvider
{
IPipelineAsync GetAsyncPipeline(Type requestType);
IPipelineAsync GetAsyncPipeline(string pipelineName);
}
}
41 changes: 20 additions & 21 deletions src/FluentMediator/Pipelines/PipelineCollection.cs
Original file line number Diff line number Diff line change
@@ -1,55 +1,54 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;

namespace FluentMediator.Pipelines
{
internal sealed class PipelineCollection<TPipeline> : IPipelineCollection<TPipeline>
where TPipeline : class, ITypedPipeline
where TPipeline : class, ITypedPipeline, INamedPipeline
{
private readonly IDictionary<Type, TPipeline> _pipelines;
private readonly IDictionary<Type, TPipeline> _typedPipelines;
private readonly IDictionary<string, TPipeline> _namedPipelines;

public PipelineCollection()
{
_pipelines = new Dictionary<Type, TPipeline>();
_typedPipelines = new Dictionary<Type, TPipeline>();
_namedPipelines = new Dictionary<string, TPipeline>();
}

public void Add(TPipeline pipeline)
{
if (_pipelines.ContainsKey(pipeline.RequestType))
if (pipeline.Name is string)
{
throw new PipelineAlreadyExistsException($"A pipeline for `{ pipeline.RequestType }` already exists.");
_namedPipelines.Add(pipeline.Name, pipeline);
}
else
{
if (_typedPipelines.ContainsKey(pipeline.RequestType))
{
throw new PipelineAlreadyExistsException($"A pipeline for `{ pipeline.RequestType }` already exists.");
}
_typedPipelines.Add(pipeline.RequestType, pipeline);
}

_pipelines.Add(pipeline.RequestType, pipeline);
}

public TPipeline Get(Type requestType)
{
if (_pipelines.TryGetValue(requestType, out var pipeline))
if (_typedPipelines.TryGetValue(requestType, out var pipeline))
{
return pipeline;
}

throw new PipelineNotFoundException($"There is no pipeline configured for `{ requestType.GetType() }`.");
}

public bool Contains(Type requestType, out TPipeline? pipeline)
public TPipeline Get(string pipelineName)
{
if (!_pipelines.ContainsKey(requestType))
if (_namedPipelines.TryGetValue(pipelineName, out var pipeline))
{
pipeline = default;
return false;
return pipeline;
}

pipeline = _pipelines[requestType];
return true;
}

public IEnumerable<TPipeline> ToIEnumerable()
{
return new ReadOnlyCollection<TPipeline>(_pipelines.Values.ToList());
throw new PipelineNotFoundException($"There is no pipeline configured for `{ pipelineName }`.");
}
}
}
85 changes: 85 additions & 0 deletions test/UnitTests/SendingRequestTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,91 @@ public void Send_Returns_Response()
Assert.NotNull(response);
}

[Fact]
public void Send_Named_PipelineReturns_Response()
{
var services = new ServiceCollection();
services.AddFluentMediator(m =>
{
m.On<PingRequest>()
.Pipeline()
.Return<PingResponse, IPingHandler>(
(handler, req) => handler.MyCustomFooMethod(req)
);
m.On<PingRequest>()
.Pipeline("Foo")
.Return<PingResponse, IPingHandler>(
(handler, req) => handler.MyCustomFooMethod(req)
);
});

services.AddScoped<IPingHandler, PingHandler>();
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();

var ping = new PingRequest("Ping");
var response = mediator.Send<PingResponse>(ping, "Foo");

Assert.NotNull(response);
}

[Fact]
public async Task Send_Named_PipelineAsync_Returns_Response()
{
var services = new ServiceCollection();
services.AddFluentMediator(m =>
{
m.On<PingRequest>()
.PipelineAsync()
.Return<PingResponse, IPingHandler>(
(handler, req) => handler.MyCustomFooBarAsync(req)
);
m.On<PingRequest>()
.PipelineAsync("Foo")
.Return<PingResponse, IPingHandler>(
(handler, req) => handler.MyCustomFooBarAsync(req)
);
});

services.AddScoped<IPingHandler, PingHandler>();
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();

var ping = new PingRequest("Ping");
var response = await mediator.SendAsync<PingResponse>(ping, "Foo");

Assert.NotNull(response);
}

[Fact]
public async Task SendCancellable_Named_PipelineReturns_Response()
{
var services = new ServiceCollection();
services.AddFluentMediator(m =>
{
m.On<PingRequest>()
.CancellablePipelineAsync()
.Return<PingResponse, IPingHandler>(
(handler, req, ct) => handler.MyCancellableForAsync(req, ct)
);
m.On<PingRequest>()
.CancellablePipelineAsync("Foo")
.Return<PingResponse, IPingHandler>(
(handler, req, ct) => handler.MyCancellableForAsync(req, ct)
);
});

services.AddScoped<IPingHandler, PingHandler>();
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
var cts = new CancellationTokenSource();

var ping = new PingRequest("Ping");
var response = await mediator.SendAsync<PingResponse>(ping, cts.Token, "Foo");

Assert.NotNull(response);
}

[Fact]
public void Send_Not_Configured_Throws_PipelineNotFoundException()
{
Expand Down

0 comments on commit f02de7f

Please sign in to comment.