Skip to content

Commit

Permalink
bug #53819 [Doctrine Messenger] Fix support for pgsql + pgbouncer. (j…
Browse files Browse the repository at this point in the history
…wage)

This PR was squashed before being merged into the 6.4 branch.

Discussion
----------

[Doctrine Messenger] Fix support for pgsql + pgbouncer.

| Q             | A
| ------------- | ---
| Branch?       | 6.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Issues        |
| License       | MIT

## Problem

When you use PgBouncer in front of a PostgreSQL server with transaction pooling mode enabled, the `INSERT` and the `lastInsertId()` happen in separate transactions which are separate connections/sessions when using PgBouncer. So the call to `lastInsertId()` fails with the following exception:

```
[Doctrine\DBAL\Exception\DriverException (7)]
An exception occurred in the driver: SQLSTATE[55000]: Object not in prerequisite state: 7 ERROR:  lastval is not yet defined in this session

Exception trace:
 at /app/vendor/doctrine/dbal/src/Driver/API/PostgreSQL/ExceptionConverter.php:87
Doctrine\DBAL\Driver\API\PostgreSQL\ExceptionConverter->convert() at /app/vendor/doctrine/dbal/src/Connection.php:1938
Doctrine\DBAL\Connection->handleDriverException() at /app/vendor/doctrine/dbal/src/Connection.php:1886
Doctrine\DBAL\Connection->convertException() at /app/vendor/doctrine/dbal/src/Connection.php:1253
Doctrine\DBAL\Connection->lastInsertId() at /app/vendor/symfony/doctrine-messenger/Transport/Connection.php:156
Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection->send() at /app/vendor/symfony/doctrine-messenger/Transport/DoctrineSender.php:46
Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineSender->send() at /app/vendor/symfony/doctrine-messenger/Transport/DoctrineTransport.php:72
Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport->send() at /app/vendor/symfony/messenger/EventListener/SendFailedMessageForRetryListener.php:81
Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener->onMessageFailed() at /app/vendor/symfony/event-dispatcher/EventDispatcher.php:220
Symfony\Component\EventDispatcher\EventDispatcher->callListeners() at /app/vendor/symfony/event-dispatcher/EventDispatcher.php:56
Symfony\Component\EventDispatcher\EventDispatcher->dispatch() at /app/vendor/symfony/messenger/Worker.php:198
Symfony\Component\Messenger\Worker->ack() at /app/vendor/symfony/messenger/Worker.php:174
Symfony\Component\Messenger\Worker->handleMessage() at /app/vendor/symfony/messenger/Worker.php:109
```

## Solution

Wrap the `INSERT` and `lastInsertId()` with a single transaction, then when `lastInsertId()` is called, it will be within the same session that the message was inserted in.

In addition, this PR adds the ability to use PostgresSQL `RETURNING id` clause instead of calling `lastInsertId()` so we can get the id of the inserted message in one operation instead of two.

TODO:

- [x] Add test for table not found scenario when inserting a message.
- [x] Add tests for when lastInsertId returns false, int and string.
- [x] Is there a place where I can write an integration test for this behavior that already exists?
- [x] Investigate using pgsql RETURNING clause to simplify this. The insert can return the id after the message is inserted.
- [ ] Squash commits to one clean commit before merge.

Commits
-------

c5830b4 [Doctrine Messenger] Fix support for pgsql + pgbouncer.
  • Loading branch information
fabpot committed Feb 15, 2024
2 parents 7080b83 + c5830b4 commit 189bfeb
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 13 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/integration-tests.yml
Expand Up @@ -140,6 +140,14 @@ jobs:
sudo service redis-server restart
echo "::endgroup::"
- name: Install pgbouncer
run: |
sudo apt-get install -y pgbouncer
sudo cp src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Fixtures/pgbouncer/pgbouncer.ini /etc/pgbouncer/pgbouncer.ini
sudo cp src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Fixtures/pgbouncer/userlist.txt /etc/pgbouncer/userlist.txt
sudo service pgbouncer restart
sudo su - postgres -c "PGPASSWORD=password psql -Atq -h localhost -p 5432 -U postgres -d postgres -c \"SELECT usename, passwd FROM pg_shadow\""
- name: Configure Couchbase
run: |
curl -s -u 'username=Administrator&password=111111' -X POST http://localhost:8091/node/controller/setupServices -d 'services=kv%2Cn1ql%2Cindex%2Cfts'
Expand Down Expand Up @@ -196,6 +204,7 @@ jobs:
MESSENGER_SQS_FIFO_QUEUE_DSN: "sqs://localhost:4566/messages.fifo?sslmode=disable&poll_timeout=0.01"
KAFKA_BROKER: 127.0.0.1:9092
POSTGRES_HOST: localhost
PGBOUNCER_HOST: localhost:6432

#- name: Run HTTP push tests
# if: matrix.php == '8.1'
Expand Down
@@ -0,0 +1,13 @@
[databases]
postgres = host=localhost port=5432 user=postgres dbname=postgres pool_mode=transaction

[pgbouncer]
logfile = /var/log/postgresql/pgbouncer.log
pidfile = /var/run/postgresql/pgbouncer.pid
listen_addr = localhost
listen_port = 6432
unix_socket_dir = /var/run/postgresql
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
max_client_conn = 20
default_pool_size = 20
@@ -0,0 +1 @@
"postgres" "md532e12f215ba27cb750c9e093ce4b5127"
Expand Up @@ -119,6 +119,104 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
$connection->reject('dummy_id');
}

