Skip to content

Commit

Permalink
feature #30913 [Messenger] Uses an AmqpStamp to provide flags and a…
Browse files Browse the repository at this point in the history
…ttributes (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Uses an `AmqpStamp` to provide flags and attributes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #28885
| License       | MIT
| Doc PR        | ø

Using the `AmqpStamp` you can configure the flags and any attribute (such as `delivery_mode`).

Commits
-------

56fa574 Uses an `AmqpStamp` to provide flags and attributes
  • Loading branch information
fabpot committed Apr 6, 2019
2 parents 8a62892 + 56fa574 commit 3de3e4e
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 50 deletions.
4 changes: 2 additions & 2 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ CHANGELOG
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand All @@ -41,14 +41,14 @@ public function testItSendsTheEncodedMessage()

public function testItSendsTheEncodedMessageUsingARoutingKey()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);

$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;

/**
* @requires extension amqp
*/
class AmqpStampTest extends TestCase
{
public function testRoutingKeyOnly()
{
$stamp = new AmqpStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
$this->assertSame([], $stamp->getAttributes());
}

public function testFlagsAndAttributes()
{
$stamp = new AmqpStamp(null, AMQP_DURABLE, ['delivery_mode' => 'unknown']);
$this->assertNull($stamp->getRoutingKey());
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;

/**
Expand Down Expand Up @@ -430,7 +432,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, 'routing_key');
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
}

public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
Expand Down Expand Up @@ -477,7 +479,27 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, 'routing_key');
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}

public function testItCanPublishWithCustomFlagsAndAttributes()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with(
'body',
'routing_key',
AMQP_IMMEDIATE,
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ public function send(Envelope $envelope): Envelope
}

try {
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
$this->connection->publish(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$envelope->last(AmqpStamp::class)
);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,35 @@

/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.3
*/
final class AmqpRoutingKeyStamp implements StampInterface
final class AmqpStamp implements StampInterface
{
private $routingKey;
private $flags;
private $attributes;

public function __construct(string $routingKey)
public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = [])
{
$this->routingKey = $routingKey;
$this->flags = $flags;
$this->attributes = $attributes;
}

public function getRoutingKey(): string
public function getRoutingKey(): ?string
{
return $this->routingKey;
}

public function getFlags(): int
{
return $this->flags;
}

public function getAttributes(): array
{
return $this->attributes;
}
}
34 changes: 23 additions & 11 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ private static function normalizeQueueArguments(array $arguments): array
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay, $routingKey);
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);

return;
}
Expand All @@ -183,13 +183,14 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
$this->setup();
}

$this->exchange()->publish(
$this->publishOnExchange(
$this->exchange(),
$body,
$routingKey ?? $this->getDefaultPublishRoutingKey(),
AMQP_NOPARAM,
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
[
'headers' => $headers,
]
],
$amqpStamp
);
}

Expand All @@ -206,19 +207,30 @@ public function countMessagesInQueues(): int
/**
* @throws \AMQPException
*/
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
if ($this->shouldSetup()) {
$this->setupDelay($delay, $exchangeRoutingKey);
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
}

$this->getDelayExchange()->publish(
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay),
AMQP_NOPARAM,
[
'headers' => $headers,
]
],
$amqpStamp
);
}

private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
{
$exchange->publish(
$body,
$routingKey,
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
);
}

Expand Down

0 comments on commit 3de3e4e

Please sign in to comment.