Skip to content

Commit

Permalink
Changes ReceiverInterface::handle() to get() to give more control to …
Browse files Browse the repository at this point in the history
…Worker
  • Loading branch information
weaverryan committed Mar 27, 2019
1 parent 76260e7 commit 357cdb9
Show file tree
Hide file tree
Showing 25 changed files with 609 additions and 651 deletions.
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -10,7 +10,7 @@ CHANGELOG
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
`ack()` and `reject()` and `receive()` was changed to `get()`.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
Expand Down
Expand Up @@ -22,10 +22,10 @@
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand Down Expand Up @@ -152,20 +152,21 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$bus = new RoutableMessageBus($this->busLocator);
}

$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}

if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}

if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
Expand All @@ -183,7 +184,6 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}

$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
}

Expand Down
Expand Up @@ -595,11 +595,9 @@ public function __invoke(DummyMessage $message): void

class DummyReceiver implements ReceiverInterface
{
public function receive(callable $handler): void
public function get(string $queue = null): ?Envelope
{
for ($i = 0; $i < 3; ++$i) {
$handler(new Envelope(new DummyMessage("Dummy $i")));
}
return new Envelope(new DummyMessage('Dummy'));
}

public function stop(): void
Expand Down

This file was deleted.

46 changes: 46 additions & 0 deletions src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
@@ -0,0 +1,46 @@
<?php

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\WorkerInterface;

class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;

public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}

public function run(callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}

if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}

public function stop(): void
{
$this->isStopped = true;
}

public function isStopped(): bool
{
return $this->isStopped;
}

public function countEnvelopesHandled()
{
return $this->envelopesHandled;
}
}
Expand Up @@ -57,16 +57,14 @@ public function testItSendsAndReceivesMessages()
$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
$envelope = $receiver->get();
$this->assertEquals($first->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));

$envelope = $receiver->get();
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());

$this->assertNull($receiver->get());
}

public function testRetryAndDelay()
Expand All @@ -82,50 +80,32 @@ public function testRetryAndDelay()

$sender->send($first = new Envelope(new DummyMessage('First')));

$receivedMessages = 0;
$startTime = time();
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
if (null === $envelope) {
// if we have been processing for 4 seconds + have received 2 messages
// then it's safe to say no other messages will be received
if (time() > $startTime + 4 && 2 === $receivedMessages) {
$receiver->stop();
}

return;
}

++$receivedMessages;

// retry the first time
if (1 === $receivedMessages) {
// imitate what Worker does
$envelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($envelope);
$receiver->ack($envelope);
$envelope = $receiver->get();
$newEnvelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($newEnvelope);
$receiver->ack($envelope);

return;
}

if (2 === $receivedMessages) {
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
$envelope = null;
$startTime = time();
// wait for next message, but only for max 3 seconds
while (null === $envelope && $startTime + 3 > time()) {
$envelope = $receiver->get();
}

/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());

$receiver->ack($envelope);
/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());

return;
}
});
$receiver->ack($envelope);
}

public function testItReceivesSignals()
Expand Down Expand Up @@ -175,29 +155,6 @@ public function testItReceivesSignals()
, $process->getOutput());
}

/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = $this->createSerializer();

$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
$connection->setup();
$connection->queue()->purge();

$receiver = new AmqpReceiver($connection, $serializer);

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;
Expand Down
Expand Up @@ -14,9 +14,11 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
Expand All @@ -26,7 +28,7 @@
*/
class AmqpReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandler()
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
Expand All @@ -37,52 +39,38 @@ public function testItSendTheDecodedMessageToTheHandler()
$connection->method('get')->willReturn($amqpEnvelope);

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
$actualEnvelope = $receiver->get();
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->ack($envelope);
$receiver->stop();
});
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->reject($envelope);
$receiver->stop();
});
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}

private function createAMQPEnvelope()
Expand Down
Expand Up @@ -47,11 +47,8 @@ public function testReceivesMessages()
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);

$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());

$transport->stop();
});
$envelope = $transport->get();
$this->assertSame($decodedMessage, $envelope->getMessage());
}

private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
Expand Down

0 comments on commit 357cdb9

Please sign in to comment.