Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"queue-interop/queue-interop": "^0.8.1",
"bunny/bunny": "^0.4|^0.5",
"php-amqplib/php-amqplib": "^2.12.1",
"doctrine/dbal": "^2.12",
"doctrine/dbal": "^2.12|^3.1",
"ramsey/uuid": "^3.5|^4",
"psr/log": "^1.1",
"psr/container": "^1",
Expand Down
23 changes: 11 additions & 12 deletions pkg/dbal/DbalConsumerHelperTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\RetryableException;
use Doctrine\DBAL\Types\Type;
use Ramsey\Uuid\Uuid;

trait DbalConsumerHelperTrait
Expand Down Expand Up @@ -39,7 +38,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
->addOrderBy('priority', 'asc')
->addOrderBy('published_at', 'asc')
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
->setParameter('delayedUntil', $now, Type::INTEGER)
->setParameter('delayedUntil', $now, DbalType::INTEGER)
->setMaxResults(1);

$update = $this->getConnection()->createQueryBuilder()
Expand All @@ -48,8 +47,8 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
->set('redeliver_after', ':redeliverAfter')
->andWhere('id = :messageId')
->andWhere('delivery_id IS NULL')
->setParameter('deliveryId', $deliveryId, Type::GUID)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
->setParameter('deliveryId', $deliveryId, DbalType::GUID)
->setParameter('redeliverAfter', $now + $redeliveryDelay, DbalType::BIGINT)
;

while (microtime(true) < $endAt) {
Expand All @@ -60,14 +59,14 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
}

$update
->setParameter('messageId', $result['id'], Type::GUID);
->setParameter('messageId', $result['id'], DbalType::GUID);

if ($update->execute()) {
$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId, Type::GUID)
->setParameter('deliveryId', $deliveryId, DbalType::GUID)
->setMaxResults(1)
->execute()
->fetch();
Expand Down Expand Up @@ -103,9 +102,9 @@ protected function redeliverMessages(): void
->set('redelivered', ':redelivered')
->andWhere('redeliver_after < :now')
->andWhere('delivery_id IS NOT NULL')
->setParameter(':now', time(), Type::BIGINT)
->setParameter('deliveryId', null, Type::GUID)
->setParameter('redelivered', true, Type::BOOLEAN)
->setParameter(':now', time(), DbalType::BIGINT)
->setParameter('deliveryId', null, DbalType::GUID)
->setParameter('redelivered', true, DbalType::BOOLEAN)
;

try {
Expand All @@ -131,8 +130,8 @@ protected function removeExpiredMessages(): void
->andWhere('delivery_id IS NULL')
->andWhere('redelivered = :redelivered')

->setParameter(':now', time(), Type::BIGINT)
->setParameter('redelivered', false, Type::BOOLEAN)
->setParameter(':now', time(), DbalType::BIGINT)
->setParameter('redelivered', false, DbalType::BOOLEAN)
;

try {
Expand All @@ -153,7 +152,7 @@ private function deleteMessage(string $deliveryId): void
$this->getConnection()->delete(
$this->getContext()->getTableName(),
['delivery_id' => $deliveryId],
['delivery_id' => Type::GUID]
['delivery_id' => DbalType::GUID]
);
}
}
27 changes: 13 additions & 14 deletions pkg/dbal/DbalContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Interop\Queue\Consumer;
use Interop\Queue\Context;
use Interop\Queue\Destination;
Expand Down Expand Up @@ -183,7 +182,7 @@ public function purgeQueue(Queue $queue): void
$this->getDbalConnection()->delete(
$this->getTableName(),
['queue' => $queue->getQueueName()],
['queue' => Type::STRING]
['queue' => DbalType::STRING]
);
}

Expand Down Expand Up @@ -221,18 +220,18 @@ public function createDataBaseTable(): void

$table = new Table($this->getTableName());

