Skip to content

Commit

Permalink
Refractor using redis streams
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-schranz committed Apr 10, 2019
1 parent c93ce59 commit 4d0016c
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 447 deletions.
10 changes: 6 additions & 4 deletions .travis.yml
Expand Up @@ -19,6 +19,7 @@ env:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://localhost/messages

matrix:
include:
Expand Down Expand Up @@ -53,8 +54,8 @@ before_install:
- |
# Start Redis cluster
docker pull grokzen/redis-cluster:4.0.8
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
docker pull grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |
Expand Down Expand Up @@ -114,6 +115,7 @@ before_install:
local ext_name=$1
local ext_so=$2
local INI=$3
local input=${4:-yes}
local ext_dir=$(php -r "echo ini_get('extension_dir');")
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
Expand All @@ -122,7 +124,7 @@ before_install:
else
rm ~/.pearrc /tmp/pear 2>/dev/null || true
mkdir -p $ext_cache
echo yes | pecl install -f $ext_name &&
echo $input | pecl install -f $ext_name &&
cp $ext_dir/$ext_so $ext_cache
fi
}
Expand All @@ -145,7 +147,6 @@ before_install:
echo session.gc_probability = 0 >> $INI
echo opcache.enable_cli = 1 >> $INI
echo apc.enable_cli = 1 >> $INI
echo extension = redis.so >> $INI
echo extension = memcached.so >> $INI
done
Expand All @@ -164,6 +165,7 @@ before_install:
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
tfold ext.redis tpecl redis-4.2.0 redis.so $INI "no"
done
- |
Expand Down
Expand Up @@ -12,43 +12,104 @@
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
*/
class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
public function testFromInvalidDsn()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');

Connection::fromDsn('redis://');
}

public function testItGetsParametersFromTheDsn()
public function testFromDsn()
{
$this->assertEquals(
new Connection('queue', array(
new Connection(['stream' => 'queue'], [
'host' => 'localhost',
'port' => 6379,
)),
]),
Connection::fromDsn('redis://localhost/queue')
);
}

public function testOverrideOptionsViaQueryParameters()
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection('queue', array(
'host' => '127.0.0.1',
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
), array(
'processing_ttl' => '8000',
)),
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
], [
'blocking_timeout' => 30,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
);
}

public function testFromDsnWithQueryOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
], [
'blocking_timeout' => 30,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
);
}

public function testKeepGettingPendingMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->exactly(3))->method('xreadgroup')
->with('symfony', 'consumer', ['queue' => 0], 1, null)
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
}

public function testFirstGetPendingMessagesThenNewMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$count = 0;

$redis->expects($this->exactly(2))->method('xreadgroup')
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
++$count;

if (1 === $count) {
return '0' === $arr_streams['queue'];
}

return '>' === $arr_streams['queue'];
}), 1, null)
->willReturn(['queue' => []]);

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}

public function testUnexpectedRedisError()
{
$this->expectException(LogicException::class);
$this->expectExceptionMessage('Redis error happens');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
}

This file was deleted.

Expand Up @@ -12,135 +12,54 @@
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

/**
* @requires extension redis
*/
class RedisExtIntegrationTest extends TestCase
{
private $redis;
private $connection;

protected function setUp()
{
parent::setUp();

if (!getenv('MESSENGER_REDIS_DSN')) {
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
}
}

public function testItSendsAndReceivesMessages()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));

$sender = new RedisSender($connection, $serializer);
$receiver = new RedisReceiver($connection, $serializer);

$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
$this->clearRedis();
$this->connection->setup();
}

public function testItReceivesSignals()
public function testConnectionSendAndGet()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));

$sender = new RedisSender($connection, $serializer);
$sender->send(Envelope::wrap(new DummyMessage('Hello')));

$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
'COMPONENT_ROOT' => __DIR__.'/../../../',
'DSN' => $dsn,
));

$process->start();

$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");

$signalTime = microtime(true);
$timedOutTime = time() + 10;

$process->signal(15);

while ($process->isRunning() && time() < $timedOutTime) {
usleep(100 * 1000); // 100ms
}

$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.
TXT
, $process->getOutput());
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

/**
* @runInSeparateProcess
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
public function testGetTheFirstAvailableMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);

$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));

$receiver = new RedisReceiver($connection, $serializer);

$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
private function clearRedis()
{
$timedOutTime = time() + $timeoutInSeconds;

while (time() < $timedOutTime) {
if (0 === strpos($process->getOutput(), $output)) {
return;
}

usleep(100 * 1000); // 100ms
}

throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? 'symfony';
$this->redis->del($stream);
}
}

0 comments on commit 4d0016c

Please sign in to comment.