Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Producer queue #54

Merged
merged 4 commits into from

3 participants

@mvrhov

Queue can optionally be defined from the producer
Fixes #43

RabbitMq/Producer.php
@@ -22,6 +22,15 @@ public function exchangeDeclare()
$this->exchangeOptions['internal']);
$this->declared = true;
+
+ if (('' != $this->queueOptions['name']) && (null !== $this->queueOptions['name'])) {
@videlalvaro Owner

why you don't allow to declare queues with an empty name?

@videlalvaro Owner

Also this code is declaring a queue, so it doesn't belong inside the exchangeDeclare method. So it needs it's own queue declare method. Also the exchange declare method uses a flag called "declare" to avoid doing that call for every message publish call. That means your code must use a flag as well to prevent declaring the queue every time (else publishing messages will become very very slow and introduce a regression).

So in that case "declare" must be split in two variables. One called perhaps "exchange_declared" and the other "queue_declared" or something like that.

@mvrhov
mvrhov added a note

The empty name.. I thought that wasn't possible.
As the code declaring queue. Shouldn't the if from publish method really be moved into the exchangeDeclare method?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@videlalvaro
Owner

Any news on this PR?

@mvrhov

Sorry, have been on vacation.

@mvrhov

Ia have this in production and except the problem I fixed about an hour ago this seems to work. Any ETA on merging this?

@chEbba

Any news here?

@mvrhov

@videlalvaro do you need anything else from me before this gets merged?

@videlalvaro videlalvaro merged commit b475469 into videlalvaro:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 28, 2012
  1. @mvrhov
  2. @mvrhov

    fixed undefined property

    mvrhov authored
Commits on Jul 16, 2012
  1. @mvrhov
Commits on Aug 1, 2012
  1. @mvrhov
This page is out of date. Refresh to see the latest.
View
1  DependencyInjection/Configuration.php
@@ -41,6 +41,7 @@ public function getConfigTreeBuilder()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
+ ->append($this->getQueueConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->end()
View
7 DependencyInjection/OldSoundRabbitMqExtension.php
@@ -29,6 +29,8 @@ class OldSoundRabbitMqExtension extends Extension
private $channelIds = array();
+ private $config = array();
+
public function load(array $configs, ContainerBuilder $container)
{
$this->container = $container;
@@ -81,6 +83,11 @@ protected function loadProducers()
foreach ($this->config['producers'] as $key => $producer) {
$definition = new Definition('%old_sound_rabbit_mq.producer.class%');
$definition->addMethodCall('setExchangeOptions', array($producer['exchange_options']));
+ //this producer doesn't define a queue
+ if (!isset($producer['queue_options'])) {
+ $producer['queue_options']['name'] = null;
+ }
+ $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
$this->injectConnection($definition, $producer['connection']);
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $producer['connection']);
View
33 RabbitMq/Producer.php
@@ -9,7 +9,8 @@
class Producer extends BaseAmqp
{
- protected $declared = false;
+ protected $exchangeDeclared = false;
+ protected $queueDeclared = false;
public function exchangeDeclare()
{
@@ -21,14 +22,38 @@ public function exchangeDeclare()
$this->exchangeOptions['auto_delete'],
$this->exchangeOptions['internal']);
- $this->declared = true;
+ $this->exchangeDeclared = true;
}
- public function publish($msgBody, $routingKey = '')
+ public function queueDeclare()
+ {
+ if (null !== $this->queueOptions['name']) {
+ list($queueName, ,) = $this->ch->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
+ $this->queueOptions['durable'], $this->queueOptions['exclusive'],
+ $this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
+ $this->queueOptions['arguments'], $this->queueOptions['ticket']);
+
+ $this->ch->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
+
+ $this->queueDeclared = true;
+ }
+ }
+
+ public function setupProducer()
{
- if (!$this->declared) {
+ if (!$this->exchangeDeclared) {
$this->exchangeDeclare();
}
+
+ if (!$this->queueDeclared) {
+ $this->queueDeclare();
+ }
+ }
+
+ public function publish($msgBody, $routingKey = '')
+ {
+ $this->setupProducer();
+
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$this->ch->basic_publish($msg, $this->exchangeOptions['name'], $routingKey);
}
View
16 Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
@@ -63,6 +63,14 @@ public function testFooProducerDefinition()
'ticket' => null,
)
)
+ ),
+ array(
+ 'setQueueOptions',
+ array(
+ array(
+ 'name' => null,
+ )
+ )
)
),
$definition->getMethodCalls()
@@ -94,6 +102,14 @@ public function testDefaultProducerDefinition()
'ticket' => null,
)
)
+ ),
+ array(
+ 'setQueueOptions',
+ array(
+ array(
+ 'name' => null,
+ )
+ )
)
),
$definition->getMethodCalls()
Something went wrong with that request. Please try again.