diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 3c4b9fd5..4843ce12 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -17,51 +17,63 @@ * - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib * - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny */ - 'factory_class' => \Enqueue\AmqpLib\AmqpConnectionFactory::class, + 'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class, 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), - 'vhost' => env('RABBITMQ_VHOST', '/'), - 'login' => env('RABBITMQ_LOGIN', 'guest'), + 'vhost' => env('RABBITMQ_VHOST', '/'), + 'login' => env('RABBITMQ_LOGIN', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), - /* - * The name of default queue. - */ - 'queue' => env('RABBITMQ_QUEUE'), + 'options' => [ - /* - * Determine if exchange should be created if it does not exist. - */ - 'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), + 'exchange' => [ - /* - * Determine if queue should be created if it does not exist. - */ - 'queue_declare' => env('RABBITMQ_QUEUE_DECLARE', true), + 'name' => env('RABBITMQ_EXCHANGE_NAME'), - /* - * Determine if queue should be created and binded to the exchange if it does not exist. - */ - 'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), + /* + * Determine if exchange should be created if it does not exist. + */ + 'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), + + /* + * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html + */ + 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT), + 'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false), + 'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), + 'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false), + ], + + 'queue' => [ + + /* + * The name of default queue. + */ + 'name' => env('RABBITMQ_QUEUE', 'default'), + + /* + * Determine if queue should be created if it does not exist. + */ + 'declare' => env('RABBITMQ_QUEUE_DECLARE', true), + + /* + * Determine if queue should be binded to the exchange created. + */ + 'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), + + /* + * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html + */ + 'passive' => env('RABBITMQ_QUEUE_PASSIVE', false), + 'durable' => env('RABBITMQ_QUEUE_DURABLE', true), + 'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), + 'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), + 'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'), + + ], - /* - * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html - */ - 'queue_params' => [ - 'passive' => env('RABBITMQ_QUEUE_PASSIVE', false), - 'durable' => env('RABBITMQ_QUEUE_DURABLE', true), - 'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), - 'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), - 'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'), - ], - 'exchange_params' => [ - 'name' => env('RABBITMQ_EXCHANGE_NAME'), - 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT), - 'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false), - 'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), - 'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false), ], /* @@ -74,12 +86,12 @@ * Optional SSL params if an SSL connection is used */ 'ssl_params' => [ - 'ssl_on' => env('RABBITMQ_SSL', false), - 'cafile' => env('RABBITMQ_SSL_CAFILE', null), - 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), - 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), - 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), - 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), - ] + 'ssl_on' => env('RABBITMQ_SSL', false), + 'cafile' => env('RABBITMQ_SSL_CAFILE', null), + 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), + 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), + 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), + 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), + ], ]; diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index ab1b26ea..8e8e0232 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -10,6 +10,7 @@ use Illuminate\Queue\Events\WorkerStopping; use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory; use Interop\Amqp\AmqpConnectionFactory; +use Interop\Amqp\AmqpContext; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; class RabbitMQConnector implements ConnectorInterface @@ -33,12 +34,12 @@ public function __construct(Dispatcher $dispatcher) */ public function connect(array $config): Queue { - if (false == array_key_exists('factory_class', $config)) { + if (false === array_key_exists('factory_class', $config)) { throw new \LogicException('The factory_class option is missing though it is required.'); } $factoryClass = $config['factory_class']; - if (false == class_exists($factoryClass) || false == (new \ReflectionClass($factoryClass))->implementsInterface(InteropAmqpConnectionFactory::class)) { + if (false === class_exists($factoryClass) || false === (new \ReflectionClass($factoryClass))->implementsInterface(InteropAmqpConnectionFactory::class)) { throw new \LogicException(sprintf('The factory_class option has to be valid class that implements "%s"', InteropAmqpConnectionFactory::class)); } @@ -62,6 +63,7 @@ public function connect(array $config): Queue $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy()); } + /** @var AmqpContext $context */ $context = $factory->createContext(); $this->dispatcher->listen(WorkerStopping::class, function () use ($context) {