Skip to content

Commit

Permalink
allowing multiple queues for AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
weaverryan authored and Guillaume Gammelin committed Mar 31, 2019
1 parent 5de9d2e commit a511e91
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 65 deletions.
1 change: 0 additions & 1 deletion UPGRADE-4.3.md
Expand Up @@ -92,7 +92,6 @@ Messenger

* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
* Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead.
* Deprecated routing key from queue configuration (`queue[routing_key]` in the DSN), use exchange configuration instead (`exchange[routing_key]` in the DSN).

Routing
-------
Expand Down
1 change: 0 additions & 1 deletion UPGRADE-5.0.md
Expand Up @@ -237,7 +237,6 @@ Messenger
---------

* The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead.
* Routing key from queue configuration has been removed. Use exchange configuration instead (`exchange[routing_key]` in DSN).

Monolog
-------
Expand Down
8 changes: 6 additions & 2 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -4,8 +4,14 @@ CHANGELOG
4.3.0
-----

<<<<<<< HEAD
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
=======
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
>>>>>>> b357f5f668... allowing multiple queues for AMQP
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
Expand Down Expand Up @@ -58,8 +64,6 @@ CHANGELOG
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.

4.2.0
-----
Expand Down
Expand Up @@ -20,9 +20,6 @@
*/
final class AmqpRoutingKeyStamp implements StampInterface
{
/**
* @var string
*/
private $routingKey;

public function __construct(string $routingKey)
Expand Down
137 changes: 79 additions & 58 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Expand Up @@ -35,7 +35,7 @@ class Connection

private $connectionConfiguration;
private $exchangeConfiguration;
private $queueConfiguration;
private $queuesConfiguration;
private $amqpFactory;

/**
Expand All @@ -51,7 +51,7 @@ class Connection
/**
* @var \AMQPQueue|null
*/
private $amqpQueue;
private $amqpQueues = [];

/**
* @var \AMQPExchange|null
Expand All @@ -68,14 +68,14 @@ class Connection
* * vhost: Virtual Host to use with the AMQP service
* * user: Username to use to connect the the AMQP service
* * password: Password to use the connect to the AMQP service
* * queue:
* * name: Name of the queue
* * routing_key: The routing key (if any) to use to push the messages to
* * queues[name]: An array of queues, keyed by the name
* * routing_keys: The routing keys (if any) to bind to this queue
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
* * name: Name of the exchange
* * type: Type of exchange (Default: fanout)
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
Expand All @@ -86,7 +86,7 @@ class Connection
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
{
$this->connectionConfiguration = array_replace_recursive([
'delay' => [
Expand All @@ -96,7 +96,7 @@ public function __construct(array $connectionConfiguration, array $exchangeConfi
],
], $connectionConfiguration);
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->queuesConfiguration = $queuesConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}

Expand All @@ -111,8 +111,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
'host' => $parsedUrl['host'] ?? 'localhost',
'port' => $parsedUrl['port'] ?? 5672,
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
'queue' => [
'name' => $queueName = $pathParts[1] ?? 'messages',
'queues' => [
[
'name' => $queueName = $pathParts[1] ?? 'messages',
],
],
'exchange' => [
'name' => $queueName,
Expand All @@ -134,14 +136,18 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
}

$exchangeOptions = $amqpOptions['exchange'];
$queueOptions = $amqpOptions['queue'];
unset($amqpOptions['queue'], $amqpOptions['exchange']);
$queuesOptions = $amqpOptions['queues'];
unset($amqpOptions['queues'], $amqpOptions['exchange']);

if (\is_array($queueOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}
$queuesOptions = array_map(function (array $queueOptions) {
if (\is_array($queuesOptions['arguments'] ?? false)) {
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}

return $queueOptions;
}, $queuesOptions);

return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
}

private static function normalizeQueueArguments(array $arguments): array
Expand Down Expand Up @@ -178,9 +184,11 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
$this->setup();
}

$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);
$routingKey = $routingKey ?? $this->getExchangeRoutingKey();
// TODO - allow flag & attributes to be configured on the message
$flags = [];
$attributes = [];
$attributes = array_merge_recursive($attributes, ['headers' => $headers]);
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();

$this->exchange()->publish($body, $routingKey, $flags, $attributes);
}
Expand All @@ -194,14 +202,16 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str
$this->setupDelay($delay, $exchangeRoutingKey);
}

// TODO - allow flag & attributes to be configured on the message
$flags = [];
$attributes = [];
$attributes = array_merge_recursive($attributes, ['headers' => $headers]);
$routingKey = $this->getRoutingKeyForDelay($delay);
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);

$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
}

private function setupDelay(int $delay, ?string $exchangeRoutingKey)
private function setupDelay(int $delay, ?string $routingKey)
{
if (!$this->channel()->isConnected()) {
$this->clear();
Expand All @@ -210,7 +220,7 @@ private function setupDelay(int $delay, ?string $exchangeRoutingKey)
$exchange = $this->getDelayExchange();
$exchange->declareExchange();

$queue = $this->createDelayQueue($delay, $exchangeRoutingKey);
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
}
Expand All @@ -235,7 +245,7 @@ private function getDelayExchange(): \AMQPExchange
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
private function createDelayQueue(int $delay, ?string $routingKey)
{
$delayConfiguration = $this->connectionConfiguration['delay'];

Expand All @@ -246,10 +256,10 @@ private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);

$exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey();
if (null !== $exchangeRoutingKey) {
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey);
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
}

return $queue;
Expand All @@ -261,18 +271,18 @@ private function getRoutingKeyForDelay(int $delay): string
}

/**
* Waits and gets a message from the configured queue.
* Gets a message from the specified queue.
*
* @throws \AMQPException
*/
public function get(): ?\AMQPEnvelope
public function get(string $queueName): ?\AMQPEnvelope
{
if ($this->shouldSetup()) {
$this->setup();
}

try {
if (false !== $message = $this->queue()->get()) {
if (false !== $message = $this->queue($queueName)->get()) {
return $message;
}
} catch (\AMQPQueueException $e) {
Expand All @@ -289,14 +299,14 @@ public function get(): ?\AMQPEnvelope
return null;
}

public function ack(\AMQPEnvelope $message): bool
public function ack(\AMQPEnvelope $message, string $queueName): bool
{
return $this->queue()->ack($message->getDeliveryTag());
return $this->queue($queueName)->ack($message->getDeliveryTag());
}

public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQP_NOPARAM): bool
{
return $this->queue()->nack($message->getDeliveryTag(), $flags);
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
}

public function setup(): void
Expand All @@ -307,10 +317,25 @@ public function setup(): void

$this->exchange()->declareExchange();

$this->queue()->declareQueue();
$this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null);
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['routing_keys'] ?? [] as $routingKey) {
$this->queue($queueName)->bind($this->exchange()->getName(), $routingKey);
}
}
}

