From 1f0e3387732ca3ee584a0de9b7c3e15ffdc82ceb Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 10 Oct 2017 20:12:50 +0300 Subject: [PATCH 1/2] [amqp] Add specs for AMQP's basic consume method. --- src/Amqp/BasicConsumeBreakOnFalseSpec.php | 80 +++++++++++++++++ ...asicConsumeFromAllSubscribedQueuesSpec.php | 89 +++++++++++++++++++ ...umeShouldAddConsumerTagOnSubscribeSpec.php | 61 +++++++++++++ ...ouldRemoveConsumerTagOnUnsubscribeSpec.php | 67 ++++++++++++++ .../BasicConsumeUntilUnsubscribedSpec.php | 86 ++++++++++++++++++ 5 files changed, 383 insertions(+) create mode 100644 src/Amqp/BasicConsumeBreakOnFalseSpec.php create mode 100644 src/Amqp/BasicConsumeFromAllSubscribedQueuesSpec.php create mode 100644 src/Amqp/BasicConsumeShouldAddConsumerTagOnSubscribeSpec.php create mode 100644 src/Amqp/BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec.php create mode 100644 src/Amqp/BasicConsumeUntilUnsubscribedSpec.php diff --git a/src/Amqp/BasicConsumeBreakOnFalseSpec.php b/src/Amqp/BasicConsumeBreakOnFalseSpec.php new file mode 100644 index 0000000..a90437b --- /dev/null +++ b/src/Amqp/BasicConsumeBreakOnFalseSpec.php @@ -0,0 +1,80 @@ +context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function test() + { + $this->context = $context = $this->createContext(); + $fooQueue = $this->createQueue($context, 'foo_basic_consume_break_on_false_spec'); + $barQueue = $this->createQueue($context, 'bar_basic_consume_break_on_false_spec'); + + $expectedFooBody = __CLASS__.'foo'.time(); + $expectedBarBody = __CLASS__.'bar'.time(); + + $context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody)); + $context->createProducer()->send($barQueue, $context->createMessage($expectedBarBody)); + + $consumedMessages = 0; + $callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$consumedMessages) { + $consumedMessages++; + + $consumer->acknowledge($message); + + return false; + }; + + $fooConsumer = $context->createConsumer($fooQueue); + $barConsumer = $context->createConsumer($barQueue); + + $context->basicConsumeSubscribe($fooConsumer, $callback); + $context->basicConsumeSubscribe($barConsumer, $callback); + $context->basicConsume(1000); + + $this->assertEquals(1, $consumedMessages); + } + + /** + * @return AmqpContext + */ + abstract protected function createContext(); + + /** + * @param AmqpContext $context + * @param string $queueName + * + * @return AmqpQueue + */ + protected function createQueue(AmqpContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/src/Amqp/BasicConsumeFromAllSubscribedQueuesSpec.php b/src/Amqp/BasicConsumeFromAllSubscribedQueuesSpec.php new file mode 100644 index 0000000..863a154 --- /dev/null +++ b/src/Amqp/BasicConsumeFromAllSubscribedQueuesSpec.php @@ -0,0 +1,89 @@ +context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function test() + { + $this->context = $context = $this->createContext(); + $fooQueue = $this->createQueue($context, 'foo_basic_consume_from_all_subscribed_queues_spec'); + $barQueue = $this->createQueue($context, 'bar_basic_consume_from_all_subscribed_queues_spec'); + + $expectedFooBody = 'fooBody'; + $expectedBarBody = 'barBody'; + + $context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody)); + $context->createProducer()->send($barQueue, $context->createMessage($expectedBarBody)); + + $fooConsumer = $context->createConsumer($fooQueue); + $barConsumer = $context->createConsumer($barQueue); + + $actualBodies = []; + $actualQueues = []; + $callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$actualBodies, &$actualQueues) { + $actualBodies[] = $message->getBody(); + $actualQueues[] = $consumer->getQueue()->getQueueName(); + + $consumer->acknowledge($message); + + return true; + }; + + $context->basicConsumeSubscribe($fooConsumer, $callback); + $context->basicConsumeSubscribe($barConsumer, $callback); + $context->basicConsume(1000); + + $this->assertEquals([$expectedFooBody, $expectedBarBody], $actualBodies); + $this->assertEquals( + [ + 'foo_basic_consume_from_all_subscribed_queues_spec', + 'bar_basic_consume_from_all_subscribed_queues_spec' + ], + $actualQueues + ); + } + + /** + * @return AmqpContext + */ + abstract protected function createContext(); + + /** + * @param AmqpContext $context + * @param string $queueName + * + * @return AmqpQueue + */ + protected function createQueue(AmqpContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/src/Amqp/BasicConsumeShouldAddConsumerTagOnSubscribeSpec.php b/src/Amqp/BasicConsumeShouldAddConsumerTagOnSubscribeSpec.php new file mode 100644 index 0000000..542fdf6 --- /dev/null +++ b/src/Amqp/BasicConsumeShouldAddConsumerTagOnSubscribeSpec.php @@ -0,0 +1,61 @@ +context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function test() + { + $this->context = $context = $this->createContext(); + $queue = $this->createQueue($context, 'basic_consume_should_add_consumer_tag_on_subscribe_spec'); + + $consumer = $context->createConsumer($queue); + + $context->basicConsumeSubscribe($consumer, function() {}); + + $this->assertNotEmpty($consumer->getConsumerTag()); + } + + /** + * @return AmqpContext + */ + abstract protected function createContext(); + + /** + * @param AmqpContext $context + * @param string $queueName + * + * @return AmqpQueue + */ + protected function createQueue(AmqpContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/src/Amqp/BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec.php b/src/Amqp/BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec.php new file mode 100644 index 0000000..ee87102 --- /dev/null +++ b/src/Amqp/BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec.php @@ -0,0 +1,67 @@ +context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function test() + { + $this->context = $context = $this->createContext(); + $queue = $this->createQueue($context, 'basic_consume_should_remove_consumer_tag_on_unsubscribe_spec'); + + $consumer = $context->createConsumer($queue); + + $context->basicConsumeSubscribe($consumer, function() {}); + $context->basicConsume(100); + + // guard + $this->assertNotEmpty($consumer->getConsumerTag()); + + $context->basicConsumeUnsubscribe($consumer); + + $this->assertEmpty($consumer->getConsumerTag()); + } + + /** + * @return AmqpContext + */ + abstract protected function createContext(); + + /** + * @param AmqpContext $context + * @param string $queueName + * + * @return AmqpQueue + */ + protected function createQueue(AmqpContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/src/Amqp/BasicConsumeUntilUnsubscribedSpec.php b/src/Amqp/BasicConsumeUntilUnsubscribedSpec.php new file mode 100644 index 0000000..75faf6c --- /dev/null +++ b/src/Amqp/BasicConsumeUntilUnsubscribedSpec.php @@ -0,0 +1,86 @@ +context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function test() + { + $this->context = $context = $this->createContext(); + $fooQueue = $this->createQueue($context, 'foo_basic_consume_until_unsubscribed_spec'); + $barQueue = $this->createQueue($context, 'bar_basic_consume_until_unsubscribed_spec'); + + $context->createProducer()->send($fooQueue, $context->createMessage()); + $context->createProducer()->send($barQueue, $context->createMessage()); + + $fooConsumer = $context->createConsumer($fooQueue); + $barConsumer = $context->createConsumer($barQueue); + + $consumedMessages = 0; + $callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$consumedMessages) { + $consumedMessages++; + + $consumer->acknowledge($message); + + return true; + }; + + $context->basicConsumeSubscribe($fooConsumer, $callback); + $context->basicConsumeSubscribe($barConsumer, $callback); + $context->basicConsume(1000); + + $this->assertEquals(2, $consumedMessages); + + $context->createProducer()->send($fooQueue, $context->createMessage()); + $context->createProducer()->send($barQueue, $context->createMessage()); + + $consumedMessages = 0; + $context->basicConsumeUnsubscribe($fooConsumer); + $context->basicConsume(1000); + + $this->assertEquals(1, $consumedMessages); + } + + /** + * @return AmqpContext + */ + abstract protected function createContext(); + + /** + * @param AmqpContext $context + * @param string $queueName + * + * @return AmqpQueue + */ + protected function createQueue(AmqpContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} From 3868e84fcaaa4ec0df5675a958177817f8808c04 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 10 Oct 2017 20:22:35 +0300 Subject: [PATCH 2/2] lower expected delay value. --- src/SendAndReceiveDelayedMessageFromQueueSpec.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SendAndReceiveDelayedMessageFromQueueSpec.php b/src/SendAndReceiveDelayedMessageFromQueueSpec.php index 7f43806..0dca70c 100644 --- a/src/SendAndReceiveDelayedMessageFromQueueSpec.php +++ b/src/SendAndReceiveDelayedMessageFromQueueSpec.php @@ -36,7 +36,7 @@ public function test() $consumer->acknowledge($message); $this->assertSame($expectedBody, $message->getBody()); - $this->assertGreaterThanOrEqual(5, microtime(true) - $sendAt); + $this->assertGreaterThanOrEqual(4, microtime(true) - $sendAt); } /**