Skip to content

Commit

Permalink
[WIP] Multiple queues & binding keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Gammelin committed Mar 31, 2019
1 parent a511e91 commit 3ab163a
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 137 deletions.
Expand Up @@ -40,7 +40,7 @@ public function testItGetsParametersFromTheDsn()
], [
'name' => 'messages',
], [
'name' => 'messages',
'messages' => [],
]),
Connection::fromDsn('amqp://localhost/%2f/messages')
);
Expand All @@ -58,9 +58,9 @@ public function testOverrideOptionsViaQueryParameters()
], [
'name' => 'exchangeName',
], [
'name' => 'queue',
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName')
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
);
}

Expand All @@ -77,9 +77,9 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
], [
'name' => 'exchangeName',
], [
'name' => 'queueName',
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queue[name]=queueName', [
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
'persistent' => 'true',
'exchange' => ['name' => 'toBeOverwritten'],
])
Expand All @@ -89,10 +89,10 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
public function testSetsParametersOnTheQueueAndExchange()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpQueue->expects($this->once())->method('setArguments')->with([
Expand All @@ -110,17 +110,19 @@ public function testSetsParametersOnTheQueueAndExchange()
]);

$dsn = 'amqp://localhost/%2f/messages?'.
'queue[arguments][x-dead-letter-exchange]=dead-exchange&'.
'queue[arguments][x-message-ttl]=100&'.
'queue[arguments][x-delay]=100&'.
'queue[arguments][x-expires]=150&'
'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&' .
'queues[messages][arguments][x-message-ttl]=100&' .
'queues[messages][arguments][x-delay]=100&' .
'queues[messages][arguments][x-expires]=150&'
;
$connection = Connection::fromDsn($dsn, [
'queue' => [
'arguments' => [
'x-max-length' => '200',
'x-max-length-bytes' => '300',
'x-max-priority' => '4',
'queues' => [
'messages' => [
'arguments' => [
'x-max-length' => '200',
'x-max-length-bytes' => '300',
'x-max-priority' => '4',
],
],
],
'exchange' => [
Expand All @@ -135,20 +137,23 @@ public function testSetsParametersOnTheQueueAndExchange()
public function invalidQueueArgumentsDataProvider(): iterable
{
$baseDsn = 'amqp://localhost/%2f/messages';
yield [$baseDsn.'?queue[arguments][x-delay]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-expires]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-max-length]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-max-length-bytes]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-max-priority]=not-a-number', []];
yield [$baseDsn.'?queue[arguments][x-message-ttl]=not-a-number', []];

// Ensure the exception is thrown when the arguments are passed via the array options
yield [$baseDsn, ['queue' => ['arguments' => ['x-delay' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-expires' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-priority' => 'not-a-number']]]];
yield [$baseDsn, ['queue' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]];

return [
[$baseDsn . '?queues[messages][arguments][x-delay]=not-a-number', []],
[$baseDsn . '?queues[messages][arguments][x-expires]=not-a-number', []],
[$baseDsn . '?queues[messages][arguments][x-max-length]=not-a-number', []],
[$baseDsn . '?queues[messages][arguments][x-max-length-bytes]=not-a-number', []],
[$baseDsn . '?queues[messages][arguments][x-max-priority]=not-a-number', []],
[$baseDsn . '?queues[messages][arguments][x-message-ttl]=not-a-number', []],

// Ensure the exception is thrown when the arguments are passed via the array options
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-delay' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-expires' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-length' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-max-priority' => 'not-a-number']]]]],
[$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]]],
];
}

/**
Expand All @@ -165,10 +170,10 @@ public function testFromDsnWithInvalidValueOnQueueArguments(string $dsn, array $
public function testItUsesANormalConnectionByDefault()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

// makes sure the channel looks connected, so it's not re-created
Expand All @@ -182,10 +187,10 @@ public function testItUsesANormalConnectionByDefault()
public function testItAllowsToUseAPersistentConnection()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

// makes sure the channel looks connected, so it's not re-created
Expand All @@ -196,65 +201,86 @@ public function testItAllowsToUseAPersistentConnection()
$connection->publish('body');
}

public function testItSetupsTheConnectionByDefault()
public function testItSetupsTheConnectionWithDefaults()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'exchange_key', AMQP_NOPARAM, ['headers' => []]);
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'queue_key');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=exchange_key&queue[routing_key]=queue_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('body');
}

public function testItCanDisableTheSetup()
public function testItSetupsTheConnection()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$amqpExchange = $this->createMock(\AMQPExchange::class);
$amqpQueue0 = $this->createMock(\AMQPQueue::class);
$amqpQueue1 = $this->createMock(\AMQPQueue::class);

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createExchange')->willReturn($amqpExchange);
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
$connection->publish('body');
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
$amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key0'],
['exchange_name', 'binding_key1']
);
$amqpQueue1->expects($this->once())->method('declareQueue');
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key2'],
['exchange_name', 'binding_key3']
);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => false], $factory);
$connection->publish('body');
$dsn = 'amqp://localhost/%2f/messages?' .
'exchange[default_publish_routing_key]=routing_key&' .
'queues[queue0][binding_keys][0]=binding_key0&' .
'queues[queue0][binding_keys][1]=binding_key1&' .
'queues[queue1][binding_keys][0]=binding_key2&' .
'queues[queue1][binding_keys][1]=binding_key3';

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key&auto_setup=false', [], $factory);
$connection = Connection::fromDsn($dsn, [], $factory);
$connection->publish('body');
}

public function testPublishWithQueueOptions()
public function testItCanDisableTheSetup()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$headers = [
'type' => '*',
];
$amqpExchange->expects($this->once())->method('publish')
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
$connection->publish('body', $headers);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
$connection->publish('body');
}

public function testSetChannelPrefetchWhenSetup()
Expand All @@ -278,11 +304,11 @@ public function testSetChannelPrefetchWhenSetup()

public function testItDelaysTheMessage()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
Expand Down Expand Up @@ -315,11 +341,11 @@ public function testItDelaysTheMessage()

public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
Expand Down Expand Up @@ -377,7 +403,7 @@ public function testObfuscatePasswordInDsn()
$connection->channel();
}

public function testItCanPublishWithTheDefaultQueueRoutingKey()
public function testItCanPublishWithTheDefaultRoutingKey()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
Expand All @@ -386,9 +412,9 @@ public function testItCanPublishWithTheDefaultQueueRoutingKey()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with('body', 'my_key');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection->publish('body');
}

Expand All @@ -401,10 +427,10 @@ public function testItCanPublishWithASuppliedRoutingKey()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with('body', 'supplied_key');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
$connection->publish('body', [], 0, 'supplied_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, 'routing_key');
}

public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
Expand Down

0 comments on commit 3ab163a

Please sign in to comment.