Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Adding support for record messages #27844

Closed
wants to merge 1 commit into from

Conversation

Nyholm
Copy link
Member

@Nyholm Nyholm commented Jul 4, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets
License MIT
Doc PR symfony/symfony-docs#10015

With inspiration from this blog post, I created a message recorder. This is needed when you handle messages in the same doctrine transaction. Ie, you have a Command and the CommandHandler dispatches Events. Those events should be handled in a new doctrine transaction.

Scenario

If the command handler creates a User with uuid X and dispatches UserCreated event. If they are handled in the same transaction then this code will find nothing:

$this->em->getRepository(User::class)->findOneBy(['uuid'=>X]);

Example configuration:

framework:
    messenger:
        default_bus: messenger.bus.command
        buses:
            messenger.bus.command:
                middleware:
                    # Make sure recorded events are handled by the event bus. You may also configure the message recorder with a second parameter. 
                    - app.messenger.middleware.record_messages
                    - doctrine_transaction_middleware: ['default']
        # -- more buses

services: 
    app.messenger.middleware.record_messages:
        parent: messenger.middleware.record_messages
        arguments: ['@messenger.bus.event']

@@ -296,6 +296,11 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
// Convert argument to references if needed.
if (is_string($argument) && $argument[0] === '@') {
$argument = new Reference(substr($argument, 1));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @ogizanagi, This is the best way I found to support references. Im not sure if this is needed if you configure framework.messenger with XML.

@@ -296,6 +296,11 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
// Convert argument to references if needed.
if (is_string($argument) && '@' === $argument[0]) {
$argument = new Reference(substr($argument, 1));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @ogizanagi, This is the best way I found to support references. Im not sure if this is needed if you configure framework.messenger with XML.

Or should I go the long way of creating a Factory that uses @container as a dependency?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of factories weren't to replace DI config when more suitable, only to solve most simple needs for configurable middlewares. Therefore, I don't think we should add services references support.
If really desired, we could indeed use dedicated service locators or perhaps let the user wiring it itself by creating the middleware service using DI config for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I tried using a Factory that depends on @service_container. That does not work since the buses are private.

I also tried injecting all services tagged with message_bus into the factory. That did not work because of circular dependencies.

Circular reference detected for service "debug.traced.messenger.bus.command", path: "debug.traced.messenger.bus.command -
debug.traced.messenger.bus.command.inner -> messenger.bus.command.middleware.messenger.middleware.handles_recorded_mess
ages -> messenger.middleware.handles_recorded_messages.factory -> debug.traced.messenger.bus.command".

Im saying that I do not know a better (working) solution that this. Im sorry.

Copy link
Member

@ogizanagi ogizanagi Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Injecting the Symfony service container now belongs to another era 😃
We should register dedicated service locators instead. This would happen in the MessengerPass, using ServiceLocatorTagPass::register().

But as I said, we could let the user create the concrete instance of this middleware in userland for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've updated the configuration example

$this->messageRecorder->eraseMessages();

foreach ($recordedMessages as $recordedMessage) {
$this->messageBus->dispatch($recordedMessage);
Copy link
Member

@ogizanagi ogizanagi Jul 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a recorded message recorded another one?
Should we clear any recorded message before middleware execution, so it's obvious it's not supported & undesired by design, and no left over message is dispatched when reusing the bus to handle another message? (am I clear? 😅)

Also should an exception while dispatching a recorded message really interrupt everything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.

I think I agree with all your questions. I'll try to make some changes to address them.

*
* @return object[]
*/
public function recordedMessages(): array;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not naming this function fetch() instead of recordedMessages(), as you already name below the method record() (without messages at the end of the name) ?

Same thing for eraseMessages().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the names from SimpleBus. But you are correct, your naming seams easier

try {
$this->messageBus->dispatch($recordedMessage);
} catch (\Throwable $exception) {
$exceptions[] = $exception;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if one handler to message A throws an exception, then other handlers may not run (depending on the order).

This will just make sure that handlers to message B will run even though a handler to A throws an exception.

Copy link
Member

@kbond kbond Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This slightly ties into what I proposed in #27215. I think all handlers should be run even if some fail. Also, I think catching all the exceptions should be done in a middleware on the event bus.

/**
* {@inheritdoc}
*/
public function erase(): void
Copy link
Member

@ogizanagi ogizanagi Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset? Would be a legit use-case for the ResettableInterface too I guess (for now using the kernel.reset tag manually until #27093 is approved and merged)

}

$exceptions = array();
while(!empty($recordedMessages = $this->messageRecorder->fetch())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if rather than while?

public static function create(array $exceptions): self
{
$message = sprintf(
"One or more handlers for reordered messages threw an exception. Their messages were: \n\n%s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/reordered/recorded

{
return $this->exceptions;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

if (!empty($exceptions)) {
throw MessageRecorderHandlerException::create($exceptions);
Copy link
Member

@ogizanagi ogizanagi Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use named construct for exceptions in core ^^

Wild thoughts:
Should it really throw? Make it configurable? Logged as error? Should the MessageRecorder store the exceptions itself?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. I think we should throw.
But making it configurable sounds like a reasonable future update.

@@ -34,6 +34,11 @@
<argument type="service" id="validator" />
</service>

<service id="messenger.middleware.handles_recorded_messages" class="Symfony\Component\Messenger\Middleware\HandlesRecordedMessagesMiddleware" abstract="true">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the names of other middlewares, this one maybe should be called messenger.middleware.record_messages

public function fetch(): array;

/**
* Erase messages that were recorded since the last call to eraseMessages().

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As your function is called reset(), I would have say:

Reset messages that were recorded since the last call to eraseMessages().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset messages that were recorded since the last call to reset().

😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actually extend the ResettableInterface from symfony/contracts now.

*
* @param object $message
*/
public function record($message);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add : void as function signature (because of interface).

Copy link
Contributor

@sroze sroze Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any value in adding : void tbh. It might be an issue if some recorders want to return something? (which might be needed in some case?)

sroze
sroze previously requested changes Jul 20, 2018
Copy link
Contributor

@sroze sroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this pull-request, there is nothing that actually records the messages, right? 🤔

As far as I understand, the problem you are trying to solve is to wrap messages into some "transaction", so that dispatched message will be handled after the "current" one, right? If I understand it correctly then I believe we should work on the naming to clarify this and we can use the term "transactions", it isn't necessarily database-only.

public function fetch(): array;

/**
* Erase messages that were recorded since the last call to eraseMessages().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actually extend the ResettableInterface from symfony/contracts now.

*
* @param object $message
*/
public function record($message);
Copy link
Contributor

@sroze sroze Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any value in adding : void tbh. It might be an issue if some recorders want to return something? (which might be needed in some case?)

namespace Symfony\Component\Messenger\Exception;

/**
* When handling recorded messaged one or more handlers caused an exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/one/on

public function __construct(array $exceptions)
{
$message = sprintf(
"One or more handlers for recorded messages threw an exception. Their messages were: \n\n%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is only one exception, let's not confuse the user and display it without wrapping it to "one or more".

*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class MessageRecorderHandlerException extends \RuntimeException implements ExceptionInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageHandlingException? This has nothing to do with the "recorder" AFAIK


$exceptions = array();
while (!empty($recordedMessages = $this->messageRecorder->fetch())) {
$this->messageRecorder->erase();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the use-case is always fetch and erase... I'm pretty sure we can find better than fetch + erase/reset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure what you mean here.

public function handle($message, callable $next)
{
// Make sure the recorder is empty before we begin
$this->messageRecorder->erase();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid any side effects. See #27844 (comment)

@Nyholm
Copy link
Member Author

Nyholm commented Jul 23, 2018

In this pull-request, there is nothing that actually records the messages, right? 🤔

True. "record" is a word from SimpleBus.

As far as I understand, the problem you are trying to solve is to wrap messages into some "transaction", so that dispatched message will be handled after the "current" one, right? If I understand it correctly then I believe we should work on the naming to clarify this and we can use the term "transactions", it isn't necessarily database-only.

Correct. I want to wrap my commands in one transaction and handle my events later. So when used with doctrine transaction middleware, if my event fails, it should not roll back the command.

@gubler
Copy link

gubler commented Aug 21, 2018

I implemented this pull request in an application I'm working on after it was pointed out to me by @ogizanagi in Slack and @sroze asked me to write up a use case.

I am working on App with a Command Bus with doctrine transactions and had an Event Bus. When injecting the Event Bus directly to the Command Handler, any error in the doctrine transaction caused a rollback as it was supposed to, but the Events were still dispatched to the Event Bus.

I copied the contents of this pull request into my app's namespace - just changing the namespaces for everything and it worked flawlessly. I did have to add the reset method to the MessageRecorderInterface because I am on Symfony 4.1 and not using symfony/contracts.

Now I inject the message recorder and if my transaction fails, the event is not dispatched. Here is an example Command Handler:

namespace App\Domain\CommandHandler\Facility;

use App\Domain\Command\Facility\NewFacilityCommand;
use App\Domain\Event\Facility\FacilityCreatedEvent;
use App\Lib\Messenger\MessageRecorder\MessageRecorderInterface;
use App\Repository\FacilityRepository;

/**
 * Class NewFacilityHandler.
 */
class NewFacilityHandler
{
    /** @var FacilityRepository */
    protected $repository;
    /** @var MessageRecorderInterface */
    protected $recorder;

    /**
     * @param FacilityRepository       $repository
     * @param MessageRecorderInterface $recorder
     */
    public function __construct(FacilityRepository $repository, MessageRecorderInterface $recorder)
    {
        $this->repository = $repository;
        $this->recorder = $recorder;
    }

    /**
     * @param NewFacilityCommand $command
     */
    public function __invoke(NewFacilityCommand $command)
    {
        $facility = $this->repository->persist($command);

        $this->recorder->record(
            new FacilityCreatedEvent(
                $facility,
                $command->generator
            )
        );
    }
}

I kept all the names the same so when this is (hopefully) merged and added to a tagged release, I can update all of the use statements and my services.yaml file and everything should continue to work the same.

I did have one problem until I read the documentation PR that mentioned that the message recorder middleware needed to be declared before the transaction middleware, but once I fixed that, it worked fine.

Here is my messages.yaml if it helps:

framework:
    messenger:
        default_bus: messenger.bus.commands

        buses:
            messenger.bus.commands:
                middleware:
                    - validation
                    - record_messages
                    - doctrine_transaction_middleware
                    - 'App\Domain\BusMiddleware\SecurityMiddleware'
            messenger.bus.events:
                middleware:
                    - validation
                    - allow_no_handler
                    - doctrine_transaction_middleware
                    - 'App\Domain\BusMiddleware\SecurityMiddleware'
                    - 'App\Domain\EventBusMiddleware\FlashMessageMiddleware'
            messenger.bus.queries:
                middleware:
                    - validation
                    - doctrine_transaction_middleware
                    - 'App\Domain\BusMiddleware\SecurityMiddleware'

and the relevant parts of my services.yaml. I had to add the method argument to the tags for message.recorder - I assume this is bacause I am using Symfony 4.1 and not using symfony/contracts.

 # doctrine middleware for command bus
    doctrine.orm.messenger.middleware_factory.transaction:
        class: Symfony\Bridge\Doctrine\Messenger\DoctrineTransactionMiddlewareFactory
        arguments: ['@doctrine']

    messenger.middleware.doctrine_transaction_middleware:
        class: Symfony\Bridge\Doctrine\Messenger\DoctrineTransactionMiddleware
        factory: ['@doctrine.orm.messenger.middleware_factory.transaction', 'createMiddleware']
        abstract: true
        arguments: ['default']

    messenger.middleware.record_messages:
        class: App\Domain\BusMiddleware\HandlesRecordedMessagesMiddleware

    messenger.recorder:
        class: App\Lib\Messenger\MessageRecorder\MessageRecorder
        tags:
            - name: 'kernel.reset'
              method: 'reset'

    App\Lib\Messenger\MessageRecorder\MessageRecorderInterface:
        alias: "messenger.recorder"

$this->messageRecorder->reset();

try {
$next($message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to capture the results as well to return them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. Thank you. I've updated the PR

* @author Tobias Nyholm <tobias.nyholm@gmail.com>
* @author Matthias Noback <matthiasnoback@gmail.com>
*/
interface MessageRecorderInterface extends ResetInterface

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleBus split this interface into 2 small ones. The first one ContainsRecordedMessages is implement by our models because you need to tell them to give us the events raise during the transaction. The second one RecordsMessages is used to record event at this end of the transaction. For instance, I use it in my repository when my aggregate is flushed. I guess this two distinct responsibilities: I don't want that my model have a public method record.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I prefer the naming of simple bus (declarative vs imperative programming). It is better to describer what happens instead how it happens :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about slitting them. But I was not sure. Thank you for commenting!

*
* @return object[]
*/
public function fetch(): array;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name seems weird to me. Like I said previously my model will implement this interface. For example, a product model. My product will have Product::fetch() and Product::reset(). In term of business that does not have any sense. I think we should use eraseMessage() and recordedMessage() it more explicit. I think it is a good compromise between technical and business stuff.

parent::__construct($message);
}

public function getExceptions(): array

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toArray would be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, Im not sure

}

if (!empty($exceptions)) {
if (1 === count($exceptions)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you do that? I think it will be better to throw a single type of exception. It will be easier to handle it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MessageHandlingException is only for grouped exceptions. If the group only have one element, there is no need for a group.

I think an earlier review suggested it to be this way.

@@ -34,6 +34,11 @@
<argument type="service" id="validator" />
</service>

<service id="messenger.middleware.record_messages" class="Symfony\Component\Messenger\Middleware\HandlesRecordedMessagesMiddleware" abstract="true">
<argument type="service" id="message_bus" />

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message_bus? Will you dispatch the events into the same bus than the command?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be overwritten by your configuration. It is just abstract.

@Nyholm
Copy link
Member Author

Nyholm commented Sep 9, 2018

Thank you for the reviews. I've made some updates.

@chalasr
Copy link
Member

chalasr commented Sep 9, 2018

Could you add some tests? Something like https://github.com/symfony/symfony/blob/master/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php#L615 would help understanding the wiring of this feature for me.

@Koc
Copy link
Contributor

Koc commented Sep 15, 2018

@Nyholm consider another approach, when aggregate root emits events and they are dispatched after transaction commited. See https://beberlei.de/2013/07/24/doctrine_and_domainevents.html for more details.

With current approach we need emit events manually from command handlers.

@Nyholm
Copy link
Member Author

Nyholm commented Sep 15, 2018 via email

@Nyholm Nyholm force-pushed the record-messages branch 2 times, most recently from 57a9089 to f81f82b Compare September 15, 2018 12:06
@Nyholm
Copy link
Member Author

Nyholm commented Sep 15, 2018

I've squashed and rebased. I'm ready for a final review.

@sroze sroze dismissed their stale review September 15, 2018 13:43

Changes made

public function handle($message, callable $next)
{
// Make sure the recorder is empty before we begin
$this->messageRecorder->resetRecordedMessages();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's imagine the following example:

  1. Controller dispatches a FirstMessage message.
  2. FirstMessageHandler gets called and it does the following:
    $recorder->record(new SecondMessage);
    $bus->dispatch(new SaveToSecondaryDatabase);
    $recorder->record(new ThirdMessage);

The dispatch will call the middleware and the just recorded message will be erased, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct IF the current bus and $bus is using the same MessageRecorder.

I think I should remove this line. Are you okey with that @ogizanagi? It was added by your suggestion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be fine removing it, but the case described above is a more complex topic then.
For instance, if there is any issue with FirstMessageHandler after SaveToSecondaryDatabase, SecondMessage
(and possibly those recorded by SaveToSecondaryDatabase) would still have been executed while I would expect it does only if FirstMessageHandler is fully successful.
Actually, to me recorded messages should be treated if and only if the highest dispatch call is successful. Possibly by using an internal flag inside the middleware? (Thus, we could even keep the reset for safety, only when the flag is off).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is equivalent to the FinishesHandlingMessageBeforeHandlingNext middleware of SimpleBus. (I prefer SimpleBus' naming in here, it's more explicit)

}

$exceptions = array();
while (!empty($recordedMessages = $this->messageRecorder->getRecordedMessages())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning of get + reset instead of pop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make sure nobody runs $this->messageRecorder->resetRecordedMessages(); while I'm handling the recorded messages.

@nicolas-grekas
Copy link
Member

(deps=low tests are red, but they should never be to me.)

@Nyholm
Copy link
Member Author

Nyholm commented Sep 17, 2018

I rebased and everything is green.

@weaverryan
Copy link
Member

@Nyholm is the documentation PR more-or-less up-to-date with the most recent changes? I'm want to "wrap my head" around this PR - I was starting by reading your docs :)

@Nyholm
Copy link
Member Author

Nyholm commented Sep 18, 2018

Yes. It’s up to date.

@javiereguiluz
Copy link
Member

@Nyholm is "record" a common/required word in this context? Personally I would always say: "save messages" or "store messages" to refer to something like this but never "record messages".

@Nyholm
Copy link
Member Author

Nyholm commented Sep 22, 2018

That is a fair point. "Record" is used from SimpleBus. As far as I know it is made up by them. Using "record" will make it easier for SimpleBus users to migrate (if they would like to).

Im all for using another word but it has to be more clear than using "record". I think that using "Store" or "Save" would cause for confusion since the component support adding things to a queue or "queue like".

Since I'm not "saving" any messages anywhere (just having them in a temporary memory queue) I don't think it is a good word.

@sroze
Copy link
Contributor

sroze commented Sep 22, 2018

Then, what about this?

$this->bus->dispatch(Envelope::wrap($message)->with(new ToBeDispatchedAfterCurrentMessageHandling))

All we need is a middleware that is doing the "locking" (i.e. tactitian term) or "FinishesHandlingMessageBeforeHandlingNext" (SimpleBus) only if the following envelope item is present.

@Nyholm
Copy link
Member Author

Nyholm commented Sep 22, 2018

With an EventRecorder you can collect events from Doctrine (just as @arnolanglade is doing).

class User {
  use EventRecorderTrait; // To be created in a following PR

  public function setUsername($username) 
  { 
    $this->username = $username;
    $this->record(new UsernameWasUpdated());
  }
  // ...
}

Using the minor modification in my entity as shown above and a DoctrineEventRecorder (Doctrine subscriber) I can make sure that domain events like these eventually ends up on the bus.


You will also have the flexibility to decide which events that should be recorded and which should be handled now.

public function handle(FooCommand $command)
{
    $this->recorder->record(new Bar()); // Handle later
    $this->eventBus->dispatch(new Biz()); // Handle now
}

This is not possible with a middleware like FinishesHandlingMessageBeforeHandlingNext.


I know "EventRecorder" and "record" is a new concept to this component. But it is not a new concept for message busses. SimpleBus has successfully been using this for years.

@sroze
Copy link
Contributor

sroze commented Sep 22, 2018

You will also have the flexibility to decide which events that should be recorded and which should be handled now.

That would exactly be the point of this Envelope item in this comment, right?

Using the minor modification in my entity as shown above and a DoctrineEventRecorder (Doctrine subscriber) I can make sure that domain events like these eventually ends up on the bus.

But you could do the same with the other option which does not introduce this extra notion of "recorder", isn't it?

@Nyholm
Copy link
Member Author

Nyholm commented Sep 22, 2018

No, you can’t. (As far as I know). How and when would you dispatch new messages to the bus?
Sure, you can create a middleware called “DispatchMessagesCollectedByDoctrineSubscriber”. That would possibly achieve the same thing but will less abstraction and flexibility.

I don’t know why we are hesitant of introducing this concept to the MessengerComponent. The argument cannot be DX and/or to keep the component simple, because the alternative solutions proposed are neighter simpler or more intuitive.

Btw, I like that we are challenging this, and I don’t think we should have redundant ways of doing the same thing.

@Koc
Copy link
Contributor

Koc commented Sep 22, 2018

How it possible to use this MessageRecorder for event sourcing and store all triggered events in db in same transaction? But queue them to the bus after transaction is complete.

@Nyholm
Copy link
Member Author

Nyholm commented Sep 22, 2018

This is how you keep them in different transaction:

# config/packages/messenger.yaml
framework:
    messenger:
        default_bus: messenger.bus.command
        buses:
            messenger.bus.command:
                middleware:
                    - messenger.middleware.validation
                    - messenger.middleware.handles_recorded_messages
                      # Doctrine transaction must be after handles_recorded_messages middleware
                    - doctrine_transaction_middleware: ['default']
            messenger.bus.event:
                middleware:
                    - messenger.middleware.allow_no_handler
                    - messenger.middleware.validation

If you change the order fo the middlewares, you can get them in the same middleware:

            messenger.bus.command:
                middleware:
                    - messenger.middleware.validation
                    - doctrine_transaction_middleware: ['default']
                    - messenger.middleware.handles_recorded_messages

If you are talking about events from a CommandHandler you would obviously not use this event recorder but the event bus directly. But if you are referring to the events from your entities, then the solution above it for you.

If you want to use both, just define more services for the same classes.

@Koc
Copy link
Contributor

Koc commented Sep 22, 2018

If you are talking about events from a CommandHandler you would obviously not use this event recorder but the event bus directly.

No, I cann't. Command handing wrapped in transaction and we shouldn't send any events until it commited.

  1. emit events in aggregate root
  2. save aggregate root and emited events in db in same transaction
  3. after transaction commited - send emited events also to bus.

If you want to use both, just define more services for the same classes.

Not sure that I understand what services should be defined twice. Can you elaborate?

@fabpot
Copy link
Member

fabpot commented Mar 3, 2019

Closing in favor of #28849

@fabpot fabpot closed this Mar 3, 2019
sroze added a commit that referenced this pull request Mar 19, 2019
…t bus is finished (Nyholm)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Support for handling messages after current bus is finished

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10015

This is a replacement for #27844. We achieve the same goals without introducing the new concept of "recorder".

```php
class CreateUserHandler
{
    private $em;
    private $eventBus;

    public function __construct(MessageBus $eventBus, EntityManagerInterface $em)
    {
        $this->eventBus = $eventBus;
        $this->em = $em;
    }

    public function __invoke(CreateUser $command)
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        $message = new UserCreatedEvent($command->getUuid();
        $this->eventBus->dispatch((new Envelope($message))->with(new DispatchAfterCurrentBus()));
    }
}
```

Note that this `DispatchAfterCurrentBusMiddleware` is added automatically as the first middleware.

2019-03-13: I updated the PR description.

Commits
-------

903355f Support for handling messages after current bus is finished
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
weaverryan added a commit to symfony/symfony-docs that referenced this pull request May 24, 2019
…rentBusMiddleware (Nyholm, gubler, OskarStark)

This PR was submitted for the master branch but it was merged into the 4.3 branch instead (closes #10015).

Discussion
----------

[Messenger] Added documentation about DispatchAfterCurrentBusMiddleware

Documentation to PR: symfony/symfony#27844

![Screenshot 2019-03-17 at 11 43 23](https://user-images.githubusercontent.com/1275206/54489171-edee4880-48a9-11e9-8028-ac501e62dfcb.png)
![Screenshot 2019-03-17 at 11 43 34](https://user-images.githubusercontent.com/1275206/54489173-edee4880-48a9-11e9-877f-f749f242b502.png)

![Screenshot 2019-03-17 at 11 43 39](https://user-images.githubusercontent.com/1275206/54489172-edee4880-48a9-11e9-84f1-d78e68474c7a.png)

Commits
-------

0d12880 minor updates according to feedback
ec3e1d7 updates according to feedback
3e99663 Update messenger/message-recorder.rst
fe81abc Update messenger/message-recorder.rst
c1cef9e Update messenger/message-recorder.rst
cfa56f7 Update messenger/message-recorder.rst
06ec8a4 Added PHP and XML config
797f530 Minor updates
30e958c Explain HandleMessageInNewTransaction middleware
84b81e3 Added fixes
ca22c3c a user
fff8659 According to feedback
afdddbf Added documentation about message recorder
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet