Skip to content

Commit

Permalink
[Messenger] Allow to configure the db index on Redis transport
Browse files Browse the repository at this point in the history
  • Loading branch information
chalasr authored and nicolas-grekas committed Oct 7, 2019
1 parent 8f92594 commit 115a9bb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@ CHANGELOG
* `InMemoryTransport` handle acknowledged and rejected messages.
* Made all dispatched worker event classes final.
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.

4.3.0
-----
Expand Down
Expand Up @@ -104,6 +104,15 @@ public function testAuth()
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
}

public function testDbIndex()
{
$redis = new \Redis();

Connection::fromDsn('redis://password@localhost/queue?dbindex=2', [], $redis);

$this->assertSame(2, $redis->getDbNum());
}

public function testFirstGetPendingMessagesThenNewMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
Expand Down
12 changes: 12 additions & 0 deletions src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Expand Up @@ -32,6 +32,7 @@ class Connection
'consumer' => 'consumer',
'auto_setup' => true,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
];

private $connection;
Expand All @@ -56,6 +57,10 @@ public function __construct(array $configuration, array $connectionCredentials =
$this->connection->auth($connectionCredentials['auth']);
}

if (($dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex']) && !$this->connection->select($dbIndex)) {
throw new InvalidArgumentException(sprintf('Redis connection failed: %s', $redis->getLastError()));
}

$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
Expand Down Expand Up @@ -97,12 +102,19 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
unset($redisOptions['stream_max_entries']);
}

$dbIndex = null;
if (\array_key_exists('dbindex', $redisOptions)) {
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
unset($redisOptions['dbindex']);
}

return new self([
'stream' => $stream,
'group' => $group,
'consumer' => $consumer,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'dbindex' => $dbIndex,
], $connectionCredentials, $redisOptions, $redis);
}

Expand Down

0 comments on commit 115a9bb

Please sign in to comment.