Skip to content

Commit

Permalink
[Messenger] Add support for PostgreSQL LISTEN/NOTIFY
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas authored and fabpot committed Feb 4, 2020
1 parent a1e4222 commit 01f33c3
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 19 deletions.
Expand Up @@ -5,3 +5,4 @@ CHANGELOG
-----

* Introduced the Doctrine bridge.
* Added support for PostgreSQL `LISTEN`/`NOTIFY`.
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class DoctrineTransportFactoryTest extends TestCase
Expand Down Expand Up @@ -49,7 +50,7 @@ public function testCreateTransport()
$serializer = $this->createMock(SerializerInterface::class);

$this->assertEquals(
new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
$factory->createTransport('doctrine://default', [], $serializer)
);
}
Expand Down
@@ -0,0 +1,46 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;

use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;

/**
* @author Kévin Dunglas <dunglas@gmail.com>
*/
class PostgreSqlConnectionTest extends TestCase
{
public function testSerialize()
{
$this->expectException(\BadMethodCallException::class);
$this->expectExceptionMessage('Cannot serialize '.PostgreSqlConnection::class);

$schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);

$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
serialize($connection);
}

public function testUnserialize()
{
$this->expectException(\BadMethodCallException::class);
$this->expectExceptionMessage('Cannot unserialize '.PostgreSqlConnection::class);

$schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);

$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
$connection->__wakeup();
}
}
Expand Up @@ -22,15 +22,17 @@
use Doctrine\DBAL\Types\Type;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\Service\ResetInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
* @internal since Symfony 5.1
*
* @final
* @author Vincent Touzet <vincent.touzet@gmail.com>
* @author Kévin Dunglas <dunglas@gmail.com>
*/
class Connection
class Connection implements ResetInterface
{
private const DEFAULT_OPTIONS = [
protected const DEFAULT_OPTIONS = [
'table_name' => 'messenger_messages',
'queue_name' => 'default',
'redeliver_timeout' => 3600,
Expand All @@ -45,22 +47,28 @@ class Connection
* * table_name: name of the table
* * connection: name of the Doctrine's entity manager
* * queue_name: name of the queue
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
* * auto_setup: Whether the table should be created automatically during send / get. Default : true
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
*/
private $configuration = [];
private $driverConnection;
protected $configuration = [];
protected $driverConnection;
protected $queueEmptiedAt;
private $schemaSynchronizer;
private $autoSetup;

public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
$this->autoSetup = $this->configuration['auto_setup'];
}

public function reset()
{
$this->queueEmptiedAt = null;
}

public function getConfiguration(): array
{
return $this->configuration;
Expand All @@ -78,20 +86,20 @@ public static function buildConfiguration(string $dsn, array $options = []): arr
}

$configuration = ['connection' => $components['host']];
$configuration += $options + $query + self::DEFAULT_OPTIONS;
$configuration += $options + $query + static::DEFAULT_OPTIONS;

$configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);

// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
if (0 < \count($optionsExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
}

// check for extra keys in options
$queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS));
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
if (0 < \count($queryExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
}

return $configuration;
Expand Down Expand Up @@ -154,9 +162,13 @@ public function get(): ?array

if (false === $doctrineEnvelope) {
$this->driverConnection->commit();
$this->queueEmptiedAt = microtime(true) * 1000;

return null;
}
// Postgres can "group" notifications having the same channel and payload
// We need to be sure to empty the queue before blocking again
$this->queueEmptiedAt = null;

$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);

Expand Down
Expand Up @@ -36,16 +36,22 @@ public function __construct($registry)

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
unset($options['transport_name']);
$configuration = Connection::buildConfiguration($dsn, $options);
$useNotify = ($options['use_notify'] ?? true);
unset($options['transport_name'], $options['use_notify']);
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);

try {
$driverConnection = $this->registry->getConnection($configuration['connection']);
} catch (\InvalidArgumentException $e) {
throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e);
}

$connection = new Connection($configuration, $driverConnection);
if ($useNotify && method_exists($driverConnection->getWrappedConnection(), 'pgsqlGetNotify')) {
$connection = new PostgreSqlConnection($configuration, $driverConnection);
} else {
$connection = new Connection($configuration, $driverConnection);
}

return new DoctrineTransport($connection, $serializer);
}
Expand Down
@@ -0,0 +1,120 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;

/**
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
*
* @internal
* @final
*
* @author Kévin Dunglas <dunglas@gmail.com>
*/
class PostgreSqlConnection extends Connection
{
/**
* * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true
* * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
* * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
*/
protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
'check_delayed_interval' => 1000,
'get_notify_timeout' => 0,
];

private $listening = false;

public function __sleep()
{
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
}

public function __wakeup()
{
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
}

public function __destruct()
{
$this->unlisten();
}

public function reset()
{
parent::reset();
$this->unlisten();
}

public function get(): ?array
{
if (null === $this->queueEmptiedAt) {
return parent::get();
}

if (!$this->listening) {
// This is secure because the table name must be a valid identifier:
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
$this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name']));
$this->listening = true;
}

$notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
if (
// no notifications, or for another table or queue
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
// delayed messages
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
) {
return null;
}

return parent::get();
}

public function setup(): void
{
parent::setup();

$sql = sprintf(<<<'SQL'
LOCK TABLE %1$s;
-- create trigger function
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('%1$s', NEW.queue_name::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- register trigger
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
CREATE TRIGGER notify_trigger
AFTER INSERT
ON %1$s
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
SQL
, $this->configuration['table_name']);
$this->driverConnection->exec($sql);
}

private function unlisten()
{
if (!$this->listening) {
return;
}

$this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
$this->listening = false;
}
}
Expand Up @@ -19,7 +19,8 @@
"php": "^7.2.5",
"doctrine/dbal": "^2.6",
"doctrine/persistence": "^1.3",
"symfony/messenger": "^5.1"
"symfony/messenger": "^5.1",
"symfony/service-contracts": "^1.1|^2"
},
"require-dev": {
"symfony/serializer": "^4.4|^5.0",
Expand Down

0 comments on commit 01f33c3

Please sign in to comment.