Permalink
270 lines (191 sloc) 6.98 KB

AMQP transport

Implements AMQP specifications and implements amqp interop interfaces. Build on top of php amqp extension.

Installation

Warning: You need amqp extension of at least 1.9.3. Here's how you can compile the extension from the source code.

$ composer require enqueue/amqp-ext

Create context

<?php
use Enqueue\AmqpExt\AmqpConnectionFactory;

// connects to localhost
$connectionFactory = new AmqpConnectionFactory();

// same as above
$factory = new AmqpConnectionFactory('amqp:');

// same as above
$factory = new AmqpConnectionFactory([]);

// connect to AMQP broker at example.com
$factory = new AmqpConnectionFactory([
    'host' => 'example.com',
    'port' => 1000,
    'vhost' => '/',
    'user' => 'user',
    'pass' => 'pass',
    'persisted' => false,
]);

// same as above but given as DSN string
$factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');

// SSL or secure connection 
$factory = new AmqpConnectionFactory([
    'dsn' => 'amqps:',
    'ssl_cacert' => '/path/to/cacert.pem',
    'ssl_cert' => '/path/to/cert.pem',
    'ssl_key' => '/path/to/key.pem',
]);

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a factory to build context from DSN 
$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp:')->createContext();
$psrContext = (new \Enqueue\ConnectionFactoryFactory())->create('amqp+ext:')->createContext();

Declare topic.

Declare topic operation creates a topic on a broker side.

<?php
use Interop\Amqp\AmqpTopic;

/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */

$fooTopic = $psrContext->createTopic('foo');
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$psrContext->declareTopic($fooTopic);

// to remove topic use delete topic method
//$psrContext->deleteTopic($fooTopic);

Declare queue.

Declare queue operation creates a queue on a broker side.

<?php
use Interop\Amqp\AmqpQueue;

/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$psrContext->declareQueue($fooQueue);

// to remove topic use delete queue method
//$psrContext->deleteQueue($fooQueue);

Bind queue to topic

Connects a queue to the topic. So messages from that topic comes to the queue and could be processed.

<?php
use Interop\Amqp\Impl\AmqpBind;

/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */
/** @var \Interop\Amqp\Impl\AmqpTopic $fooTopic */

$psrContext->bind(new AmqpBind($fooTopic, $fooQueue));

Send message to topic

<?php
/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpTopic $fooTopic */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);

Send message to queue

<?php
/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);

Send priority message

<?php
/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$fooQueue->setArguments(['x-max-priority' => 10]);
$psrContext->declareQueue($fooQueue);

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setPriority(5) // the higher priority the sooner a message gets to a consumer
    //    
    ->send($fooQueue, $message)
;

Send expiration message

<?php
/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setTimeToLive(60000) // 60 sec
    //    
    ->send($fooQueue, $message)
;

Send delayed message

AMQP specification says nothing about message delaying hence the producer throws DeliveryDelayNotSupportedException. Though the producer (and the context) accepts a delivery delay strategy and if it is set it uses it to send delayed message. The enqueue/amqp-tools package provides two RabbitMQ delay strategies, to use them you have to install that package

<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;

/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

// make sure you run "composer require enqueue/amqp-tools".

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
    ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
    ->setDeliveryDelay(5000) // 5 sec
    
    ->send($fooQueue, $message)
;

Consume message:

<?php
/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);

Subscription consumer

<?php
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrConsumer;

/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */
/** @var \Interop\Amqp\Impl\AmqpQueue $barQueue */

$fooConsumer = $psrContext->createConsumer($fooQueue);
$barConsumer = $psrContext->createConsumer($barQueue);

$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
    // process message
    
    $consumer->acknowledge($message);
    
    return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
    // process message
    
    $consumer->acknowledge($message);
    
    return true;
});

$subscriptionConsumer->consume(2000); // 2 sec

Purge queue messages:

<?php
/** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
/** @var \Interop\Amqp\Impl\AmqpQueue $fooQueue */

$queue = $psrContext->createQueue('aQueue');

$psrContext->purgeQueue($queue);

back to index