diff --git a/QueueInteropTransport.php b/QueueInteropTransport.php index b7d7990..e57850d 100644 --- a/QueueInteropTransport.php +++ b/QueueInteropTransport.php @@ -18,6 +18,7 @@ use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException; use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException; +use Enqueue\SnsQs\SnsQsProducer; use Interop\Queue\Consumer; use Interop\Queue\Exception as InteropQueueException; use Interop\Queue\Message; @@ -25,6 +26,7 @@ use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\OptionsResolver\Options; @@ -137,6 +139,10 @@ public function send(Envelope $envelope): Envelope $producer = $context->createProducer(); + if (null !== $envelope->last(RedeliveryStamp::class) && $producer instanceof SnsQsProducer) { + $topic = $context->createQueue($destination['queue']); + } + $delay = 0; $delayStamp = $envelope->last(DelayStamp::class); if (null !== $delayStamp) { diff --git a/Tests/QueueInteropTransportTest.php b/Tests/QueueInteropTransportTest.php index f2bf8a4..6500c7f 100644 --- a/Tests/QueueInteropTransportTest.php +++ b/Tests/QueueInteropTransportTest.php @@ -18,6 +18,7 @@ use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException; use Enqueue\MessengerAdapter\QueueInteropTransport; use Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage; +use Enqueue\SnsQs\SnsQsProducer; use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Exception\Exception; @@ -28,6 +29,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -276,6 +278,59 @@ public function testSendWithQueueAndTopicSpecificOptions() $transport->send($envelope); } + public function testSendSnsQsRedeliveryMessageOnQueue() + { + $transportName = 'transport'; + $topicName = 'topic'; + $queueName = 'queue'; + $message = new \stdClass(); + $message->foo = 'bar'; + $envelope = (new Envelope($message))->with(new RedeliveryStamp(1)); + + $psrMessageProphecy = $this->prophesize(Message::class); + $psrMessage = $psrMessageProphecy->reveal(); + + $topicProphecy = $this->prophesize(Topic::class); + $topic = $topicProphecy->reveal(); + + $queueProphecy = $this->prophesize(Queue::class); + $queue = $queueProphecy->reveal(); + + $producerProphecy = $this->prophesize(SnsQsProducer::class); + $producerProphecy->send($queue, $psrMessage)->shouldBeCalled(); + + $contextProphecy = $this->prophesize(Context::class); + $contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic); + $contextProphecy->createQueue($queueName)->shouldBeCalled()->willReturn($queue); + $contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal()); + $contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage); + + $contextManagerProphecy = $this->prophesize(ContextManager::class); + $contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal()); + $contextManagerProphecy->ensureExists(array( + 'topic' => $topicName, + 'topicOptions' => array('name' => $topicName), + 'queue' => $queueName, + 'queueOptions' => array('name' => $queueName), + ))->shouldBeCalled(); + + $encoderProphecy = $this->prophesize(SerializerInterface::class); + $encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo')); + + $transport = $this->getTransport( + $encoderProphecy->reveal(), + $contextManagerProphecy->reveal(), + array( + 'transport_name' => $transportName, + 'topic' => array('name' => $topicName), + 'queue' => array('name' => $queueName), + ), + true + ); + + $transport->send($envelope); + } + public function testSendWithMessageMetadata() { $transportName = 'transport'; diff --git a/composer.json b/composer.json index 2facbd8..14865fa 100644 --- a/composer.json +++ b/composer.json @@ -39,6 +39,7 @@ "require-dev": { "phpspec/prophecy": "^1.8.0", "phpunit/phpunit": "^7.1", - "symfony/yaml": "^3.4|^4.1|^5" + "symfony/yaml": "^3.4|^4.1|^5", + "enqueue/snsqs": "^0.10.11" } }