/**
* @return string[]
*/
public function getAllQueueNames(): array
{
return array_keys($this->queuesConfiguration);
}

/**
* @internal
*/
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
Expand All @@ -335,22 +360,29 @@ public function channel(): \AMQPChannel
return $this->amqpChannel;
}

public function queue(): \AMQPQueue
/**
* @internal
*/
public function queue(string $queueName): \AMQPQueue
{
if (null === $this->amqpQueue) {
$this->amqpQueue = $this->amqpFactory->createQueue($this->channel());
$this->amqpQueue->setName($this->queueConfiguration['name']);
$this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE);
if (!isset($this->amqpQueues[$queueName])) {
$queueConfig = $this->queuesConfiguration[$queueName];

if (isset($this->queueConfiguration['arguments'])) {
$this->amqpQueue->setArguments($this->queueConfiguration['arguments']);
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
$amqpQueue->setName($queueConfig['name']);
$amqpQueue->setFlags($queueConfig['flags'] ?? AMQP_DURABLE);

if (isset($queueConfig['arguments'])) {
$amqpQueue->setArguments($queueConfig['arguments']);
}

$this->amqpQueues[$queueName] = $amqpQueue;
}

return $this->amqpQueue;
return $this->amqpQueues[$queueName];
}

public function exchange(): \AMQPExchange
private function exchange(): \AMQPExchange
{
if (null === $this->amqpExchange) {
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
Expand All @@ -374,7 +406,7 @@ public function getConnectionConfiguration(): array
private function clear(): void
{
$this->amqpChannel = null;
$this->amqpQueue = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
}

Expand All @@ -391,19 +423,8 @@ private function shouldSetup(): bool
return true;
}

private function getAttributes(array $headers): array
private function getDefaultPublishRoutingKey(): ?string
{
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
}

private function getExchangeRoutingKey(): ?string
{
$routingKey = $this->exchangeConfiguration['routing_key'] ?? null;
if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) {
$routingKey = $this->queueConfiguration['routing_key'];
@trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED);
}

return $routingKey;
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
}
}

0 comments on commit a511e91

Please sign in to comment.