diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index 9c61ff55..d0c6275f 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -158,7 +158,9 @@ protected function loadConsumers() $this->injectLoggedChannel($definition, $key, $consumer['connection']); } - $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_consumer', $key), $definition); + $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); + $this->container->setDefinition($name, $definition); + $this->addDequeuerAwareCall($consumer['callback'], $name); } } @@ -166,6 +168,7 @@ protected function loadMultipleConsumers() { foreach ($this->config['multiple_consumers'] as $key => $consumer) { $queues = array(); + $callbacks = array(); if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { throw new InvalidConfigurationException( @@ -177,6 +180,7 @@ protected function loadMultipleConsumers() foreach ($consumer['queues'] as $queueName => $queueOptions) { $queues[$queueOptions['name']] = $queueOptions; $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute'); + $callbacks[] = new Reference($queueOptions['callback']); } $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); @@ -213,7 +217,14 @@ protected function loadMultipleConsumers() $this->injectLoggedChannel($definition, $key, $consumer['connection']); } - $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_multiple', $key), $definition); + $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); + $this->container->setDefinition($name, $definition); + if ($consumer['queues_provider']) { + $this->addDequeuerAwareCall($consumer['queues_provider'], $name); + } + foreach ($callbacks as $callback) { + $this->addDequeuerAwareCall($callback, $name); + } } } @@ -231,7 +242,9 @@ protected function loadAnonConsumers() $this->injectLoggedChannel($definition, $key, $anon['connection']); } - $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_anon', $key), $definition); + $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); + $this->container->setDefinition($name, $definition); + $this->addDequeuerAwareCall($anon['callback'], $name); } } @@ -355,4 +368,23 @@ public function getAlias() { return 'old_sound_rabbit_mq'; } + + /** + * Add proper dequeuer aware call + * + * @param string $callback + * @param string $name + */ + protected function addDequeuerAwareCall($callback, $name) + { + if (! $this->container->has($callback)) { + return; + } + + $callbackDefinition = $this->container->findDefinition($callback); + $refClass = new \ReflectionClass($callbackDefinition->getClass()); + if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { + $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name))); + } + } } diff --git a/README.md b/README.md index 4cde14df..c6a0d64f 100644 --- a/README.md +++ b/README.md @@ -516,7 +516,11 @@ All the options of `queues-options` in the consumer are available for each queue Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks. The `queues_provider` is a optional service that dynamically provides queues. -It must implement `QueuesProviderInterface` +It must implement `QueuesProviderInterface`. + +Be aware that queues providers are responsible for the proper calls to `setDequeuer` and that callbacks are callables +(not `ConsumerInterface`). In case service providing queues implements `DequeuerAwareInterface`, a call to +`setDequeuer` is added to the definition of the service with a `DequeuerInterface` currently being a `MultipleConsumer`. ### Anonymous Consumers ###