Skip to content
Permalink
Browse files

Add handling for delayed message to redis transport

  • Loading branch information...
alexander-schranz authored and chalasr committed Nov 4, 2019
1 parent a0cefaa commit cfece10569c9658b9186ac9a93a60abcb4521111
@@ -20,7 +20,7 @@ env:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1

matrix:
@@ -59,7 +59,7 @@ before_install:
- |
# Start Redis cluster
docker pull grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |
@@ -23,6 +23,7 @@ CHANGELOG
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* The component is not marked as `@experimental` anymore.
* Marked the `MessengerDataCollector` class as `@final`.
* Added support for `DelayStamp` to the `redis` transport.

4.3.0
-----
@@ -17,6 +17,7 @@
/**
* @requires extension redis
* @group time-sensitive
*/
class RedisExtIntegrationTest extends TestCase
{
@@ -31,7 +32,7 @@ protected function setUp(): void
$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
$this->clearRedis();
$this->connection->cleanup();
$this->connection->setup();
}
@@ -55,11 +56,48 @@ public function testGetTheFirstAvailableMessage()
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
private function clearRedis()
public function testConnectionSendWithSameContent()
{
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? 'symfony';
$this->redis->del($stream);
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
$this->connection->add($body, $headers);
$this->connection->add($body, $headers);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
public function testConnectionSendAndGetDelayed()
{
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class], 500);
$encoded = $this->connection->get();
$this->assertNull($encoded);
sleep(2);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
public function testConnectionSendDelayedMessagesWithSameContent()
{
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
$this->connection->add($body, $headers, 500);
$this->connection->add($body, $headers, 500);
sleep(2);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
}
@@ -38,6 +38,7 @@ class Connection
private $connection;
private $stream;
private $queue;
private $group;
private $consumer;
private $autoSetup;
@@ -65,6 +66,7 @@ public function __construct(array $configuration, array $connectionCredentials =
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
}
@@ -125,6 +127,34 @@ public function get(): ?array
$this->setup();
}
try {
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($queuedMessageCount) {
for ($i = 0; $i < $queuedMessageCount; ++$i) {
try {
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
foreach ($queuedMessages as $queuedMessage => $time) {
$queuedMessage = json_decode($queuedMessage, true);
// if a futured placed message is actually popped because of a race condition with
// another running message consumer, the message is readded to the queue by add function
// else its just added stream and will be available for all stream consumers
$this->add(
$queuedMessage['body'],
$queuedMessage['headers'],
$time - $this->getCurrentTimeInMilliseconds()
);
}
}
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {
@@ -203,24 +233,40 @@ public function reject(string $id): void
}
}
public function add(string $body, array $headers): void
public function add(string $body, array $headers, int $delayInMs = 0): void
{
if ($this->autoSetup) {
$this->setup();
}
try {
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)], $this->maxEntries, true);
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
$message = json_encode([
'body' => $body,
'headers' => $headers,
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
'uniqid' => uniqid('', true),
]);
$score = (int) ($this->getCurrentTimeInMilliseconds() + $delayInMs);
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
$message = json_encode([
'body' => $body,
'headers' => $headers,
]);
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
}
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? $e->getMessage(), 0, $e);
}
if (!$added) {
@@ -246,4 +292,15 @@ public function setup(): void
$this->autoSetup = false;
}
private function getCurrentTimeInMilliseconds(): int
{
return (int) (microtime(true) * 1000);
}
public function cleanup(): void
{
$this->connection->del($this->stream);
$this->connection->del($this->queue);
}
}
@@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\RedisExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -37,7 +38,11 @@ public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
return $envelope;
}

0 comments on commit cfece10

Please sign in to comment.
You can’t perform that action at this time.