Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic pipeline #115

Merged
merged 6 commits into from Dec 19, 2016
Merged

Generic pipeline #115

merged 6 commits into from Dec 19, 2016

Conversation

jbogard
Copy link
Owner

@jbogard jbogard commented Dec 12, 2016

@khellang this doesn't actually work in StructureMap, those tests fail. I think this might introduce more complexity. But maybe we just say we only support open generics.

@jbogard
Copy link
Owner Author

jbogard commented Dec 12, 2016

Looks like open generics work just fine, but you'll need to use constraints for individuals. No idea if this is supported in other containers, @khellang can Scrutor handle an enumerable of open generics?

@jdaigle
Copy link

jdaigle commented Dec 12, 2016

@jbogard it works in Autofac (with an almost identical registration):

builder.RegisterGeneric(typeof(CommandHandlerPipeline<,>))
       .As(typeof(IPipelineBehavior<,>));

It also supports mixing both open and closed generic pipelines. Disappointing that it does not work in StructureMap.

@jbogard
Copy link
Owner Author

jbogard commented Dec 12, 2016

@jdaigle meh, it would be really weird if your pipeline varied that much. Checking our existing pipelines, we don't vary them per message. Only things like request mutators, but those aren't a pipeline step in and of themselves. I do like putting in some stock pipeline behaviors though.


return result;
return pipeline.Result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadlock?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadlock? I just want to block until the result is returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't block this way, sooner or later will cause a deadlock. I'd use Task.Run(() => ...).Result instead

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh. Blarg. Another nail in the coffin for a sync API. I don't mind sync handlers, but it's a lot nicer to have the Send be async-only.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the boundary between sync and async still sucks, for years now and couting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Async over sync is fiiine. Sync over async, not so much 😢 Hence my comment 😉


var pipeline = GetPipeline(request, invokeHandler);

pipeline.Wait();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadlock?

private Task<TResponse> GetPipeline<TResponse>(object request, RequestHandlerDelegate<TResponse> invokeHandler)
{
var requestType = request.GetType();
var method = CreatePipelineMethod.MakeGenericMethod(requestType, typeof(TResponse));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure any cache matters. With many containers (including the ASP.NET Core one), you have to register the Mediator class as scoped (see https://github.com/jbogard/MediatR.Extensions.Microsoft.DependencyInjection/blob/master/src/MediatR.Extensions.Microsoft.DependencyInjection/ServiceCollectionExtensions.cs#L188). So tbqh I'd be up for removing all the caches, it's just cognitive overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about caching instances. It's about caching the generic types and methods. Those caches should be static, so it doesn't matter how the mediator is registered 😄

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep cache and make it static indeed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first thought was to pass a state machine into the pipeline which would be executed on every step. This would work and flip all the closed-over instances into the state machine which is passed as an input argument. Downside is that this call chain pollutes the stack trace quite a bit.

If you really want to do this right you have to do some code gen with expression trees.
Had a call with Daniel (Marbach) this monday where we concluded this. As he has created something very similar for NServiceBus.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with a compiled expression-based pipeline is that you lose the ability to inject services into your decorators/handlers' ctors. You have to know, at compile-time, what arguments the pipeline takes. That's why you typically end up with a PipelineContext, which essentially is a bag of objects, like the OWIN dictionary, that's used to flow stuff through the pipeline.

My question is; are we willing to drop the nice dependency injection in decorators/handlers for this? Has anyone done any benchmarking to see if this is needed? I don't think there's been a heap of complaints on MediatR perf to date. Has this changed?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not accurate at all, as what NServiceBus does can very easily be adapted to using a different instance every time, neatly supplied via the input arguments. This way you remove the state machine stack trace overhead but you still have all the freedom to input new instances on every call.

It is exactly that problem what I have been trying to solve in a performant manner, and it's very much possible.

The problem here is that you want to create a dynamic call chain next -> next -> next of length N, only way to do that is either code gen or allocate func's.

Problem with func is that you have to close over your current (say request scoped) step instance, which in effect makes the full pipeline 'single use'.

The option you have to fix this is pulling this instance out of the func and let the func execute a method on its input argument. For instance stateMachine.DoNext(next); which would then internally advance one step and do correct async exception bubbling etc. I've seen an example of this and it's not pretty, lot's of work to get it right and in the end you still have a polluted stack trace.

Another option could be to supply a ref int index (only possible with a delegate) and an array of pipeline steps, so each func can pick the step at that index, increment the index and run the step. However then you need to have a way to carry the ref index with you when you call the step (as it has to call next) and ref params can be finicky.

When you look at all that work you could conclude, let's just code gen these delegate body's with the index number burned into them (pseudo code):

public Task<TResponse> DelegateOne(IPipelineBehavior[] behaviors, request, next){
    return behaviors[0].Handle(request, next);
}

public Task<TResponse> DelegateTwo(IPipelineBehavior[] behaviors, request, next){
    return behaviors[1].Handle(request, next);
}

etc

Just cache these individually, then cache pipelines based on step count and you are good to go.

@jbogard
Copy link
Owner Author

jbogard commented Dec 14, 2016 via email

@jbogard jbogard added this to the 3.0 milestone Dec 19, 2016
@jbogard jbogard merged commit 852413f into master Dec 19, 2016
@jbogard jbogard deleted the GenericPipeline branch December 19, 2016 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants