Skip to content

Commit

Permalink
Fix snsqs redelivery
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewmy authored and Nyholm committed Aug 3, 2021
1 parent 77e2745 commit 320eb42
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
6 changes: 6 additions & 0 deletions QueueInteropTransport.php
Expand Up @@ -18,13 +18,15 @@
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException;
use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException;
use Enqueue\SnsQs\SnsQsProducer;
use Interop\Queue\Consumer;
use Interop\Queue\Exception as InteropQueueException;
use Interop\Queue\Message;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\OptionsResolver\Options;
Expand Down Expand Up @@ -137,6 +139,10 @@ public function send(Envelope $envelope): Envelope

$producer = $context->createProducer();

if (null !== $envelope->last(RedeliveryStamp::class) && $producer instanceof SnsQsProducer) {
$topic = $context->createQueue($destination['queue']);
}

$delay = 0;
$delayStamp = $envelope->last(DelayStamp::class);
if (null !== $delayStamp) {
Expand Down
55 changes: 55 additions & 0 deletions Tests/QueueInteropTransportTest.php
Expand Up @@ -18,6 +18,7 @@
use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException;
use Enqueue\MessengerAdapter\QueueInteropTransport;
use Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage;
use Enqueue\SnsQs\SnsQsProducer;
use Interop\Queue\Consumer;
use Interop\Queue\Context;
use Interop\Queue\Exception\Exception;
Expand All @@ -28,6 +29,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

Expand Down Expand Up @@ -276,6 +278,59 @@ public function testSendWithQueueAndTopicSpecificOptions()
$transport->send($envelope);
}

public function testSendSnsQsRedeliveryMessageOnQueue()
{
$transportName = 'transport';
$topicName = 'topic';
$queueName = 'queue';
$message = new \stdClass();
$message->foo = 'bar';
$envelope = (new Envelope($message))->with(new RedeliveryStamp(1));

$psrMessageProphecy = $this->prophesize(Message::class);
$psrMessage = $psrMessageProphecy->reveal();

$topicProphecy = $this->prophesize(Topic::class);
$topic = $topicProphecy->reveal();

$queueProphecy = $this->prophesize(Queue::class);
$queue = $queueProphecy->reveal();

$producerProphecy = $this->prophesize(SnsQsProducer::class);
$producerProphecy->send($queue, $psrMessage)->shouldBeCalled();

$contextProphecy = $this->prophesize(Context::class);
$contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic);
$contextProphecy->createQueue($queueName)->shouldBeCalled()->willReturn($queue);
$contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal());
$contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage);

$contextManagerProphecy = $this->prophesize(ContextManager::class);
$contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal());
$contextManagerProphecy->ensureExists(array(
'topic' => $topicName,
'topicOptions' => array('name' => $topicName),
'queue' => $queueName,
'queueOptions' => array('name' => $queueName),
))->shouldBeCalled();

$encoderProphecy = $this->prophesize(SerializerInterface::class);
$encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo'));

$transport = $this->getTransport(
$encoderProphecy->reveal(),
$contextManagerProphecy->reveal(),
array(
'transport_name' => $transportName,
'topic' => array('name' => $topicName),
'queue' => array('name' => $queueName),
),
true
);

$transport->send($envelope);
}

public function testSendWithMessageMetadata()
{
$transportName = 'transport';
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Expand Up @@ -39,6 +39,7 @@
"require-dev": {
"phpspec/prophecy": "^1.8.0",
"phpunit/phpunit": "^7.1",
"symfony/yaml": "^3.4|^4.1|^5"
"symfony/yaml": "^3.4|^4.1|^5",
"enqueue/snsqs": "^0.10.11"
}
}

0 comments on commit 320eb42

Please sign in to comment.