Skip to content

Commit

Permalink
Added memory check when consuming a message.
Browse files Browse the repository at this point in the history
The consumer will be stopped if the available memory is not enough.

Fixed memory calcul.
  • Loading branch information
Benjamin Laugueux committed Feb 26, 2013
1 parent 497fbfe commit 51e109a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Command/BaseConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ protected function configure()
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
;
Expand Down Expand Up @@ -84,6 +85,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->consumer = $this->getContainer()
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));

if (!is_null($input->getOption('memory-limit')) && ctype_digit((string)$input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
}
$this->consumer->setRoutingKey($input->getOption('route'));
$this->consumer->consume($this->amount);
}
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ Then consumer will finish executing instantly.

For using command with this flag you need to install PHP with [PCNTL extension](http://www.php.net/manual/en/book.pcntl.php).

If you want to establish a consumer memory limit, you can do it by using flag -l. In the following example, this flag adds 256 MB memory limit. Consumer will be stopped five MB before reaching 256MB in order to avoid a PHP Allowed memory size error.

$ ./app/console rabbitmq:consumer -l 256

### Callbacks ###

Expand Down
47 changes: 46 additions & 1 deletion RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,36 @@

class Consumer extends BaseConsumer
{
/**
* @var int $memoryLimit
*/
protected $memoryLimit = null;

/**
* Set the memory limit
*
* @param int $memoryLimit
*/
public function setMemoryLimit($memoryLimit)
{
$this->memoryLimit = $memoryLimit;
}

/**
* Get the memory limit
*
* @return int
*/
public function getMemoryLimit()
{
return $this->memoryLimit;
}

/**
* Consume the message
*
* @param int $msgAmount
*/
public function consume($msgAmount)
{
$this->target = $msgAmount;
Expand All @@ -33,6 +62,22 @@ public function processMessage(AMQPMessage $msg)
$this->consumed++;
$this->maybeStopConsumer();

if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
$this->stopConsuming();
}
}

}
/**
* Checks if memory in use is greater or equal than memory allowed for this process
*
* @return boolean
*/
protected function isRamAlmostOverloaded()
{
if (memory_get_usage(true) >= ($this->getMemoryLimit() * 1024 * 1024)) {
return true;
} else {
return false;
}
}
}

0 comments on commit 51e109a

Please sign in to comment.