Skip to content

Commit

Permalink
Merge pull request #310 from php-enqueue/queue-consumer-does-not-exit…
Browse files Browse the repository at this point in the history
…-on-signal

[consumption] Fix signal handling when AMQP is used.
  • Loading branch information
makasim committed Jan 4, 2018
2 parents d5609d8 + 80995c7 commit 54eb61d
Showing 1 changed file with 36 additions and 34 deletions.
70 changes: 36 additions & 34 deletions pkg/enqueue/Consumption/QueueConsumer.php
Expand Up @@ -168,39 +168,47 @@ public function consume(ExtensionInterface $runtimeExtension = null)
$logger = $context->getLogger() ?: new NullLogger();
$logger->info('Start consuming');

while (true) {
try {
if ($this->psrContext instanceof AmqpContext) {
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) {
$currentProcessor = null;
if ($this->psrContext instanceof AmqpContext) {
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) {
$currentProcessor = null;

/** @var PsrQueue $queue */
foreach ($this->boundProcessors as list($queue, $processor)) {
if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) {
$currentProcessor = $processor;
}
}

/** @var PsrQueue $queue */
foreach ($this->boundProcessors as list($queue, $processor)) {
if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) {
$currentProcessor = $processor;
}
}
if (false == $currentProcessor) {
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName()));
}

if (false == $currentProcessor) {
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName()));
}
$context = new Context($this->psrContext);
$context->setLogger($logger);
$context->setPsrQueue($consumer->getQueue());
$context->setPsrConsumer($consumer);
$context->setPsrProcessor($currentProcessor);
$context->setPsrMessage($message);

$context = new Context($this->psrContext);
$context->setLogger($logger);
$context->setPsrQueue($consumer->getQueue());
$context->setPsrConsumer($consumer);
$context->setPsrProcessor($currentProcessor);
$context->setPsrMessage($message);
$this->doConsume($extension, $context);

$this->doConsume($extension, $context);
return true;
};

return true;
};
foreach ($consumers as $consumer) {
/* @var AmqpConsumer $consumer */

foreach ($consumers as $consumer) {
/* @var AmqpConsumer $consumer */
$this->psrContext->subscribe($consumer, $callback);
}
}

$this->psrContext->subscribe($consumer, $callback);
while (true) {
try {
if ($this->psrContext instanceof AmqpContext) {
$extension->onBeforeReceive($context);

if ($context->isExecutionInterrupted()) {
throw new ConsumptionInterruptedException();
}

$this->psrContext->consume($this->receiveTimeout);
Expand Down Expand Up @@ -266,16 +274,14 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
$consumer = $context->getPsrConsumer();
$logger = $context->getLogger();

if (false == $context->getPsrMessage() instanceof AmqpContext) {
$extension->onBeforeReceive($context);
}

if ($context->isExecutionInterrupted()) {
throw new ConsumptionInterruptedException();
}

$message = $context->getPsrMessage();
if (false == $message) {
$extension->onBeforeReceive($context);

if ($message = $consumer->receive($this->receiveTimeout)) {
$context->setPsrMessage($message);
}
Expand Down Expand Up @@ -312,10 +318,6 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
$logger->info(sprintf('Message processed: %s', $context->getResult()));

$extension->onPostReceived($context);

if ($context->getPsrMessage() instanceof AmqpContext) {
$extension->onBeforeReceive($context);
}
} else {
usleep($this->idleTimeout * 1000);
$extension->onIdle($context);
Expand Down

0 comments on commit 54eb61d

Please sign in to comment.