Skip to content
Permalink
Browse files

Changes ReceiverInterface::handle() to get() to give more control to …

…Worker
  • Loading branch information...
weaverryan committed Mar 26, 2019
1 parent 76260e7 commit 2d1f17b592bf54e5042f93211ab96b240d55cc98
Showing with 608 additions and 651 deletions.
  1. +2 −1 src/Symfony/Component/Messenger/CHANGELOG.md
  2. +7 −7 src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
  3. +2 −4 src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
  4. +0 −48 src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php
  5. +46 −0 src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
  6. +30 −73 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
  7. +9 −21 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
  8. +2 −5 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
  9. +0 −84 ...ymfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php
  10. +0 −103 ...mfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php
  11. +0 −49 src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php
  12. +71 −0 src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php
  13. +71 −0 src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php
  14. +44 −0 src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php
  15. +105 −34 src/Symfony/Component/Messenger/Tests/WorkerTest.php
  16. +23 −36 src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
  17. +2 −10 src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
  18. +1 −9 src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
  19. +0 −65 src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php
  20. +0 −66 src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php
  21. +30 −16 src/Symfony/Component/Messenger/Worker.php
  22. +13 −20 ...eiver/StopWhenMemoryUsageIsExceededReceiver.php → Worker/StopWhenMemoryUsageIsExceededWorker.php}
  23. +58 −0 src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php
  24. +59 −0 src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php
  25. +33 −0 src/Symfony/Component/Messenger/WorkerInterface.php
@@ -10,7 +10,8 @@ CHANGELOG
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
`ack()` and `reject()`. Also `receive()` was changed to `get()`
and `stop()` was removed.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
@@ -22,10 +22,10 @@
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -152,20 +152,21 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$bus = new RoutableMessageBus($this->busLocator);
}
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}
if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -183,7 +184,6 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
}
@@ -595,11 +595,9 @@ public function __invoke(DummyMessage $message): void
class DummyReceiver implements ReceiverInterface
{
public function receive(callable $handler): void
public function get(string $queue = null): ?Envelope
{
for ($i = 0; $i < 3; ++$i) {
$handler(new Envelope(new DummyMessage("Dummy $i")));
}
return new Envelope(new DummyMessage('Dummy'));
}
public function stop(): void

This file was deleted.

@@ -0,0 +1,46 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\WorkerInterface;
class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;
public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}
public function run(callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}
if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}
public function stop(): void
{
$this->isStopped = true;
}
public function isStopped(): bool
{
return $this->isStopped;
}
public function countEnvelopesHandled()
{
return $this->envelopesHandled;
}
}
@@ -57,16 +57,14 @@ public function testItSendsAndReceivesMessages()
$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
$envelope = $receiver->get();
$this->assertEquals($first->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
$envelope = $receiver->get();
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());
$this->assertNull($receiver->get());
}
public function testRetryAndDelay()
@@ -82,50 +80,32 @@ public function testRetryAndDelay()
$sender->send($first = new Envelope(new DummyMessage('First')));
$receivedMessages = 0;
$startTime = time();
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
if (null === $envelope) {
// if we have been processing for 4 seconds + have received 2 messages
// then it's safe to say no other messages will be received
if (time() > $startTime + 4 && 2 === $receivedMessages) {
$receiver->stop();
}
return;
}
++$receivedMessages;
// retry the first time
if (1 === $receivedMessages) {
// imitate what Worker does
$envelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($envelope);
$receiver->ack($envelope);
$envelope = $receiver->get();
$newEnvelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($newEnvelope);
$receiver->ack($envelope);
return;
}
if (2 === $receivedMessages) {
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
$envelope = null;
$startTime = time();
// wait for next message, but only for max 3 seconds
while (null === $envelope && $startTime + 3 > time()) {
$envelope = $receiver->get();
}
/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
$receiver->ack($envelope);
/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
return;
}
});
$receiver->ack($envelope);
}
public function testItReceivesSignals()
@@ -175,29 +155,6 @@ public function testItReceivesSignals()
, $process->getOutput());
}
/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
$connection->setup();
$connection->queue()->purge();
$receiver = new AmqpReceiver($connection, $serializer);
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;
@@ -14,9 +14,11 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@@ -26,7 +28,7 @@
*/
class AmqpReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandler()
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
@@ -37,52 +39,38 @@ public function testItSendTheDecodedMessageToTheHandler()
$connection->method('get')->willReturn($amqpEnvelope);
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
$actualEnvelope = $receiver->get();
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->ack($envelope);
$receiver->stop();
});
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->reject($envelope);
$receiver->stop();
});
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}
private function createAMQPEnvelope()
@@ -47,11 +47,8 @@ public function testReceivesMessages()
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());
$transport->stop();
});
$envelope = $transport->get();
$this->assertSame($decodedMessage, $envelope->getMessage());
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

0 comments on commit 2d1f17b

Please sign in to comment.
You can’t perform that action at this time.