diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 0d57541d6..37d667f65 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -29,6 +29,24 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa $endAt = microtime(true) + 0.2; // add 200ms + $selectQueryParameters = [ + 'queues' => $queues, + 'delayedUntil' => $now, + ]; + $selectQueryParameterTypes = [ + 'queues' => Connection::PARAM_STR_ARRAY, + 'delayedUntil' => DbalType::INTEGER, + ]; + + $i = 0; + $orderByQueues = []; + foreach ($queues as $queue) { + $parameterName = 'queue'.$i; + $orderByQueues[] = sprintf('WHEN queue = :%s THEN %d', $parameterName, $i++); + $selectQueryParameters[$parameterName] = $queue; + $selectQueryParameterTypes[$parameterName] = DbalType::STRING; + } + $select = $this->getConnection()->createQueryBuilder() ->select('id') ->from($this->getContext()->getTableName()) @@ -36,9 +54,9 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') ->andWhere('delivery_id IS NULL') ->addOrderBy('priority', 'asc') + ->addOrderBy('CASE '.implode(' ', $orderByQueues).' ELSE -1 END') ->addOrderBy('published_at', 'asc') - ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY) - ->setParameter('delayedUntil', $now, DbalType::INTEGER) + ->setParameters($selectQueryParameters, $selectQueryParameterTypes) ->setMaxResults(1); $update = $this->getConnection()->createQueryBuilder() diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 2551043e3..719a754da 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -87,8 +87,9 @@ public function consume(int $timeout = 0): void $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { - $queueNames[$queueName] = $queueName; + $queueNames[] = $queueName; } + $queueNames = array_unique($queueNames); $timeout /= 1000; $now = time(); @@ -114,7 +115,10 @@ public function consume(int $timeout = 0): void return; } - unset($currentQueueNames[$message->getQueue()]); + $queueNames = array_filter($queueNames, static function ($queueName) use ($message) { + return $message->getQueue() !== $queueName; + }); + $queueNames[] = $message->getQueue(); } else { $currentQueueNames = [];