Skip to content

Commit

Permalink
[Messenger] Consume a PSR-14 dispatcher for dispatching events
Browse files Browse the repository at this point in the history
  • Loading branch information
derrabus committed Apr 8, 2022
1 parent 86e87a2 commit f500c7a
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 8 deletions.
Expand Up @@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\EventListener;

use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
Expand All @@ -25,7 +26,6 @@
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Tobias Schultze <http://tobion.de>
Expand Down
Expand Up @@ -11,14 +11,14 @@

namespace Symfony\Component\Messenger\Middleware;

use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand Down
Expand Up @@ -13,6 +13,7 @@

use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
Expand All @@ -21,7 +22,6 @@
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

class SendFailedMessageForRetryListenerTest extends TestCase
{
Expand Down
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Middleware;

use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
Expand All @@ -23,7 +24,6 @@
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

class SendMessageMiddlewareTest extends MiddlewareTestCase
{
Expand Down
21 changes: 18 additions & 3 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests;

use PHPUnit\Framework\TestCase;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
Expand Down Expand Up @@ -39,7 +40,6 @@
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Symfony\Contracts\Service\ResetInterface;

/**
Expand All @@ -65,8 +65,23 @@ public function testWorkerDispatchTheReceivedMessage()
return $envelopes[] = $envelope;
});

$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
$dispatcher = new class() implements EventDispatcherInterface {
private StopWorkerOnMessageLimitListener $listener;

public function __construct()
{
$this->listener = new StopWorkerOnMessageLimitListener(2);
}

public function dispatch(object $event): object
{
if ($event instanceof WorkerRunningEvent) {
$this->listener->onWorkerRunning($event);
}

return $event;
}
};

$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
$worker->run();
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/Worker.php
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger;

use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
Expand All @@ -28,7 +29,6 @@
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Symfony\Contracts\Service\ResetInterface;

/**
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/composer.json
Expand Up @@ -35,6 +35,7 @@
},
"conflict": {
"symfony/event-dispatcher": "<5.4",
"symfony/event-dispatcher-contracts": "<2",
"symfony/framework-bundle": "<5.4",
"symfony/http-kernel": "<5.4",
"symfony/serializer": "<5.4"
Expand Down

0 comments on commit f500c7a

Please sign in to comment.