Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger]  Allow InMemoryTransport to serialize message #39075

Merged
merged 1 commit into from Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.3.0
-----

* `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`.

5.2.0
-----

Expand Down
Expand Up @@ -49,6 +49,35 @@ public function testCreateTransport()
$this->assertInstanceOf(InMemoryTransport::class, $this->factory->createTransport('in-memory://', [], $serializer));
}

public function testCreateTransportWithoutSerializer()
{
/** @var SerializerInterface $serializer */
$serializer = $this->createMock(SerializerInterface::class);
$serializer
->expects($this->never())
->method('encode')
;
$transport = $this->factory->createTransport('in-memory://?serialize=false', [], $serializer);
$message = Envelope::wrap(new DummyMessage('Hello.'));
$transport->send($message);

$this->assertSame([$message], $transport->get());
}

public function testCreateTransportWithSerializer()
{
/** @var SerializerInterface $serializer */
$serializer = $this->createMock(SerializerInterface::class);
$message = Envelope::wrap(new DummyMessage('Hello.'));
$serializer
->expects($this->once())
->method('encode')
->with($this->equalTo($message))
;
$transport = $this->factory->createTransport('in-memory://?serialize=true', [], $serializer);
$transport->send($message);
}

public function testResetCreatedTransports()
{
$transport = $this->factory->createTransport('in-memory://', [], $this->createMock(SerializerInterface::class));
Expand All @@ -63,6 +92,8 @@ public function provideDSN(): array
{
return [
'Supported' => ['in-memory://foo'],
'Serialize enabled' => ['in-memory://?serialize=true'],
'Serialize disabled' => ['in-memory://?serialize=false'],
'Unsupported' => ['amqp://bar', false],
];
}
Expand Down
Expand Up @@ -14,7 +14,9 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\InMemoryTransport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
* @author Gary PEGEOT <garypegeot@gmail.com>
Expand All @@ -26,9 +28,21 @@ class InMemoryTransportTest extends TestCase
*/
private $transport;

/**
* @var InMemoryTransport
*/
private $serializeTransport;

/**
* @var SerializerInterface
*/
private $serializer;

protected function setUp(): void
{
$this->serializer = $this->createMock(SerializerInterface::class);
$this->transport = new InMemoryTransport();
$this->serializeTransport = new InMemoryTransport($this->serializer);
}

public function testSend()
Expand All @@ -38,6 +52,24 @@ public function testSend()
$this->assertSame([$envelope], $this->transport->getSent());
}

public function testSendWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->send($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getSent());
}

public function testQueue()
{
$envelope1 = new Envelope(new \stdClass());
Expand All @@ -51,6 +83,24 @@ public function testQueue()
$this->assertSame([], $this->transport->get());
}

public function testQueueWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->send($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->get());
}

public function testAcknowledgeSameMessageWithDifferentStamps()
{
$envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
Expand All @@ -71,13 +121,49 @@ public function testAck()
$this->assertSame([$envelope], $this->transport->getAcknowledged());
}

public function testAckWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->ack($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getAcknowledged());
}

public function testReject()
{
$envelope = new Envelope(new \stdClass());
$this->transport->reject($envelope);
$this->assertSame([$envelope], $this->transport->getRejected());
}

public function testRejectWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->reject($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getRejected());
}

public function testReset()
{
$envelope = new Envelope(new \stdClass());
Expand Down
57 changes: 49 additions & 8 deletions src/Symfony/Component/Messenger/Transport/InMemoryTransport.php
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\Service\ResetInterface;

/**
Expand Down Expand Up @@ -41,20 +42,30 @@ class InMemoryTransport implements TransportInterface, ResetInterface
*/
private $queue = [];

/**
* @var SerializerInterface|null
*/
private $serializer;

public function __construct(SerializerInterface $serializer = null)
{
$this->serializer = $serializer;
}

/**
* {@inheritdoc}
*/
public function get(): iterable
{
return array_values($this->queue);
return array_values($this->decode($this->queue));
}

/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
$this->acknowledged[] = $envelope;
$this->acknowledged[] = $this->encode($envelope);
$id = spl_object_hash($envelope->getMessage());
unset($this->queue[$id]);
}
Expand All @@ -64,7 +75,7 @@ public function ack(Envelope $envelope): void
*/
public function reject(Envelope $envelope): void
{
$this->rejected[] = $envelope;
$this->rejected[] = $this->encode($envelope);
$id = spl_object_hash($envelope->getMessage());
unset($this->queue[$id]);
}
Expand All @@ -74,9 +85,10 @@ public function reject(Envelope $envelope): void
*/
public function send(Envelope $envelope): Envelope
{
$this->sent[] = $envelope;
$encodedEnvelope = $this->encode($envelope);
$this->sent[] = $encodedEnvelope;
$id = spl_object_hash($envelope->getMessage());
$this->queue[$id] = $envelope;
$this->queue[$id] = $encodedEnvelope;

return $envelope;
}
Expand All @@ -91,22 +103,51 @@ public function reset()
*/
public function getAcknowledged(): array
{
return $this->acknowledged;
return $this->decode($this->acknowledged);
}

/**
* @return Envelope[]
*/
public function getRejected(): array
{
return $this->rejected;
return $this->decode($this->rejected);
}

/**
* @return Envelope[]
*/
public function getSent(): array
{
return $this->sent;
return $this->decode($this->sent);
}

/**
* @return Envelope|array
*/
private function encode(Envelope $envelope)
{
if (null === $this->serializer) {
return $envelope;
}

return $this->serializer->encode($envelope);
}

/**
* @param array<mixed> $messagesEncoded
*
* @return Envelope[]
*/
private function decode(array $messagesEncoded): array
{
if (null === $this->serializer) {
return $messagesEncoded;
}

return array_map(
[$this->serializer, 'decode'],
$messagesEncoded
);
}
}
Expand Up @@ -26,7 +26,9 @@ class InMemoryTransportFactory implements TransportFactoryInterface, ResetInterf

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return $this->createdTransports[] = new InMemoryTransport();
['serialize' => $serialize] = $this->parseDsn($dsn);

return $this->createdTransports[] = new InMemoryTransport($serialize ? $serializer : null);
}

public function supports(string $dsn, array $options): bool
Expand All @@ -40,4 +42,16 @@ public function reset()
$transport->reset();
}
}

private function parseDsn(string $dsn): array
{
$query = [];
if ($queryAsString = strstr($dsn, '?')) {
parse_str(ltrim($queryAsString, '?'), $query);
}

return [
'serialize' => filter_var($query['serialize'] ?? false, \FILTER_VALIDATE_BOOLEAN),
];
}
}