public function testSend()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();

$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);

$queryBuilder->expects($this->once())
->method('insert')
->willReturn($queryBuilder);

$queryBuilder->expects($this->once())
->method('values')
->with([
'body' => '?',
'headers' => '?',
'queue_name' => '?',
'created_at' => '?',
'available_at' => '?',
])
->willReturn($queryBuilder);

$queryBuilder->expects($this->once())
->method('getSQL')
->willReturn('INSERT');

$driverConnection->expects($this->once())
->method('beginTransaction');

$driverConnection->expects($this->once())
->method('executeStatement')
->with('INSERT')
->willReturn(1);

$driverConnection->expects($this->once())
->method('lastInsertId')
->willReturn('1');

$driverConnection->expects($this->once())
->method('commit');

$connection = new Connection([], $driverConnection);
$id = $connection->send('test', []);

self::assertSame('1', $id);
}

public function testSendLastInsertIdReturnsInteger()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();

$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);

$queryBuilder->expects($this->once())
->method('insert')
->willReturn($queryBuilder);

$queryBuilder->expects($this->once())
->method('values')
->with([
'body' => '?',
'headers' => '?',
'queue_name' => '?',
'created_at' => '?',
'available_at' => '?',
])
->willReturn($queryBuilder);

$queryBuilder->expects($this->once())
->method('getSQL')
->willReturn('INSERT');

$driverConnection->expects($this->once())
->method('beginTransaction');

$driverConnection->expects($this->once())
->method('executeStatement')
->with('INSERT')
->willReturn(1);

$driverConnection->expects($this->once())
->method('lastInsertId')
->willReturn(1);

$driverConnection->expects($this->once())
->method('commit');

$connection = new Connection([], $driverConnection);
$id = $connection->send('test', []);

self::assertSame('1', $id);
}

private function getDBALConnectionMock()
{
$driverConnection = $this->createMock(DBALConnection::class);
Expand Down
@@ -0,0 +1,89 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;

use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory;
use Doctrine\DBAL\Tools\DsnParser;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;

/**
* This tests using PostgreSqlConnection with PgBouncer between pgsql and the application.
*
* @requires extension pdo_pgsql
*
* @group integration
*/
class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase
{
private Connection $driverConnection;
private PostgreSqlConnection $connection;

public function testSendAndGetWithAutoSetupEnabledAndNotSetupAlready()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);

$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);

$this->assertNull($this->connection->get());
}

public function testSendAndGetWithAutoSetupEnabledAndSetupAlready()
{
$this->connection->setup();

$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);

$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);

$this->assertNull($this->connection->get());
}

protected function setUp(): void
{
if (!$host = getenv('PGBOUNCER_HOST')) {
$this->markTestSkipped('Missing PGBOUNCER_HOST env variable');
}

$url = "pdo-pgsql://postgres:password@$host";
$params = class_exists(DsnParser::class) ? (new DsnParser())->parse($url) : ['url' => $url];
$config = new Configuration();
if (class_exists(DefaultSchemaManagerFactory::class)) {
$config->setSchemaManagerFactory(new DefaultSchemaManagerFactory());
}

$this->driverConnection = DriverManager::getConnection($params, $config);
$this->connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $this->driverConnection);
}

protected function tearDown(): void
{
$this->createSchemaManager()->dropTable('queue_table');
$this->driverConnection->close();
}

private function createSchemaManager(): AbstractSchemaManager
{
return method_exists($this->driverConnection, 'createSchemaManager')
? $this->driverConnection->createSchemaManager()
: $this->driverConnection->getSchemaManager();
}
}
@@ -0,0 +1,89 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;

use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory;
use Doctrine\DBAL\Tools\DsnParser;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;

/**
* This tests a using Doctrine PostgreSql connection without using PostgreSqlConnection
* that gets used when use_notify is enabled.
*
* @requires extension pdo_pgsql
*
* @group integration
*/
class DoctrinePostgreSqlRegularIntegrationTest extends TestCase
{
private \Doctrine\DBAL\Connection $driverConnection;
private Connection $connection;

public function testSendAndGetWithAutoSetupEnabledAndNotSetupAlready()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);

$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);

$this->assertNull($this->connection->get());
}

public function testSendAndGetWithAutoSetupEnabledAndSetupAlready()
{
$this->connection->setup();

$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);

$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);

$this->assertNull($this->connection->get());
}

protected function setUp(): void
{
if (!$host = getenv('POSTGRES_HOST')) {
$this->markTestSkipped('Missing POSTGRES_HOST env variable');
}

$url = "pdo-pgsql://postgres:password@$host";
$params = class_exists(DsnParser::class) ? (new DsnParser())->parse($url) : ['url' => $url];
$config = new Configuration();
if (class_exists(DefaultSchemaManagerFactory::class)) {
$config->setSchemaManagerFactory(new DefaultSchemaManagerFactory());
}

$this->driverConnection = DriverManager::getConnection($params, $config);
$this->connection = new Connection(['table_name' => 'queue_table'], $this->driverConnection);
}

protected function tearDown(): void
{
$this->createSchemaManager()->dropTable('queue_table');
$this->driverConnection->close();
}

private function createSchemaManager(): AbstractSchemaManager
{
return method_exists($this->driverConnection, 'createSchemaManager')
? $this->driverConnection->createSchemaManager()
: $this->driverConnection->getSchemaManager();
}
}

0 comments on commit 189bfeb

Please sign in to comment.