Permalink
Fetching contributors…
Cannot retrieve contributors at this time
261 lines (184 sloc) 6.54 KB

AMQP transport

Implements AMQP specifications and implements amqp interop interfaces. Build on top of bunny lib.

Installation

$ composer require enqueue/amqp-bunny

Create context

<?php
use Enqueue\AmqpBunny\AmqpConnectionFactory;

// connects to localhost
$factory = new AmqpConnectionFactory();

// same as above
$factory = new AmqpConnectionFactory('amqp:');

// same as above
$factory = new AmqpConnectionFactory([]);

// connect to AMQP broker at example.com
$factory = new AmqpConnectionFactory([
    'host' => 'example.com',
    'port' => 1000,
    'vhost' => '/',
    'user' => 'user',
    'pass' => 'pass',
    'persisted' => false,
]);

// same as above but given as DSN string
$factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a factory to build context from DSN 
$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp:')->createContext();
$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp+bunny:')->createContext();

Declare topic.

Declare topic operation creates a topic on a broker side.

<?php
use Interop\Amqp\AmqpTopic;

/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */

$fooTopic = $psrContext->createTopic('foo');
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$psrContext->declareTopic($fooTopic);

// to remove topic use delete topic method
//$psrContext->deleteTopic($fooTopic);

Declare queue.

Declare queue operation creates a queue on a broker side.

<?php
use Interop\Amqp\AmqpQueue;

/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$psrContext->declareQueue($fooQueue);

// to remove topic use delete queue method
//$psrContext->deleteQueue($fooQueue);

Bind queue to topic

Connects a queue to the topic. So messages from that topic comes to the queue and could be processed.

<?php
use Interop\Amqp\Impl\AmqpBind;

/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */
/** @var \Interop\Amqp\Impl\AmqpTopic $fooTopic */

$psrContext->bind(new AmqpBind($fooTopic, $fooQueue));

Send message to topic

<?php
/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpTopic $fooTopic */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);

Send message to queue

<?php
/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);

Send priority message

<?php
use Interop\Amqp\AmqpQueue;

/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$fooQueue->setArguments(['x-max-priority' => 10]);
$psrContext->declareQueue($fooQueue);

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setPriority(5) // the higher priority the sooner a message gets to a consumer
    //    
    ->send($fooQueue, $message)
;

Send expiration message

<?php
/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setTimeToLive(60000) // 60 sec
    //    
    ->send($fooQueue, $message)
;

Send delayed message

AMQP specification says nothing about message delaying hence the producer throws DeliveryDelayNotSupportedException. Though the producer (and the context) accepts a delivery delay strategy and if it is set it uses it to send delayed message. The enqueue/amqp-tools package provides two RabbitMQ delay strategies, to use them you have to install that package

<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;

/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

// make sure you run "composer require enqueue/amqp-tools".

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
    ->setDeliveryDelay(5000) // 5 sec
    
    ->send($fooQueue, $message)
;

Consume message:

<?php
/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);

Subscription consumer

<?php
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrConsumer;

/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */
/** @var \Interop\Amqp\Impl\AmqpQueue $barQueue */

$fooConsumer = $psrContext->createConsumer($fooQueue);
$barConsumer = $psrContext->createConsumer($barQueue);

$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
    // process message
    
    $consumer->acknowledge($message);
    
    return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
    // process message
    
    $consumer->acknowledge($message);
    
    return true;
});

$subscriptionConsumer->consume(2000); // 2 sec

Purge queue messages:

<?php
/** @var \Enqueue\AmqpBunny\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$queue = $psrContext->createQueue('aQueue');

$psrContext->purgeQueue($queue);

back to index