Permalink
Browse files

Added multiple consumer queues provider support

  • Loading branch information...
Sergey Chernecov
Sergey Chernecov committed Nov 25, 2014
1 parent d49ead3 commit 05beea18567dd6f1ecc6516d8f0d42e54709cfb4
View
@@ -1,3 +1,7 @@
- 2014-11-27
* Added interface `OldSound\RabbitMqBundle\Provider\QueuesProviderInterface`
* Added `queues_provider` configuration for multiple consumer
- 2014-07-21
* Added `reconnect` method into `OldSound\RabbitMqBundle\RabbitMq\BaseAmqp`
@@ -139,6 +139,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->booleanNode('global')->defaultFalse()->end()
->end()
->end()
->scalarNode('queues_provider')->defaultNull()->end()
->end()
->end()
->end()
@@ -243,7 +244,7 @@ protected function getQueueConfiguration()
protected function getMultipleQueuesConfiguration()
{
$node = new ArrayNodeDefinition('queues');
$prototypeNode = $node->requiresAtLeastOneElement()->prototype('array');
$prototypeNode = $node->prototype('array');
$this->addQueueNodeConfiguration($prototypeNode);
@@ -173,6 +173,13 @@ protected function loadMultipleConsumers()
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
$queues = array();
if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
throw new InvalidConfigurationException(
"Error on loading $key multiple consumer. " .
"Either 'queues' or 'queues_provider' parameters should be defined."
);
}
foreach ($consumer['queues'] as $queueName => $queueOptions) {
$queues[$queueOptions['name']] = $queueOptions;
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
@@ -185,6 +192,13 @@ protected function loadMultipleConsumers()
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
if ($consumer['queues_provider']) {
$definition->addMethodCall(
'setQueuesProvider',
array(new Reference($consumer['queues_provider']))
);
}
if (array_key_exists('qos_options', $consumer)) {
$definition->addMethodCall('setQosOptions', array(
$consumer['qos_options']['prefetch_size'],
@@ -0,0 +1,18 @@
<?php
namespace OldSound\RabbitMqBundle\Provider;
/**
* Queues provider interface
*
* @author Sergey Chernecov <sergey.chernecov@intexsys.lv>
*/
interface QueuesProviderInterface
{
/**
* Return array of queues
*
* @return array
*/
public function getQueues();
}
View
@@ -487,6 +487,7 @@ multiple_consumers:
upload:
connection: default
exchange_options: {name: 'upload', type: direct}
queues_provider: queues_provider_service
queues:
upload-picture:
name: upload_picture
@@ -508,6 +509,9 @@ All the options of `queues-options` in the consumer are available for each queue
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
The `queues_provider` is a optional service that dynamically provides queues.
It must implement `QueuesProviderInterface`
### Anonymous Consumers ###
Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.
@@ -2,13 +2,34 @@
namespace OldSound\RabbitMqBundle\RabbitMq;
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
use PhpAmqpLib\Message\AMQPMessage;
class MultipleConsumer extends Consumer
{
protected $queues = array();
/**
* Queues provider
*
* @var QueuesProviderInterface
*/
protected $queuesProvider = null;
/**
* QueuesProvider setter
*
* @param QueuesProviderInterface $queuesProvider
*
* @return self
*/
public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
{
$this->queuesProvider = $queuesProvider;
return $this;
}
public function getQueueConsumerTag($queue)
{
return sprintf('%s-%s', $this->getConsumerTag(), $queue);
@@ -21,6 +42,8 @@ public function setQueues(array $queues)
protected function setupConsumer()
{
$this->mergeQueues();
if ($this->autoSetupFabric) {
$this->setupFabric();
}
@@ -72,4 +95,17 @@ public function stopConsuming()
$this->getChannel()->basic_cancel($this->getQueueConsumerTag($name));
}
}
/**
* Merges static and provided queues into one array
*/
protected function mergeQueues()
{
if ($this->queuesProvider) {
$this->queues = array_merge(
$this->queues,
$this->queuesProvider->getQueues()
);
}
}
}
@@ -112,6 +112,7 @@ old_sound_rabbit_mq:
- 'android.upload'
- 'iphone.upload'
callback: foo.multiple_test2.callback
queues_provider: foo.queues_provider
anon_consumers:
foo_anon_consumer:
@@ -320,6 +320,12 @@ public function testMultipleConsumerDefinition()
)
)
)
),
array(
'setQueuesProvider',
array(
new Reference('foo.queues_provider')
)
)
),
$definition->getMethodCalls()
Oops, something went wrong.

0 comments on commit 05beea1

Please sign in to comment.