Skip to content

Commit

Permalink
Support a message consumption amount for rpc servers, infinite by def…
Browse files Browse the repository at this point in the history
…ault
  • Loading branch information
richardfullmer committed Feb 5, 2012
1 parent 1a62c37 commit 2e275b0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
8 changes: 7 additions & 1 deletion Command/RpcServerCommand.php
Expand Up @@ -18,6 +18,7 @@ protected function configure()
$this
->setName('rabbitmq:rpc-server')
->addArgument('name', InputArgument::REQUIRED, 'Server Name')
->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
->addOption('debug', 'd', InputOption::VALUE_OPTIONAL, 'Debug mode', false)
;
}
Expand All @@ -35,9 +36,14 @@ protected function configure()
protected function execute(InputInterface $input, OutputInterface $output)
{
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
$amount = $input->getOption('messages');

if (0 > $amount) {
throw new InvalidArgumentException("The -m option should be null or greater than 0");
}

$server = $this->getContainer()
->get(sprintf('old_sound_rabbit_mq.%s_server', $input->getArgument('name')))
->start();
->start($amount);
}
}
21 changes: 20 additions & 1 deletion RabbitMq/RpcServer.php
Expand Up @@ -7,14 +7,20 @@

class RpcServer extends BaseConsumer
{
protected $target;

protected $consumed = 0;

public function initServer($name)
{
$this->setExchangeOptions(array('name' => $name, 'type' => 'direct'));
$this->setQueueOptions(array('name' => $name . '-queue'));
}

public function start()
public function start($msgAmount = 0)
{
$this->target = $msgAmount;

$this->setUpConsumer();

while (count($this->ch->callbacks))
Expand All @@ -30,6 +36,8 @@ public function processMessage(AMQPMessage $msg)
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
$result = call_user_func($this->callback, $msg);
$this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id'));
$this->consumed++;
$this->maybeStopConsumer();
}
catch (\Exception $e)
{
Expand All @@ -42,4 +50,15 @@ protected function sendReply($result, $client, $correlationId)
$reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
$this->ch->basic_publish($reply, '', $client);
}

protected function maybeStopConsumer()
{
if ($this->target == 0) {
return;
}

if ($this->consumed == $this->target) {
$this->stopConsuming();
}
}
}

0 comments on commit 2e275b0

Please sign in to comment.