Skip to content

Commit

Permalink
Merge pull request #19 from pkruithof/no-auto-declare
Browse files Browse the repository at this point in the history
Exchanges and queues are no longer declared automatically
  • Loading branch information
pkruithof committed Mar 3, 2016
2 parents 046bb83 + 8f78681 commit 55f2460
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ For a complete list of releases, see the [releases page][0].
instances, which are only for composing new messages.
* Removed `MessageProvider` and its interface, as it was basically a copy of
the consumer class. Instead, consumers now work directly with a queue/processor.
* Exchanges and queues are no longer declared automatically on factory creation


## v0.1.0
Expand Down
30 changes: 13 additions & 17 deletions src/TreeHouse/Queue/Amqp/Driver/Amqp/AmqpFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public function createChannel(ConnectionInterface $connection)
$connection->connect();
}

$channel = new \AMQPChannel($connection->getDelegate());
$delegate = new \AMQPChannel($connection->getDelegate());

return new Channel($channel, $connection);
return new Channel($delegate, $connection);
}

/**
Expand All @@ -52,32 +52,28 @@ public function createExchange(
$flags = null,
array $args = []
) {
$exchange = new \AMQPExchange($channel->getDelegate());
$exchange->setName($name);
$exchange->setType($type);
$exchange->setFlags(Exchange::convertToDelegateFlags($flags));
$exchange->setArguments($args);
$delegate = new \AMQPExchange($channel->getDelegate());
$delegate->setName($name);
$delegate->setType($type);
$delegate->setFlags(Exchange::convertToDelegateFlags($flags));
$delegate->setArguments($args);

$exchange->declareExchange();

return new Exchange($exchange, $channel);
return new Exchange($delegate, $channel);
}

/**
* @inheritdoc
*/
public function createQueue(ChannelInterface $channel, $name = null, $flags = null, array $args = [])
{
$queue = new \AMQPQueue($channel->getDelegate());
$queue->setFlags(Queue::convertToDelegateFlags($flags));
$queue->setArguments($args);
$delegate = new \AMQPQueue($channel->getDelegate());
$delegate->setFlags(Queue::convertToDelegateFlags($flags));
$delegate->setArguments($args);

if (null !== $name) {
$queue->setName($name);
$delegate->setName($name);
}

$queue->declareQueue();

return new Queue($queue, $channel);
return new Queue($delegate, $channel);
}
}
12 changes: 5 additions & 7 deletions src/TreeHouse/Queue/Processor/Retry/RetryProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace TreeHouse\Queue\Processor\Retry;

use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use TreeHouse\Queue\Amqp\EnvelopeInterface;
use TreeHouse\Queue\Exception\ProcessExhaustedException;
use TreeHouse\Queue\Processor\ProcessorInterface;
Expand Down Expand Up @@ -43,7 +44,7 @@ public function __construct(ProcessorInterface $processor, RetryStrategyInterfac
{
$this->processor = $processor;
$this->strategy = $strategy;
$this->logger = $logger;
$this->logger = $logger ?: new NullLogger();
}

/**
Expand Down Expand Up @@ -86,9 +87,8 @@ public function process(EnvelopeInterface $envelope)
try {
$result = $this->processor->process($envelope);
} catch (\Exception $exception) {
if ($this->logger) {
$this->logger->error($exception->getMessage(), ['message' => $envelope->getDeliveryTag()]);
}
$this->logger->error($exception->getMessage(), ['message' => $envelope->getDeliveryTag()]);
$this->logger->debug($exception->getTraceAsString());

$result = $this->retryMessage($envelope, $exception);
}
Expand All @@ -111,9 +111,7 @@ protected function retryMessage(EnvelopeInterface $envelope, \Exception $excepti
throw new ProcessExhaustedException(sprintf('Exhausted after failing %d attempt(s)', $attempt));
}

if ($this->logger) {
$this->logger->debug(sprintf('Requeueing message (%d attempts left)', $this->maxAttempts - $attempt));
}
$this->logger->debug(sprintf('Requeueing message (%d attempts left)', $this->maxAttempts - $attempt));

return $this->strategy->retry($envelope, ++$attempt, $exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public function it_can_be_created()
];

$exchange = $this->getExchange();
$exchange->declareExchange();

$queue = $this->getQueue();
$queue->declareQueue();

$queue->bind($exchange->getName(), $routingKey);

$exchange->publish($body, $routingKey, null, $properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public function it_can_bind_and_unbind()

$exchange1 = $this->factory->createExchange($channel, 'exchg1');
$exchange2 = $this->factory->createExchange($channel, 'exchg2');
$exchange1->declareExchange();
$exchange2->declareExchange();

$routingKey = 'test';

$this->assertTrue($exchange1->bind($exchange2->getName(), $routingKey));
Expand Down Expand Up @@ -71,6 +74,7 @@ public function it_cannot_bind_on_closed_channel()
$conn = $this->factory->createConnection('localhost');
$channel = $this->factory->createChannel($conn);
$exchange = $this->factory->createExchange($channel, 'exchg1');
$exchange->declareExchange();

$conn->close();

Expand Down Expand Up @@ -106,6 +110,9 @@ public function it_cannot_unbind_on_closed_channel()
$exchange1 = $this->factory->createExchange($channel, 'exchg1');
$exchange2 = $this->factory->createExchange($channel, 'exchg2');

$exchange1->declareExchange();
$exchange2->declareExchange();

$exchange1->bind($exchange2->getName(), 'test');

$conn->close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public function it_retries_when_processor_throws_exception()
$inner->shouldReceive('process')->once()->andThrow($exception);

$envelope = $this->createEnvelopeMock(1);
$envelope->shouldReceive('getDeliveryTag')->andReturnNull();

$result = $processor->process($envelope);

$this->assertTrue($result, 'The ->process() method should return the value from the strategy');
Expand All @@ -90,6 +92,7 @@ public function it_cannot_exceed_max_retries()

// create message for second attempt
$envelope = $this->createEnvelopeMock(2);
$envelope->shouldReceive('getDeliveryTag')->andReturnNull();

$processor = new RetryProcessor($inner, $strategy);
$processor->setMaxAttempts(2);
Expand Down

0 comments on commit 55f2460

Please sign in to comment.