New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generic pipeline #115
Generic pipeline #115
Changes from all commits
5a32298
0bcde8f
09d3ec9
66f355c
5b5e82c
9f76b1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
namespace MediatR | ||
{ | ||
using System; | ||
using System.Threading.Tasks; | ||
|
||
public delegate Task<TResponse> RequestHandlerDelegate<TResponse>(); | ||
|
||
public interface IPipelineBehavior<in TRequest, TResponse> | ||
{ | ||
Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ namespace MediatR | |
using System.Collections.Generic; | ||
using System.Collections.Concurrent; | ||
using System.Linq; | ||
using System.Reflection; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
|
@@ -19,6 +20,7 @@ public class Mediator : IMediator | |
|
||
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, Type>> _genericHandlerCache; | ||
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, Type>> _wrapperHandlerCache; | ||
private static readonly MethodInfo CreatePipelineMethod = typeof(Mediator).GetTypeInfo().DeclaredMethods.Single(m => m.Name == nameof(CreatePipeline)); | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="Mediator"/> class. | ||
|
@@ -37,48 +39,76 @@ public TResponse Send<TResponse>(IRequest<TResponse> request) | |
{ | ||
var defaultHandler = GetHandler(request); | ||
|
||
var result = defaultHandler.Handle(request); | ||
var pipeline = GetPipeline(request, () => Task.FromResult(defaultHandler.Handle(request))); | ||
|
||
return result; | ||
return pipeline.Result; | ||
} | ||
|
||
public void Send(IRequest request) | ||
{ | ||
var handler = GetHandler(request); | ||
|
||
handler.Handle(request); | ||
RequestHandlerDelegate<Unit> invokeHandler = () => | ||
{ | ||
handler.Handle(request); | ||
return Task.FromResult(Unit.Value); | ||
}; | ||
|
||
var pipeline = GetPipeline(request, invokeHandler); | ||
|
||
pipeline.Wait(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deadlock? |
||
} | ||
|
||
public Task<TResponse> SendAsync<TResponse>(IAsyncRequest<TResponse> request) | ||
{ | ||
var defaultHandler = GetHandler(request); | ||
|
||
var result = defaultHandler.Handle(request); | ||
RequestHandlerDelegate<TResponse> invokeHandler = () => defaultHandler.Handle(request); | ||
|
||
return result; | ||
var pipeline = GetPipeline(request, invokeHandler); | ||
|
||
return pipeline; | ||
} | ||
|
||
public Task SendAsync(IAsyncRequest request) | ||
{ | ||
var handler = GetHandler(request); | ||
|
||
return handler.Handle(request); | ||
RequestHandlerDelegate<Unit> invokeHandler = async () => | ||
{ | ||
await handler.Handle(request); | ||
return Unit.Value; | ||
}; | ||
|
||
var pipeline = GetPipeline(request, invokeHandler); | ||
|
||
return pipeline; | ||
} | ||
|
||
public Task<TResponse> SendAsync<TResponse>(ICancellableAsyncRequest<TResponse> request, CancellationToken cancellationToken) | ||
{ | ||
var defaultHandler = GetHandler(request); | ||
|
||
var result = defaultHandler.Handle(request, cancellationToken); | ||
RequestHandlerDelegate<TResponse> invokeHandler = () => defaultHandler.Handle(request, cancellationToken); | ||
|
||
var pipeline = GetPipeline(request, invokeHandler); | ||
|
||
return result; | ||
return pipeline; | ||
} | ||
|
||
public Task SendAsync(ICancellableAsyncRequest request, CancellationToken cancellationToken) | ||
{ | ||
var handler = GetHandler(request); | ||
|
||
return handler.Handle(request, cancellationToken); | ||
RequestHandlerDelegate<Unit> invokeHandler = async () => | ||
{ | ||
await handler.Handle(request, cancellationToken); | ||
return Unit.Value; | ||
}; | ||
|
||
var pipeline = GetPipeline(request, invokeHandler); | ||
|
||
return pipeline; | ||
} | ||
|
||
public void Publish(INotification notification) | ||
|
@@ -226,6 +256,25 @@ private IEnumerable<TWrapper> GetNotificationHandlers<TWrapper>(object notificat | |
.ToList(); | ||
} | ||
|
||
private Task<TResponse> GetPipeline<TResponse>(object request, RequestHandlerDelegate<TResponse> invokeHandler) | ||
{ | ||
var requestType = request.GetType(); | ||
var method = CreatePipelineMethod.MakeGenericMethod(requestType, typeof(TResponse)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cache? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure any cache matters. With many containers (including the ASP.NET Core one), you have to register the Mediator class as scoped (see https://github.com/jbogard/MediatR.Extensions.Microsoft.DependencyInjection/blob/master/src/MediatR.Extensions.Microsoft.DependencyInjection/ServiceCollectionExtensions.cs#L188). So tbqh I'd be up for removing all the caches, it's just cognitive overhead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not about caching instances. It's about caching the generic types and methods. Those caches should be static, so it doesn't matter how the mediator is registered 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd keep cache and make it static indeed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My first thought was to pass a state machine into the pipeline which would be executed on every step. This would work and flip all the closed-over instances into the state machine which is passed as an input argument. Downside is that this call chain pollutes the stack trace quite a bit. If you really want to do this right you have to do some code gen with expression trees. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem with a compiled expression-based pipeline is that you lose the ability to inject services into your decorators/handlers' ctors. You have to know, at compile-time, what arguments the pipeline takes. That's why you typically end up with a My question is; are we willing to drop the nice dependency injection in decorators/handlers for this? Has anyone done any benchmarking to see if this is needed? I don't think there's been a heap of complaints on MediatR perf to date. Has this changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not accurate at all, as what NServiceBus does can very easily be adapted to using a different instance every time, neatly supplied via the input arguments. This way you remove the state machine stack trace overhead but you still have all the freedom to input new instances on every call. It is exactly that problem what I have been trying to solve in a performant manner, and it's very much possible. The problem here is that you want to create a dynamic call chain Problem with func is that you have to close over your current (say request scoped) step instance, which in effect makes the full pipeline 'single use'. The option you have to fix this is pulling this instance out of the func and let the func execute a method on its input argument. For instance Another option could be to supply a When you look at all that work you could conclude, let's just code gen these delegate body's with the index number burned into them (pseudo code): public Task<TResponse> DelegateOne(IPipelineBehavior[] behaviors, request, next){
return behaviors[0].Handle(request, next);
}
public Task<TResponse> DelegateTwo(IPipelineBehavior[] behaviors, request, next){
return behaviors[1].Handle(request, next);
}
etc Just cache these individually, then cache pipelines based on step count and you are good to go. |
||
return (Task<TResponse>) method.Invoke(this, new[] { request, invokeHandler}); | ||
} | ||
|
||
private Task<TResponse> CreatePipeline<TRequest, TResponse>(TRequest request, RequestHandlerDelegate<TResponse> invokeHandler) | ||
{ | ||
var behaviors = _multiInstanceFactory(typeof(IPipelineBehavior<TRequest, TResponse>)) | ||
.Cast<IPipelineBehavior<TRequest, TResponse>>() | ||
.Reverse(); | ||
|
||
var aggregate = behaviors.Aggregate(invokeHandler, (next, pipeline) => () => pipeline.Handle(request, next)); | ||
|
||
return aggregate(); | ||
} | ||
|
||
|
||
private IEnumerable<object> GetNotificationHandlers(object notification, Type handlerType) | ||
{ | ||
try | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
namespace MediatR.Pipeline | ||
{ | ||
using System.Threading.Tasks; | ||
|
||
public interface IRequestPostProcessor<in TRequest, in TResponse> | ||
{ | ||
Task Process(TRequest request, TResponse response); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
namespace MediatR.Pipeline | ||
{ | ||
using System.Threading.Tasks; | ||
|
||
public interface IRequestPreProcessor<in TRequest> | ||
{ | ||
Task Process(TRequest request); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
namespace MediatR.Pipeline | ||
{ | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
|
||
public class RequestPostProcessorBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> | ||
{ | ||
private readonly IEnumerable<IRequestPostProcessor<TRequest, TResponse>> _postProcessors; | ||
|
||
public RequestPostProcessorBehavior(IEnumerable<IRequestPostProcessor<TRequest, TResponse>> postProcessors) | ||
{ | ||
_postProcessors = postProcessors; | ||
} | ||
|
||
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next) | ||
{ | ||
var response = await next(); | ||
|
||
await Task.WhenAll(_postProcessors.Select(p => p.Process(request, response))); | ||
|
||
return response; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
namespace MediatR.Pipeline | ||
{ | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
|
||
public class RequestPreProcessorBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> | ||
{ | ||
private readonly IEnumerable<IRequestPreProcessor<TRequest>> _preProcessors; | ||
|
||
public RequestPreProcessorBehavior(IEnumerable<IRequestPreProcessor<TRequest>> preProcessors) | ||
{ | ||
_preProcessors = preProcessors; | ||
} | ||
|
||
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next) | ||
{ | ||
await Task.WhenAll(_preProcessors.Select(p => p.Process(request))); | ||
|
||
return await next(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
namespace MediatR.Tests | ||
{ | ||
using System; | ||
using System.Threading.Tasks; | ||
using Shouldly; | ||
using StructureMap; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
namespace MediatR.Tests.Pipeline | ||
{ | ||
using System.Threading.Tasks; | ||
using MediatR.Pipeline; | ||
using Shouldly; | ||
using StructureMap; | ||
using Xunit; | ||
|
||
public class RequestPostProcessorTests | ||
{ | ||
public class Ping : IAsyncRequest<Pong> | ||
{ | ||
public string Message { get; set; } | ||
} | ||
|
||
public class Pong | ||
{ | ||
public string Message { get; set; } | ||
} | ||
|
||
public class PingHandler : IAsyncRequestHandler<Ping, Pong> | ||
{ | ||
public Task<Pong> Handle(Ping message) | ||
{ | ||
return Task.FromResult(new Pong { Message = message.Message + " Pong" }); | ||
} | ||
} | ||
|
||
public class PingPongPostProcessor : IRequestPostProcessor<Ping, Pong> | ||
{ | ||
public Task Process(Ping request, Pong response) | ||
{ | ||
response.Message = response.Message + " " + request.Message; | ||
|
||
return Task.FromResult(0); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Should_run_postprocessors() | ||
{ | ||
var container = new Container(cfg => | ||
{ | ||
cfg.Scan(scanner => | ||
{ | ||
scanner.AssemblyContainingType(typeof(AsyncPublishTests)); | ||
scanner.IncludeNamespaceContainingType<Ping>(); | ||
scanner.WithDefaultConventions(); | ||
scanner.AddAllTypesOf(typeof(IAsyncRequestHandler<,>)); | ||
scanner.AddAllTypesOf(typeof(IRequestPostProcessor<,>)); | ||
}); | ||
cfg.For(typeof(IPipelineBehavior<,>)).Add(typeof(RequestPostProcessorBehavior<,>)); | ||
cfg.For<SingleInstanceFactory>().Use<SingleInstanceFactory>(ctx => t => ctx.GetInstance(t)); | ||
cfg.For<MultiInstanceFactory>().Use<MultiInstanceFactory>(ctx => t => ctx.GetAllInstances(t)); | ||
cfg.For<IMediator>().Use<Mediator>(); | ||
}); | ||
|
||
var mediator = container.GetInstance<IMediator>(); | ||
|
||
var response = await mediator.SendAsync(new Ping { Message = "Ping" }); | ||
|
||
response.Message.ShouldBe("Ping Pong Ping"); | ||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deadlock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deadlock? I just want to block until the result is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't block this way, sooner or later will cause a deadlock. I'd use
Task.Run(() => ...).Result
insteadThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sigh. Blarg. Another nail in the coffin for a sync API. I don't mind sync handlers, but it's a lot nicer to have the Send be async-only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the boundary between sync and async still sucks, for years now and couting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Async over sync is fiiine. Sync over async, not so much 😢 Hence my comment 😉