Skip to content

Commit

Permalink
Fatal Exception
Browse files Browse the repository at this point in the history
Add Fatal Exception
  • Loading branch information
gregoireMIAGE committed Jan 24, 2018
1 parent 60d65e0 commit 6fc6302
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ If an exception is triggered in the `Consumer`, a `nack` will be sent instead to

Note: if your consumer callback throws an exception implementing the `RetryableExceptionInterface` interface, the `nack` message will be sent with the "requeue" flag. The message will be requeued.

Note: if your consumer callback throws an exception implementing the `FatalExceptionInterface` interface, the exception will be propagated by the consumer (hence leading to the crash of the consumer script). Otherwise, consumer will continue processing messages.

Exceptions are logged by default using the error_log function. You can override this behaviour by passing a PSR-3 compliant logger to the `AbstractConsumer` constructor.


Writing your consumer as a class
--------------------------------

Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
],
"require" : {
"php" : ">=5.6",
"php-amqplib/php-amqplib": "^2.6.3"
"php-amqplib/php-amqplib": "^2.6.3",
"psr/log": "^1",
"mouf/utils.log.psr.errorlog_logger": "^2"
},
"require-dev" : {
"phpunit/phpunit": "^5.4.6",
Expand Down
22 changes: 22 additions & 0 deletions src/AbstractConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,42 @@

namespace Mouf\AmqpClient;

use Mouf\Utils\Log\Psr\ErrorLogLogger;
use Psr\Log\LoggerInterface;

abstract class AbstractConsumer implements ConsumerInterface
{
use ConsumerTrait;

private $logger;

public function __construct(LoggerInterface $logger = null)
{
$this->logger = $logger ?: new ErrorLogLogger();
}

public function callback($msg)
{
try {
$this->onMessageReceived($msg);

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
} catch (RetryableExceptionInterface $e) {
$this->logger->error("Exception caught while consuming message.", [
'exception' => $e
]);
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, true);
if ($e instanceof FatalExceptionInterface) {
throw $e;
}
} catch (\Exception $e) {
$this->logger->error("Exception caught while consuming message.", [
'exception' => $e
]);
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, false);
if ($e instanceof FatalExceptionInterface) {
throw $e;
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/FatalExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace Mouf\AmqpClient;

/**
* When an exception implementing this interface is thrown, the consumer will propagate the exception (so the script
* will stop). Useful to halt a consumer script if the database connection has timed out.
*/
interface FatalExceptionInterface
{
/**
* Exception is propagated if isFatal returns true.
* @return bool
*/
public function isFatal();
}

0 comments on commit 6fc6302

Please sign in to comment.