Skip to content

Commit

Permalink
Update Extension to use DequeuerAwareInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
bburnichon committed Jan 20, 2015
1 parent 26fdd2d commit 1d4fc0f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
38 changes: 35 additions & 3 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Expand Up @@ -158,14 +158,17 @@ protected function loadConsumers()
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
}

$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_consumer', $key), $definition);
$name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
$this->container->setDefinition($name, $definition);
$this->addDequeuerAwareCall($consumer['callback'], $name);
}
}

protected function loadMultipleConsumers()
{
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
$queues = array();
$callbacks = array();

if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
throw new InvalidConfigurationException(
Expand All @@ -177,6 +180,7 @@ protected function loadMultipleConsumers()
foreach ($consumer['queues'] as $queueName => $queueOptions) {
$queues[$queueOptions['name']] = $queueOptions;
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
$callbacks[] = new Reference($queueOptions['callback']);
}

$definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
Expand Down Expand Up @@ -213,7 +217,14 @@ protected function loadMultipleConsumers()
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
}

$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_multiple', $key), $definition);
$name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
$this->container->setDefinition($name, $definition);
if ($consumer['queues_provider']) {
$this->addDequeuerAwareCall($consumer['queues_provider'], $name);
}
foreach ($callbacks as $callback) {
$this->addDequeuerAwareCall($callback, $name);
}
}
}

Expand All @@ -231,7 +242,9 @@ protected function loadAnonConsumers()
$this->injectLoggedChannel($definition, $key, $anon['connection']);
}

$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_anon', $key), $definition);
$name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
$this->container->setDefinition($name, $definition);
$this->addDequeuerAwareCall($anon['callback'], $name);
}
}

Expand Down Expand Up @@ -355,4 +368,23 @@ public function getAlias()
{
return 'old_sound_rabbit_mq';
}

/**
* Add proper dequeuer aware call
*
* @param string $callback
* @param string $name
*/
protected function addDequeuerAwareCall($callback, $name)
{
if (! $this->container->has($callback)) {
return;
}

$callbackDefinition = $this->container->findDefinition($callback);
$refClass = new \ReflectionClass($callbackDefinition->getClass());
if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
$callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
}
}
}
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -516,7 +516,11 @@ All the options of `queues-options` in the consumer are available for each queue
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
The `queues_provider` is a optional service that dynamically provides queues.
It must implement `QueuesProviderInterface`
It must implement `QueuesProviderInterface`.
Be aware that queues providers are responsible for the proper calls to `setDequeuer` and that callbacks are callables
(not `ConsumerInterface`). In case service providing queues implements `DequeuerAwareInterface`, a call to
`setDequeuer` is added to the definition of the service with a `DequeuerInterface` currently being a `MultipleConsumer`.
### Anonymous Consumers ###
Expand Down

0 comments on commit 1d4fc0f

Please sign in to comment.