Бандл предоставляет инструменты по работе с очередями RabbitMQ
посредством механизма producer
- consumer
.
Для корректной работы бандла требуется подключить следующие плагины RabbitMQ:
В директории проекта, выполните следующую команду для загрузки наиболее подходящей стабильной версии этого бандла:
composer require wakeapp/rabbit-queue-bundle
Эта команда подразумевает что Composer установлен и доступен глобально.
Необходимо включить бандл добавив его в список зарегистрированных бандлов в app/AppKernel.php
файл вашего проекта:
<?php
// app/AppKernel.php
class AppKernel extends Kernel
{
// ...
public function registerBundles()
{
$bundles = [
// ...
new Wakeapp\Bundle\RabbitQueueBundle\WakeappRabbitQueueBundle(),
];
return $bundles;
}
// ...
}
Чтобы начать использовать бандл, необходимо описать конфигурацию подключения к RabbitMQ
.
# app/packages/wakeapp_rabbit_queue.yaml
wakeapp_rabbit_queue:
connections:
default:
host: 'rabbitmq' # хост для подключения к rabbitMQ
port: 5672 # порт для подключения к rabbitMQ
username: 'rabbitmq_user' # логин для подключения к rabbitMQ
password: 'rabbitmq_password' # пароль для подключения к rabbitMQ
vhost: 'example_vhost' # виртуальный хост для подключения (необязательный параметр)
connection_timeout: 3 # таймаут соединения
read_write_timeout: 3 # таймаут на чтение/запись
heartbeat: 0 # частота heartbeat
consumer:
wait_timeout: 3 # таймаут ожидания новых сообщений для обработки пачки в секундах (по умолчанию 3)
idle_timeout: 0 # таймаут ожидания сообщений в пустой очереди в секундах (по умолчанию 0 - нет таймаута)
Producer
- используется для отправки сообщений в очередь.
Для этих целей в бандле реализован RabbitMqProducer, с помощью которого можно отправлять сообщения в очередь с заданными параметрами.
<?php
$data = ['message' => 'example']; # Сообщение
$options = ['key' => 'unique_key', 'delay' => 1000]; # Опции, в зависимости от типа очереди
/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options);
Публикация сообщений в очередь происходит с помощью специальных классов паблишеров.
Producer
определяет какой паблишер использовать для публикации по типу очереди, с которым связан паблишер.
Соответственно на каждый новый тип очереди требуется свой класс Publisher
с кастомной логикой обработки/валидации и публикации сообщений в канал.
Бандл поддерживает следующие типы очередей:
- FIFO
- Delay
- Deduplicate
- Deduplicate + Delay
При желании добавить собственный тип очереди, необходимо создать класс Publisher
наследующий AbstractPublisher или реализующий PublisherInterface.
Пример DelayPublisher:
<?php
declare(strict_types=1);
namespace Wakeapp\Bundle\RabbitQueueBundle\Publisher;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\RabbitQueueException;
use function is_int;
use function sprintf;
class DelayPublisher extends AbstractPublisher
{
public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY;
/**
* Custom prepare options logic
*/
protected function prepareOptions(DefinitionInterface $definition, array $options): array
{
$delay = $options[QueueOptionEnum::DELAY] ?? null;
if (!is_int($delay)) {
$message = sprintf(
'Element for queue "%s" must be with option %s. See %s',
$definition::getQueueName(),
QueueOptionEnum::DELAY,
QueueOptionEnum::class
);
throw new RabbitQueueException($message);
}
$amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $delay * 1000;
return $amqpTableOption;
}
/**
* Queue type supported by publisher
*/
public static function getQueueType(): string
{
return (string) self::QUEUE_TYPE;
}
}
Consumer
- Используется для получения и обработки сообщений из очереди.
Для реализации логики обработки сообщений необходимо создать класс consumer
,
реализующий ConsumerInterface,
либо наследующий AbstractConsumer, который содержит предустановленные значения для некоторых методов.
<?php
declare(strict_types=1);
namespace Acme\AppBundle\Consumer;
use Wakeapp\Bundle\RabbitQueueBundle\Consumer\AbstractConsumer;
class ExampleConsumer extends AbstractConsumer
{
public const DEFAULT_BATCH_SIZE = 100; # Размер пачки
/**
* {@inheritDoc}
*/
public function process(array $messageList): void
{
foreach ($messageList as $item) {
$data = $this->decodeMessageBody($item); # Decode message by hydrator
// handle some task by specific logic
}
}
/**
* {@inheritDoc}
*/
public function getBindQueueName(): string
{
return 'example';
}
/**
* {@inheritDoc}
*/
public static function getName(): string
{
return 'example';
}
}
В методе process()
необходимо реализовать обработку полученных сообщений.
Сообщения поступают пачками, размер которых задается константой DEFAULT_BATCH_SIZE
(по умолчанию = 1).
Сумма DEFAULT_BATCH_SIZE
со всех потребителей одной очереди не должна превышать значения 65535.
Для удобства работы с сообщениями разных форматов бандл предоставляет инструменты гидрации (кодирование/декодирование сообщений в необходимый формат).
По умолчанию доступны следующие гидраторы:
- JsonHydrator - для работы с сообщениями в формате json (используется по умолчанию).
- PlainTextHydrator - для работы с простыми текстовыми сообщениями.
Также существует возможность создания собственного гидратора.
Для этого необходимо реализовать HydratorInterface и изменить параметр конфигурации hydrator_name
на тип нового гидратора.
RabbitMQ позволяет создавать сложные схемы очередей, состоящие из несколько взаимосвязанных exchange
и queue
.
Для удобства работы со схемами бандл предоставляет возможность сохранения схем очередей в специальные классы Definition
,
которые реализуют DefinitionInterface.
Пример FIFO:
<?php
declare(strict_types=1);
namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ExampleFifoDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_FIFO;
public const ENTRY_POINT = self::QUEUE_NAME;
/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();
$channel->queue_declare(
self::ENTRY_POINT,
false,
true,
false,
false
);
}
/**
*
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}
/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::FIFO;
}
/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
Пример delay + deduplicate:
<?php
declare(strict_types=1);
namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class ExampleDeduplicateDelayDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_DEDUPLICATE_DELAY;
public const ENTRY_POINT = self::QUEUE_NAME . '@exchange_deduplication';
private const SECOND_POINT = self::QUEUE_NAME . '@exchange_delay';
private const THIRD_POINT = self::QUEUE_NAME;
/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();
$channel->exchange_declare(
self::ENTRY_POINT,
'x-message-deduplication',
false,
true,
false,
false,
false,
new AMQPTable(['x-cache-size' => 1_000_000_000])
);
$channel->exchange_declare(
self::SECOND_POINT,
'x-delayed-message',
false,
true,
false,
false,
false,
new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT])
);
$channel->queue_declare(
self::THIRD_POINT,
false,
true,
false,
false
);
$channel->exchange_bind(self::SECOND_POINT, self::ENTRY_POINT);
$channel->queue_bind(self::THIRD_POINT, self::SECOND_POINT);
}
/**
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}
/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE | QueueTypeEnum::DELAY;
}
/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
В методе init()
объявляется структура очереди состоящая из необходимых exchanges
, queue
и bindings
с помощью стандартных методов php-amqplib.
Метод getEntryPointName()
- отвечает за точку входа сообщений. Точкой входа может быть название exchange
или queue
в зависимости от структуры схемы.
Метод getQueueName()
- название очереди, куда в конечном итоге попадут сообщения.
Жизненный цикл сообщения:
Сообщение -> Producer -> EntryPoint -> Структура очереди exchanges, bindings -> Queue -> Consumer
Таким образом producer
отправляет сообщения на точку входа, а consumer
забирает сообщения из очереди.
В простейшем случае при использовании обычной очереди FIFO, точкой входа будет являться название очереди.
rabbit:consumer:run
- запускает выбранный консьюмер.
php bin/console rabbit:consumer:run <name> # <name> - название консьюмера.
rabbit:definition:update
- загружает все схемы очередейRabbitMQ
в соответствии с существующими классамиDefinition
.
Примечание: Данная команда не обновляет существующие схемы.
php bin/console rabbit:definition:update
rabbit:consumer:list
- выводит список консьюмеров, зарегистрированных в проекте.
php bin/console rabbit:consumer:list
Пример вывода команды:
Total consumers count: 2
+--------------------+------------+
| Queue Name | Batch Size |
+--------------------+------------+
| example_first | 1 |
| example_second | 100 |
+--------------------+------------+
Для инициализации схемы, требуется создать класс Definition,
который реализует DefinitionInterface.
В методе init
нужно объявить структуру очереди состоящию из необходимых exchanges
, queue
и bindings
с помощью стандартных методов работы с каналом php-amqplib.
Далее необходимо создать класс-consumer
, наследующий AbstractConsumer.
А в методе process
реализовать обработку полученных сообщений.
Если в проекте не работает механизм autowire
, то вам понадобится зарегистрировать consumer
с тегом wakeapp_rabbit_queue.consumer
:
services:
app.acme.consumer:
class: Acme\AppBundle\Consumer\ExampleConsumer
tags:
- { name: wakeapp_rabbit_queue.consumer }
Чтобы загрузить схемы definition
в RabbitMQ необходимо выполнить команду rabbit:definition:update
.
Данная команда обновит схему в соответствии с существующими классами Definition
, реализующими DefinitionInterface.
php bin/console rabbit:definition:update
Чтобы запустить consumer
необходимо выполнить команду rabbit:consumer:run
rabbit.
Для запуска нужно передать имя конкретного consumer
.
Запуск ранее описанного consumer
'а будет выглядеть так:
php bin/console rabbit:consumer:run example
Для просмотра списка всех зарегистрированных consumer
'ов достаточно выполнить команду rabbit:consumer:list
.