Skip to content

Commit

Permalink
feature #31825 [Messenger] Added support for auto trimming of redis s…
Browse files Browse the repository at this point in the history
…treams (Toflar)

This PR was squashed before being merged into the 4.4 branch (closes #31825).

Discussion
----------

[Messenger] Added support for auto trimming of redis streams

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | will submit if concept is okay

Right now the Redis stream will just grow indefinitely. However, there are means to have it delete old entries from time to time.
Note: I could not use the `XADD mystream MAXLEN ~ 1000 *` notation because the PHP redis extension does not support the `MAXLEN` option afaics so I went for the extra `XTRIM` command.
I explicitly enabled the approximate flag because it makes absolutely no sense to hardcode the limit for us although we could even have this configurable too (but I don't think we should).
The whole idea of this PR is to enable occasional trimming of the stream so it doesn't grow forever, so when you configure something like `20000` it may well happen that trimming only happens at `25000` depending on your settings.

Ping @soyuka @alexander-schranz @chalasr :)

Commits
-------

7fe06bc [Messenger] Added support for auto trimming of redis streams
  • Loading branch information
sroze committed Jul 2, 2019
2 parents d97f9ab + 7fe06bc commit 7bd0a27
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -6,6 +6,7 @@ CHANGELOG

* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.

4.3.0
-----
Expand Down
Expand Up @@ -42,13 +42,13 @@ public function testFromDsn()
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'stream_max_entries' => 20000], [
'host' => 'localhost',
'port' => 6379,
], [
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false, 'stream_max_entries' => 20000])
);
}

Expand Down Expand Up @@ -142,4 +142,16 @@ public function testGetNonBlocking()
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
}

public function testMaxEntries()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->exactly(1))->method('xadd')
->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true)
->willReturn(1);

$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always
$connection->add('1', []);
}
}
29 changes: 25 additions & 4 deletions src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Expand Up @@ -31,13 +31,15 @@ class Connection
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
];

private $connection;
private $stream;
private $group;
private $consumer;
private $autoSetup;
private $maxEntries;
private $couldHavePendingMessages = true;

public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
Expand All @@ -53,6 +55,7 @@ public function __construct(array $configuration, array $connectionCredentials =
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
}

public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
Expand Down Expand Up @@ -82,7 +85,19 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
unset($redisOptions['auto_setup']);
}

return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
$maxEntries = null;
if (\array_key_exists('stream_max_entries', $redisOptions)) {
$maxEntries = filter_var($redisOptions['stream_max_entries'], FILTER_VALIDATE_INT);
unset($redisOptions['stream_max_entries']);
}

return new self([
'stream' => $stream,
'group' => $group,
'consumer' => $consumer,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
], $connectionCredentials, $redisOptions, $redis);
}

public function get(): ?array
Expand Down Expand Up @@ -169,9 +184,15 @@ public function add(string $body, array $headers): void

$e = null;
try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)], $this->maxEntries, true);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
}
} catch (\RedisException $e) {
}

Expand Down

0 comments on commit 7bd0a27

Please sign in to comment.