Skip to content

Commit

Permalink
allow queue to bind to multiple exchanges
Browse files Browse the repository at this point in the history
fixes: #20
  • Loading branch information
prolic committed Jul 16, 2016
1 parent cd191d8 commit 64d5fc6
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 37 deletions.
53 changes: 41 additions & 12 deletions src/Container/QueueFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,37 @@ public function __invoke(ContainerInterface $container) : Queue

if ($this->autoSetupFabric || $options['auto_setup_fabric']) {
// auto setup fabric depended exchange
$exchangeName = $options['exchange'];
ExchangeFactory::$exchangeName($container, $this->channel, true);

if (isset($options['arguments']['x-dead-letter-exchange'])) {
// auto setup fabric dead letter exchange
$exchangeName = $options['arguments']['x-dead-letter-exchange'];
ExchangeFactory::$exchangeName($container, $this->channel, true);
}

$exchanges = $options['exchanges'];

if ($exchanges instanceof \Traversable) {
$exchanges = iterator_to_array($exchanges);
}

if (! is_array($exchanges) || empty($exchanges)) {
throw new Exception\InvalidArgumentException('Expected an array or traversable of exchanges');
}

foreach ($exchanges as $exchange => $exchangeOptions) {
ExchangeFactory::$exchange($container, $this->channel, true);
}

$queue->declareQueue();

$routingKeys = $options['routing_keys'];
if (empty($routingKeys)) {
$queue->bind($options['exchange'], '', $options['bind_arguments']);
} else {
foreach ($routingKeys as $routingKey) {
$queue->bind($options['exchange'], $routingKey, $options['bind_arguments']);
foreach ($exchanges as $exchange => $exchangeOptions) {
if (empty($exchangeOptions)) {
$this->bindQueue($queue, $exchange, [], []);
} else {
foreach ($exchangeOptions as $exchangeOption) {
$routingKeys = $exchangeOption['routing_keys'] ?? [];
$bindArguments = $exchangeOption['bind_arguments'] ?? [];
$this->bindQueue($queue, $exchange, $routingKeys, $bindArguments);
}
}
}
}
Expand Down Expand Up @@ -184,8 +198,6 @@ public function defaultOptions()
'exclusive' => false,
'auto_delete' => false,
'arguments' => [],
'routing_keys' => [],
'bind_arguments' => [],
// factory configs
'auto_setup_fabric' => false
];
Expand All @@ -198,7 +210,7 @@ public function mandatoryOptions()
{
return [
'connection',
'exchange',
'exchanges',
];
}

Expand All @@ -216,4 +228,21 @@ private function getFlags($options)

return $flags;
}

/**
* @param Queue $queue
* @param string $exchange
* @param array $routingKeys
* @param array $bindArguments
*/
private function bindQueue(Queue $queue, string $exchange, array $routingKeys, array $bindArguments)
{
if (empty($routingKeys)) {
$queue->bind($exchange, '', $bindArguments);
} else {
foreach ($routingKeys as $routingKey) {
$queue->bind($exchange, $routingKey, $bindArguments);
}
}
}
}
4 changes: 3 additions & 1 deletion tests/Console/Command/PurgeQueueCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public function it_purges_the_queue()
'queue' => [
'foo' => [
'name' => 'foo',
'exchange' => 'demo',
'exchanges' => [
'demo' => [],
],
'connection' => 'default',
],
],
Expand Down
7 changes: 4 additions & 3 deletions tests/Console/Command/SetupFabricCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ public function it_declares_exchanges_and_queues()
'queue' => [
'foo' => [
'name' => 'foo',
'exchange' => 'demo',
'exchanges' => [
'demo' => [],
],
'connection' => 'default',
],
],
'connection' => [
'default' => [
],
'default' => [],
],
]
]
Expand Down
8 changes: 6 additions & 2 deletions tests/Container/CallbackConsumerFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public function it_creates_callback_consumer()
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'callback_consumer' => [
Expand Down Expand Up @@ -123,7 +125,9 @@ public function it_creates_callback_consumer_with_call_static_and_defined_logger
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'callback_consumer' => [
Expand Down
12 changes: 9 additions & 3 deletions tests/Container/JsonRpcClientFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public function it_creates_json_rpc_client()
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'json_rpc_client' => [
Expand Down Expand Up @@ -129,7 +131,9 @@ public function it_creates_json_rpc_client_with_call_static()
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'json_rpc_client' => [
Expand Down Expand Up @@ -210,7 +214,9 @@ public function it_throws_exception_when_empty_exchanges_given()
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'json_rpc_client' => [
Expand Down
8 changes: 6 additions & 2 deletions tests/Container/JsonRpcServerFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public function it_creates_json_rpc_server()
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'json_rpc_server' => [
Expand Down Expand Up @@ -133,7 +135,9 @@ public function it_creates_json_rpc_server_with_call_static_and_defined_logger()
'my_queue' => [
'connection' => 'my_connection',
'name' => 'my_queue',
'exchange' => 'my_exchange',
'exchanges' => [
'my_exchange' => [],
],
],
],
'json_rpc_server' => [
Expand Down
Loading

0 comments on commit 64d5fc6

Please sign in to comment.