Permalink
Browse files

Fixed CS and remove unused 'use'.

  • Loading branch information...
1 parent 8feb6af commit 219caf8bc056bfac6cd08f176631e71a01966b81 @blaugueux blaugueux committed Feb 25, 2013
@@ -2,9 +2,6 @@
namespace OldSound\RabbitMqBundle\Command;
-use Symfony\Component\Console\Input\InputInterface;
-use Symfony\Component\Console\Output\OutputInterface;
-
class AnonConsumerCommand extends BaseConsumerCommand
{
@@ -23,4 +20,4 @@ protected function getConsumerService()
{
return 'old_sound_rabbit_mq.%s_anon';
}
-}
+}
@@ -6,4 +6,4 @@
abstract class BaseRabbitMqCommand extends Command
{
-}
+}
@@ -2,9 +2,6 @@
namespace OldSound\RabbitMqBundle\Command;
-use Symfony\Component\Console\Input\InputInterface;
-use Symfony\Component\Console\Output\OutputInterface;
-
class ConsumerCommand extends BaseConsumerCommand
{
@@ -2,7 +2,6 @@
namespace OldSound\RabbitMqBundle\Command;
-use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -2,7 +2,6 @@
namespace OldSound\RabbitMqBundle\Command;
-use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -43,4 +42,4 @@ protected function execute(InputInterface $input, OutputInterface $output)
$producer->publish(serialize($data));
}
-}
+}
@@ -2,7 +2,6 @@
namespace OldSound\RabbitMqBundle\DataCollector;
-use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
@@ -22,7 +21,7 @@ public function __construct($channels)
$this->data = array();
}
- function collect(Request $request, Response $response, \Exception $exception = null)
+ public function collect(Request $request, Response $response, \Exception $exception = null)
{
foreach ($this->channels as $channel) {
foreach ($channel->getBasicPublishLog() as $log) {
@@ -4,7 +4,6 @@
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
-use Symfony\Component\Config\Definition\Builder\NodeBuilder;
use \Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
/**
@@ -138,12 +137,11 @@ protected function getQueueConfiguration()
->booleanNode('nowait')->defaultFalse()->end()
->variableNode('arguments')->defaultNull()->end()
->scalarNode('ticket')->defaultNull()->end()
- ->arrayNode('routing_keys')
+ ->arrayNode('routing_keys')
->prototype('scalar')->end()
->defaultValue(array())
->end()
->end()
;
}
}
-
@@ -32,4 +32,3 @@ public function getBasicPublishLog()
return $this->basicPublishLog;
}
}
-
View
@@ -35,9 +35,9 @@
);
/**
- * @param AMQPConnection $conn
+ * @param AMQPConnection $conn
* @param AMQPChannel|null $ch
- * @param null $consumerTag
+ * @param null $consumerTag
*/
public function __construct(AMQPConnection $conn, AMQPChannel $ch = null, $consumerTag = null)
{
@@ -63,7 +63,7 @@ public function __destruct()
}
/**
- * @param AMQPChannel $ch
+ * @param AMQPChannel $ch
* @return void
*/
public function setChannel(AMQPChannel $ch)
@@ -73,7 +73,7 @@ public function setChannel(AMQPChannel $ch)
/**
* @throws \InvalidArgumentException
- * @param array $options
+ * @param array $options
* @return void
*/
public function setExchangeOptions(array $options = array())
@@ -90,7 +90,7 @@ public function setExchangeOptions(array $options = array())
}
/**
- * @param array $options
+ * @param array $options
* @return void
*/
public function setQueueOptions(array $options = array())
@@ -99,7 +99,7 @@ public function setQueueOptions(array $options = array())
}
/**
- * @param string $routingKey
+ * @param string $routingKey
* @return void
*/
public function setRoutingKey($routingKey)
@@ -131,8 +131,8 @@ protected function queueDeclare()
$this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
$this->queueOptions['arguments'], $this->queueOptions['ticket']);
- if(count($this->queueOptions['routing_keys']) > 0) {
- foreach($this->queueOptions['routing_keys'] as $routingKey) {
+ if (count($this->queueOptions['routing_keys']) > 0) {
+ foreach ($this->queueOptions['routing_keys'] as $routingKey) {
$this->ch->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
}
} else {
@@ -25,8 +25,7 @@ public function start($msgAmount = 0)
$this->setupConsumer();
- while (count($this->ch->callbacks))
- {
+ while (count($this->ch->callbacks)) {
$this->ch->wait();
}
}
@@ -69,7 +68,6 @@ public function getConsumerTag()
return $this->consumerTag;
}
-
public function forceStopConsumer()
{
$this->forceStop = true;
@@ -79,8 +77,8 @@ public function forceStopConsumer()
* Sets the qos settings for the current channel
* Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
*
- * @param int $prefetchSize
- * @param int $prefetchCount
+ * @param int $prefetchSize
+ * @param int $prefetchCount
* @param bool $global
*/
public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
View
@@ -7,15 +7,14 @@
class Consumer extends BaseConsumer
{
-
+
public function consume($msgAmount)
{
$this->target = $msgAmount;
$this->setupConsumer();
- while (count($this->ch->callbacks))
- {
+ while (count($this->ch->callbacks)) {
$this->maybeStopConsumer();
$this->ch->wait();
}
@@ -26,15 +25,14 @@ public function processMessage(AMQPMessage $msg)
if (false === call_user_func($this->callback, $msg)) {
// Reject and requeue message to RabbitMQ
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
- }
- else {
+ } else {
// Remove message from queue only if callback return not false
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
$this->consumed++;
$this->maybeStopConsumer();
-
+
}
-
+
}
@@ -6,6 +6,5 @@
interface ConsumerInterface
{
- function execute(AMQPMessage $msg);
+ public function execute(AMQPMessage $msg);
}
-
View
@@ -4,8 +4,8 @@
class Fallback
{
- public function publish()
+ public function publish()
{
return false;
}
-}
+}
View
@@ -3,21 +3,19 @@
namespace OldSound\RabbitMqBundle\RabbitMq;
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
-use PhpAmqpLib\Channel\AMQPChannel;
-use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Producer extends BaseAmqp
{
protected $contentType = 'text/plain';
protected $deliveryMode = 2;
-
- public function setContentType($contentType)
+
+ public function setContentType($contentType)
{
$this->contentType = $contentType;
}
-
- public function setDeliveryMode($deliveryMode)
+
+ public function setDeliveryMode($deliveryMode)
{
$this->deliveryMode = $deliveryMode;
}
View
@@ -35,17 +35,17 @@ public function getReplies()
{
$this->ch->basic_consume($this->queueName, '', false, true, false, false, array($this, 'processMessage'));
- while (count($this->replies) < $this->requests)
- {
+ while (count($this->replies) < $this->requests) {
$this->ch->wait();
}
$this->ch->basic_cancel($this->queueName);
+
return $this->replies;
}
public function processMessage(AMQPMessage $msg)
{
$this->replies[$msg->get('correlation_id')] = unserialize($msg->body);
}
-}
+}
View
@@ -2,7 +2,6 @@
namespace OldSound\RabbitMqBundle\RabbitMq;
-use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
use PhpAmqpLib\Message\AMQPMessage;
class RpcServer extends BaseConsumer
@@ -15,16 +14,13 @@ public function initServer($name)
public function processMessage(AMQPMessage $msg)
{
- try
- {
+ try {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
$result = call_user_func($this->callback, $msg);
$this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id'));
$this->consumed++;
$this->maybeStopConsumer();
- }
- catch (\Exception $e)
- {
+ } catch (\Exception $e) {
$this->sendReply('error: ' . $e->getMessage(), $msg->get('reply_to'), $msg->get('correlation_id'));
}
}
@@ -34,4 +30,4 @@ protected function sendReply($result, $client, $correlationId)
$reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
$this->ch->basic_publish($reply, '', $client);
}
-}
+}
@@ -4,9 +4,7 @@
use OldSound\RabbitMqBundle\Command\ConsumerCommand;
-use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Input\InputOption;
-use Symfony\Component\Console\Output\NullOutput;
class ConsumerCommandTest extends \PHPUnit_Framework_TestCase
{
@@ -20,4 +20,3 @@ old_sound_rabbit_mq:
arguments: {name: bar}
queue_options:
callback: consumer.callback
-

0 comments on commit 219caf8

Please sign in to comment.