diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 24641be9f2d37..2a6ad24f8c15a 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -10,7 +10,7 @@ CHANGELOG the transport. See `ConsumeMessagesCommand`. * The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed. * [BC BREAK] 2 new methods were added to `ReceiverInterface`: - `ack()` and `reject()`. + `ack()` and `reject()` and `receive()` was changed to `get()`. * [BC BREAK] Error handling was moved from the receivers into `Worker`. Implementations of `ReceiverInterface::handle()` should now allow all exceptions to be thrown, except for transport diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index af916cb785c1c..63db07f6e96f5 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -22,10 +22,10 @@ use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\RoutableMessageBus; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; use Symfony\Component\Messenger\Worker; +use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; +use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; +use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; /** * @author Samuel Roze @@ -152,20 +152,21 @@ protected function execute(InputInterface $input, OutputInterface $output): void $bus = new RoutableMessageBus($this->busLocator); } + $worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger); $stopsWhen = []; if ($limit = $input->getOption('limit')) { $stopsWhen[] = "processed {$limit} messages"; - $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger); + $worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger); } if ($memoryLimit = $input->getOption('memory-limit')) { $stopsWhen[] = "exceeded {$memoryLimit} of memory"; - $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger); + $worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger); } if ($timeLimit = $input->getOption('time-limit')) { $stopsWhen[] = "been running for {$timeLimit}s"; - $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); + $worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger); } $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); @@ -183,7 +184,6 @@ protected function execute(InputInterface $input, OutputInterface $output): void $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); } - $worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger); $worker->run(); } diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index c6ef2c0cecb59..5d50fd44e6c1e 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -595,11 +595,9 @@ public function __invoke(DummyMessage $message): void class DummyReceiver implements ReceiverInterface { - public function receive(callable $handler): void + public function get(string $queue = null): ?Envelope { - for ($i = 0; $i < 3; ++$i) { - $handler(new Envelope(new DummyMessage("Dummy $i"))); - } + return new Envelope(new DummyMessage('Dummy')); } public function stop(): void diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php deleted file mode 100644 index b1d26934d252c..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php +++ /dev/null @@ -1,48 +0,0 @@ -callable = $callable; - } - - public function receive(callable $handler): void - { - $callable = $this->callable; - $callable($handler); - } - - public function stop(): void - { - } - - public function ack(Envelope $envelope): void - { - ++$this->acknowledgeCount; - } - - public function reject(Envelope $envelope): void - { - ++$this->rejectCount; - } - - public function getAcknowledgeCount(): int - { - return $this->acknowledgeCount; - } - - public function getRejectCount(): int - { - return $this->rejectCount; - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php new file mode 100644 index 0000000000000..be3959cfad6db --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php @@ -0,0 +1,46 @@ +envelopesToReceive = $envelopesToReceive; + } + + public function run(callable $onHandledCallback = null): void + { + foreach ($this->envelopesToReceive as $envelope) { + if (true === $this->isStopped) { + break; + } + + if ($onHandledCallback) { + $onHandledCallback($envelope); + ++$this->envelopesHandled; + } + } + } + + public function stop(): void + { + $this->isStopped = true; + } + + public function isStopped(): bool + { + return $this->isStopped; + } + + public function countEnvelopesHandled() + { + return $this->envelopesHandled; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index 2949ae837fe36..88a0c29c4dfe3 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -57,16 +57,14 @@ public function testItSendsAndReceivesMessages() $sender->send($first = new Envelope(new DummyMessage('First'))); $sender->send($second = new Envelope(new DummyMessage('Second'))); - $receivedMessages = 0; - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { - $expectedEnvelope = 0 === $receivedMessages ? $first : $second; - $this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage()); - $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class)); - - if (2 === ++$receivedMessages) { - $receiver->stop(); - } - }); + $envelope = $receiver->get(); + $this->assertEquals($first->getMessage(), $envelope->getMessage()); + $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class)); + + $envelope = $receiver->get(); + $this->assertEquals($envelope->getMessage(), $envelope->getMessage()); + + $this->assertNull($receiver->get()); } public function testRetryAndDelay() @@ -82,50 +80,32 @@ public function testRetryAndDelay() $sender->send($first = new Envelope(new DummyMessage('First'))); - $receivedMessages = 0; - $startTime = time(); - $receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) { - if (null === $envelope) { - // if we have been processing for 4 seconds + have received 2 messages - // then it's safe to say no other messages will be received - if (time() > $startTime + 4 && 2 === $receivedMessages) { - $receiver->stop(); - } - - return; - } - - ++$receivedMessages; - - // retry the first time - if (1 === $receivedMessages) { - // imitate what Worker does - $envelope = $envelope - ->with(new DelayStamp(2000)) - ->with(new RedeliveryStamp(1, 'not_important')); - $sender->send($envelope); - $receiver->ack($envelope); + $envelope = $receiver->get(); + $newEnvelope = $envelope + ->with(new DelayStamp(2000)) + ->with(new RedeliveryStamp(1, 'not_important')); + $sender->send($newEnvelope); + $receiver->ack($envelope); - return; - } - - if (2 === $receivedMessages) { - // should have a 2 second delay - $this->assertGreaterThanOrEqual($startTime + 2, time()); - // but only a 2 second delay - $this->assertLessThan($startTime + 4, time()); + $envelope = null; + $startTime = time(); + // wait for next message, but only for max 3 seconds + while (null === $envelope && $startTime + 3 > time()) { + $envelope = $receiver->get(); + } - /** @var RedeliveryStamp|null $retryStamp */ - // verify the stamp still exists from the last send - $retryStamp = $envelope->last(RedeliveryStamp::class); - $this->assertNotNull($retryStamp); - $this->assertSame(1, $retryStamp->getRetryCount()); + // should have a 2 second delay + $this->assertGreaterThanOrEqual($startTime + 2, time()); + // but only a 2 second delay + $this->assertLessThan($startTime + 4, time()); - $receiver->ack($envelope); + /** @var RedeliveryStamp|null $retryStamp */ + // verify the stamp still exists from the last send + $retryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertNotNull($retryStamp); + $this->assertSame(1, $retryStamp->getRetryCount()); - return; - } - }); + $receiver->ack($envelope); } public function testItReceivesSignals() @@ -175,29 +155,6 @@ public function testItReceivesSignals() , $process->getOutput()); } - /** - * @runInSeparateProcess - */ - public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() - { - $serializer = $this->createSerializer(); - - $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']); - $connection->setup(); - $connection->queue()->purge(); - - $receiver = new AmqpReceiver($connection, $serializer); - - $receivedMessages = 0; - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { - $this->assertNull($envelope); - - if (2 === ++$receivedMessages) { - $receiver->stop(); - } - }); - } - private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) { $timedOutTime = time() + $timeoutInSeconds; diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index d0c8abfa3564e..ca71a8b837be5 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -14,9 +14,11 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; @@ -26,7 +28,7 @@ */ class AmqpReceiverTest extends TestCase { - public function testItSendTheDecodedMessageToTheHandler() + public function testItReturnsTheDecodedMessageToTheHandler() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) @@ -37,10 +39,8 @@ public function testItSendTheDecodedMessageToTheHandler() $connection->method('get')->willReturn($amqpEnvelope); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); - $receiver->stop(); - }); + $actualEnvelope = $receiver->get(); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } /** @@ -48,20 +48,14 @@ public function testItSendTheDecodedMessageToTheHandler() */ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - + $serializer = $this->createMock(SerializerInterface::class); $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($amqpEnvelope); $connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $receiver->ack($envelope); - $receiver->stop(); - }); + $receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope))); } /** @@ -69,20 +63,14 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() */ public function testItThrowsATransportExceptionIfItCannotRejectMessage() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - + $serializer = $this->createMock(SerializerInterface::class); $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($amqpEnvelope); $connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $receiver->reject($envelope); - $receiver->stop(); - }); + $receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope))); } private function createAMQPEnvelope() diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php index c343c29226369..c029165cb35da 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php @@ -47,11 +47,8 @@ public function testReceivesMessages() $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); $connection->method('get')->willReturn($amqpEnvelope); - $transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) { - $this->assertSame($decodedMessage, $envelope->getMessage()); - - $transport->stop(); - }); + $envelope = $transport->get(); + $this->assertSame($decodedMessage, $envelope->getMessage()); } private function getTransport(SerializerInterface $serializer = null, Connection $connection = null) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php deleted file mode 100644 index 27314e75502e7..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php +++ /dev/null @@ -1,84 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\Receiver; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; - -class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase -{ - /** - * @dataProvider memoryProvider - */ - public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('API'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - if (true === $shouldStop) { - $decoratedReceiver->expects($this->once())->method('stop'); - } else { - $decoratedReceiver->expects($this->never())->method('stop'); - } - - $memoryResolver = function () use ($memoryUsage) { - return $memoryUsage; - }; - - $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver); - $memoryLimitReceiver->receive(function () {}); - } - - public function memoryProvider() - { - yield [2048, 1024, true]; - yield [1024, 1024, false]; - yield [1024, 2048, false]; - } - - public function testReceiverLogsMemoryExceededWhenLoggerIsGiven() - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('API'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->once())->method('stop'); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]); - - $memoryResolver = function () { - return 70 * 1024 * 1024; - }; - - $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver); - $memoryLimitReceiver->receive(function () {}); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php deleted file mode 100644 index 1a303728a94e4..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php +++ /dev/null @@ -1,103 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\Receiver; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; - -class StopWhenMessageCountIsExceededReceiverTest extends TestCase -{ - /** - * @dataProvider countProvider - */ - public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop) - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('First message'))); - $handler(new Envelope(new DummyMessage('Second message'))); - $handler(new Envelope(new DummyMessage('Third message'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - if (true === $shouldStop) { - $decoratedReceiver->expects($this->any())->method('stop'); - } else { - $decoratedReceiver->expects($this->never())->method('stop'); - } - - $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max); - $maximumCountReceiver->receive(function () {}); - } - - public function countProvider() - { - yield [1, true]; - yield [2, true]; - yield [3, true]; - yield [4, false]; - } - - public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage() - { - $callable = function ($handler) { - $handler(null); - $handler(null); - $handler(null); - $handler(null); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->never())->method('stop'); - - $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1); - $maximumCountReceiver->receive(function () {}); - } - - public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven() - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('First message'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->once())->method('stop'); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with( - $this->equalTo('Receiver stopped due to maximum count of {count} exceeded'), - $this->equalTo(['count' => 1]) - ); - - $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger); - $maximumCountReceiver->receive(function () {}); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php deleted file mode 100644 index 472703fe6f7f1..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php +++ /dev/null @@ -1,49 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\Receiver; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; - -class StopWhenTimeLimitIsReachedReceiverTest extends TestCase -{ - /** - * @group time-sensitive - */ - public function testReceiverStopsWhenTimeLimitIsReached() - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('API'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->once())->method('stop'); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]); - - $timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger); - $timeoutReceiver->receive(function () { - sleep(2); - }); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php new file mode 100644 index 0000000000000..1a89e0443d3f8 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; + +class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase +{ + /** + * @dataProvider memoryProvider + */ + public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) + { + $handlerCalledTimes = 0; + $handledCallback = function () use (&$handlerCalledTimes) { + ++$handlerCalledTimes; + }; + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $memoryResolver = function () use ($memoryUsage) { + return $memoryUsage; + }; + + $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver); + $memoryLimitWorker->run($handledCallback); + + // handler should be called exactly 1 time + $this->assertSame($handlerCalledTimes, 1); + $this->assertSame($shouldStop, $decoratedWorker->isStopped()); + } + + public function memoryProvider() + { + yield [2048, 1024, true]; + yield [1024, 1024, false]; + yield [1024, 2048, false]; + } + + public function testWorkerLogsMemoryExceededWhenLoggerIsGiven() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]); + + $memoryResolver = function () { + return 70 * 1024 * 1024; + }; + + $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver); + $memoryLimitWorker->run(); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php new file mode 100644 index 0000000000000..2d50968b09b5f --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; + +class StopWhenMessageCountIsExceededWorkerTest extends TestCase +{ + /** + * @dataProvider countProvider + */ + public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop) + { + $handlerCalledTimes = 0; + $handledCallback = function () use (&$handlerCalledTimes) { + ++$handlerCalledTimes; + }; + // receive 3 real messages + $decoratedWorker = new DummyWorker([ + new Envelope(new DummyMessage('First message')), + null, + new Envelope(new DummyMessage('Second message')), + null, + new Envelope(new DummyMessage('Third message')), + ]); + + $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max); + $maximumCountWorker->run($handledCallback); + + $this->assertSame($shouldStop, $decoratedWorker->isStopped()); + } + + public function countProvider() + { + yield [1, true]; + yield [2, true]; + yield [3, true]; + yield [4, false]; + } + + public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with( + $this->equalTo('Worker stopped due to maximum count of {count} exceeded'), + $this->equalTo(['count' => 1]) + ); + + $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger); + $maximumCountWorker->run(); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php new file mode 100644 index 0000000000000..f3a149000b218 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; + +class StopWhenTimeLimitIsReachedWorkerTest extends TestCase +{ + /** + * @group time-sensitive + */ + public function testWorkerStopsWhenTimeLimitIsReached() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + new Envelope(new \stdClass()), + ]); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]); + + $timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger); + $timeoutWorker->run(function () { + sleep(2); + }); + + $this->assertTrue($decoratedWorker->isStopped()); + $this->assertSame(1, $decoratedWorker->countEnvelopesHandled()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index cd833b223dce2..b574b929f4738 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -23,8 +23,8 @@ use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; class WorkerTest extends TestCase @@ -34,10 +34,10 @@ public function testWorkerDispatchTheReceivedMessage() $apiMessage = new DummyMessage('API'); $ipaMessage = new DummyMessage('IPA'); - $receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) { - $handler(new Envelope($apiMessage)); - $handler(new Envelope($ipaMessage)); - }); + $receiver = new DummyReceiver([ + new Envelope($apiMessage), + new Envelope($ipaMessage), + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); @@ -45,7 +45,12 @@ public function testWorkerDispatchTheReceivedMessage() $bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope); $worker = new Worker($receiver, $bus, 'receiver_id'); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); $this->assertSame(2, $receiver->getAcknowledgeCount()); } @@ -53,9 +58,7 @@ public function testWorkerDispatchTheReceivedMessage() public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() { $envelope = new Envelope(new DummyMessage('API')); - $receiver = new CallbackReceiver(function ($handler) use ($envelope) { - $handler($envelope); - }); + $receiver = new DummyReceiver([$envelope]); $envelope = $envelope->with(new ReceivedStamp()); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); @@ -63,14 +66,19 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); } public function testDispatchCausesRetry() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))); - }); + $receiver = new DummyReceiver([ + new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')), + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); @@ -94,7 +102,12 @@ public function testDispatchCausesRetry() $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); // old message acknowledged $this->assertSame(1, $receiver->getAcknowledgeCount()); @@ -102,9 +115,9 @@ public function testDispatchCausesRetry() public function testDispatchCausesRejectWhenNoRetry() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))); - }); + $receiver = new DummyReceiver([ + new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')), + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); @@ -113,16 +126,21 @@ public function testDispatchCausesRejectWhenNoRetry() $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); $this->assertSame(1, $receiver->getRejectCount()); $this->assertSame(0, $receiver->getAcknowledgeCount()); } public function testDispatchCausesRejectOnUnrecoverableMessage() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new Envelope(new DummyMessage('Hello'))); - }); + $receiver = new DummyReceiver([ + new Envelope(new DummyMessage('Hello')), + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work')); @@ -131,30 +149,38 @@ public function testDispatchCausesRejectOnUnrecoverableMessage() $retryStrategy->expects($this->never())->method('isRetryable'); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); $this->assertSame(1, $receiver->getRejectCount()); } public function testWorkerDoesNotSendNullMessagesToTheBus() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(null); - }); + $receiver = new DummyReceiver([ + null, + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->never())->method('dispatch'); $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); } public function testWorkerDispatchesEventsOnSuccess() { $envelope = new Envelope(new DummyMessage('Hello')); - $receiver = new CallbackReceiver(function ($handler) use ($envelope) { - $handler($envelope); - }); + $receiver = new DummyReceiver([$envelope]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willReturn($envelope); @@ -170,15 +196,18 @@ public function testWorkerDispatchesEventsOnSuccess() ); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); } public function testWorkerDispatchesEventsOnError() { $envelope = new Envelope(new DummyMessage('Hello')); - $receiver = new CallbackReceiver(function ($handler) use ($envelope) { - $handler($envelope); - }); + $receiver = new DummyReceiver([$envelope]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $exception = new \InvalidArgumentException('Oh no!'); @@ -195,6 +224,48 @@ public function testWorkerDispatchesEventsOnError() ); $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher); - $worker->run(); + $worker->run(function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); + } +} + +class DummyReceiver implements ReceiverInterface +{ + private $envelopes; + private $acknowledgeCount = 0; + private $rejectCount = 0; + + public function __construct(array $envelopes) + { + $this->envelopes = $envelopes; + } + + public function get(): ?Envelope + { + return array_shift($this->envelopes); + } + + public function ack(Envelope $envelope): void + { + ++$this->acknowledgeCount; + } + + public function reject(Envelope $envelope): void + { + ++$this->rejectCount; + } + + public function getAcknowledgeCount(): int + { + return $this->acknowledgeCount; + } + + public function getRejectCount(): int + { + return $this->rejectCount; } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 93afbaff5d8b8..b7c134cc0ce35 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -30,7 +30,6 @@ class AmqpReceiver implements ReceiverInterface { private $serializer; private $connection; - private $shouldStop; public function __construct(Connection $connection, SerializerInterface $serializer = null) { @@ -41,38 +40,33 @@ public function __construct(Connection $connection, SerializerInterface $seriali /** * {@inheritdoc} */ - public function receive(callable $handler): void + public function get(): ?Envelope { - while (!$this->shouldStop) { - try { - $amqpEnvelope = $this->connection->get(); - } catch (\AMQPException $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } - - if (null === $amqpEnvelope) { - $handler(null); - - usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000); - - continue; - } - - try { - $envelope = $this->serializer->decode([ - 'body' => $amqpEnvelope->getBody(), - 'headers' => $amqpEnvelope->getHeaders(), - ]); - } catch (MessageDecodingFailedException $exception) { - // invalid message of some type - $this->rejectAmqpEnvelope($amqpEnvelope); - - throw $exception; - } - - $envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope)); - $handler($envelope); + try { + $amqpEnvelope = $this->connection->get(); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); } + + if (null === $amqpEnvelope) { + return null; + } + + try { + $envelope = $this->serializer->decode([ + 'body' => $amqpEnvelope->getBody(), + 'headers' => $amqpEnvelope->getHeaders(), + ]); + } catch (MessageDecodingFailedException $exception) { + // invalid message of some type + $this->rejectAmqpEnvelope($amqpEnvelope); + + throw $exception; + } + + $envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope)); + + return $envelope; } public function ack(Envelope $envelope): void @@ -89,11 +83,6 @@ public function reject(Envelope $envelope): void $this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope)); } - public function stop(): void - { - $this->shouldStop = true; - } - private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void { try { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php index a98c90596634c..4bed9f3b0fc8b 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -37,17 +37,9 @@ public function __construct(Connection $connection, SerializerInterface $seriali /** * {@inheritdoc} */ - public function receive(callable $handler): void + public function get(string $queue = null): ?Envelope { - ($this->receiver ?? $this->getReceiver())->receive($handler); - } - - /** - * {@inheritdoc} - */ - public function stop(): void - { - ($this->receiver ?? $this->getReceiver())->stop(); + return ($this->receiver ?? $this->getReceiver())->get($queue); } /** diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php index 29da741a11f34..302fa85138492 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php @@ -25,21 +25,13 @@ interface ReceiverInterface /** * Receive some messages to the given handler. * - * The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message. - * Note that this envelope can be `null` if the timeout to receive something has expired. - * * If the received message cannot be decoded, the message should not * be retried again (e.g. if there's a queue, it should be removed) * and a MessageDecodingFailedException should be thrown. * * @throws TransportException If there is an issue communicating with the transport */ - public function receive(callable $handler): void; - - /** - * Stop receiving some messages. - */ - public function stop(): void; + public function get(): ?Envelope; /** * Acknowledge that the passed message was handled. diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php deleted file mode 100644 index 8be38d157e8f1..0000000000000 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php +++ /dev/null @@ -1,65 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Receiver; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; - -/** - * @author Samuel Roze - * - * @experimental in 4.2 - */ -class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface -{ - private $decoratedReceiver; - private $maximumNumberOfMessages; - private $logger; - - public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null) - { - $this->decoratedReceiver = $decoratedReceiver; - $this->maximumNumberOfMessages = $maximumNumberOfMessages; - $this->logger = $logger; - } - - public function receive(callable $handler): void - { - $receivedMessages = 0; - - $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) { - $handler($envelope); - - if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Receiver stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedReceiver->stop(); - } - - public function ack(Envelope $envelope): void - { - $this->decoratedReceiver->ack($envelope); - } - - public function reject(Envelope $envelope): void - { - $this->decoratedReceiver->reject($envelope); - } -} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php deleted file mode 100644 index ade088b7dabb1..0000000000000 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php +++ /dev/null @@ -1,66 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Receiver; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; - -/** - * @author Simon Delicata - * - * @experimental in 4.2 - */ -class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface -{ - private $decoratedReceiver; - private $timeLimitInSeconds; - private $logger; - - public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null) - { - $this->decoratedReceiver = $decoratedReceiver; - $this->timeLimitInSeconds = $timeLimitInSeconds; - $this->logger = $logger; - } - - public function receive(callable $handler): void - { - $startTime = microtime(true); - $endTime = $startTime + $this->timeLimitInSeconds; - - $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, $endTime) { - $handler($envelope); - - if ($endTime < microtime(true)) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedReceiver->stop(); - } - - public function ack(Envelope $envelope): void - { - $this->decoratedReceiver->ack($envelope); - } - - public function reject(Envelope $envelope): void - { - $this->decoratedReceiver->reject($envelope); - } -} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 19c5a4b0fafea..6155b197757a5 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -33,7 +33,7 @@ * * @final */ -class Worker +class Worker implements WorkerInterface { private $receiver; private $bus; @@ -41,6 +41,7 @@ class Worker private $retryStrategy; private $eventDispatcher; private $logger; + private $shouldStop = false; public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) { @@ -60,21 +61,33 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu /** * Receive the messages and dispatch them to the bus. */ - public function run() + public function run(callable $onHandledCallback = null): void { if (\function_exists('pcntl_signal')) { pcntl_signal(SIGTERM, function () { - $this->receiver->stop(); + $this->stop(); }); } - $this->receiver->receive(function (?Envelope $envelope) { + $handled = function (?Envelope $envelope) use ($onHandledCallback) { + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + }; + + while (false === $this->shouldStop) { + $envelope = $this->receiver->get(); + if (null === $envelope) { - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } + $handled(null); + + usleep(1000000); - return; + continue; } $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName)); @@ -120,11 +133,9 @@ public function run() $this->receiver->reject($envelope); } - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } + $handled($envelope); - return; + continue; } $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName)); @@ -135,10 +146,13 @@ public function run() $this->receiver->ack($envelope); - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } - }); + $handled($envelope); + } + } + + public function stop(): void + { + $this->shouldStop = true; } private function dispatchEvent(Event $event) diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php similarity index 50% rename from src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php rename to src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php index 09af4673b87b4..9093613fbfbfa 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php +++ b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php @@ -9,26 +9,27 @@ * file that was distributed with this source code. */ -namespace Symfony\Component\Messenger\Transport\Receiver; +namespace Symfony\Component\Messenger\Worker; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; /** * @author Simon Delicata * * @experimental in 4.2 */ -class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface +class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface { - private $decoratedReceiver; + private $decoratedWorker; private $memoryLimit; private $logger; private $memoryResolver; - public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) + public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) { - $this->decoratedReceiver = $decoratedReceiver; + $this->decoratedWorker = $decoratedWorker; $this->memoryLimit = $memoryLimit; $this->logger = $logger; $this->memoryResolver = $memoryResolver ?: function () { @@ -36,16 +37,18 @@ public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLim }; } - public function receive(callable $handler): void + public function run(callable $onHandledCallback = null): void { - $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) { - $handler($envelope); + $this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } $memoryResolver = $this->memoryResolver; if ($memoryResolver() > $this->memoryLimit) { $this->stop(); if (null !== $this->logger) { - $this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]); + $this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]); } } }); @@ -53,16 +56,6 @@ public function receive(callable $handler): void public function stop(): void { - $this->decoratedReceiver->stop(); - } - - public function ack(Envelope $envelope): void - { - $this->decoratedReceiver->ack($envelope); - } - - public function reject(Envelope $envelope): void - { - $this->decoratedReceiver->reject($envelope); + $this->decoratedWorker->stop(); } } diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php new file mode 100644 index 0000000000000..4162027b2f128 --- /dev/null +++ b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Worker; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; + +/** + * @author Samuel Roze + * + * @experimental in 4.2 + */ +class StopWhenMessageCountIsExceededWorker implements WorkerInterface +{ + private $decoratedWorker; + private $maximumNumberOfMessages; + private $logger; + + public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null) + { + $this->decoratedWorker = $decoratedWorker; + $this->maximumNumberOfMessages = $maximumNumberOfMessages; + $this->logger = $logger; + } + + public function run(callable $onHandledCallback = null): void + { + $receivedMessages = 0; + + $this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + + if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]); + } + } + }); + } + + public function stop(): void + { + $this->decoratedWorker->stop(); + } +} diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php new file mode 100644 index 0000000000000..9692172cc4c2f --- /dev/null +++ b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php @@ -0,0 +1,59 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Worker; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; + +/** + * @author Simon Delicata + * + * @experimental in 4.2 + */ +class StopWhenTimeLimitIsReachedWorker implements WorkerInterface +{ + private $decoratedWorker; + private $timeLimitInSeconds; + private $logger; + + public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null) + { + $this->decoratedWorker = $decoratedWorker; + $this->timeLimitInSeconds = $timeLimitInSeconds; + $this->logger = $logger; + } + + public function run(callable $onHandledCallback = null): void + { + $startTime = microtime(true); + $endTime = $startTime + $this->timeLimitInSeconds; + + $this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback, $endTime) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + + if ($endTime < microtime(true)) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]); + } + } + }); + } + + public function stop(): void + { + $this->decoratedWorker->stop(); + } +} diff --git a/src/Symfony/Component/Messenger/WorkerInterface.php b/src/Symfony/Component/Messenger/WorkerInterface.php new file mode 100644 index 0000000000000..6a5800078b929 --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerInterface.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * Interface for Workers that handle messages from transports. + * + * @author Ryan Weaver + */ +interface WorkerInterface +{ + /** + * Receive the messages and dispatch them to the bus. + * + * The $onHandledCallback will be passed the Envelope that was just + * handled or null if nothing was handled. + */ + public function run(callable $onHandledCallback = null): void; + + /** + * Stop receiving messages. + */ + public function stop(): void; +}