Skip to content
This repository has been archived by the owner on Jul 28, 2022. It is now read-only.

Commit

Permalink
Merge 3.x into master
Browse files Browse the repository at this point in the history
  • Loading branch information
SonataCI committed Dec 21, 2016
2 parents 166fc01 + 7327545 commit 765fa78
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 61 deletions.
18 changes: 17 additions & 1 deletion Backend/AMQPBackend.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class AMQPBackend implements BackendInterface
*/
protected $deadLetterExchange;

/**
* @var null|string
*/
protected $deadLetterRoutingKey;

/**
* @var AMQPBackendDispatcher
*/
Expand All @@ -63,14 +68,16 @@ class AMQPBackend implements BackendInterface
* @param string $recover
* @param string $key
* @param string $deadLetterExchange
* @param string $deadLetterRoutingKey
*/
public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null)
public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null)
{
$this->exchange = $exchange;
$this->queue = $queue;
$this->recover = $recover;
$this->key = $key;
$this->deadLetterExchange = $deadLetterExchange;
$this->deadLetterRoutingKey = $deadLetterRoutingKey;

if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
throw new \RuntimeException('Please install php-amqplib/php-amqplib dependency');
Expand All @@ -94,6 +101,10 @@ public function initialize()

if ($this->deadLetterExchange !== null) {
$args['x-dead-letter-exchange'] = array('S', $this->deadLetterExchange);

if ($this->deadLetterRoutingKey !== null) {
$args['x-dead-letter-routing-key'] = array('S', $this->deadLetterRoutingKey);
}
}

/*
Expand All @@ -117,6 +128,11 @@ public function initialize()
$this->getChannel()->exchange_declare($this->exchange, 'direct', false, true, false);

$this->getChannel()->queue_bind($this->queue, $this->exchange, $this->key);

if ($this->deadLetterExchange !== null && $this->deadLetterRoutingKey === null) {
$this->getChannel()->exchange_declare($this->deadLetterExchange, 'direct', false, true, false);
$this->getChannel()->queue_bind($this->queue, $this->deadLetterExchange, $this->key);
}
}

/**
Expand Down
9 changes: 9 additions & 0 deletions Backend/AMQPBackendDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class AMQPBackendDispatcher extends QueueBackendDispatcher
*/
protected $connection;

protected $backendsInitialized = false;

/**
* @param array $settings
* @param array $queues
Expand Down Expand Up @@ -79,6 +81,13 @@ public function getChannel()
*/
public function getBackend($type)
{
if (!$this->backendsInitialized) {
foreach ($this->backends as $backend) {
$backend['backend']->initialize();
}
$this->backendsInitialized = true;
}

$default = null;

if (count($this->queues) === 0) {
Expand Down
13 changes: 13 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ protected function getQueueNode()
Only used by RabbitMQ
If is set, failed messages will be rejected and sent to this exchange
EOF;

$deadLetterRoutingKey = <<<'EOF'
Only used by RabbitMQ
If set, failed messages will be routed to the queue using this key by dead-letter-exchange,
otherwise it will be requeued to the original queue if `dead-letter-exchange` is set.
If set, the queue must be configured with this key as `routing_key`.
EOF;

$typesInfo = <<<'EOF'
Expand Down Expand Up @@ -250,6 +259,10 @@ protected function getQueueNode()
->info($deadLetterExchangeInfo)
->defaultValue(null)
->end()
->scalarNode('dead_letter_routing_key')
->info($deadLetterRoutingKey)
->defaultValue(null)
->end()

// Database configuration (Doctrine)
->arrayNode('types')
Expand Down
88 changes: 84 additions & 4 deletions DependencyInjection/SonataNotificationExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
{
$queues = $config['queues'];
$connection = $config['backends']['rabbitmq']['connection'];
$exchange = $config['backends']['rabbitmq']['exchange'];
$baseExchange = $config['backends']['rabbitmq']['exchange'];
$amqBackends = array();

if (count($queues) == 0) {
Expand All @@ -329,9 +329,21 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
'routing_key' => '',
'recover' => false,
'dead_letter_exchange' => null,
'dead_letter_routing_key' => null,
));
}

$deadLetterRoutingKeys = $this->getQueuesParameters('dead_letter_routing_key', $queues);
$routingKeys = $this->getQueuesParameters('routing_key', $queues);

foreach ($deadLetterRoutingKeys as $key) {
if (!in_array($key, $routingKeys)) {
throw new \RuntimeException(sprintf(
'You must configure the queue having the routing_key "%s" same as dead_letter_routing_key', $key
));
}
}

$declaredQueues = array();

$defaultSet = false;
Expand All @@ -342,7 +354,29 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)

$declaredQueues[] = $queue['queue'];

$id = $this->createAMQPBackend($container, $exchange, $queue['queue'], $queue['recover'], $queue['routing_key'], $queue['dead_letter_exchange']);
if ($queue['dead_letter_routing_key']) {
if (is_null($queue['dead_letter_exchange'])) {
throw new \RuntimeException(
'dead_letter_exchange must be configured when dead_letter_routing_key is set'
);
}
}

if (in_array($queue['routing_key'], $deadLetterRoutingKeys)) {
$exchange = $this->getAMQPDeadLetterExchangeByRoutingKey($queue['routing_key'], $queues);
} else {
$exchange = $baseExchange;
}

$id = $this->createAMQPBackend(
$container,
$exchange,
$queue['queue'],
$queue['recover'],
$queue['routing_key'],
$queue['dead_letter_exchange'],
$queue['dead_letter_routing_key']
);

$amqBackends[$pos] = array(
'type' => $queue['routing_key'],
Expand Down Expand Up @@ -377,17 +411,63 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
* @param string $recover
* @param string $key
* @param string $deadLetterExchange
* @param string $deadLetterRoutingKey
*
* @return string
*/
protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null)
protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null)
{
$id = 'sonata.notification.backend.rabbitmq.'.$this->amqpCounter++;

$definition = new Definition('Sonata\NotificationBundle\Backend\AMQPBackend', array($exchange, $name, $recover, $key, $deadLetterExchange));
$definition = new Definition(
'Sonata\NotificationBundle\Backend\AMQPBackend',
array(
$exchange,
$name,
$recover,
$key,
$deadLetterExchange,
$deadLetterRoutingKey,
)
);
$definition->setPublic(false);
$container->setDefinition($id, $definition);

return $id;
}

