Skip to content
This repository has been archived by the owner on Jul 28, 2022. It is now read-only.

Commit

Permalink
Added per-queue message TTL in AMQPBackend (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiroko authored and greg0ire committed Dec 22, 2016
1 parent 1484b93 commit 96eafb2
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 55 deletions.
25 changes: 18 additions & 7 deletions Backend/AMQPBackend.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,34 @@ class AMQPBackend implements BackendInterface
*/
protected $deadLetterRoutingKey;

/**
* @var null|int
*/
protected $ttl;

/**
* @var AMQPBackendDispatcher
*/
protected $dispatcher = null;

/**
* @param string $exchange
* @param string $queue
* @param string $recover
* @param string $key
* @param string $deadLetterExchange
* @param string $deadLetterRoutingKey
* @param string $exchange
* @param string $queue
* @param string $recover
* @param string $key
* @param string $deadLetterExchange
* @param string $deadLetterRoutingKey
* @param null|int $ttl
*/
public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null)
public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null)
{
$this->exchange = $exchange;
$this->queue = $queue;
$this->recover = $recover;
$this->key = $key;
$this->deadLetterExchange = $deadLetterExchange;
$this->deadLetterRoutingKey = $deadLetterRoutingKey;
$this->ttl = $ttl;

if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
throw new \RuntimeException('Please install php-amqplib/php-amqplib dependency');
Expand Down Expand Up @@ -107,6 +114,10 @@ public function initialize()
}
}

if ($this->ttl !== null) {
$args['x-message-ttl'] = array('I', $this->ttl);
}

/*
* name: $queue
* passive: false
Expand Down
15 changes: 13 additions & 2 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,19 @@ protected function getQueueNode()
If is set, failed messages will be rejected and sent to this exchange
EOF;

$deadLetterRoutingKey = <<<'EOF'
$deadLetterRoutingKeyInfo = <<<'EOF'
Only used by RabbitMQ
If set, failed messages will be routed to the queue using this key by dead-letter-exchange,
otherwise it will be requeued to the original queue if `dead-letter-exchange` is set.
If set, the queue must be configured with this key as `routing_key`.
EOF;

$ttlInfo = <<<'EOF'
Only used by RabbitMQ
Defines the per-queue message time-to-live (milliseconds)
EOF;

$typesInfo = <<<'EOF'
Expand Down Expand Up @@ -260,7 +266,12 @@ protected function getQueueNode()
->defaultValue(null)
->end()
->scalarNode('dead_letter_routing_key')
->info($deadLetterRoutingKey)
->info($deadLetterRoutingKeyInfo)
->defaultValue(null)
->end()
->integerNode('ttl')
->info($ttlInfo)
->min(0)
->defaultValue(null)
->end()

Expand Down
8 changes: 6 additions & 2 deletions DependencyInjection/SonataNotificationExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
'recover' => false,
'dead_letter_exchange' => null,
'dead_letter_routing_key' => null,
'ttl' => null,
));
}

Expand Down Expand Up @@ -375,7 +376,8 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
$queue['recover'],
$queue['routing_key'],
$queue['dead_letter_exchange'],
$queue['dead_letter_routing_key']
$queue['dead_letter_routing_key'],
$queue['ttl']
);

$amqBackends[$pos] = array(
Expand Down Expand Up @@ -412,10 +414,11 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
* @param string $key
* @param string $deadLetterExchange
* @param string $deadLetterRoutingKey
* @param int|null $ttl
*
* @return string
*/
protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null)
protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null)
{
$id = 'sonata.notification.backend.rabbitmq.'.$this->amqpCounter++;

Expand All @@ -428,6 +431,7 @@ protected function createAMQPBackend(ContainerBuilder $container, $exchange, $na
$key,
$deadLetterExchange,
$deadLetterRoutingKey,
$ttl,
)
);
$definition->setPublic(false);
Expand Down
141 changes: 97 additions & 44 deletions Tests/Backend/AMQPBackendTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class AMQPBackendTest extends \PHPUnit_Framework_TestCase
const KEY = 'message.type.foo';
const DEAD_LETTER_EXCHANGE = 'dlx';
const DEAD_LETTER_ROUTING_KEY = 'message.type.dl';
const TTL = 60000;

protected function setUp()
{
Expand All @@ -32,27 +33,30 @@ public function testInitializeWithNoDeadLetterExchangeAndNoDeadLetterRoutingKey(

$channelMock->expects($this->once())
->method('exchange_declare')
->with($this->equalTo(self::EXCHANGE),
$this->equalTo('direct'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
->with(
$this->equalTo(self::EXCHANGE),
$this->equalTo('direct'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
);
$channelMock->expects($this->once())
->method('queue_declare')
->with($this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array())
->with(
$this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array())
);
$channelMock->expects($this->once())
->method('queue_bind')
->with($this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
->with(
$this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
);

$backend->initialize();
Expand Down Expand Up @@ -82,15 +86,16 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey()
);
$channelMock->expects($this->once())
->method('queue_declare')
->with($this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array(
'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE),
))
->with(
$this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array(
'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE),
))
);
$channelMock->expects($this->exactly(2))
->method('queue_bind')
Expand All @@ -116,39 +121,87 @@ public function testInitializeWithDeadLetterExchangeAndDeadLetterRoutingKey()

$channelMock->expects($this->once())
->method('exchange_declare')
->with($this->equalTo(self::EXCHANGE),
$this->equalTo('direct'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
->with(
$this->equalTo(self::EXCHANGE),
$this->equalTo('direct'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
);
$channelMock->expects($this->once())
->method('queue_declare')
->with($this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array(
'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE),
'x-dead-letter-routing-key' => array('S', self::DEAD_LETTER_ROUTING_KEY),
))
->with(
$this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array(
'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE),
'x-dead-letter-routing-key' => array('S', self::DEAD_LETTER_ROUTING_KEY),
))
);
$channelMock->expects($this->once())
->method('queue_bind')
->with($this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
->with(
$this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
);

$backend->initialize();
}

public function testInitializeWithTTL()
{
list($backend, $channelMock) = $this->getBackendAndChannelMock(false, null, null, self::TTL);

$channelMock->expects($this->once())
->method('exchange_declare')
->with(
$this->equalTo(self::EXCHANGE),
$this->equalTo('direct'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
);
$channelMock->expects($this->once())
->method('queue_declare')
->with(
$this->equalTo(self::QUEUE),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array(
'x-message-ttl' => array('I', self::TTL),
))
);
$channelMock->expects($this->once())
->method('queue_bind')
->with(
$this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
);

$backend->initialize();
}

protected function getBackendAndChannelMock($recover = false, $deadLetterExchange = null, $deadLetterRoutingKey = null)
protected function getBackendAndChannelMock($recover = false, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null)
{
$mock = $this->getMockBuilder('\Sonata\NotificationBundle\Backend\AMQPBackend')
->setConstructorArgs(array(self::EXCHANGE, self::QUEUE, $recover, self::KEY, $deadLetterExchange, $deadLetterRoutingKey))
->setConstructorArgs(array(
self::EXCHANGE,
self::QUEUE,
$recover,
self::KEY,
$deadLetterExchange,
$deadLetterRoutingKey,
$ttl,
))
->setMethods(array('getIterator'))
->getMock();

Expand Down

0 comments on commit 96eafb2

Please sign in to comment.