Skip to content

Commit

Permalink
bug #32631 [Messenger] expire delay queue and fix auto_setup logic (T…
Browse files Browse the repository at this point in the history
…obion)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] expire delay queue and fix auto_setup logic

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | yes
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | #32588
| License       | MIT
| Doc PR        |

Tested successfully

Commits
-------

7aee83a [Messenger] expire delay queue and fix auto_setup logic
  • Loading branch information
Tobion committed Jul 28, 2019
2 parents 6003608 + 7aee83a commit d7d6d92
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 63 deletions.
Expand Up @@ -308,7 +308,7 @@ public function testSetChannelPrefetchWhenSetup()
);

// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->any())->method('isConnected')->willReturn(true);

$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
Expand All @@ -317,30 +317,57 @@ public function testSetChannelPrefetchWhenSetup()
$connection->setup();
}

public function testItDelaysTheMessage()
public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$amqpQueue = $this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpQueue->expects($this->once())->method('declareQueue');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('publish');

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}

public function testItDelaysTheMessage()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
Expand All @@ -358,23 +385,19 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
Expand All @@ -383,9 +406,10 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()

$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
Expand Down Expand Up @@ -467,23 +491,19 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->createMock(\AMQPExchange::class),
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
Expand All @@ -492,9 +512,10 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument

$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => 'routing_key',
]);
Expand Down
Expand Up @@ -45,10 +45,7 @@ public function send(Envelope $envelope): Envelope

/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = 0;
if (null !== $delayStamp) {
$delay = $delayStamp->getDelay();
}
$delay = $delayStamp ? $delayStamp->getDelay() : 0;

$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
Expand Down
64 changes: 33 additions & 31 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Expand Up @@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%',
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
Expand Down Expand Up @@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * prefetch_count: set channel prefetch count
*/
Expand Down Expand Up @@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array
}

/**
* @param int $delay The delay in milliseconds
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
$this->clearWhenDisconnected();

if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);

return;
}

if ($this->shouldSetup()) {
$this->setup();
$this->setupExchangeAndQueues();
}

$this->publishOnExchange(
Expand Down Expand Up @@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);

if ($this->shouldSetup()) {
$this->setupDelay($delay, $routingKey);
}
$this->setupDelay($delay, $routingKey);

$this->publishOnExchange(
$this->getDelayExchange(),
Expand All @@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string

private function setupDelay(int $delay, ?string $routingKey)
{
if (!$this->channel()->isConnected()) {
$this->clear();
if ($this->shouldSetup()) {
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
}

$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to
$this->getDelayExchange()->declareExchange();

$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
}

Expand Down Expand Up @@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey)
));
$queue->setArguments([
'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000,
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
Expand All @@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern']
$this->connectionOptions['delay']['queue_name_pattern']
);
}

Expand All @@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
*/
public function get(string $queueName): ?\AMQPEnvelope
{
$this->clearWhenDisconnected();

if ($this->shouldSetup()) {
$this->setup();
$this->setupExchangeAndQueues();
}

try {
Expand All @@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope
} catch (\AMQPQueueException $e) {
if (404 === $e->getCode() && $this->shouldSetup()) {
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
$this->setup();
$this->setupExchangeAndQueues();

return $this->get();
}
Expand All @@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ

public function setup(): void
{
if (!$this->channel()->isConnected()) {
$this->clear();
}
$this->setupExchangeAndQueues();
$this->getDelayExchange()->declareExchange();
}

private function setupExchangeAndQueues(): void
{
$this->exchange()->declareExchange();

foreach ($this->queuesOptions as $queueName => $queueConfig) {
Expand Down Expand Up @@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange
return $this->amqpExchange;
}

private function clear(): void
private function clearWhenDisconnected(): void
{
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
if (!$this->channel()->isConnected()) {
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
}
}

private function shouldSetup(): bool
Expand Down

0 comments on commit d7d6d92

Please sign in to comment.