diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 5282022f..025145bb 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -20,15 +20,10 @@ class RabbitMQQueue extends Queue implements QueueContract */ const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count'; - protected $declareExchange; - protected $declareQueue; - protected $declareBindQueue; protected $sleepOnError; - protected $defaultQueue; - protected $queueParameters; - protected $queueArguments; - protected $configExchange; + protected $queueOptions; + protected $exchangeOptions; private $declaredExchanges = []; private $declaredQueues = []; @@ -43,13 +38,12 @@ class RabbitMQQueue extends Queue implements QueueContract public function __construct(AmqpContext $context, array $config) { $this->context = $context; - $this->defaultQueue = $config['queue']; - $this->queueParameters = $config['queue_params']; - $this->queueArguments = isset($this->queueParameters['arguments']) ? json_decode($this->queueParameters['arguments'], true) : []; - $this->configExchange = $config['exchange_params']; - $this->declareExchange = $config['exchange_declare']; - $this->declareQueue = $config['queue_declare']; - $this->declareBindQueue = $config['queue_declare_bind']; + + $this->queueOptions = $config['options']['queue']; + $this->queueOptions['arguments'] = isset($this->queueOptions['arguments']) ? json_decode($this->queueOptions['arguments'], true) : []; + + $this->exchangeOptions = $config['options']['exchange']; + $this->sleepOnError = $config['sleep_on_error'] ?? 5; } @@ -177,49 +171,49 @@ public function getContext(): AmqpContext */ private function declareEverything(string $queueName = null): array { - $queueName = $queueName ?: $this->defaultQueue; - $exchangeName = $this->configExchange['name'] ?: $queueName; + $queueName = $queueName ?: $this->queueOptions['name']; + $exchangeName = $this->exchangeOptions['name'] ?: $queueName; $topic = $this->context->createTopic($exchangeName); - $topic->setType($this->configExchange['type']); - if ($this->configExchange['passive']) { + $topic->setType($this->exchangeOptions['type']); + if ($this->exchangeOptions['passive']) { $topic->addFlag(AmqpTopic::FLAG_PASSIVE); } - if ($this->configExchange['durable']) { + if ($this->exchangeOptions['durable']) { $topic->addFlag(AmqpTopic::FLAG_DURABLE); } - if ($this->configExchange['auto_delete']) { + if ($this->exchangeOptions['auto_delete']) { $topic->addFlag(AmqpTopic::FLAG_AUTODELETE); } - if ($this->declareExchange && !in_array($exchangeName, $this->declaredExchanges, true)) { + if ($this->exchangeOptions['declare'] && !in_array($exchangeName, $this->declaredExchanges, true)) { $this->context->declareTopic($topic); $this->declaredExchanges[] = $exchangeName; } $queue = $this->context->createQueue($queueName); - $queue->setArguments($this->queueArguments); - if ($this->queueParameters['passive']) { + $queue->setArguments($this->queueOptions['arguments']); + if ($this->queueOptions['passive']) { $queue->addFlag(AmqpQueue::FLAG_PASSIVE); } - if ($this->queueParameters['durable']) { + if ($this->queueOptions['durable']) { $queue->addFlag(AmqpQueue::FLAG_DURABLE); } - if ($this->queueParameters['exclusive']) { + if ($this->queueOptions['exclusive']) { $queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE); } - if ($this->queueParameters['auto_delete']) { + if ($this->queueOptions['auto_delete']) { $queue->addFlag(AmqpQueue::FLAG_AUTODELETE); } - if ($this->declareQueue && !in_array($queueName, $this->declaredQueues, true)) { + if ($this->queueOptions['declare'] && !in_array($queueName, $this->declaredQueues, true)) { $this->context->declareQueue($queue); $this->declaredQueues[] = $queueName; } - if ($this->declareBindQueue) { + if ($this->queueOptions['bind']) { $this->context->bind(new AmqpBind($queue, $topic, $queue->getQueueName())); } diff --git a/tests/Functional/SendAndReceiveDelayedMessageTest.php b/tests/Functional/SendAndReceiveDelayedMessageTest.php index be70503d..03c33c0b 100644 --- a/tests/Functional/SendAndReceiveDelayedMessageTest.php +++ b/tests/Functional/SendAndReceiveDelayedMessageTest.php @@ -27,23 +27,26 @@ public function test() 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', - 'queue' => 'queue_name', - 'exchange_declare' => true, - 'queue_declare' => true, - 'queue_declare_bind' => true, - 'queue_params' => [ - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'auto_delete' => false, - 'arguments' => null, - ], - 'exchange_params' => [ - 'name' => null, - 'type' => AmqpTopic::TYPE_DIRECT, - 'passive' => false, - 'durable' => true, - 'auto_delete' => false, + 'options' => [ + 'exchange' => [ + 'name' => null, + 'declare' => true, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'default', + 'declare' => true, + 'bind' => true, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], ], 'ssl_params' => [ 'ssl_on' => false, @@ -63,7 +66,7 @@ public function test() // we need it to declare exchange\queue on RabbitMQ side. $queue->pushRaw('something'); - $queue->getContext()->purgeQueue($queue->getContext()->createQueue('queue_name')); + $queue->getContext()->purgeQueue($queue->getContext()->createQueue('default')); $expectedPayload = __METHOD__.microtime(true); diff --git a/tests/Functional/SendAndReceiveMessageTest.php b/tests/Functional/SendAndReceiveMessageTest.php index b6173a1c..562a9cc0 100644 --- a/tests/Functional/SendAndReceiveMessageTest.php +++ b/tests/Functional/SendAndReceiveMessageTest.php @@ -27,23 +27,26 @@ public function test() 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', - 'queue' => 'queue_name', - 'exchange_declare' => true, - 'queue_declare' => true, - 'queue_declare_bind' => true, - 'queue_params' => [ - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'auto_delete' => false, - 'arguments' => null, - ], - 'exchange_params' => [ - 'name' => null, - 'type' => AmqpTopic::TYPE_DIRECT, - 'passive' => false, - 'durable' => true, - 'auto_delete' => false, + 'options' => [ + 'exchange' => [ + 'name' => null, + 'declare' => true, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'default', + 'declare' => true, + 'bind' => true, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], ], 'ssl_params' => [ 'ssl_on' => false, @@ -63,7 +66,7 @@ public function test() // we need it to declare exchange\queue on RabbitMQ side. $queue->pushRaw('something'); - $queue->getContext()->purgeQueue($queue->getContext()->createQueue('queue_name')); + $queue->getContext()->purgeQueue($queue->getContext()->createQueue('default')); $expectedPayload = __METHOD__.microtime(true); diff --git a/tests/Functional/SslConnectionTest.php b/tests/Functional/SslConnectionTest.php index 3b5e0b80..87310f34 100644 --- a/tests/Functional/SslConnectionTest.php +++ b/tests/Functional/SslConnectionTest.php @@ -26,25 +26,26 @@ public function testConnectorEstablishSecureConnectionWithRabbitMQBroker() 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', + 'options' => [ + 'exchange' => [ + 'name' => null, + 'declare' => true, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], - 'queue' => 'queue_name', - 'exchange_declare' => true, - 'queue_declare' => true, - 'queue_declare_bind' => true, - - 'queue_params' => [ - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'auto_delete' => false, - 'arguments' => null, - ], - 'exchange_params' => [ - 'name' => null, - 'type' => AmqpTopic::TYPE_DIRECT, - 'passive' => false, - 'durable' => true, - 'auto_delete' => false, + 'queue' => [ + 'name' => 'default', + 'declare' => true, + 'bind' => true, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], ], 'ssl_params' => [ 'ssl_on' => true, diff --git a/tests/Functional/StreamConnectionTest.php b/tests/Functional/StreamConnectionTest.php index 681a8637..b1558baf 100644 --- a/tests/Functional/StreamConnectionTest.php +++ b/tests/Functional/StreamConnectionTest.php @@ -26,23 +26,26 @@ public function testConnectorEstablishSecureConnectionWithRabbitMQBroker() 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', - 'queue' => 'queue_name', - 'exchange_declare' => true, - 'queue_declare' => true, - 'queue_declare_bind' => true, - 'queue_params' => [ - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'auto_delete' => false, - 'arguments' => null, - ], - 'exchange_params' => [ - 'name' => null, - 'type' => AmqpTopic::TYPE_DIRECT, - 'passive' => false, - 'durable' => true, - 'auto_delete' => false, + 'options' => [ + 'exchange' => [ + 'name' => null, + 'declare' => true, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'default', + 'declare' => true, + 'bind' => true, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], ], 'ssl_params' => [ 'ssl_on' => false, diff --git a/tests/Queue/Connectors/RabbitMQConnectorTest.php b/tests/Queue/Connectors/RabbitMQConnectorTest.php index f8345b1e..3d2ea1bc 100644 --- a/tests/Queue/Connectors/RabbitMQConnectorTest.php +++ b/tests/Queue/Connectors/RabbitMQConnectorTest.php @@ -166,23 +166,26 @@ private function createDummyConfig() 'local_key' => 'theLocalKey', 'passphrase' => 'thePassPhrase', ], - 'queue' => 'aQueueName', - 'exchange_declare' => false, - 'queue_declare' => false, - 'queue_declare_bind' => false, - 'queue_params' => [ - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'auto_delete' => false, - 'arguments' => '[]', - ], - 'exchange_params' => [ - 'name' => 'anExchangeName', - 'type' => 'direct', - 'passive' => false, - 'durable' => true, - 'auto_delete' => false, + 'options' => [ + 'exchange' => [ + 'name' => 'anExchangeName', + 'declare' => false, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'aQueueName', + 'declare' => false, + 'bind' => false, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], ], 'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5), ]; diff --git a/tests/Queue/RabbitMQQueueTest.php b/tests/Queue/RabbitMQQueueTest.php index 2e51740e..6d1ae1b9 100644 --- a/tests/Queue/RabbitMQQueueTest.php +++ b/tests/Queue/RabbitMQQueueTest.php @@ -481,23 +481,26 @@ private function createDummyConfig() 'local_cert' => 'aLocalCert', 'local_key' => 'aLocalKey', ], - 'queue' => 'aQueueName', - 'exchange_declare' => false, - 'queue_declare' => false, - 'queue_declare_bind' => false, - 'queue_params' => [ - 'passive' => false, - 'durable' => true, - 'exclusive' => false, - 'auto_delete' => false, - 'arguments' => '[]', - ], - 'exchange_params' => [ - 'name' => 'anExchangeName', - 'type' => 'direct', - 'passive' => false, - 'durable' => true, - 'auto_delete' => false, + 'options' => [ + 'exchange' => [ + 'name' => 'anExchangeName', + 'declare' => false, + 'type' => \Interop\Amqp\AmqpTopic::TYPE_DIRECT, + 'passive' => false, + 'durable' => true, + 'auto_delete' => false, + ], + + 'queue' => [ + 'name' => 'aQueueName', + 'declare' => false, + 'bind' => false, + 'passive' => false, + 'durable' => true, + 'exclusive' => false, + 'auto_delete' => false, + 'arguments' => '[]', + ], ], 'sleep_on_error' => false, ];