Skip to content
Permalink
Browse files

feature #30671 Add optional parameter `prefetching` for AMQP connecti…

…on (fbouchery)

This PR was merged into the 4.3-dev branch.

Discussion
----------

Add optional parameter `prefetching` for AMQP connection

Add prefetching connection parameter to setup channel prefetch count.

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        |

When setting up AMQP transport connection, it can be interesting to configure prefetching on a channel, which is not currently possible.

Commits
-------

47777ee Add optional parameter `prefetching` in connection configuration, to setup channel prefetch count
  • Loading branch information...
sroze committed Mar 28, 2019
2 parents 4184cc6 + 47777ee commit c30f462c2e54c8237d0d73f5344ca527e2998866
@@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----

* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
and `BusNameStamp` were added, which allow you to add a bus identifier
to the `Envelope` then find the correct bus when receiving from
@@ -256,6 +256,25 @@ public function testPublishWithQueueOptions()
$connection->publish('body', $headers);
}
public function testSetChannelPrefetchWhenSetup()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
$connection->setup();
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
$connection->setup();
}
public function testItDelaysTheMessage()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
@@ -84,6 +84,7 @@ class Connection
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
{
@@ -323,6 +324,10 @@ public function channel(): \AMQPChannel
throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s)', json_encode($credentials)), 0, $e);
}
$this->amqpChannel = $this->amqpFactory->createChannel($connection);
if (isset($this->connectionConfiguration['prefetch_count'])) {
$this->amqpChannel->setPrefetchCount($this->connectionConfiguration['prefetch_count']);
}
}
return $this->amqpChannel;

0 comments on commit c30f462

Please sign in to comment.
You can’t perform that action at this time.