diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index fc01fa52ccff..6df7ddcd1f84 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -6,6 +6,7 @@ CHANGELOG * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, pass a `RoutableMessageBus` instance instead. + * Added support for auto trimming of Redis streams. 4.3.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php index 177f6b90384c..f329222bb35f 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -42,13 +42,13 @@ public function testFromDsn() public function testFromDsnWithOptions() { $this->assertEquals( - new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [ + new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'stream_max_entries' => 20000], [ 'host' => 'localhost', 'port' => 6379, ], [ 'serializer' => 2, ]), - Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false]) + Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false, 'stream_max_entries' => 20000]) ); } @@ -142,4 +142,16 @@ public function testGetNonBlocking() $connection->reject($message['id']); $redis->del('messenger-getnonblocking'); } + + public function testMaxEntries() + { + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); + + $redis->expects($this->exactly(1))->method('xadd') + ->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true) + ->willReturn(1); + + $connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always + $connection->add('1', []); + } } diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index 9cc96762c8ba..57037588cfbd 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -31,6 +31,7 @@ class Connection 'group' => 'symfony', 'consumer' => 'consumer', 'auto_setup' => true, + 'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries ]; private $connection; @@ -38,6 +39,7 @@ class Connection private $group; private $consumer; private $autoSetup; + private $maxEntries; private $couldHavePendingMessages = true; public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) @@ -53,6 +55,7 @@ public function __construct(array $configuration, array $connectionCredentials = $this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group']; $this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer']; $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']; + $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries']; } public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self @@ -82,7 +85,19 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re unset($redisOptions['auto_setup']); } - return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis); + $maxEntries = null; + if (\array_key_exists('stream_max_entries', $redisOptions)) { + $maxEntries = filter_var($redisOptions['stream_max_entries'], FILTER_VALIDATE_INT); + unset($redisOptions['stream_max_entries']); + } + + return new self([ + 'stream' => $stream, + 'group' => $group, + 'consumer' => $consumer, + 'auto_setup' => $autoSetup, + 'stream_max_entries' => $maxEntries, + ], $connectionCredentials, $redisOptions, $redis); } public function get(): ?array @@ -169,9 +184,15 @@ public function add(string $body, array $headers): void $e = null; try { - $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( - ['body' => $body, 'headers' => $headers] - )]); + if ($this->maxEntries) { + $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( + ['body' => $body, 'headers' => $headers] + )], $this->maxEntries, true); + } else { + $added = $this->connection->xadd($this->stream, '*', ['message' => json_encode( + ['body' => $body, 'headers' => $headers] + )]); + } } catch (\RedisException $e) { }