Skip to content

Commit

Permalink
Implement redis transport
Browse files Browse the repository at this point in the history
  • Loading branch information
soyuka authored and alexander-schranz committed Apr 10, 2019
1 parent 2243bf5 commit c93ce59
Show file tree
Hide file tree
Showing 13 changed files with 904 additions and 0 deletions.
@@ -0,0 +1,54 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
*/
class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
{
Connection::fromDsn('redis://');
}

public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection('queue', array(
'host' => 'localhost',
'port' => 6379,
)),
Connection::fromDsn('redis://localhost/queue')
);
}

public function testOverrideOptionsViaQueryParameters()
{
$this->assertEquals(
new Connection('queue', array(
'host' => '127.0.0.1',
'port' => 6379,
), array(
'processing_ttl' => '8000',
)),
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
);
}
}
@@ -0,0 +1,43 @@
<?php

$componentRoot = $_SERVER['COMPONENT_ROOT'];

if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
$autoload = $componentRoot.'/../../../../vendor/autoload.php';
}

if (!file_exists($autoload)) {
exit('You should run "composer install --dev" in the component before running this script.');
}

require_once $autoload;

use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new RedisReceiver($connection, $serializer);

$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($envelope)
{
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));

sleep(30);
echo "Done.\n";
}
});

echo "Receiving messages...\n";
$worker->run();
@@ -0,0 +1,146 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

/**
* @requires extension redis
*/
class RedisExtIntegrationTest extends TestCase
{
protected function setUp()
{
parent::setUp();

if (!getenv('MESSENGER_REDIS_DSN')) {
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
}
}

public function testItSendsAndReceivesMessages()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));

$sender = new RedisSender($connection, $serializer);
$receiver = new RedisReceiver($connection, $serializer);

$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

public function testItReceivesSignals()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));

$sender = new RedisSender($connection, $serializer);
$sender->send(Envelope::wrap(new DummyMessage('Hello')));

$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
'COMPONENT_ROOT' => __DIR__.'/../../../',
'DSN' => $dsn,
));

$process->start();

$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");

$signalTime = microtime(true);
$timedOutTime = time() + 10;

$process->signal(15);

while ($process->isRunning() && time() < $timedOutTime) {
usleep(100 * 1000); // 100ms
}

$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.
TXT
, $process->getOutput());
}

/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));

$receiver = new RedisReceiver($connection, $serializer);

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;

while (time() < $timedOutTime) {
if (0 === strpos($process->getOutput(), $output)) {
return;
}

usleep(100 * 1000); // 100ms
}

throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
}
}
@@ -0,0 +1,118 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

/**
* @requires extension redis
*/
class RedisReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);

$connection->expects($this->once())->method('ack')->with($encoded);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}

public function testItSendNoMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn(null);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertNull($envelope);
$receiver->stop();
});
}

/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException
*/
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('requeue')->with($encoded);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}

/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException
*/
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$envelope = Envelope::wrap(new DummyMessage('Hi'));
$encoded = $serializer->encode($envelope);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('waitAndGet')->willReturn($encoded);
$connection->expects($this->once())->method('reject')->with($encoded);

$receiver = new RedisReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
});
}
}

class InterruptException extends \Exception
{
}

class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
{
}

0 comments on commit c93ce59

Please sign in to comment.