From 2e275b074fcda0d063274ac37d6f59c4da602afe Mon Sep 17 00:00:00 2001 From: Richard Fullmer Date: Wed, 25 Jan 2012 17:11:08 -0500 Subject: [PATCH] Support a message consumption amount for rpc servers, infinite by default --- Command/RpcServerCommand.php | 8 +++++++- RabbitMq/RpcServer.php | 21 ++++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/Command/RpcServerCommand.php b/Command/RpcServerCommand.php index 040b8bba..6f25aba3 100644 --- a/Command/RpcServerCommand.php +++ b/Command/RpcServerCommand.php @@ -18,6 +18,7 @@ protected function configure() $this ->setName('rabbitmq:rpc-server') ->addArgument('name', InputArgument::REQUIRED, 'Server Name') + ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) ->addOption('debug', 'd', InputOption::VALUE_OPTIONAL, 'Debug mode', false) ; } @@ -35,9 +36,14 @@ protected function configure() protected function execute(InputInterface $input, OutputInterface $output) { define('AMQP_DEBUG', (bool) $input->getOption('debug')); + $amount = $input->getOption('messages'); + + if (0 > $amount) { + throw new InvalidArgumentException("The -m option should be null or greater than 0"); + } $server = $this->getContainer() ->get(sprintf('old_sound_rabbit_mq.%s_server', $input->getArgument('name'))) - ->start(); + ->start($amount); } } \ No newline at end of file diff --git a/RabbitMq/RpcServer.php b/RabbitMq/RpcServer.php index 49041643..3beb046f 100644 --- a/RabbitMq/RpcServer.php +++ b/RabbitMq/RpcServer.php @@ -7,14 +7,20 @@ class RpcServer extends BaseConsumer { + protected $target; + + protected $consumed = 0; + public function initServer($name) { $this->setExchangeOptions(array('name' => $name, 'type' => 'direct')); $this->setQueueOptions(array('name' => $name . '-queue')); } - public function start() + public function start($msgAmount = 0) { + $this->target = $msgAmount; + $this->setUpConsumer(); while (count($this->ch->callbacks)) @@ -30,6 +36,8 @@ public function processMessage(AMQPMessage $msg) $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $result = call_user_func($this->callback, $msg); $this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id')); + $this->consumed++; + $this->maybeStopConsumer(); } catch (\Exception $e) { @@ -42,4 +50,15 @@ protected function sendReply($result, $client, $correlationId) $reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId)); $this->ch->basic_publish($reply, '', $client); } + + protected function maybeStopConsumer() + { + if ($this->target == 0) { + return; + } + + if ($this->consumed == $this->target) { + $this->stopConsuming(); + } + } } \ No newline at end of file