Skip to content

Commit

Permalink
[Messenger] Add mysql indexes back and work around deadlocks using so…
Browse files Browse the repository at this point in the history
…ft-delete
  • Loading branch information
nicolas-grekas committed Mar 30, 2022
1 parent 780efff commit 12271a4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Result as DriverResult;
Expand All @@ -24,11 +23,8 @@
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\TableDiff;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Types\Types;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -410,58 +406,4 @@ public function providePlatformSql(): iterable
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
];
}

/**
* @dataProvider setupIndicesProvider
*/
public function testSetupIndices(string $platformClass, array $expectedIndices)
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getConfiguration')->willReturn(new Configuration());

$schemaManager = $this->createMock(AbstractSchemaManager::class);
$schema = new Schema();
$expectedTable = $schema->createTable('messenger_messages');
$expectedTable->addColumn('id', Types::BIGINT);
$expectedTable->setPrimaryKey(['id']);
// Make sure columns for indices exists so addIndex() will not throw
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
$expectedTable->addColumn($columnName, Types::STRING);
}
foreach ($expectedIndices as $indexColumns) {
$expectedTable->addIndex($indexColumns);
}
$schemaManager->method('createSchema')->willReturn($schema);
if (method_exists(DBALConnection::class, 'createSchemaManager')) {
$driverConnection->method('createSchemaManager')->willReturn($schemaManager);
} else {
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
}

$platformMock = $this->createMock($platformClass);
$platformMock
->expects(self::once())
->method('getAlterTableSQL')
->with(self::callback(static function (TableDiff $tableDiff): bool {
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
}))
->willReturn([]);
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);

$connection = new Connection([], $driverConnection);
$connection->setup();
}

public function setupIndicesProvider(): iterable
{
yield 'MySQL' => [
MySQL57Platform::class,
[['delivered_at']],
];

yield 'Other platforms' => [
AbstractPlatform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
];
}
}
25 changes: 20 additions & 5 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Exception as DriverException;
use Doctrine\DBAL\Driver\Result as DriverResult;
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Exception\TableNotFoundException;
Expand Down Expand Up @@ -157,6 +158,14 @@ public function send(string $body, array $headers, int $delay = 0): string

public function get(): ?array
{
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
try {
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31']);
} catch (DriverException $e) {
// Ignore the exception
}
}

get:
$this->driverConnection->beginTransaction();
try {
Expand Down Expand Up @@ -224,6 +233,10 @@ public function get(): ?array
public function ack(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31'], ['id' => $id]) > 0;
}

return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException|Exception $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
Expand All @@ -233,6 +246,10 @@ public function ack(string $id): bool
public function reject(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31'], ['id' => $id]) > 0;
}

return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException|Exception $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
Expand Down Expand Up @@ -388,6 +405,7 @@ private function getSchema(): Schema
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(true);
Expand All @@ -396,11 +414,8 @@ private function getSchema(): Schema
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
if (!$this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
}
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);

return $schema;
Expand Down

0 comments on commit 12271a4

Please sign in to comment.