diff --git a/pkg/redis/PRedis.php b/pkg/redis/PRedis.php index f76ad3efd..6dab6004f 100644 --- a/pkg/redis/PRedis.php +++ b/pkg/redis/PRedis.php @@ -91,6 +91,27 @@ public function lpush(string $key, string $value): int } } + public function lrem(string $key, int $count, string $value): int + { + try { + return $this->redis->lrem($key, $count, $value); + } catch (PRedisServerException $e) { + throw new ServerException('lrem command has failed', 0, $e); + } + } + + public function renamenx(string $key, string $target): int + { + try { + return $this->redis->renamenx($key, $target); + } catch (PRedisServerException $e) { + if ($e->getMessage() == 'ERR no such key') { + return 0; + } + throw new ServerException('renamenx command has failed', 0, $e); + } + } + public function brpop(array $keys, int $timeout): ?RedisResult { try { @@ -104,6 +125,32 @@ public function brpop(array $keys, int $timeout): ?RedisResult } } + public function brpoplpush(string $source, string $dest, int $timeout): ?RedisResult + { + try { + if ($message = $this->redis->brpoplpush($source, $dest, $timeout)) { + return new RedisResult($source, $message); + } + + return null; + } catch (PRedisServerException $e) { + throw new ServerException('brpop command has failed', 0, $e); + } + } + + public function rpoplpush(string $source, string $dest): ?RedisResult + { + try { + if ($message = $this->redis->rpoplpush($source, $dest)) { + return new RedisResult($source, $message); + } + + return null; + } catch (PRedisServerException $e) { + throw new ServerException('rpop command has failed', 0, $e); + } + } + public function rpop(string $key): ?RedisResult { try { diff --git a/pkg/redis/Redis.php b/pkg/redis/Redis.php index 362f5e8d0..7b4651512 100644 --- a/pkg/redis/Redis.php +++ b/pkg/redis/Redis.php @@ -48,6 +48,38 @@ public function zrem(string $key, string $value): int; */ public function lpush(string $key, string $value): int; + /** + * @param string $key + * @param int $count + * @param string $value + * + * @throws ServerException + * + * @return int number of removed elements + */ + public function lrem(string $key, int $count, string $value): int; + + /** + * @param string $key + * @param string $target + * + * @throws ServerException + * + * @return int rename key to non-exists target success + */ + public function renamenx(string $key, string $target): int; + + /** + * @param string $source + * @param string $dest + * @param int $timeout in seconds + * + * @throws ServerException + * + * @return RedisResult|null + */ + public function brpoplpush(string $source, string $dest, int $timeout): ?RedisResult; + /** * @param string[] $keys * @param int $timeout in seconds @@ -58,6 +90,16 @@ public function lpush(string $key, string $value): int; */ public function brpop(array $keys, int $timeout): ?RedisResult; + /** + * @param string $source + * @param string $dest + * + * @throws ServerException + * + * @return RedisResult|null + */ + public function rpoplpush(string $source, string $dest): ?RedisResult; + /** * @param string $key * diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 6ff23f41e..68a147280 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -73,7 +73,7 @@ public function receive(int $timeout = 0): ?Message } } - return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay); + return $this->receiveMessage($this->queue, $timeout, $this->redeliveryDelay); } /** @@ -99,20 +99,19 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - $this->acknowledge($message); - if ($requeue) { - $message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); - $message->setHeader('attempts', 0); + $newMessage = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); + $newMessage->setHeader('attempts', 0); if ($message->getTimeToLive()) { - $message->setHeader('expires_at', time() + $message->getTimeToLive()); + $newMessage->setHeader('expires_at', time() + $message->getTimeToLive()); } - $payload = $this->getContext()->getSerializer()->toString($message); - + $payload = $this->getContext()->getSerializer()->toString($newMessage); $this->getRedis()->lpush($this->queue->getName(), $payload); } + + $this->acknowledge($message); } private function getContext(): RedisContext diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php index 9939986ed..e071cdfd2 100644 --- a/pkg/redis/RedisConsumerHelperTrait.php +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -6,41 +6,31 @@ trait RedisConsumerHelperTrait { - /** - * @var string[] - */ - protected $queueNames; - abstract protected function getContext(): RedisContext; /** - * @param RedisDestination[] $queues + * @param RedisDestination $destination * @param int $timeout * @param int $redeliveryDelay * * @return RedisMessage|null */ - protected function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage + protected function receiveMessage(RedisDestination $destination, int $timeout, int $redeliveryDelay): ?RedisMessage { $startAt = time(); $thisTimeout = $timeout; - if (null === $this->queueNames) { - $this->queueNames = []; - foreach ($queues as $queue) { - $this->queueNames[] = $queue->getName(); - } - } - while ($thisTimeout > 0) { - $this->migrateExpiredMessages($this->queueNames); + $queueName = $destination->getName(); + $this->migrateExpiredMessages([$queueName]); - if (false == $result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { + if (false == $result = $this->getContext()->getRedis()->brpoplpush( + $queueName, $queueName.':processing', $thisTimeout + )) { + $this->migrateProcessingMessages([$queueName]); return null; } - $this->pushQueueNameBack($result->getKey()); - if ($message = $this->processResult($result, $redeliveryDelay)) { return $message; } @@ -53,10 +43,15 @@ protected function receiveMessage(array $queues, int $timeout, int $redeliveryDe protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage { - $this->migrateExpiredMessages([$destination->getName()]); + $queueName = $destination->getName(); + $this->migrateExpiredMessages([$queueName]); - if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) { + if ($result = $this->getContext()->getRedis()->rpoplpush( + $queueName, $queueName.':processing' + )) { return $this->processResult($result, $redeliveryDelay); + } else { + $this->migrateProcessingMessages([$queueName]); } return null; @@ -80,29 +75,15 @@ protected function processResult(RedisResult $result, int $redeliveryDelay): ?Re $message->setReservedKey($this->getContext()->getSerializer()->toString($message)); $reservedQueue = $result->getKey().':reserved'; + $processingQueue = $result->getKey().':processing'; $redeliveryAt = $now + $redeliveryDelay; - $this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); - + $redis = $this->getContext()->getRedis(); + $redis->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); + $redis->lrem($processingQueue, 0, $result->getMessage()); return $message; } - protected function pushQueueNameBack(string $queueName): void - { - if (count($this->queueNames) <= 1) { - return; - } - - if (false === $from = array_search($queueName, $this->queueNames, true)) { - throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); - } - - $to = count($this->queueNames) - 1; - - $out = array_splice($this->queueNames, $from, 1); - array_splice($this->queueNames, $to, 0, $out); - } - protected function migrateExpiredMessages(array $queueNames): void { $now = time(); @@ -115,4 +96,13 @@ protected function migrateExpiredMessages(array $queueNames): void ->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); } } + + protected function migrateProcessingMessages(array $queueNames): void + { + foreach ($queueNames as $queueName) { + $this->getContext()->getRedis() + ->renamenx($queueName.':processing', $queueName); + } + + } } diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index 344bb20c5..b05c0104d 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -8,6 +8,7 @@ use Interop\Queue\Context; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; +use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; @@ -135,10 +136,14 @@ public function createConsumer(Destination $destination): Consumer */ public function createSubscriptionConsumer(): SubscriptionConsumer { + throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); + /* + * Todo temp disable subscription consumer $consumer = new RedisSubscriptionConsumer($this); $consumer->setRedeliveryDelay($this->redeliveryDelay); return $consumer; + */ } /**