Permalink
Browse files

Add multiple queues consumers

  • Loading branch information...
joelwurtz committed Apr 22, 2014
1 parent 05059f8 commit d85233f6b578e43cbc92fb899fb7d97bc0b8fcde
@@ -0,0 +1,19 @@
<?php
namespace OldSound\RabbitMqBundle\Command;
class MultipleConsumerCommand extends BaseConsumerCommand
{
protected function configure()
{
parent::configure();
$this->setName('rabbitmq:multiple-consumer');
}
protected function getConsumerService()
{
return 'old_sound_rabbit_mq.%s_multiple';
}
}
@@ -20,6 +20,7 @@ public function process(ContainerBuilder $container)
'old_sound_rabbit_mq.base_amqp',
'old_sound_rabbit_mq.producer',
'old_sound_rabbit_mq.consumer',
'old_sound_rabbit_mq.multi_consumer',
'old_sound_rabbit_mq.anon_consumer',
'old_sound_rabbit_mq.rpc_client',
'old_sound_rabbit_mq.rpc_server',
@@ -17,11 +17,31 @@ public function getConfigTreeBuilder()
{
$tree = new TreeBuilder();
$tree->root('old_sound_rabbit_mq')
$rootNode = $tree->root('old_sound_rabbit_mq');
$rootNode
->children()
->booleanNode('debug')->defaultValue('%kernel.debug%')->end()
->booleanNode('enable_collector')->defaultValue(false)->end()
->booleanNode('sandbox')->defaultValue(false)->end()
->end()
;
$this->addConnections($rootNode);
$this->addProducers($rootNode);
$this->addConsumers($rootNode);
$this->addMultipleConsumers($rootNode);
$this->addAnonConsumers($rootNode);
$this->addRpcClients($rootNode);
$this->addRpcServers($rootNode);
return $tree;
}
protected function addConnections(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('connections')
->useAttributeAsKey('key')
->canBeUnset()
@@ -36,7 +56,14 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
// producers
->end()
;
}
protected function addProducers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('producers')
->canBeUnset()
->useAttributeAsKey('key')
@@ -50,7 +77,14 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
// consumers
->end()
;
}
protected function addConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('consumers')
->canBeUnset()
->useAttributeAsKey('key')
@@ -73,6 +107,61 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->end()
;
}
protected function addMultipleConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('multiple_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('idle_timeout')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->append($this->getMultipleQueuesConfiguration())
->arrayNode('qos_options')
->canBeUnset()
->children()
->scalarNode('prefetch_size')->defaultValue(0)->end()
->scalarNode('prefetch_count')->defaultValue(0)->end()
->booleanNode('global')->defaultFalse()->end()
->end()
->end()
->end()
->end()
->end()
;
}
protected function addAnonConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('anon_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->end()
->end()
->end()
->end()
;
}
protected function addRpcClients(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('rpc_clients')
->canBeUnset()
->useAttributeAsKey('key')
@@ -83,6 +172,14 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->end()
;
}
protected function addRpcServers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('rpc_servers')
->canBeUnset()
->useAttributeAsKey('key')
@@ -101,31 +198,19 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->arrayNode('anon_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->end()
->end()
->end()
->end()
;
return $tree;
}
protected function getExchangeConfiguration()
{
$node = new ArrayNodeDefinition('exchange_options');
return $node
->isRequired()
->children()
->scalarNode('name')->end()
->scalarNode('type')->end()
->scalarNode('name')->isRequired()->end()
->scalarNode('type')->isRequired()->end()
->booleanNode('passive')->defaultValue(false)->end()
->booleanNode('durable')->defaultValue(true)->end()
->booleanNode('auto_delete')->defaultValue(false)->end()
@@ -141,9 +226,32 @@ protected function getQueueConfiguration()
{
$node = new ArrayNodeDefinition('queue_options');
return $node
$this->addQueueNodeConfiguration($node);
return $node;
}
protected function getMultipleQueuesConfiguration()
{
$node = new ArrayNodeDefinition('queues');
$prototypeNode = $node->requiresAtLeastOneElement()->prototype('array');
$this->addQueueNodeConfiguration($prototypeNode);
$prototypeNode->children()
->scalarNode('callback')->isRequired()->end()
->end();
$prototypeNode->end();
return $node;
}
protected function addQueueNodeConfiguration(ArrayNodeDefinition $node)
{
$node
->children()
->scalarNode('name')->end()
->scalarNode('name')->isRequired()->end()
->booleanNode('passive')->defaultFalse()->end()
->booleanNode('durable')->defaultTrue()->end()
->booleanNode('exclusive')->defaultFalse()->end()
@@ -2,6 +2,8 @@
namespace OldSound\RabbitMqBundle\DependencyInjection;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
@@ -46,6 +48,7 @@ public function load(array $configs, ContainerBuilder $container)
$this->loadConnections();
$this->loadProducers();
$this->loadConsumers();
$this->loadMultipleConsumers();
$this->loadAnonConsumers();
$this->loadRpcClients();
$this->loadRpcServers();
@@ -149,6 +152,47 @@ protected function loadConsumers()
}
}
protected function loadMultipleConsumers()
{
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
$queues = array();
foreach ($consumer['queues'] as $queueName => $queueOptions) {
$queues[$queueOptions['name']] = $queueOptions;
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
}
$definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
$definition
->addTag('old_sound_rabbit_mq.base_amqp')
->addTag('old_sound_rabbit_mq.multi_consumer')
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
if (array_key_exists('qos_options', $consumer)) {
$definition->addMethodCall('setQosOptions', array(
$consumer['qos_options']['prefetch_size'],
$consumer['qos_options']['prefetch_count'],
$consumer['qos_options']['global']
));
}
if(isset($consumer['idle_timeout'])) {
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
}
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_multiple', $key), $definition);
}
}
protected function loadAnonConsumers()
{
foreach ($this->config['anon_consumers'] as $key => $anon) {
View
@@ -466,6 +466,42 @@ public function indexAction($name)
Is very similar to the previous example, we just have an extra `addRequest` call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array.
### Multiple Consumers ###
It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing
with many evolutions (forget to add a line in your supervisord configuration ?). This is also useful for small queues as you may not want to have as many workers as queues, and want to regroup
some tasks together without losing flexibility and separation principle.
Multiple consumers allow you to handle this use case by listening to multiple queues on the same consumer.
Here is how you can set a consumer with multiple queues :
```yaml
multiple_consumers:
upload:
connection: default
exchange_options: {name: 'upload', type: direct}
queues:
upload-picture:
name: upload_picture
callback: upload_picture_service
routing:
- picture
upload-video:
name: upload_video
callback: upload_video_service
routing:
- video
upload-stats:
name: upload_stats
callback: upload_stats
```
The callback is now specified under each queues and must implement the `ConsumerInterface` like a simple consumer.
All the options of `queues-options` in the consumer are available for each queue.
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
### Anonymous Consumers ###
Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.
View
@@ -59,9 +59,13 @@ public function purge()
public function processMessage(AMQPMessage $msg)
{
$processFlag = call_user_func($this->callback, $msg);
$this->handleProcessMessage($msg, $processFlag);
}
protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
{
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
// Reject and requeue message to RabbitMQ
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
@@ -0,0 +1,7 @@
<?php
namespace OldSound\RabbitMqBundle\RabbitMq\Exception;
class QueueNotFoundException extends \RuntimeException
{
}
Oops, something went wrong.

0 comments on commit d85233f

Please sign in to comment.