Skip to content

Commit

Permalink
updates code to use latest php-amqplib
Browse files Browse the repository at this point in the history
The code also addressed issues php-amqplib#7 and php-amqplib#11
  • Loading branch information
videlalvaro committed Nov 24, 2011
1 parent 40d7ac3 commit 0ec9d57
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 18 deletions.
4 changes: 3 additions & 1 deletion DependencyInjection/OldSoundRabbitMqExtension.php
Expand Up @@ -17,7 +17,7 @@ public function load(array $configs, ContainerBuilder $container)
$locator = new FileLocator(array(__DIR__.'/../Resources/config'));
$loader = new YamlFileLoader($container, $locator);
$loader->load('rabbitmq.yml');

$config = $this->mergeConfig($configs);

foreach ($config['connections'] as $name => $connection)
Expand Down Expand Up @@ -147,6 +147,8 @@ protected function loadProducer(array $producer, ContainerBuilder $container)
$this->injectConnection($producerDef, $producer);

$producerDef->addMethodCall('setExchangeOptions', array($producer['exchange_options']));
//TODO add configuration option that allows to not do this all the time.
$producerDef->addMethodCall('exchangeDeclare');

$container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $producer['alias']), $producerDef);
}
Expand Down
3 changes: 2 additions & 1 deletion RabbitMq/AnonConsumer.php
Expand Up @@ -3,10 +3,11 @@
namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use PhpAmqpLib\Connection\AMQPConnection;

class AnonConsumer extends Consumer
{
public function __construct(\AMQPConnection $conn)
public function __construct(AMQPConnection $conn)
{
parent::__construct($conn);

Expand Down
12 changes: 7 additions & 5 deletions RabbitMq/BaseAmqp.php
@@ -1,6 +1,8 @@
<?php

namespace OldSound\RabbitMqBundle\RabbitMq;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPConnection;

abstract class BaseAmqp
{
Expand Down Expand Up @@ -32,11 +34,11 @@ abstract class BaseAmqp
protected $routingKey = '';

/**
* @param \AMQPConnection $conn
* @param \AMQPChannel|null $ch
* @param AMQPConnection $conn
* @param AMQPChannel|null $ch
* @param null $consumerTag
*/
public function __construct(\AMQPConnection $conn, \AMQPChannel $ch = null, $consumerTag = null)
public function __construct(AMQPConnection $conn, AMQPChannel $ch = null, $consumerTag = null)
{
$this->conn = $conn;

Expand All @@ -60,10 +62,10 @@ public function __destruct()
}

/**
* @param \AMQPChannel $ch
* @param AMQPChannel $ch
* @return void
*/
public function setChannel(\AMQPChannel $ch)
public function setChannel(AMQPChannel $ch)
{
$this->ch = $ch;
}
Expand Down
7 changes: 4 additions & 3 deletions RabbitMq/Consumer.php
Expand Up @@ -3,13 +3,14 @@
namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
use PhpAmqpLib\Message\AMQPMessage;

class Consumer extends BaseConsumer
{
protected $target;

protected $consumed = 0;

public function consume($msgAmount)
{
$this->target = $msgAmount;
Expand All @@ -22,7 +23,7 @@ public function consume($msgAmount)
}
}

public function processMessage(\AMQPMessage $msg)
public function processMessage(AMQPMessage $msg)
{
try
{
Expand All @@ -37,7 +38,7 @@ public function processMessage(\AMQPMessage $msg)
}
}

protected function maybeStopConsumer(\AMQPMessage $msg)
protected function maybeStopConsumer(AMQPMessage $msg)
{
if ($this->target == -1) {
return;
Expand Down
39 changes: 37 additions & 2 deletions RabbitMq/Producer.php
Expand Up @@ -3,13 +3,48 @@
namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;


class Producer extends BaseAmqp
{
proteced $producerExchangeOptions = array(
'durable' => false,
'auto_delete' => true,
'internal' => false
);

public function __construct(AMQPConnection $conn, AMQPChannel $ch = null, $consumerTag = null)
{
parent::__construct($conn, $ch, $consumerTag);

}

public function setExchangeOptions(array $options = array())
{
$this->exchangeOptions = array_merge(
$this->exchangeOptions,
$this->producerExchangeOptions
);

parent::setExchangeOptions($options);
}

public function exchangeDeclare()
{
$this->ch->exchange_declare(
$this->exchangeOptions['name'],
$this->exchangeOptions['type'],
$this->exchangeOptions['durable'],
$this->exchangeOptions['auto_delete'],
$this->exchangeOptions['internal']);
}

public function publish($msgBody, $routingKey = '')
{
$this->ch->exchange_declare($this->exchangeOptions['name'], $this->exchangeOptions['type'], false, true, false);
$msg = new \AMQPMessage($msgBody, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$this->ch->basic_publish($msg, $this->exchangeOptions['name'], $routingKey);
}
}
7 changes: 4 additions & 3 deletions RabbitMq/RpcClient.php
Expand Up @@ -3,6 +3,7 @@
namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient extends BaseAmqp
{
Expand All @@ -21,7 +22,7 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = '
throw new \InvalidArgumentException('You must provide a $requestId');
}

$msg = new \AMQPMessage($msgBody, array('content_type' => 'text/plain',
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain',
'reply_to' => $this->queueName,
'correlation_id' => $requestId));

Expand All @@ -32,7 +33,7 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = '

public function getReplies()
{
$this->ch->basic_consume($this->queueName, $this->queueName, false, true, false, false, array($this, 'processMessage'));
$this->ch->basic_consume($this->queueName, '', false, true, false, false, array($this, 'processMessage'));

while (count($this->replies) < $this->requests)
{
Expand All @@ -43,7 +44,7 @@ public function getReplies()
return $this->replies;
}

public function processMessage(\AMQPMessage $msg)
public function processMessage(AMQPMessage $msg)
{
$this->replies[$msg->get('correlation_id')] = unserialize($msg->body);
}
Expand Down
5 changes: 3 additions & 2 deletions RabbitMq/RpcServer.php
Expand Up @@ -3,6 +3,7 @@
namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
use PhpAmqpLib\Message\AMQPMessage;

class RpcServer extends BaseConsumer
{
Expand All @@ -22,7 +23,7 @@ public function start()
}
}

public function processMessage(\AMQPMessage $msg)
public function processMessage(AMQPMessage $msg)
{
try
{
Expand All @@ -38,7 +39,7 @@ public function processMessage(\AMQPMessage $msg)

protected function sendReply($result, $client, $correlationId)
{
$reply = new \AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
$reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
$this->ch->basic_publish($reply, '', $client);
}
}
2 changes: 1 addition & 1 deletion Resources/config/rabbitmq.yml
@@ -1,5 +1,5 @@
parameters:
old_sound_rabbit_mq.connection.class: AMQPConnection
old_sound_rabbit_mq.connection.class: PhpAmqpLib\Connection\AMQPConnection
old_sound_rabbit_mq.producer.class: OldSound\RabbitMqBundle\RabbitMq\Producer
old_sound_rabbit_mq.consumer.class: OldSound\RabbitMqBundle\RabbitMq\Consumer
old_sound_rabbit_mq.anon_consumer.class: OldSound\RabbitMqBundle\RabbitMq\AnonConsumer
Expand Down

0 comments on commit 0ec9d57

Please sign in to comment.