Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Command/MultipleConsumerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace OldSound\RabbitMqBundle\Command;

class MultipleConsumerCommand extends BaseConsumerCommand
{

protected function configure()
{
parent::configure();

$this->setName('rabbitmq:multiple-consumer');
}

protected function getConsumerService()
{
return 'old_sound_rabbit_mq.%s_multiple';
}
}
1 change: 1 addition & 0 deletions DependencyInjection/Compiler/RegisterPartsPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public function process(ContainerBuilder $container)
'old_sound_rabbit_mq.base_amqp',
'old_sound_rabbit_mq.producer',
'old_sound_rabbit_mq.consumer',
'old_sound_rabbit_mq.multi_consumer',
'old_sound_rabbit_mq.anon_consumer',
'old_sound_rabbit_mq.rpc_client',
'old_sound_rabbit_mq.rpc_server',
Expand Down
148 changes: 128 additions & 20 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,31 @@ public function getConfigTreeBuilder()
{
$tree = new TreeBuilder();

$tree->root('old_sound_rabbit_mq')
$rootNode = $tree->root('old_sound_rabbit_mq');

$rootNode
->children()
->booleanNode('debug')->defaultValue('%kernel.debug%')->end()
->booleanNode('enable_collector')->defaultValue(false)->end()
->booleanNode('sandbox')->defaultValue(false)->end()
->end()
;

$this->addConnections($rootNode);
$this->addProducers($rootNode);
$this->addConsumers($rootNode);
$this->addMultipleConsumers($rootNode);
$this->addAnonConsumers($rootNode);
$this->addRpcClients($rootNode);
$this->addRpcServers($rootNode);

return $tree;
}

protected function addConnections(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('connections')
->useAttributeAsKey('key')
->canBeUnset()
Expand All @@ -36,7 +56,14 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
// producers
->end()
;
}

protected function addProducers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('producers')
->canBeUnset()
->useAttributeAsKey('key')
Expand All @@ -50,7 +77,14 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
// consumers
->end()
;
}

protected function addConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('consumers')
->canBeUnset()
->useAttributeAsKey('key')
Expand All @@ -73,6 +107,61 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->end()
;
}

protected function addMultipleConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('multiple_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('idle_timeout')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->append($this->getMultipleQueuesConfiguration())
->arrayNode('qos_options')
->canBeUnset()
->children()
->scalarNode('prefetch_size')->defaultValue(0)->end()
->scalarNode('prefetch_count')->defaultValue(0)->end()
->booleanNode('global')->defaultFalse()->end()
->end()
->end()
->end()
->end()
->end()
;
}

protected function addAnonConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('anon_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->end()
->end()
->end()
->end()
;
}

protected function addRpcClients(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('rpc_clients')
->canBeUnset()
->useAttributeAsKey('key')
Expand All @@ -83,6 +172,14 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->end()
;
}

protected function addRpcServers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('rpc_servers')
->canBeUnset()
->useAttributeAsKey('key')
Expand All @@ -101,31 +198,19 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->arrayNode('anon_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->end()
->end()
->end()
->end()
;

return $tree;
}

protected function getExchangeConfiguration()
{
$node = new ArrayNodeDefinition('exchange_options');

return $node
->isRequired()
->children()
->scalarNode('name')->end()
->scalarNode('type')->end()
->scalarNode('name')->isRequired()->end()
->scalarNode('type')->isRequired()->end()
->booleanNode('passive')->defaultValue(false)->end()
->booleanNode('durable')->defaultValue(true)->end()
->booleanNode('auto_delete')->defaultValue(false)->end()
Expand All @@ -141,9 +226,32 @@ protected function getQueueConfiguration()
{
$node = new ArrayNodeDefinition('queue_options');

return $node
$this->addQueueNodeConfiguration($node);

return $node;
}

protected function getMultipleQueuesConfiguration()
{
$node = new ArrayNodeDefinition('queues');
$prototypeNode = $node->requiresAtLeastOneElement()->prototype('array');

$this->addQueueNodeConfiguration($prototypeNode);

$prototypeNode->children()
->scalarNode('callback')->isRequired()->end()
->end();

$prototypeNode->end();

return $node;
}

protected function addQueueNodeConfiguration(ArrayNodeDefinition $node)
{
$node
->children()
->scalarNode('name')->end()
->scalarNode('name')->isRequired()->end()
->booleanNode('passive')->defaultFalse()->end()
->booleanNode('durable')->defaultTrue()->end()
->booleanNode('exclusive')->defaultFalse()->end()
Expand Down
44 changes: 44 additions & 0 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace OldSound\RabbitMqBundle\DependencyInjection;

use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
Expand Down Expand Up @@ -46,6 +48,7 @@ public function load(array $configs, ContainerBuilder $container)
$this->loadConnections();
$this->loadProducers();
$this->loadConsumers();
$this->loadMultipleConsumers();
$this->loadAnonConsumers();
$this->loadRpcClients();
$this->loadRpcServers();
Expand Down Expand Up @@ -149,6 +152,47 @@ protected function loadConsumers()
}
}

protected function loadMultipleConsumers()
{
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
$queues = array();

foreach ($consumer['queues'] as $queueName => $queueOptions) {
$queues[$queueOptions['name']] = $queueOptions;
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
}

$definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
$definition
->addTag('old_sound_rabbit_mq.base_amqp')
->addTag('old_sound_rabbit_mq.multi_consumer')
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));

if (array_key_exists('qos_options', $consumer)) {
$definition->addMethodCall('setQosOptions', array(
$consumer['qos_options']['prefetch_size'],
$consumer['qos_options']['prefetch_count'],
$consumer['qos_options']['global']
));
}

if(isset($consumer['idle_timeout'])) {
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}

$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
}

$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_multiple', $key), $definition);
}
}

protected function loadAnonConsumers()
{
foreach ($this->config['anon_consumers'] as $key => $anon) {
Expand Down
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,42 @@ public function indexAction($name)

Is very similar to the previous example, we just have an extra `addRequest` call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array.

### Multiple Consumers ###

It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing
with many evolutions (forget to add a line in your supervisord configuration?). This is also useful for small queues as you may not want to have as many workers as queues, and want to regroup
some tasks together without losing flexibility and separation principle.

Multiple consumers allow you to handle this use case by listening to multiple queues on the same consumer.

Here is how you can set a consumer with multiple queues:

```yaml
multiple_consumers:
upload:
connection: default
exchange_options: {name: 'upload', type: direct}
queues:
upload-picture:
name: upload_picture
callback: upload_picture_service
routing_keys:
- picture
upload-video:
name: upload_video
callback: upload_video_service
routing_keys:
- video
upload-stats:
name: upload_stats
callback: upload_stats
```

The callback is now specified under each queues and must implement the `ConsumerInterface` like a simple consumer.
All the options of `queues-options` in the consumer are available for each queue.

Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.

### Anonymous Consumers ###

Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.
Expand Down
6 changes: 5 additions & 1 deletion RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ public function purge()

public function processMessage(AMQPMessage $msg)
{

$processFlag = call_user_func($this->callback, $msg);

$this->handleProcessMessage($msg, $processFlag);
}

protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
{
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
// Reject and requeue message to RabbitMQ
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Expand Down
7 changes: 7 additions & 0 deletions RabbitMq/Exception/QueueNotFoundException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace OldSound\RabbitMqBundle\RabbitMq\Exception;

class QueueNotFoundException extends \RuntimeException
{
}
Loading