diff --git a/README.md b/README.md index 9a8e4ef..19a7022 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,76 @@ $eventBus = new EventBus($symfonyDispatcher); $eventBus->dispatch($event); ``` +#### Asynchronicity + +To allow events to be handled asynchronously you should include `Gears\Event\Symfony\Dispatcher\AsyncEventHandler` in dispatcher's constructor + +AsyncEventHandler requires an implementation of `Gears\Event\Async\EventQueue` which will be responsible for event queueing and an instance of `Gears\Event\Async\Discriminator\EventDiscriminator` used to discriminate which events should be queued + +```php +use Gears\Event\Async\Discriminator\ParameterEventDiscriminator; +use Gears\Event\Async\Serializer\NativePhpEventSerializer; +use Gears\Event\Symfony\Dispatcher\AsyncEventQueueHandler; +use Gears\Event\Symfony\Dispatcher\ContainerAwareDispatcher; +use Gears\Event\Symfony\Dispatcher\EventBus; +use Gears\Event\Symfony\Dispatcher\Dispatcher; + +/* @var \Gears\Event\Async\EventQueue $eventQueue */ +$eventQueue = new EventQueueImplementation(new NativePhpEventSerializer()); + +$asyncEventHandler = new AsyncEventQueueHandler($eventQueue, new ParameterEventDiscriminator('async')); + +$eventToHandlerMap = []; + +$symfonyDispatcher = new Dispatcher($eventToHandlerMap, [$asyncEventHandler]); +// OR +/** @var \Psr\Container\ContainerInterface $container */ +$symfonyDispatcher = new ContainerAwareDispatcher($container, $eventToHandlerMap, [$asyncEventHandler]); + +$eventBus = new EventBus($symfonyDispatcher); + +/** @var \Gears\Event\Event $event */ +$eventBus->dispatch($event); +``` + +If you'd like to send different events to different message queues you can just add more instances of AsyncEventQueueHandler + +To know more about how to create and configure an EventQueue head to [phpgears/event-async](https://github.com/phpgears/event-async) + +##### Dequeueing + +This part is highly dependent on your message queue, though event serializers can be used to deserialize queue messages + +This is just an example of the process + +```php +use Gears\Event\Async\Serializer\NativePhpEventSerializer; +use Gears\Event\Symfony\Dispatcher\ContainerAwareDispatcher; +use Gears\Event\Symfony\Dispatcher\EventBus; +use Gears\Event\Symfony\Dispatcher\Dispatcher; + +$eventToHandlerMap = []; + +$symfonyDispatcher = new Dispatcher($eventToHandlerMap); +// OR +/** @var \Psr\Container\ContainerInterface $container */ +$symfonyDispatcher = new ContainerAwareDispatcher($container, $eventToHandlerMap); + +$eventBus = new EventBus($symfonyDispatcher); +$serializer = new NativePhpEventSerializer(); + +while (true) { + /* @var your_message_queue_manager $queue */ + $message = $queue->getMessage(); // extract messages from queue + + if ($message !== null) { + $event = $serializer->fromSerialized($message); + + $eventBus->dispatch($event); + } +} +``` + ## Contributing Found a bug or have a feature request? [Please open a new issue](https://github.com/phpgears/event-symfony-event-dispatcher/issues). Have a look at existing issues before. diff --git a/composer.json b/composer.json index b8a14fa..ccf9cf8 100644 --- a/composer.json +++ b/composer.json @@ -24,7 +24,8 @@ "prefer-stable": true, "require": { "php": "^7.1", - "phpgears/event": "~0.3.3", + "phpgears/event": "dev-reflection-based", + "phpgears/event-async": "dev-reflection-based", "symfony/event-dispatcher": "^4.3" }, "require-dev": { diff --git a/phpstan.neon b/phpstan.neon index 674be93..a1c637a 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -2,5 +2,3 @@ parameters: level: max paths: - src - ignoreErrors: - - '/^Parameter #2 \$listener of method .+\\EventDispatcher::addListener\(\) expects callable\(\): mixed, .+ given.$/' diff --git a/src/AsyncEventQueueHandler.php b/src/AsyncEventQueueHandler.php new file mode 100644 index 0000000..3411cba --- /dev/null +++ b/src/AsyncEventQueueHandler.php @@ -0,0 +1,62 @@ + + */ + +declare(strict_types=1); + +namespace Gears\Event\Symfony\Dispatcher; + +use Gears\Event\Async\Discriminator\EventDiscriminator; +use Gears\Event\Async\EventQueue; +use Gears\Event\Async\QueuedEvent; +use Gears\Event\Event; + +final class AsyncEventQueueHandler +{ + /** + * Event queue. + * + * @var EventQueue + */ + private $eventQueue; + + /** + * Event discriminator. + * + * @var EventDiscriminator + */ + private $discriminator; + + /** + * AsyncEventHandler constructor. + * + * @param EventQueue $eventQueue + * @param EventDiscriminator $discriminator + */ + public function __construct(EventQueue $eventQueue, EventDiscriminator $discriminator) + { + $this->eventQueue = $eventQueue; + $this->discriminator = $discriminator; + } + + /** + * Handle event. + * + * @param Event $event + */ + public function handle(Event $event): void + { + if (!$event instanceof QueuedEvent && $this->discriminator->shouldEnqueue($event)) { + $this->eventQueue->send(new QueuedEvent($event)); + + return; + } + } +} diff --git a/src/ContainerAwareDispatcher.php b/src/ContainerAwareDispatcher.php index a1d9164..e08eea7 100644 --- a/src/ContainerAwareDispatcher.php +++ b/src/ContainerAwareDispatcher.php @@ -13,13 +13,11 @@ namespace Gears\Event\Symfony\Dispatcher; +use Gears\Event\Event; use Gears\Event\EventHandler; use Psr\Container\ContainerInterface; -use Symfony\Component\EventDispatcher\EventDispatcher as SymfonyEventDispatcher; -use Symfony\Component\EventDispatcher\EventSubscriberInterface; -use Symfony\Contracts\EventDispatcher\Event as SymfonyEvent; -class ContainerAwareDispatcher extends SymfonyEventDispatcher implements EventDispatcher +class ContainerAwareDispatcher extends Dispatcher { /** * @var ContainerInterface @@ -29,56 +27,24 @@ class ContainerAwareDispatcher extends SymfonyEventDispatcher implements EventDi /** * ContainerAwareEventDispatcher constructor. * - * @param ContainerInterface $container - * @param array $listenersMap + * @param ContainerInterface $container + * @param array $listenersMap + * @param array $asyncEventHandlers */ - public function __construct(ContainerInterface $container, array $listenersMap = []) - { - parent::__construct(); + public function __construct( + ContainerInterface $container, + array $listenersMap = [], + array $asyncEventHandlers = [] + ) { + parent::__construct($listenersMap, $asyncEventHandlers); $this->container = $container; - - foreach ($listenersMap as $eventName => $listeners) { - if (!\is_array($listeners)) { - $listeners = [$listeners]; - } - - foreach ($listeners as $listener) { - $this->addListener($eventName, $listener); - } - } } /** - * Adds an event subscriber. - * - * @param EventSubscriberInterface $subscriber - */ - public function addSubscriber(EventSubscriberInterface $subscriber): void - { - foreach ($subscriber::getSubscribedEvents() as $eventName => $params) { - if (!\is_array($params)) { - $params = [$params]; - } - - foreach ($params as $listener) { - if (\is_string($listener)) { - $this->addListener($eventName, $listener); - } else { - $this->addListener($eventName, $listener[0], $listener[1] ?? 0); - } - } - } - } - - /** - * Adds an event listener that listens on the specified events. - * - * @param string $eventName - * @param callable|string $listener - * @param int $priority + * {@inheritdoc} */ - public function addListener($eventName, $listener, $priority = 0): void + protected function assertListenerType($listener): void { if (!\is_string($listener)) { throw new \InvalidArgumentException(\sprintf( @@ -86,49 +52,15 @@ public function addListener($eventName, $listener, $priority = 0): void \is_object($listener) ? \get_class($listener) : \gettype($listener) )); } - - parent::addListener($eventName, $listener, $priority); } /** - * Dispatches an event to all registered listeners. - * - * @param mixed $eventEnvelope - * - * @return SymfonyEvent + * {@inheritdoc} */ - public function dispatch($eventEnvelope): SymfonyEvent + protected function dispatchEvent(iterable $listeners, Event $event): void { - if ($eventEnvelope === null) { - throw new \InvalidArgumentException('Dispatched event cannot be empty'); - } - - if (!$eventEnvelope instanceof EventEnvelope) { - throw new \InvalidArgumentException(\sprintf( - 'Dispatched event must implement "%s", "%s" given', - EventEnvelope::class, - \get_class($eventEnvelope) - )); - } - - $eventListeners = $this->getListeners($eventEnvelope->getWrappedEvent()->getEventType()); - $this->dispatchEvent($eventListeners, $eventEnvelope); - - return $eventEnvelope; - } - - /** - * Dispatch event to registered listeners. - * - * @param string[] $listeners - * @param EventEnvelope $event - */ - private function dispatchEvent(array $listeners, EventEnvelope $event): void - { - $dispatchEvent = $event->getWrappedEvent(); - + /** @var string $listener */ foreach ($listeners as $listener) { - /* @var EventHandler $handler */ $handler = $this->container->get($listener); if (!$handler instanceof EventHandler) { @@ -139,7 +71,7 @@ private function dispatchEvent(array $listeners, EventEnvelope $event): void )); } - $handler->handle($dispatchEvent); + $handler->handle($event); } } } diff --git a/src/Dispatcher.php b/src/Dispatcher.php index f37ce14..2ae718e 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -13,6 +13,7 @@ namespace Gears\Event\Symfony\Dispatcher; +use Gears\Event\Event; use Gears\Event\EventHandler; use Symfony\Component\EventDispatcher\EventDispatcher as SymfonyEventDispatcher; use Symfony\Component\EventDispatcher\EventSubscriberInterface; @@ -20,15 +21,25 @@ class Dispatcher extends SymfonyEventDispatcher implements EventDispatcher { + /** + * @var AsyncEventQueueHandler[] + */ + private $asyncEventHandlers = []; + /** * ContainerAwareEventDispatcher constructor. * - * @param array $listenersMap + * @param array $listenersMap + * @param AsyncEventQueueHandler[] $asyncEventHandlers */ - public function __construct(array $listenersMap = []) + public function __construct(array $listenersMap = [], array $asyncEventHandlers = []) { parent::__construct(); + foreach ($asyncEventHandlers as $asyncEventHandler) { + $this->addAsyncEventHandler($asyncEventHandler); + } + foreach ($listenersMap as $eventName => $listeners) { if (!\is_array($listeners)) { $listeners = [$listeners]; @@ -40,6 +51,16 @@ public function __construct(array $listenersMap = []) } } + /** + * Add asynchronous event handler. + * + * @param AsyncEventQueueHandler $eventHandler + */ + public function addAsyncEventHandler(AsyncEventQueueHandler $eventHandler): void + { + $this->asyncEventHandlers[] = $eventHandler; + } + /** * {@inheritdoc} */ @@ -68,6 +89,18 @@ public function addSubscriber(EventSubscriberInterface $subscriber): void * @param mixed $priority */ public function addListener($eventName, $listener, $priority = 0): void + { + $this->assertListenerType($listener); + + parent::addListener($eventName, $listener, (int) $priority); + } + + /** + * Assert listener type. + * + * @param mixed $listener + */ + protected function assertListenerType($listener): void { if (!$listener instanceof EventHandler) { throw new \InvalidArgumentException(\sprintf( @@ -76,8 +109,6 @@ public function addListener($eventName, $listener, $priority = 0): void \is_object($listener) ? \get_class($listener) : \gettype($listener) )); } - - parent::addListener($eventName, $listener, (int) $priority); } /** @@ -101,24 +132,55 @@ public function dispatch($eventEnvelope): SymfonyEvent )); } - $eventListeners = $this->getListeners($eventEnvelope->getWrappedEvent()->getEventType()); - $this->dispatchEvent($eventListeners, $eventEnvelope); + $event = $eventEnvelope->getWrappedEvent(); + + $this->handleAsync($event); + + parent::dispatch($eventEnvelope, $event->getEventType()); return $eventEnvelope; } /** - * Dispatch event to registered listeners. + * Handle async. + * + * @param Event $event + */ + protected function handleAsync(Event $event): void + { + foreach ($this->asyncEventHandlers as $eventHandler) { + $eventHandler->handle($event); + } + } + + /** + * {@inheritdoc} * - * @param EventHandler[] $listeners - * @param EventEnvelope $event + * @param iterable $listeners + * @param string $eventName + * @param object $event */ - private function dispatchEvent(array $listeners, EventEnvelope $event): void + protected function callListeners(iterable $listeners, string $eventName, $event): void { - $dispatchEvent = $event->getWrappedEvent(); + if ($event instanceof EventEnvelope) { + $this->dispatchEvent($listeners, $event->getWrappedEvent()); + + return; + } + + parent::callListeners($listeners, $eventName, $event); + } + /** + * Dispatch event to registered listeners. + * + * @param iterable $listeners + * @param Event $event + */ + protected function dispatchEvent(iterable $listeners, Event $event): void + { foreach ($listeners as $handler) { - $handler->handle($dispatchEvent); + $handler->handle($event); } } } diff --git a/tests/Dispatcher/AsyncEventQueueHandlerTest.php b/tests/Dispatcher/AsyncEventQueueHandlerTest.php new file mode 100644 index 0000000..da1b238 --- /dev/null +++ b/tests/Dispatcher/AsyncEventQueueHandlerTest.php @@ -0,0 +1,81 @@ + + */ + +declare(strict_types=1); + +namespace Gears\Event\Symfony\Dispatcher\Tests; + +use Gears\Event\Async\Discriminator\EventDiscriminator; +use Gears\Event\Async\EventQueue; +use Gears\Event\Async\QueuedEvent; +use Gears\Event\Symfony\Dispatcher\AsyncEventQueueHandler; +use Gears\Event\Symfony\Dispatcher\Tests\Stub\EventStub; +use PHPUnit\Framework\TestCase; + +class AsyncEventQueueHandlerTest extends TestCase +{ + public function testShouldEnqueue(): void + { + $eventQueue = $this->getMockBuilder(EventQueue::class) + ->disableOriginalConstructor() + ->getMock(); + $eventQueue->expects(static::once()) + ->method('send'); + $eventDiscriminator = $this->getMockBuilder(EventDiscriminator::class) + ->disableOriginalConstructor() + ->getMock(); + $eventDiscriminator->expects(static::once()) + ->method('shouldEnqueue') + ->willReturn(true); + + (new AsyncEventQueueHandler($eventQueue, $eventDiscriminator)) + ->handle(EventStub::instance()); + } + + public function testShouldNotEnqueue(): void + { + $eventQueue = $this->getMockBuilder(EventQueue::class) + ->disableOriginalConstructor() + ->getMock(); + $eventQueue->expects(static::never()) + ->method('send'); + $eventDiscriminator = $this->getMockBuilder(EventDiscriminator::class) + ->disableOriginalConstructor() + ->getMock(); + $eventDiscriminator->expects(static::once()) + ->method('shouldEnqueue') + ->willReturn(false); + + $mockEvent = EventStub::instance(); + + (new AsyncEventQueueHandler($eventQueue, $eventDiscriminator)) + ->handle($mockEvent); + } + + public function testShouldNotEnqueueReceivedCommand(): void + { + $eventQueue = $this->getMockBuilder(EventQueue::class) + ->disableOriginalConstructor() + ->getMock(); + $eventQueue->expects(static::never()) + ->method('send'); + $eventDiscriminator = $this->getMockBuilder(EventDiscriminator::class) + ->disableOriginalConstructor() + ->getMock(); + $eventDiscriminator->expects(static::never()) + ->method('shouldEnqueue'); + + $mockCommand = new QueuedEvent(EventStub::instance()); + + (new AsyncEventQueueHandler($eventQueue, $eventDiscriminator)) + ->handle($mockCommand); + } +} diff --git a/tests/Dispatcher/ContainerAwareDispatcherTest.php b/tests/Dispatcher/ContainerAwareDispatcherTest.php index b919c3a..245bf4e 100644 --- a/tests/Dispatcher/ContainerAwareDispatcherTest.php +++ b/tests/Dispatcher/ContainerAwareDispatcherTest.php @@ -40,36 +40,6 @@ public function testInvalidListener(): void new ContainerAwareDispatcher($containerMock, [\stdClass::class => new \stdClass()]); } - public function testEmptyEvent(): void - { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('Dispatched event cannot be empty'); - - /** @var ContainerInterface $containerMock */ - $containerMock = $this->getMockBuilder(ContainerInterface::class) - ->disableOriginalConstructor() - ->getMock(); - - $eventDispatcher = new ContainerAwareDispatcher($containerMock); - - $eventDispatcher->dispatch(null); - } - - public function testInvalidEvent(): void - { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessageRegExp('/^Dispatched event must implement ".+\\\EventEnvelope", ".+" given$/'); - - /** @var ContainerInterface $containerMock */ - $containerMock = $this->getMockBuilder(ContainerInterface::class) - ->disableOriginalConstructor() - ->getMock(); - - $eventDispatcher = new ContainerAwareDispatcher($containerMock); - - $eventDispatcher->dispatch(new Event()); - } - public function testInvalidHandler(): void { $this->expectException(\RuntimeException::class); @@ -81,7 +51,7 @@ public function testInvalidHandler(): void $containerMock->expects(static::once()) ->method('get') ->with('eventHandler') - ->will(static::returnValue('thisIsNoHandler')); + ->willReturn('thisIsNoHandler'); /** @var ContainerInterface $containerMock */ $eventDispatcher = new ContainerAwareDispatcher($containerMock, [EventStub::class => 'eventHandler']); @@ -105,7 +75,7 @@ public function testEventDispatch(): void $containerMock->expects(static::once()) ->method('get') ->with('eventHandler') - ->will(static::returnValue($eventHandler)); + ->willReturn($eventHandler); /** @var ContainerInterface $containerMock */ $subscriber = new EventSubscriberInterfaceStub([EventStub::class => 'eventHandler']); diff --git a/tests/Dispatcher/DispatcherTest.php b/tests/Dispatcher/DispatcherTest.php index b56cbc2..9f7a826 100644 --- a/tests/Dispatcher/DispatcherTest.php +++ b/tests/Dispatcher/DispatcherTest.php @@ -13,7 +13,10 @@ namespace Gears\Event\Symfony\Dispatcher\Tests; +use Gears\Event\Async\Discriminator\EventDiscriminator; +use Gears\Event\Async\EventQueue; use Gears\Event\EventHandler; +use Gears\Event\Symfony\Dispatcher\AsyncEventQueueHandler; use Gears\Event\Symfony\Dispatcher\Dispatcher; use Gears\Event\Symfony\Dispatcher\EventEnvelope; use Gears\Event\Symfony\Dispatcher\Tests\Stub\EventStub; @@ -60,6 +63,18 @@ public function testEventDispatch(): void { $event = EventStub::instance(); + $eventQueue = $this->getMockBuilder(EventQueue::class) + ->disableOriginalConstructor() + ->getMock(); + $eventDiscriminator = $this->getMockBuilder(EventDiscriminator::class) + ->disableOriginalConstructor() + ->getMock(); + $eventDiscriminator->expects(static::once()) + ->method('shouldEnqueue') + ->willReturn(false); + $asyncEventHandler = new AsyncEventQueueHandler($eventQueue, $eventDiscriminator); + + $eventHandler = $this->getMockBuilder(EventHandler::class) ->disableOriginalConstructor() ->getMock(); @@ -73,7 +88,7 @@ public function testEventDispatch(): void ], ]); - $eventDispatcher = new Dispatcher(); + $eventDispatcher = new Dispatcher([], [$asyncEventHandler]); $eventDispatcher->addSubscriber($subscriber); $eventDispatcher->dispatch(new EventEnvelope($event));