Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/TreeHouse/Queue/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public function getProcessor()
public function consume()
{
while (null !== $message = $this->messageProvider->get()) {
$this->processor->process($message);
$this->messageProvider->ack($message);
if ($this->processor->process($message)) {
$this->messageProvider->ack($message);
}
}
}
}
10 changes: 10 additions & 0 deletions src/TreeHouse/Queue/Exception/ConsumerException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace TreeHouse\Queue\Exception;

/**
* Base exception for all consumer-related exceptions
*/
class ConsumerException extends QueueException
{
}
6 changes: 5 additions & 1 deletion src/TreeHouse/Queue/Exception/ProcessExhaustedException.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

namespace TreeHouse\Queue\Exception;

class ProcessExhaustedException extends \Exception
/**
* Exception indicating a message could not be processed
* by the consumer, even after retrying.
*/
class ProcessExhaustedException extends ConsumerException
{
}
10 changes: 10 additions & 0 deletions src/TreeHouse/Queue/Exception/QueueException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace TreeHouse\Queue\Exception;

/**
* Base exception for all exceptions in this library
*/
class QueueException extends \Exception
{
}
63 changes: 0 additions & 63 deletions src/TreeHouse/Queue/Exception/RescheduleException.php

This file was deleted.

61 changes: 42 additions & 19 deletions src/TreeHouse/Queue/Processor/RetryProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Psr\Log\LoggerInterface;
use TreeHouse\Queue\Exception\ProcessExhaustedException;
use TreeHouse\Queue\Message\Message;
use TreeHouse\Queue\Message\Publisher\MessagePublisherInterface;
use TreeHouse\Queue\Message\Provider\MessageProviderInterface;

class RetryProcessor implements ProcessorInterface
{
Expand All @@ -17,36 +17,34 @@ class RetryProcessor implements ProcessorInterface
protected $processor;

/**
* @var MessagePublisherInterface
* @var MessageProviderInterface
*/
protected $publisher;
protected $provider;

/**
* @var LoggerInterface
*/
protected $logger;

/**
* @todo make configurable
*
* @var integer
*/
protected $maxAttempts = 3;
protected $maxAttempts = 2;

/**
* @var integer
*/
protected $cooldownTime = 600;

/**
* @param ProcessorInterface $processor
* @param MessagePublisherInterface $publisher
* @param LoggerInterface $logger
* @param ProcessorInterface $processor
* @param MessageProviderInterface $provider
* @param LoggerInterface $logger
*/
public function __construct(ProcessorInterface $processor, MessagePublisherInterface $publisher, LoggerInterface $logger = null)
public function __construct(ProcessorInterface $processor, MessageProviderInterface $provider, LoggerInterface $logger = null)
{
$this->processor = $processor;
$this->publisher = $publisher;
$this->provider = $provider;
$this->logger = $logger;
}

Expand Down Expand Up @@ -104,19 +102,44 @@ public function getMaxAttempts()
public function process(Message $message)
{
try {
$this->processor->process($message);
} catch (\Exception $e) {
$this->logger->warning($e->getMessage(), ['message' => $message->getId()]);
$result = $this->processor->process($message);
} catch (\Exception $exception) {
$result = false;

$attempt = $this->getAttemptValue($message);
if ($attempt >= $this->maxAttempts) {
throw new ProcessExhaustedException(sprintf('Exhausted after failing %d attempt(s)', $attempt), 0, $e);
if ($this->logger) {
$this->logger->warning($exception->getMessage(), ['message' => $message->getId()]);
}
}

if ($result !== true) {
$this->retryMessage($message);
}

$message->getProperties()->set(self::PROPERTY_KEY, ++$attempt);
return $result;
}

/**
* @param Message $message
*
* @throws ProcessExhaustedException
*/
protected function retryMessage(Message $message)
{
$attempt = $this->getAttemptValue($message);
if ($attempt >= $this->maxAttempts) {
throw new ProcessExhaustedException(sprintf('Exhausted after failing %d attempt(s)', $attempt));
}

$this->publisher->publish($message, false, new \DateTime('@' . (time() + $this->cooldownTime)));
if ($this->logger) {
$this->logger->debug(sprintf('Requeueing message (%d attempts left)', $this->maxAttempts - $attempt));
}

$message->getProperties()->set(self::PROPERTY_KEY, ++$attempt);

// TODO: find a way to implement the cooldown period
// $date = new \DateTime('@' . (time() + $this->cooldownTime));

$this->provider->nack($message, true);
}

/**
Expand Down