Skip to content

Commit

Permalink
[RedisMessengerBridge] Add a delete_after_ack option to automatically…
Browse files Browse the repository at this point in the history
… clean up processed messages from memory
  • Loading branch information
Seldaek committed Apr 16, 2020
1 parent 6dc7d8b commit 262252c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
Expand Up @@ -9,3 +9,5 @@ CHANGELOG
* Deprecated use of invalid options
* Added ability to receive of old pending messages with new `redeliver_timeout`
and `claim_interval` options.
* Added a `delete_after_ack` option to the DSN as an alternative to
`stream_max_entries` to avoid leaking memory.
Expand Up @@ -307,6 +307,21 @@ public function testMaxEntries()
$connection->add('1', []);
}

public function testDeleteAfterAck()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->exactly(1))->method('xack')
->with('xack', 'queue', 'symfony', '1')
->willReturn(1);
$redis->expects($this->exactly(1))->method('xdel')
->with('xdel', 'queue', '1')
->willReturn(1);

$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis); // 1 = always
$connection->ack('1');
}

public function testLastErrorGetsCleared()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
Expand Down
Expand Up @@ -32,6 +32,7 @@ class Connection
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
'delete_after_ack' => false,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
'tls' => false,
Expand All @@ -49,6 +50,7 @@ class Connection
private $redeliverTimeout;
private $nextClaim = 0;
private $claimInterval;
private $deleteAfterAck;
private $couldHavePendingMessages = true;

public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
Expand Down Expand Up @@ -81,6 +83,7 @@ public function __construct(array $configuration, array $connectionCredentials =
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
}
Expand Down Expand Up @@ -114,6 +117,12 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
unset($redisOptions['stream_max_entries']);
}

$deleteAfterAck = null;
if (\array_key_exists('delete_after_ack', $redisOptions)) {
$deleteAfterAck = filter_var($redisOptions['delete_after_ack'], FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['delete_after_ack']);
}

$dbIndex = null;
if (\array_key_exists('dbindex', $redisOptions)) {
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
Expand Down Expand Up @@ -144,6 +153,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
'consumer' => $redisOptions['consumer'] ?? null,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'delete_after_ack' => $deleteAfterAck,
'dbindex' => $dbIndex,
'redeliver_timeout' => $redeliverTimeout,
'claim_interval' => $claimInterval,
Expand Down Expand Up @@ -314,6 +324,9 @@ public function ack(string $id): void
{
try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
if ($this->deleteAfterAck) {
$acknowledged = $this->connection->xdel($this->stream, [$id]);
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
Expand Down Expand Up @@ -408,6 +421,18 @@ public function setup(): void
$this->connection->clearLastError();
}

if ($this->deleteAfterAck) {
$groups = $this->connection->xinfo('GROUPS', $this->stream);
if (
// support for Redis extension version 5+
(\is_array($groups) && 1 < \count($groups))
// support for Redis extension version 4.x
|| (\is_string($groups) && substr_count($groups, '"name"'))
) {
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
}
}

$this->autoSetup = false;
}

Expand Down

0 comments on commit 262252c

Please sign in to comment.