$table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]);
$table->addColumn('published_at', Type::BIGINT);
$table->addColumn('body', Type::TEXT, ['notnull' => false]);
$table->addColumn('headers', Type::TEXT, ['notnull' => false]);
$table->addColumn('properties', Type::TEXT, ['notnull' => false]);
$table->addColumn('redelivered', Type::BOOLEAN, ['notnull' => false]);
$table->addColumn('queue', Type::STRING);
$table->addColumn('priority', Type::SMALLINT, ['notnull' => false]);
$table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]);
$table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]);
$table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
$table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]);
$table->addColumn('id', DbalType::GUID, ['length' => 16, 'fixed' => true]);
$table->addColumn('published_at', DbalType::BIGINT);
$table->addColumn('body', DbalType::TEXT, ['notnull' => false]);
$table->addColumn('headers', DbalType::TEXT, ['notnull' => false]);
$table->addColumn('properties', DbalType::TEXT, ['notnull' => false]);
$table->addColumn('redelivered', DbalType::BOOLEAN, ['notnull' => false]);
$table->addColumn('queue', DbalType::STRING);
$table->addColumn('priority', DbalType::SMALLINT, ['notnull' => false]);
$table->addColumn('delayed_until', DbalType::BIGINT, ['notnull' => false]);
$table->addColumn('time_to_live', DbalType::BIGINT, ['notnull' => false]);
$table->addColumn('delivery_id', DbalType::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
$table->addColumn('redeliver_after', DbalType::BIGINT, ['notnull' => false]);

$table->setPrimaryKey(['id']);
$table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']);
Expand Down
25 changes: 12 additions & 13 deletions pkg/dbal/DbalProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Enqueue\Dbal;

use Doctrine\DBAL\Types\Type;
use Interop\Queue\Destination;
use Interop\Queue\Exception\Exception;
use Interop\Queue\Exception\InvalidDestinationException;
Expand Down Expand Up @@ -107,18 +106,18 @@ public function send(Destination $destination, Message $message): void

try {
$rowsAffected = $this->context->getDbalConnection()->insert($this->context->getTableName(), $dbalMessage, [
'id' => Type::GUID,
'published_at' => Type::INTEGER,
'body' => Type::TEXT,
'headers' => Type::TEXT,
'properties' => Type::TEXT,
'priority' => Type::SMALLINT,
'queue' => Type::STRING,
'time_to_live' => Type::INTEGER,
'delayed_until' => Type::INTEGER,
'redelivered' => Type::SMALLINT,
'delivery_id' => Type::STRING,
'redeliver_after' => Type::BIGINT,
'id' => DbalType::GUID,
'published_at' => DbalType::INTEGER,
'body' => DbalType::TEXT,
'headers' => DbalType::TEXT,
'properties' => DbalType::TEXT,
'priority' => DbalType::SMALLINT,
'queue' => DbalType::STRING,
'time_to_live' => DbalType::INTEGER,
'delayed_until' => DbalType::INTEGER,
'redelivered' => DbalType::SMALLINT,
'delivery_id' => DbalType::STRING,
'redeliver_after' => DbalType::BIGINT,
]);

if (1 !== $rowsAffected) {
Expand Down
34 changes: 34 additions & 0 deletions pkg/dbal/DbalType.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Enqueue\Dbal;

class DbalType
{
public const ARRAY = 'array';
public const ASCII_STRING = 'ascii_string';
public const BIGINT = 'bigint';
public const BINARY = 'binary';
public const BLOB = 'blob';
public const BOOLEAN = 'boolean';
public const DATE_MUTABLE = 'date';
public const DATE_IMMUTABLE = 'date_immutable';
public const DATEINTERVAL = 'dateinterval';
public const DATETIME_MUTABLE = 'datetime';
public const DATETIME_IMMUTABLE = 'datetime_immutable';
public const DATETIMETZ_MUTABLE = 'datetimetz';
public const DATETIMETZ_IMMUTABLE = 'datetimetz_immutable';
public const DECIMAL = 'decimal';
public const FLOAT = 'float';
public const GUID = 'guid';
public const INTEGER = 'integer';
public const JSON = 'json';
public const OBJECT = 'object';
public const SIMPLE_ARRAY = 'simple_array';
public const SMALLINT = 'smallint';
public const STRING = 'string';
public const TEXT = 'text';
public const TIME_MUTABLE = 'time';
public const TIME_IMMUTABLE = 'time_immutable';
}
6 changes: 3 additions & 3 deletions pkg/dbal/Tests/DbalConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Enqueue\Dbal\Tests;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Types\Type;
use Enqueue\Dbal\DbalConsumer;
use Enqueue\Dbal\DbalContext;
use Enqueue\Dbal\DbalDestination;
use Enqueue\Dbal\DbalMessage;
use Enqueue\Dbal\DbalProducer;
use Enqueue\Dbal\DbalType;
use Enqueue\Test\ClassExtensionTrait;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
Expand Down Expand Up @@ -72,7 +72,7 @@ public function testShouldDeleteMessageOnAcknowledge()
->with(
'some-table-name',
['delivery_id' => $deliveryId->toString()],
['delivery_id' => Type::GUID]
['delivery_id' => DbalType::GUID]
)
;

Expand Down Expand Up @@ -143,7 +143,7 @@ public function testShouldDeleteMessageFromQueueOnReject()
->with(
'some-table-name',
['delivery_id' => $deliveryId->toString()],
['delivery_id' => Type::GUID]
['delivery_id' => DbalType::GUID]
)
;

Expand Down
2 changes: 1 addition & 1 deletion pkg/dbal/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"require": {
"php": "^7.3|^8.0",
"queue-interop/queue-interop": "^0.8",
"doctrine/dbal": "^2.12",
"doctrine/dbal": "^2.12|^3.1",
"doctrine/persistence": "^1.3.3|^2.0",
"ramsey/uuid": "^3.5|^4"
},
Expand Down