Skip to content

Commit

Permalink
Merge pull request #14 from fieg/queue-consume
Browse files Browse the repository at this point in the history
Added queue consume support
  • Loading branch information
fieg committed Jul 13, 2015
2 parents 27ce8f5 + 5e37731 commit 836b58f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
@@ -1,4 +1,6 @@
Queue library
===========================

[![Build Status](https://travis-ci.org/treehouselabs/queue.svg?branch=master)](https://travis-ci.org/treehouselabs/queue)

To be documented
16 changes: 16 additions & 0 deletions src/TreeHouse/Queue/Message/Provider/AmqpMessageProvider.php
Expand Up @@ -51,4 +51,20 @@ public function nack(Message $message, $requeue = false)
{
$this->queue->nack($message->getId(), $requeue ? AMQP_REQUEUE : null);
}

/**
* @inheritdoc
*/
public function consume(callable $callback)
{
$this->queue->consume(function(\AMQPEnvelope $envelope) use ($callback) {
$id = $envelope->getDeliveryTag();
$body = $envelope->getBody();
$props = new MessageProperties($envelope->getHeaders());

$message = new Message($body, $props, $id);

$callback($message);
});
}
}
Expand Up @@ -11,6 +11,15 @@ interface MessageProviderInterface
*/
public function get();

/**
* Blocking function to consume next message
*
* @param callable $callback called when a message is available
*
* @return void
*/
public function consume(callable $callback);

/**
* @param Message $message The message to ACK
*
Expand Down
Expand Up @@ -45,6 +45,39 @@ public function testGet()
$this->assertEquals($headers, $message->getProperties()->toArray());
}

public function testConsume()
{
$id = uniqid();
$body = 'test';
$headers = ['foo' => 'bar'];

$envelope = $this->getMock(\AMQPEnvelope::class);
$envelope->expects($this->any())->method('getDeliveryTag')->will($this->returnValue($id));
$envelope->expects($this->any())->method('getBody')->will($this->returnValue($body));
$envelope->expects($this->any())->method('getHeaders')->will($this->returnValue($headers));

$callback = function($message) use ($id, $body, $headers) {
$this->assertInstanceOf(Message::class, $message);

$this->assertEquals($id, $message->getId());
$this->assertEquals($body, $message->getBody());
$this->assertEquals($headers, $message->getProperties()->toArray());
};

$this->queue
->expects($this->once())
->method('consume')
->with($this->callback(function($callback) use ($envelope) {
$callback($envelope);

return true;
}))
;

$provider = new AmqpMessageProvider($this->queue);
$provider->consume($callback);
}

public function testReturnNullOnEmptyEnvelope()
{
$this->queue
Expand Down

0 comments on commit 836b58f

Please sign in to comment.