/**
* @param string $name
* @param array $queues
*
* @return string[]
*/
private function getQueuesParameters($name, array $queues)
{
$params = array_unique(array_map(function ($q) use ($name) {
return $q[$name];
}, $queues));

$idx = array_search(null, $params);
if ($idx !== false) {
unset($params[$idx]);
}

return $params;
}

/**
* @param string $key
* @param array $queues
*
* @return string
*/
private function getAMQPDeadLetterExchangeByRoutingKey($key, array $queues)
{
foreach ($queues as $queue) {
if ($queue['dead_letter_routing_key'] === $key) {
return $queue['dead_letter_exchange'];
}
}
}
}
119 changes: 119 additions & 0 deletions Tests/Backend/AMQPBackendDispatcherTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
<?php

/*
* This file is part of the Sonata Project package.
*
* (c) Thomas Rabaix <thomas.rabaix@sonata-project.org>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Sonata\NotificationBundle\Tests\Backend;

use Sonata\NotificationBundle\Backend\AMQPBackendDispatcher;

class AMQPBackendDispatcherTest extends \PHPUnit_Framework_TestCase
{
protected function setUp()
{
if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
$this->markTestSkipped('AMQP Lib not installed');
}
}

public function testQueue()
{
$mock = $this->getMockQueue('foo', 'message.type.foo', $this->once());
$mock2 = $this->getMockQueue('bar', 'message.type.foo', $this->never());
$fooBackend = array('type' => 'message.type.foo', 'backend' => $mock);
$barBackend = array('type' => 'message.type.bar', 'backend' => $mock2);
$backends = array($fooBackend, $barBackend);
$dispatcher = $this->getDispatcher($backends);
$dispatcher->createAndPublish('message.type.foo', array());
}

public function testDefaultQueue()
{
$mock = $this->getMockQueue('foo', 'message.type.foo', $this->once());
$fooBackend = array('type' => 'default', 'backend' => $mock);
$dispatcher = $this->getDispatcher(array($fooBackend));
$dispatcher->createAndPublish('some.other.type', array());
}

public function testDefaultQueueNotFound()
{
$mock = $this->getMockQueue('foo', 'message.type.foo', $this->never());
$fooBackend = array('type' => 'message.type.foo', 'backend' => $mock);
$dispatcher = $this->getDispatcher(array($fooBackend));

$this->setExpectedException('\Sonata\NotificationBundle\Exception\BackendNotFoundException');
$dispatcher->createAndPublish('some.other.type', array());
}

public function testInvalidQueue()
{
$mock = $this->getMockQueue('foo', 'message.type.bar');
$dispatcher = $this->getDispatcher(
array(array('type' => 'bar', 'backend' => $mock)),
array(array('queue' => 'foo', 'routing_key' => 'message.type.bar'))
);

$this->setExpectedException('\Sonata\NotificationBundle\Exception\BackendNotFoundException');
$dispatcher->createAndPublish('message.type.bar', array());
}

public function testAllQueueInitializeOnce()
{
$queues = array(
array('queue' => 'foo', 'routing_key' => 'message.type.foo'),
array('queue' => 'bar', 'routing_key' => 'message.type.bar'),
array('queue' => 'baz', 'routing_key' => 'message.type.baz'),
);

$backends = array();

foreach ($queues as $queue) {
$mock = $this->getMockQueue($queue['queue'], $queue['routing_key']);
$mock->expects($this->once())
->method('initialize');
$backends[] = array('type' => $queue['routing_key'], 'backend' => $mock);
}

$dispatcher = $this->getDispatcher($backends, $queues);

$dispatcher->createAndPublish('message.type.foo', array());
$dispatcher->createAndPublish('message.type.foo', array());
}

protected function getMockQueue($queue, $type, $called = null)
{
$methods = array('createAndPublish', 'initialize');
$args = array('', 'foo', false, 'message.type.foo');
$mock = $this->getMockBuilder('Sonata\NotificationBundle\Backend\AMQPBackend')
->setConstructorArgs($args)
->setMethods($methods)
->getMock();

if ($called !== null) {
$mock->expects($called)
->method('createAndPublish')
;
}

return $mock;
}

protected function getDispatcher(array $backends, array $queues = array(array('queue' => 'foo', 'routing_key' => 'message.type.foo')))
{
$settings = array(
'host' => 'foo',
'port' => 'port',
'user' => 'user',
'pass' => 'pass',
'vhost' => '/',
);

return new AMQPBackendDispatcher($settings, $queues, 'default', $backends);
}
}

0 comments on commit 765fa78

Please sign in to comment.