Skip to content
Permalink
Browse files

feature #30008 [messenger] Adds a stamp to provide a routing key on m…

…essage publishing (G15N, sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[messenger] Adds a stamp to provide a routing key on message publishing

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #29950
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

Adds a stamp allowing to set a `routing_key` at `MessageBus::dispatch()` level.

```php
$message = (new Envelope('message'))->with(new RoutingKeyStamp('routing_key'));
$bus->dispatch($message);
```

Commits
-------

a515635 Simply code and rename "configuration" to "options"
3151b54 [messenger] AMQP configurable routing key & multiple queues
  • Loading branch information...
sroze committed Apr 6, 2019
2 parents fe7363f + a515635 commit e6eb43bd6887c6302efea4d5479215468735be1a
@@ -18,6 +18,13 @@ CHANGELOG
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`
have a new argument: the queue name.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count.
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
@@ -71,6 +78,7 @@ CHANGELOG
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.

4.2.0
-----
@@ -49,7 +49,7 @@ public function testItSendsAndReceivesMessages()
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
@@ -79,7 +79,7 @@ public function testRetryAndDelay()
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
@@ -126,7 +126,7 @@ public function testItReceivesSignals()
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
$sender->send(new Envelope(new DummyMessage('Hello')));
@@ -173,7 +173,7 @@ public function testItCountsMessagesInQueue()
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();
$sender = new AmqpSender($connection, $serializer);
@@ -182,7 +182,7 @@ public function testItCountsMessagesInQueue()
$sender->send(new Envelope(new DummyMessage('Third')));
sleep(1); // give amqp a moment to have the messages ready
$this->assertSame(3, $connection->countMessagesInQueue());
$this->assertSame(3, $connection->countMessagesInQueues());
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
@@ -0,0 +1,31 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/**
* @requires extension amqp
*/
class AmqpReceivedStampTest extends TestCase
{
public function testStamp()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$stamp = new AmqpReceivedStamp($amqpEnvelope, 'queueName');
$this->assertSame($amqpEnvelope, $stamp->getAmqpEnvelope());
$this->assertSame('queueName', $stamp->getQueueName());
}
}
@@ -36,7 +36,8 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$receiver = new AmqpReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
@@ -52,11 +53,12 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope, 'queueName')->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
}
/**
@@ -67,11 +69,12 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, 'queueName', AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
}
private function createAMQPEnvelope()
@@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
class AmqpRoutingKeyStampTest extends TestCase
{
public function testStamp()
{
$stamp = new AmqpRoutingKeyStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
}
}
@@ -14,6 +14,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -38,6 +39,21 @@ public function testItSendsTheEncodedMessage()
$sender->send($envelope);
}
public function testItSendsTheEncodedMessageUsingARoutingKey()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
public function testItSendsTheEncodedMessageWithoutHeaders()
{
$envelope = new Envelope(new DummyMessage('Oy'));
@@ -45,7 +45,8 @@ public function testReceivesMessages()
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

0 comments on commit e6eb43b

Please sign in to comment.
You can’t perform that action at this time.