Skip to content

Commit

Permalink
Merge 6fc6302 into 60d65e0
Browse files Browse the repository at this point in the history
  • Loading branch information
gparant committed Jan 24, 2018
2 parents 60d65e0 + 6fc6302 commit 1ed46d4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ If an exception is triggered in the `Consumer`, a `nack` will be sent instead to

Note: if your consumer callback throws an exception implementing the `RetryableExceptionInterface` interface, the `nack` message will be sent with the "requeue" flag. The message will be requeued.

Note: if your consumer callback throws an exception implementing the `FatalExceptionInterface` interface, the exception will be propagated by the consumer (hence leading to the crash of the consumer script). Otherwise, consumer will continue processing messages.

Exceptions are logged by default using the error_log function. You can override this behaviour by passing a PSR-3 compliant logger to the `AbstractConsumer` constructor.


Writing your consumer as a class
--------------------------------

Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
],
"require" : {
"php" : ">=5.6",
"php-amqplib/php-amqplib": "^2.6.3"
"php-amqplib/php-amqplib": "^2.6.3",
"psr/log": "^1",
"mouf/utils.log.psr.errorlog_logger": "^2"
},
"require-dev" : {
"phpunit/phpunit": "^5.4.6",
Expand Down
22 changes: 22 additions & 0 deletions src/AbstractConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,42 @@

namespace Mouf\AmqpClient;

use Mouf\Utils\Log\Psr\ErrorLogLogger;
use Psr\Log\LoggerInterface;

abstract class AbstractConsumer implements ConsumerInterface
{
use ConsumerTrait;

private $logger;

public function __construct(LoggerInterface $logger = null)
{
$this->logger = $logger ?: new ErrorLogLogger();
}

public function callback($msg)
{
try {
$this->onMessageReceived($msg);

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
} catch (RetryableExceptionInterface $e) {
$this->logger->error("Exception caught while consuming message.", [
'exception' => $e
]);
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, true);
if ($e instanceof FatalExceptionInterface) {
throw $e;
}
} catch (\Exception $e) {
$this->logger->error("Exception caught while consuming message.", [
'exception' => $e
]);
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, false);
if ($e instanceof FatalExceptionInterface) {
throw $e;
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/FatalExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace Mouf\AmqpClient;

/**
* When an exception implementing this interface is thrown, the consumer will propagate the exception (so the script
* will stop). Useful to halt a consumer script if the database connection has timed out.
*/
interface FatalExceptionInterface
{
/**
* Exception is propagated if isFatal returns true.
* @return bool
*/
public function isFatal();
}

0 comments on commit 1ed46d4

Please sign in to comment.