diff --git a/README.md b/README.md index ef1b45b..8cf08a9 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,13 @@ RabbitMQ Extension for Yii2 ================== -Wrapper based on php-amqplib to incorporate messaging in your Yii2 application via RabbitMQ. Inspired by RabbitMqBundle for Symfony framework as it is really awesome. +Wrapper based on php-amqplib library to incorporate messaging in your Yii2 application via RabbitMQ. Inspired by RabbitMqBundle for Symfony framework. -This documentation is relevant for the latest stable version of the extension. +This documentation is relevant for the version 2.\*, which require PHP version >=7.0. For legacy PHP applications >=5.4 please use [previous version of this extension](https://github.com/mikemadisonweb/yii2-rabbitmq/blob/master/README_v1.md). [![Latest Stable Version](https://poser.pugx.org/mikemadisonweb/yii2-rabbitmq/v/stable)](https://packagist.org/packages/mikemadisonweb/yii2-rabbitmq) [![License](https://poser.pugx.org/mikemadisonweb/yii2-rabbitmq/license)](https://packagist.org/packages/mikemadisonweb/yii2-rabbitmq) +[![Build Status](https://travis-ci.org/mikemadisonweb/yii2-rabbitmq.svg?branch=master)](https://travis-ci.org/mikemadisonweb/yii2-rabbitmq) +[![Coverage Status](https://coveralls.io/repos/github/mikemadisonweb/yii2-rabbitmq/badge.svg?branch=master)](https://coveralls.io/github/mikemadisonweb/yii2-rabbitmq?branch=master) Installation ------------ @@ -17,7 +19,7 @@ php composer.phar require mikemadisonweb/yii2-rabbitmq ``` or add ```json -"mikemadisonweb/yii2-rabbitmq": "^1.5.1" +"mikemadisonweb/yii2-rabbitmq": "^2.0.0" ``` to the require section of your `composer.json` file. @@ -30,53 +32,59 @@ return [ // should be in common.php 'components' => [ // ... - 'rabbitmq' => [ - 'class' => 'mikemadisonweb\rabbitmq\Configuration', - 'connections' => [ - 'default' => [ - 'host' => '127.0.0.1', - 'port' => '5672', - 'user' => 'your_username', - 'password' => 'your_password', - 'vhost' => '/', - 'heartbeat' => 0, - ], + 'class' => \mikemadisonweb\rabbitmq\Configuration::class, + 'connections' => [ + [ + // You can pass these parameters as a single `url` option: https://www.rabbitmq.com/uri-spec.html + 'host' => 'YOUR_HOSTNAME', + 'port' => '5672', + 'user' => 'YOUR_USERNAME', + 'password' => 'YOUR_PASSWORD', + 'vhost' => '/', + ] + // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks + ], + 'exchanges' => [ + [ + 'name' => 'YOUR_EXCHANGE_NAME', + 'type' => 'direct' + // Refer to Defaults section for all possible options ], - 'producers' => [ - 'import_data' => [ - 'connection' => 'default', - 'exchange_options' => [ - 'name' => 'import_data', - 'type' => 'direct', - ], - ], + ], + 'queues' => [ + [ + 'name' => 'YOUR_QUEUE_NAME', + // Queue can be configured here the way you want it: + //'durable' => true, + //'auto_delete' => false, + ], + [ + 'name' => 'YOUR_ANOTHER_QUEUE_NAME', + ], + ], + 'bindings' => [ + [ + 'queue' => 'YOUR_QUEUE_NAME', + 'exchange' => 'YOUR_EXCHANGE_NAME', + 'routing_keys' => ['YOUR_ROUTING_KEY'], ], - 'consumers' => [ - 'import_data' => [ - 'connection' => 'default', - 'exchange_options' => [ - 'name' => 'import_data', // Name of exchange to declare - 'type' => 'direct', // Type of exchange - ], - 'queue_options' => [ - 'name' => 'import_data', // Queue name which will be binded to the exchange adove - 'routing_keys' => ['import_data'], // Your custom options - 'durable' => true, - 'auto_delete' => false, - ], - // Or just '\path\to\ImportDataConsumer' in PHP 5.4 - 'callback' => \path\to\ImportDataConsumer::class, + ], + 'producers' => [ + [ + 'name' => 'YOUR_PRODUCER_NAME', + ], + ], + 'consumers' => [ + [ + 'name' => 'YOUR_CONSUMER_NAME', + // Every consumer should define one or more callbacks for corresponding queues + 'callbacks' => [ + // queue name => callback class name + 'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class, ], ], ], - // ... ], - // should be in console.php - 'controllerMap' => [ - 'rabbitmq-consumer' => \mikemadisonweb\rabbitmq\controllers\ConsumerController::class, - 'rabbitmq-producer' => \mikemadisonweb\rabbitmq\controllers\ProducerController::class, - ], - // ... ]; ``` To use this extension you should be familiar with the basic concepts of RabbitMQ. If you are not confident in your knowledge I suggest reading [this article](https://mikemadisonweb.github.io/2017/05/04/tldr-series-rabbitmq/). @@ -93,9 +101,10 @@ return [ 'singletons' => [ 'rabbitmq.import-data.consumer' => [ [ - 'class' => \path\to\ImportDataConsumer::class, + 'class' => \path\to\YourConsumer::class, ], [ + // If dependency is needed 'some-dependency' => Instance::of('dependency-service-name'), ], ], @@ -103,47 +112,9 @@ return [ ], ]; ``` +If you need several consumers you can list respective entries in the configuration, but that would require a separate worker(daemon process) for each of that consumers. While it can be absolutely fine in some cases if you are dealing with small queues which consuming messages really fast you may want to group them into one worker. So just list your callbacks in consumer config and one worker will perform your business logic on multiple queues. -#### Multiple consumers -If you need several consumers you can list respective entries in the configuration, but that would require a separate worker(daemon process) for each of that consumers. While it can be absolutely fine in some cases if you are dealing with small queues which consuming messages really fast you may want to group them into one worker. - -This is how you can set a consumer with multiple queues: -```php - [ - // ... - 'rabbitmq' => [ - // ... - 'multipleConsumers' => [ - 'import_data' => [ - 'connection' => 'default', - 'exchange_options' => [ - 'name' => 'exchange_name', - 'type' => 'direct', - ], - 'queues' => [ - 'import_data' => [ - 'name' => 'import_data', - 'callback' => \path\to\ImportDataConsumer::class, - 'routing_keys' => ['import_data'], // Queue will be binded using routing key - ], - 'update_index' => [ - 'name' => 'update_index', - 'callback' => \path\to\UpdateIndexConsumer::class, - 'routing_keys' => ['update_index'], - ], - ], - ], - ], - ], - // ... - ], -]; -``` -Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks. +Be sure that all queues and exchanges are defined in corresponding bindings, it's up to you to set up correct message routing. #### Lifecycle events There are also couple of lifecycle events implemented: before_consume, after_consume, before_publish, after_publish. You can use them for any additional work you need to do before or after message been consumed/published. For example, reopen database connection for it not to be closed by timeout as a consumer is a long-running process: ```php @@ -182,8 +153,9 @@ return [ // ... 'logger' => [ 'enable' => true, - 'category' => 'amqp', - 'print_console' => true, + 'category' => 'application', + 'print_console' => false, + 'system_memory' => false, ], ], // ... @@ -195,22 +167,35 @@ Logger enabled by default, but it log messages into main application log. You ca Console commands ------------- Extension provides several console commands: -- **rabbitmq-consumer/single** - Run consumer(one instance per queue) -- **rabbitmq-consumer/multiple** - Run consumer(one instance per multiple queues) -- **rabbitmq-consumer/setup-fabric** - Setup RabbitMQ exchanges and queues based on configuration -- **rabbitmq-producer/publish** - Pubish messages from STDIN to queue - -The most important here is single and multiple consumer commands as it start consumer processes based on consumer and multipleConsumer config respectively. +- **rabbitmq/consume** - Run a consumer +- **rabbitmq/declare-all** - Create RabbitMQ exchanges, queues and bindings based on configuration +- **rabbitmq/declare-exchange** - Create the exchange listed in configuration +- **rabbitmq/declare-queue** - Create the queue listed in configuration +- **rabbitmq/delete-all** - Delete all RabbitMQ exchanges and queues that is defined in configuration +- **rabbitmq/delete-exchange** - Delete the exchange +- **rabbitmq/delete-queue** - Delete the queue +- **rabbitmq/publish** - Publish a message from STDIN to the queue +- **rabbitmq/purge-queue** - Delete all messages from the queue -As PHP daemon especially based upon a framework may be prone to memory leaks, it may be reasonable to limit the number of messages to consume and stop: +To start a consumer: ``` -yii rabbitmq-consumer/single import_data -m=10 +yii rabbitmq-consumer/single YOUR_CONSUMER_NAME ``` In this case, you can use process control system, like Supervisor, to restart consumer process and this way keep your worker run continuously. +#### Message limit +As PHP daemon especially based upon a framework may be prone to memory leaks, it may be reasonable to limit the number of messages to consume and stop: +``` +--memoryLimit, -l: (defaults to 0) +--messagesLimit, -m: (defaults to 0) +``` +#### Auto-declare +By default extension configured in auto-declare mode, which means that on every message published exchanges, queues and bindings will be checked and created if missing. If performance means much to your application you should disable that feature in configuration and use console commands to declare and delete routing schema by yourself. Usage ------------- -As the consumer worker will read messages from the queue, it executes a callback and passes a message to it. Callback class should implement ConsumerInterface: +As the consumer worker will read messages from the queue, execute a callback method and pass a message to it. +#### Consume +In order a class to become a callback it should implement ConsumerInterface: ```php body); + $data = $msg->body; + // Apply your business logic here - if ($this->isValid($data)) { - // Apply your business logic here - - return ConsumerInterface::MSG_ACK; - } + return ConsumerInterface::MSG_ACK; } } ``` -You can format your message as you wish(JSON, XML, etc) the only restriction is that it should be a string. Here is an example how you can publish a message: +You can publish any data type(object, int, array etc), despite the fact that RabbitMQ will transfer payload as a string here in consumer $msg->body your data will be of the same type it was sent. +#### Return codes +As for the return codes there is a bunch of them in order for you to control following processing of the message by the broker: +- **ConsumerInterface::MSG_ACK** - Acknowledge message (mark as processed) and drop it from the queue +- **ConsumerInterface::MSG_REJECT** - Reject and drop message from the queue +- **ConsumerInterface::MSG_REJECT_REQUEUE** - Reject and requeue message in RabbitMQ +#### Publish + Here is an example how you can publish a message: ```php -\Yii::$app->rabbitmq->load(); -$producer = \Yii::$container->get(sprintf('rabbit_mq.producer.%s', 'import_data')); -$msg = serialize(['dataset_id' => $dataset->id, 'linked_datasets' => []]); -$producer->publish($msg, 'import_data'); +$producer = \Yii::$app->rabbitmq->getProducer('YOUR_PRODUCER_NAME'); +$msg = serialize(['dataset_id' => 657, 'linked_datasets' => []]); +$producer->publish($msg, 'YOUR_EXCHANGE_NAME', 'YOUR_ROUTING_KEY'); ``` -This template for a service name 'rabbit_mq.producer.%s' is also available as a constant mikemadisonweb\rabbitmq\components\BaseRabbitMQ::PRODUCER_SERVICE_NAME. It's needed because producer classes are lazy loaded, that means they are only got created on demand. Likewise the Connection class also got created on demand, that means a connection to RabbitMQ would not be established on each request. +Routing key as third parameter is optional, which can be the case for fanout exchanges. -Defaults -------------- -All default options are taken from php-amqplib library. If you are not familiar with meanings of some of these options, you can find them in [AMQP 0-9-1 Complete Reference Guide](http://www.rabbitmq.com/amqp-0-9-1-reference.html). +By default connection to broker only get established upon publishing a message, it would not try to connect on each HTTP request if there is no need to. -For example, to declare an exchange you should provide name and type for it. Other optional parameters with corresponding default values are listed below: +Options +------------- +All configuration options: ```php - $queueDefaults = [ - 'passive' => false, - 'durable' => false, - 'auto_delete' => true, - 'internal' => false, - 'nowait' => false, - 'arguments' => null, - 'ticket' => null, - 'declare' => true, +$rabbitmq_defaults = [ + 'auto_declare' => true, + 'connections' => [ + [ + 'name' => self::DEFAULT_CONNECTION_NAME, + 'type' => AMQPLazyConnection::class, + 'url' => null, + 'host' => null, + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => null, + 'keepalive' => false, + 'heartbeat' => 0, + ], + ], + 'exchanges' => [ + [ + 'name' => null, + 'type' => null, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + 'internal' => false, + 'nowait' => false, + 'arguments' => null, + 'ticket' => null, + 'declare' => true, + ], + ], + 'queues' => [ + [ + 'name' => '', + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'nowait' => false, + 'arguments' => null, + 'ticket' => null, + 'declare' => true, + ], + ], + 'bindings' => [ + [ + 'exchange' => null, + 'queue' => null, + 'toExchange' => null, + 'routingKeys' => [], + ], + ], + 'producers' => [ + [ + 'name' => null, + 'connection' => self::DEFAULT_CONNECTION_NAME, + 'safe' => true, + 'content_type' => 'text/plain', + 'delivery_mode' => 2, + 'serializer' => 'serialize', + ], + ], + 'consumers' => [ + [ + 'name' => null, + 'connection' => self::DEFAULT_CONNECTION_NAME, + 'callbacks' => [], + 'qos' => [ + 'prefetch_size' => 0, + 'prefetch_count' => 0, + 'global' => false, + ], + 'idle_timeout' => null, + 'idle_timeout_exit_code' => null, + 'proceed_on_exception' => false, + 'deserializer' => 'unserialize', + ], + ], + 'logger' => [ + 'log' => false, + 'category' => 'application', + 'print_console' => true, + 'system_memory' => false, + ], ]; ``` +##### Exchange +For example, to declare an exchange you should provide name and type for it. + +parameter | required | type | default | comments +--- | --- | --- | --- | --- +name | yes | string | | The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon. +type | yes | string | | Type of the exchange, possible values are `direct`, `fanout`, `topic` and `headers`. +declare | no | boolean | true | Whether to declare a exchange on sending or consuming messages. +passive | no | boolean | false | If set to true, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. +durable | no | boolean | false | Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. +auto_delete | no | boolean | true | If set to true, the exchange would be deleted when no queues are binded to it anymore. +internal | no | boolean | false | Internal exchange may not be used directly by publishers, but only when bound to other exchanges. +nowait | no | boolean | false | Client may send next request immediately after sending the first one, no waiting for reply is required +arguments | no | array | null | A set of arguments for the declaration. +ticket | no | integer | null | Access ticket + +Good use-case of the `arguments` parameter usage can be a creation of a [dead-letter-exchange](https://github.com/php-amqplib/php-amqplib/blob/master/demo/queue_arguments.php#L17). +##### Queue As for the queue declaration, all parameters are optional. Even if you does not provide a name for your queue server will generate unique name for you: -```php - $queueDefaults = [ - 'name' => '', - 'passive' => false, - 'durable' => false, - 'exclusive' => false, - 'auto_delete' => true, - 'nowait' => false, - 'arguments' => null, - 'ticket' => null, - 'declare' => true, - ]; -``` -Beware that not all these options are allowed to be changed 'on-the-fly', in other words after queue or exchange had already been created. Otherwise, you will receive an error. -Logger default settings: -```php - $loggerDefaults = [ - 'enable' => true, - 'category' => 'application', - 'print_console' => false, - 'system_memory' => false, - ]; -``` +parameter | required | type | default | comments +--- | --- | --- | --- | --- +name | no | string | '' | The queue name can be empty, or a sequence of these characters: letters, digits, hyphen, underscore, period, or colon. +declare | no | boolean | true | Whether to declare a queue on sending or consuming messages. +passive | no | boolean | false | If set to true, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. +durable | no | boolean | false | Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. +auto_delete | no | boolean | true | If set to true, the queue is deleted when all consumers have finished using it. +exclusive | no | boolean | false | Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed. +nowait | no | boolean | false | Client may send next request immediately after sending the first one, no waiting for reply is required +arguments | false | array | null | A set of arguments for the declaration. +ticket | no | integer | null | Access ticket + +Complete explanation about options, their defaults and valuable details can be found in [AMQP 0-9-1 Reference Guide](http://www.rabbitmq.com/amqp-0-9-1-reference.html). + +Beware that not all these options are allowed to be changed 'on-the-fly', in other words after queue or exchange had already been created. Otherwise, you will receive an error. + +Breaking Changes +------------- +Since version 1.\* this extension was completely rewritten internally and can be considered brand new. However, the following key differences can be distinguished: +- PHP version 7.0 and above required +- Configuration format changed +- All extension components get automatically loaded using [Yii2 Bootstraping](http://www.yiiframework.com/doc-2.0/guide-structure-extensions.html#bootstrapping-classes) +- Different connection types supported +- All extension components are registered in DIC as singletons +- Routing component added to control schema in broker +- Queue and exchange default options changed +- Console commands are joined into one controller class which is added automatically and doesn't need to be configured +- New console commands added to manipulate with routing schema +- All data types are supported for message payload +- Consumer handles control signals in a predictable manner \ No newline at end of file diff --git a/README_v1.md b/README_v1.md new file mode 100644 index 0000000..f92ea9d --- /dev/null +++ b/README_v1.md @@ -0,0 +1,293 @@ +RabbitMQ Extension for Yii2 +================== +Wrapper based on php-amqplib to incorporate messaging in your Yii2 application via RabbitMQ. Inspired by RabbitMqBundle for Symfony framework which is awesome. + +This documentation is relevant for the latest stable version of the extension. + +[![Latest Stable Version](https://poser.pugx.org/mikemadisonweb/yii2-rabbitmq/v/stable)](https://packagist.org/packages/mikemadisonweb/yii2-rabbitmq) +[![License](https://poser.pugx.org/mikemadisonweb/yii2-rabbitmq/license)](https://packagist.org/packages/mikemadisonweb/yii2-rabbitmq) + +Installation +------------ +The preferred way to install this extension is through [composer](http://getcomposer.org/download/). + +Either run +``` +php composer.phar require mikemadisonweb/yii2-rabbitmq +``` +or add +```json +"mikemadisonweb/yii2-rabbitmq": "^1.7.0" +``` +to the require section of your `composer.json` file. + +Configuration +------------- +This extension facilitates creation of RabbitMQ [producers and consumers](https://www.rabbitmq.com/tutorials/tutorial-three-php.html) to meet your specific needs. This is an example basic config: +```php + [ + // ... + 'rabbitmq' => [ + 'class' => 'mikemadisonweb\rabbitmq\Configuration', + 'connections' => [ + 'default' => [ + 'host' => '127.0.0.1', + 'port' => '5672', + 'user' => 'your_username', + 'password' => 'your_password', + 'vhost' => '/', + 'heartbeat' => 0, + ], + ], + 'producers' => [ + 'import_data' => [ + 'connection' => 'default', + 'exchange_options' => [ + 'name' => 'import_data', + 'type' => 'direct', + ], + 'queue_options' => [ + 'declare' => false, // Use this if you don't want to create a queue on producing messages + ], + ], + ], + 'consumers' => [ + 'import_data' => [ + 'connection' => 'default', + 'exchange_options' => [ + 'name' => 'import_data', + 'type' => 'direct', + ], + 'queue_options' => [ + 'name' => 'import_data', // Queue name which will be binded to the exchange adove + 'routing_keys' => ['import_data'], // Name of the exchange to bind to + 'durable' => true, + 'auto_delete' => false, + ], + // Or just '\path\to\ImportDataConsumer' in PHP 5.4 + 'callback' => \path\to\ImportDataConsumer::class, + ], + ], + ], + // ... + ], + // should be in console.php + 'controllerMap' => [ + 'rabbitmq-consumer' => \mikemadisonweb\rabbitmq\controllers\ConsumerController::class, + 'rabbitmq-producer' => \mikemadisonweb\rabbitmq\controllers\ProducerController::class, + ], + // ... +]; +``` +To use this extension you should be familiar with the basic concepts of RabbitMQ. If you are not confident in your knowledge I suggest reading [this article](https://mikemadisonweb.github.io/2017/05/04/tldr-series-rabbitmq/). + +The 'callback' parameter can be a class name or a service name from [dependency injection container](http://www.yiiframework.com/doc-2.0/yii-di-container.html). Starting from Yii version 2.0.11 you can configure your container like this: +```php + [ + 'definitions' => [], + 'singletons' => [ + 'rabbitmq.import-data.consumer' => [ + [ + 'class' => \path\to\ImportDataConsumer::class, + ], + [ + 'some-dependency' => Instance::of('dependency-service-name'), + ], + ], + ], + ], +]; +``` + +#### Multiple consumers +If you need several consumers you can list respective entries in the configuration, but that would require a separate worker(daemon process) for each of that consumers. While it can be absolutely fine in some cases if you are dealing with small queues which consuming messages really fast you may want to group them into one worker. + +This is how you can set a consumer with multiple queues: +```php + [ + // ... + 'rabbitmq' => [ + // ... + 'multipleConsumers' => [ + 'import_data' => [ + 'connection' => 'default', + 'exchange_options' => [ + 'name' => 'exchange_name', + 'type' => 'direct', + ], + 'queues' => [ + 'import_data' => [ + 'name' => 'import_data', + 'callback' => \path\to\ImportDataConsumer::class, + 'routing_keys' => ['import_data'], // Queue will be binded using routing key + // Other optional settings can be listed here (like in queue_options) + 'durable' => true, + ], + 'update_index' => [ + 'name' => 'update_index', + 'callback' => \path\to\UpdateIndexConsumer::class, + 'routing_keys' => ['update_index'], + // Refer to the Options section for more + 'exclusive' => true, // Optional + ], + ], + ], + ], + ], + // ... + ], +]; +``` +Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks. +#### Lifecycle events +There are also couple of lifecycle events implemented: before_consume, after_consume, before_publish, after_publish. You can use them for any additional work you need to do before or after message been consumed/published. For example, reopen database connection for it not to be closed by timeout as a consumer is a long-running process: +```php + [ + // ... + 'rabbitmq' => [ + // ... + 'on before_consume' => function ($event) { + if (isset(\Yii::$app->db)) { + $db = \Yii::$app->db; + if ($db->getIsActive()) { + $db->close(); + } + $db->open(); + } + }, + ], + // ... + ], +]; +``` +#### Logger +Last but not least is logger configuration which is also optional: +```php + [ + // ... + 'rabbitmq' => [ + // ... + 'logger' => [ + 'enable' => true, + 'category' => 'application', + 'print_console' => false, + 'system_memory' => false, + ], + ], + // ... + ], +]; +``` +Logger enabled by default, but it log messages into main application log. You can change that by setting your own log target and specify corresponding category name, like 'amqp' is set above. Option 'print_console' disabled by default, it give you additional information while debugging a consumer in you console. + +Console commands +------------- +Extension provides several console commands: +- **rabbitmq-consumer/single** - Run consumer(one instance per queue) +- **rabbitmq-consumer/multiple** - Run consumer(one instance per multiple queues) +- **rabbitmq-consumer/setup-fabric** - Setup RabbitMQ exchanges and queues based on configuration +- **rabbitmq-producer/publish** - Pubish messages from STDIN to queue + +The most important here is single and multiple consumer commands as it start consumer processes based on consumer and multipleConsumer config respectively. + +As PHP daemon especially based upon a framework may be prone to memory leaks, it may be reasonable to limit the number of messages to consume and stop: +``` +yii rabbitmq-consumer/single import_data -m=10 +``` +In this case, you can use process control system, like Supervisor, to restart consumer process and this way keep your worker run continuously. + +Usage +------------- +As the consumer worker will read messages from the queue, it executes a callback and passes a message to it. Callback class should implement ConsumerInterface: +```php +body); + + if ($this->isValid($data)) { + // Apply your business logic here + + return ConsumerInterface::MSG_ACK; + } + } +} +``` +You can format your message as you wish(JSON, XML, etc) the only restriction is that it should be a string. Here is an example how you can publish a message: +```php +\Yii::$app->rabbitmq->load(); +$producer = \Yii::$container->get(sprintf('rabbit_mq.producer.%s', 'import_data')); +$msg = serialize(['dataset_id' => $dataset->id, 'linked_datasets' => []]); +$producer->publish($msg, 'import_data'); +``` +This template for a service name 'rabbit_mq.producer.%s' is also available as a constant mikemadisonweb\rabbitmq\components\BaseRabbitMQ::PRODUCER_SERVICE_NAME. It's needed because producer classes are lazy loaded, that means they are only got created on demand. Likewise the Connection class also got created on demand, that means a connection to RabbitMQ would not be established on each request. + +Options +------------- +All default options are taken from php-amqplib library. Complete explanation about options, their defaults and valuable details can be found in [AMQP 0-9-1 Reference Guide](http://www.rabbitmq.com/amqp-0-9-1-reference.html). + +##### Exchange +For example, to declare an exchange you should provide name and type for it. + +parameter | required | type | default | comments +--- | --- | --- | --- | --- +name | yes | string | | The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon. +type | yes | string | | Type of the exchange, possible values are `direct`, `fanout`, `topic` and `headers`. +declare | no | boolean | true | Whether to declare a exchange on sending or consuming messages. +passive | no | boolean | false | If set to true, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. +durable | no | boolean | false | Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. +auto_delete | no | boolean | true | If set to true, the exchange would be deleted when no queues are binded to it anymore. +internal | no | boolean | false | Internal exchange may not be used directly by publishers, but only when bound to other exchanges. +nowait | no | boolean | false | Client may send next request immediately after sending the first one, no waiting for reply is required +arguments | no | array | null | A set of arguments for the declaration. +ticket | no | integer | null | Access ticket + +Good use-case of the `arguments` parameter usage can be a creation of a [dead-letter-exchange](https://github.com/php-amqplib/php-amqplib/blob/master/demo/queue_arguments.php#L17). +##### Queue +As for the queue declaration, all parameters are optional. Even if you does not provide a name for your queue server will generate unique name for you: + +parameter | required | type | default | comments +--- | --- | --- | --- | --- +name | no | string | '' | The queue name can be empty, or a sequence of these characters: letters, digits, hyphen, underscore, period, or colon. +declare | no | boolean | true | Whether to declare a queue on sending or consuming messages. +passive | no | boolean | false | If set to true, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. +durable | no | boolean | false | Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. +auto_delete | no | boolean | true | If set to true, the queue is deleted when all consumers have finished using it. +exclusive | no | boolean | false | Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed. +nowait | no | boolean | false | Client may send next request immediately after sending the first one, no waiting for reply is required +arguments | false | array | null | A set of arguments for the declaration. +ticket | no | integer | null | Access ticket + +Beware that not all these options are allowed to be changed 'on-the-fly', in other words after queue or exchange had already been created. Otherwise, you will receive an error. \ No newline at end of file diff --git a/composer.json b/composer.json index 8446d4b..c9d3f81 100644 --- a/composer.json +++ b/composer.json @@ -13,7 +13,7 @@ "require": { "php": ">=7.0", "yiisoft/yii2": "^2.0", - "php-amqplib/php-amqplib": "^2.6" + "php-amqplib/php-amqplib": "^2.7" }, "require-dev": { "phpunit/phpunit": "^6.4",