Skip to content

Commit

Permalink
[Messenger][AMQP] Use delivery_mode=2 by default
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrixx committed Dec 12, 2019
1 parent 5690b97 commit a1cb2ab
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
Expand Up @@ -227,7 +227,7 @@ public function testItSetupsTheConnectionWithDefaults()
);

$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);

Expand All @@ -250,7 +250,7 @@ public function testItSetupsTheConnection()
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));

$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
Expand Down Expand Up @@ -287,7 +287,7 @@ public function testBindingArguments()
$factory->method('createQueue')->willReturn($amqpQueue);

$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive(
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']]
Expand Down Expand Up @@ -400,7 +400,7 @@ public function testItDelaysTheMessage()
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
Expand Down Expand Up @@ -442,7 +442,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$connection->publish('{}', [], 120000);
}

Expand Down Expand Up @@ -474,12 +474,27 @@ public function testAmqpStampHeadersAreUsed()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
}

public function testAmqpStampDelireryModeIsUsed()
{
$factory = new TestAmqpFactory(
$this->createMock(\AMQPConnection::class),
$this->createMock(\AMQPChannel::class),
$this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
}

public function testItCanPublishWithTheDefaultRoutingKey()
{
$factory = new TestAmqpFactory(
Expand Down Expand Up @@ -546,7 +561,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}

Expand Down
Expand Up @@ -230,6 +230,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;

$exchange->publish(
$body,
Expand Down

0 comments on commit a1cb2ab

Please sign in to comment.