Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added .gitignore

Declaring routing keys in config

Improved phpunit help.

Fixed setupConsumer  issue.
  • Loading branch information...
commit e509f1ede23e20a18f839f280b8230d18d3bcb98 1 parent 3e7df42
@blaugueux blaugueux authored
View
3  .gitignore
@@ -0,0 +1,3 @@
+composer.lock
+composer.phar
+vendor
View
4 DependencyInjection/Configuration.php
@@ -138,6 +138,10 @@ protected function getQueueConfiguration()
->booleanNode('nowait')->defaultFalse()->end()
->variableNode('arguments')->defaultNull()->end()
->scalarNode('ticket')->defaultNull()->end()
+ ->arrayNode('routing_keys')
+ ->prototype('scalar')->end()
+ ->defaultValue(array())
+ ->end()
->end()
;
}
View
8 README.md
@@ -117,6 +117,14 @@ If you need to use HA Queues then your queue options can be something like this:
Adapt the `arguments` according to your needs.
+If you want to bind queue with specific routingKeys you can declare it in producer or consumer config:
+
+ queue_options:
+ name: "upload-picture"
+ routing_keys:
+ - 'android.#.upload'
+ - 'iphone.upload'
+
## Producers, Consumers, What? ##
In a messaging application, the process sending messages to the broker is called __producer__ while the process receiving those messages is called __consumer__. In your application you will have several of them that you can list under their respective entries in the configuration.
View
52 RabbitMq/BaseAmqp.php
@@ -9,6 +9,9 @@
protected $conn;
protected $ch;
protected $consumerTag;
+ protected $exchangeDeclared = false;
+ protected $queueDeclared = false;
+ protected $routingKey = '';
protected $exchangeOptions = array(
'passive' => false,
@@ -31,8 +34,6 @@
'ticket' => null
);
- protected $routingKey = '';
-
/**
* @param AMQPConnection $conn
* @param AMQPChannel|null $ch
@@ -105,4 +106,51 @@ public function setRoutingKey($routingKey)
{
$this->routingKey = $routingKey;
}
+
+ protected function exchangeDeclare()
+ {
+ $this->ch->exchange_declare(
+ $this->exchangeOptions['name'],
+ $this->exchangeOptions['type'],
+ $this->exchangeOptions['passive'],
+ $this->exchangeOptions['durable'],
+ $this->exchangeOptions['auto_delete'],
+ $this->exchangeOptions['internal'],
+ $this->exchangeOptions['nowait'],
+ $this->exchangeOptions['arguments'],
+ $this->exchangeOptions['ticket']);
+
+ $this->exchangeDeclared = true;
+ }
+
+ protected function queueDeclare()
+ {
+ if (null !== $this->queueOptions['name']) {
+ list($queueName, ,) = $this->ch->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
+ $this->queueOptions['durable'], $this->queueOptions['exclusive'],
+ $this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
+ $this->queueOptions['arguments'], $this->queueOptions['ticket']);
+
+ if(count($this->queueOptions['routing_keys']) > 0) {
+ foreach($this->queueOptions['routing_keys'] as $routingKey) {
+ $this->ch->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
+ }
+ } else {
+ $this->ch->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
+ }
+
+ $this->queueDeclared = true;
+ }
+ }
+
+ protected function setupQueue()
+ {
+ if (!$this->exchangeDeclared) {
+ $this->exchangeDeclare();
+ }
+
+ if (!$this->queueDeclared) {
+ $this->queueDeclare();
+ }
+ }
}
View
19 RabbitMq/BaseConsumer.php
@@ -23,7 +23,7 @@ public function start($msgAmount = 0)
{
$this->target = $msgAmount;
- $this->setUpConsumer();
+ $this->setupConsumer();
while (count($this->ch->callbacks))
{
@@ -36,21 +36,10 @@ public function stopConsuming()
$this->ch->basic_cancel($this->getConsumerTag());
}
- protected function setUpConsumer()
+ protected function setupConsumer()
{
- $this->ch->exchange_declare($this->exchangeOptions['name'], $this->exchangeOptions['type'],
- $this->exchangeOptions['passive'], $this->exchangeOptions['durable'],
- $this->exchangeOptions['auto_delete'], $this->exchangeOptions['internal'],
- $this->exchangeOptions['nowait'], $this->exchangeOptions['arguments'],
- $this->exchangeOptions['ticket']);
-
- list($queueName, ,) = $this->ch->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
- $this->queueOptions['durable'], $this->queueOptions['exclusive'],
- $this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
- $this->queueOptions['arguments'], $this->queueOptions['ticket']);
-
- $this->ch->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
- $this->ch->basic_consume($queueName, $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
+ $this->setupQueue();
+ $this->ch->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
}
protected function maybeStopConsumer()
View
2  RabbitMq/Consumer.php
@@ -12,7 +12,7 @@ public function consume($msgAmount)
{
$this->target = $msgAmount;
- $this->setUpConsumer();
+ $this->setupConsumer();
while (count($this->ch->callbacks))
{
View
42 RabbitMq/Producer.php
@@ -9,8 +9,6 @@
class Producer extends BaseAmqp
{
- protected $exchangeDeclared = false;
- protected $queueDeclared = false;
protected $contentType = 'text/plain';
protected $deliveryMode = 2;
@@ -24,47 +22,9 @@ public function setDeliveryMode($deliveryMode)
$this->deliveryMode = $deliveryMode;
}
- public function exchangeDeclare()
- {
- $this->ch->exchange_declare(
- $this->exchangeOptions['name'],
- $this->exchangeOptions['type'],
- $this->exchangeOptions['passive'],
- $this->exchangeOptions['durable'],
- $this->exchangeOptions['auto_delete'],
- $this->exchangeOptions['internal']);
-
- $this->exchangeDeclared = true;
- }
-
- public function queueDeclare()
- {
- if (null !== $this->queueOptions['name']) {
- list($queueName, ,) = $this->ch->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
- $this->queueOptions['durable'], $this->queueOptions['exclusive'],
- $this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
- $this->queueOptions['arguments'], $this->queueOptions['ticket']);
-
- $this->ch->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
-
- $this->queueDeclared = true;
- }
- }
-
- public function setupProducer()
- {
- if (!$this->exchangeDeclared) {
- $this->exchangeDeclare();
- }
-
- if (!$this->queueDeclared) {
- $this->queueDeclare();
- }
- }
-
public function publish($msgBody, $routingKey = '')
{
- $this->setupProducer();
+ $this->setupQueue();
$msg = new AMQPMessage($msgBody, array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode));
$this->ch->basic_publish($msg, $this->exchangeOptions['name'], $routingKey);
View
3  Tests/DependencyInjection/Fixtures/test.yml
@@ -54,6 +54,9 @@ old_sound_rabbit_mq:
nowait: true
arguments: null
ticket: null
+ routing_keys:
+ - 'android.#.upload'
+ - 'iphone.upload'
callback: foo.callback
default_consumer:
View
18 Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
@@ -146,14 +146,15 @@ public function testFooConsumerDefinition()
'setQueueOptions',
array(
array(
- 'name' => 'foo_queue',
- 'passive' => true,
- 'durable' => false,
- 'exclusive' => true,
- 'auto_delete' => true,
- 'nowait' => true,
- 'arguments' => null,
- 'ticket' => null,
+ 'name' => 'foo_queue',
+ 'passive' => true,
+ 'durable' => false,
+ 'exclusive' => true,
+ 'auto_delete' => true,
+ 'nowait' => true,
+ 'arguments' => null,
+ 'ticket' => null,
+ 'routing_keys' => array('android.#.upload', 'iphone.upload'),
)
)
),
@@ -204,6 +205,7 @@ public function testDefaultConsumerDefinition()
'nowait' => false,
'arguments' => null,
'ticket' => null,
+ 'routing_keys' => array(),
)
)
),
View
2  Tests/bootstrap.php
@@ -10,5 +10,5 @@ function includeIfExists($file)
if ((!$loader = includeIfExists(__DIR__.'/../vendor/autoload.php')) && (!$loader = includeIfExists(__DIR__.'/../../../../../autoload.php'))) {
die('You must set up the project dependencies, run the following commands:'.PHP_EOL.
'curl -s http://getcomposer.org/installer | php'.PHP_EOL.
- 'php composer.phar install'.PHP_EOL);
+ 'php composer.phar install --dev'.PHP_EOL);
}
Please sign in to comment.
Something went wrong with that request. Please try again.