Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Domain Event Publisher for Doctrine Entities #62

Open
webdevilopers opened this issue Jan 25, 2022 · 1 comment
Open

Domain Event Publisher for Doctrine Entities #62

webdevilopers opened this issue Jan 25, 2022 · 1 comment

Comments

@webdevilopers
Copy link
Owner

webdevilopers commented Jan 25, 2022

Currently we use Prooph Event Sourcing for our Aggregate Roots. Since the Prooph Service Bus was deprecated we use the Symfony Messenger instead.

services:
    _defaults:
        public: false

    Prooph\EventStoreBusBridge\EventPublisher:
        class: Acme\Shared\Infrastructure\Prooph\EventPublisher
        arguments:
            - '@event.bus'
        tags:
            - { name: 'prooph_event_store.default.plugin' }
<?php

namespace Acme\Shared\Infrastructure\Prooph;

use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Symfony\Component\Messenger\MessageBusInterface;

final class EventPublisher extends AbstractPlugin
{
    private MessageBusInterface $eventBus;

    /**
     * @var Iterator[]
     */
    private array $cachedEventStreams = [];

    public function __construct(MessageBusInterface $eventBus)
    {
        $this->eventBus = $eventBus;
    }

    public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
    {
        $this->listenerHandlers[] = $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_APPEND_TO,
            function (ActionEvent $event) use ($eventStore): void {
                $recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());

                if (! $this->inTransaction($eventStore)) {
                    if ($event->getParam('streamNotFound', false)
                        || $event->getParam('concurrencyException', false)
                    ) {
                        return;
                    }

                    foreach ($recordedEvents as $recordedEvent) {
                        $this->eventBus->dispatch($recordedEvent);
                    }
                } else {
                    $this->cachedEventStreams[] = $recordedEvents;
                }
            }
        );

        $this->listenerHandlers[] = $eventStore->attach(
            ActionEventEmitterEventStore::EVENT_CREATE,
            function (ActionEvent $event) use ($eventStore): void {
                $stream = $event->getParam('stream');
                $recordedEvents = $stream->streamEvents();

                if (! $this->inTransaction($eventStore)) {
                    if ($event->getParam('streamExistsAlready', false)) {
                        return;
                    }

                    foreach ($recordedEvents as $recordedEvent) {
                        $this->eventBus->dispatch($recordedEvent);
                    }
                } else {
                    $this->cachedEventStreams[] = $recordedEvents;
                }
            }
        );

        if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
            $this->listenerHandlers[] = $eventStore->attach(
                TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
                function (ActionEvent $event): void {
                    foreach ($this->cachedEventStreams as $stream) {
                        foreach ($stream as $recordedEvent) {
                            $this->eventBus->dispatch($recordedEvent);
                        }
                    }
                    $this->cachedEventStreams = [];
                }
            );

            $this->listenerHandlers[] = $eventStore->attach(
                TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
                function (ActionEvent $event): void {
                    $this->cachedEventStreams = [];
                }
            );
        }
    }

    private function inTransaction(EventStore $eventStore): bool
    {
        return $eventStore instanceof TransactionalActionEventEmitterEventStore
            && $eventStore->inTransaction();
    }
}

We have some legacy projects using Doctrine Entities. We don't want to use Event Sourcing. But we would like to publish Domain Events from the Entity to the Messenger Bus.

Has anybody implemented a Domain Publisher using Doctrine Event Subscribers or Lifecycle Callbacks for instance? I guess flushing on the entity manager and publishing the domain event should happen in the same transaction?

@webdevilopers
Copy link
Owner Author

webdevilopers commented Jan 25, 2022

There is an extension by @vaniocz @nixbody @maryo:
https://github.com/vaniocz/doctrine-domain-events

I guess flushing on the entity manager and publishing the domain event should happen in the same transaction?

As documented:

The event is dispatched at the end of transaction once your entity has been flushed and all the changes are projected into database so it is possible to both perform database queries over the changes as well as cancel the transaction.

Source: https://github.com/vaniocz/doctrine-domain-events#raising-domain-events

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant