Skip to content
Permalink
Browse files

feature #30225 publish message with custom queue options : flags | at…

…tributes (fedor.f, insidestyles)

This PR was merged into the 4.3-dev branch.

Discussion
----------

publish message with custom queue options : flags | attributes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | #...   <!-- #-prefixed issue number(s), if any -->
| License       | MIT
| Doc PR        | symfony/symfony-docs#... <!-- required for new features -->

option for publish persistent message:
`amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[flags]=1`
or
options:
>queue:
>>name: '%env(MESSENGER_QUEUE)%'
>>routing_key: '%env(MESSENGER_ROUTING_KEY)%'
>>attributes:
>>>delivery_mode: 2

>>flags: 1

Commits
-------

5e16053 update test case for custom queue options
4532319 publish message with custom queue options : update ConnectionTest
6f9fdaf publish message with custom queue options : flags | attributes
  • Loading branch information...
fabpot committed Feb 21, 2019
2 parents 2e6d069 + 5e16053 commit ba725c2d7029cfdcfdb7a025cf5ccc6434e51173
@@ -233,6 +233,24 @@ public function testItCanDisableTheSetup()
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
$connection->publish('body');
}
public function testPublishWithQueueOptions()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$headers = [
'type' => '*',
];
$amqpExchange->expects($this->once())->method('publish')
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory);
$connection->publish('body', $headers);
}
}
class TestAmqpFactory extends AmqpFactory
@@ -133,7 +133,10 @@ public function publish(string $body, array $headers = []): void
$this->setup();
}
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, AMQP_NOPARAM, ['headers' => $headers]);
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
}
/**

0 comments on commit ba725c2

Please sign in to comment.
You can’t perform that action at this time.