Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Redis] message lost prevention #757

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions pkg/redis/PRedis.php
Expand Up @@ -91,6 +91,27 @@ public function lpush(string $key, string $value): int
}
}

public function lrem(string $key, int $count, string $value): int
{
try {
return $this->redis->lrem($key, $count, $value);
} catch (PRedisServerException $e) {
throw new ServerException('lrem command has failed', 0, $e);
}
}

public function renamenx(string $key, string $target): int
{
try {
return $this->redis->renamenx($key, $target);
} catch (PRedisServerException $e) {
if ($e->getMessage() == 'ERR no such key') {
return 0;
}
throw new ServerException('renamenx command has failed', 0, $e);
}
}

public function brpop(array $keys, int $timeout): ?RedisResult
{
try {
Expand All @@ -104,6 +125,32 @@ public function brpop(array $keys, int $timeout): ?RedisResult
}
}

public function brpoplpush(string $source, string $dest, int $timeout): ?RedisResult
{
try {
if ($message = $this->redis->brpoplpush($source, $dest, $timeout)) {
return new RedisResult($source, $message);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('brpop command has failed', 0, $e);
}
}

public function rpoplpush(string $source, string $dest): ?RedisResult
{
try {
if ($message = $this->redis->rpoplpush($source, $dest)) {
return new RedisResult($source, $message);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('rpop command has failed', 0, $e);
}
}

public function rpop(string $key): ?RedisResult
{
try {
Expand Down
42 changes: 42 additions & 0 deletions pkg/redis/Redis.php
Expand Up @@ -48,6 +48,38 @@ public function zrem(string $key, string $value): int;
*/
public function lpush(string $key, string $value): int;

/**
* @param string $key
* @param int $count
* @param string $value
*
* @throws ServerException
*
* @return int number of removed elements
*/
public function lrem(string $key, int $count, string $value): int;

/**
* @param string $key
* @param string $target
*
* @throws ServerException
*
* @return int rename key to non-exists target success
*/
public function renamenx(string $key, string $target): int;

/**
* @param string $source
* @param string $dest
* @param int $timeout in seconds
*
* @throws ServerException
*
* @return RedisResult|null
*/
public function brpoplpush(string $source, string $dest, int $timeout): ?RedisResult;

/**
* @param string[] $keys
* @param int $timeout in seconds
Expand All @@ -58,6 +90,16 @@ public function lpush(string $key, string $value): int;
*/
public function brpop(array $keys, int $timeout): ?RedisResult;

/**
* @param string $source
* @param string $dest
*
* @throws ServerException
*
* @return RedisResult|null
*/
public function rpoplpush(string $source, string $dest): ?RedisResult;

/**
* @param string $key
*
Expand Down
15 changes: 7 additions & 8 deletions pkg/redis/RedisConsumer.php
Expand Up @@ -73,7 +73,7 @@ public function receive(int $timeout = 0): ?Message
}
}

return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay);
return $this->receiveMessage($this->queue, $timeout, $this->redeliveryDelay);
}

/**
Expand All @@ -99,20 +99,19 @@ public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class);

$this->acknowledge($message);

if ($requeue) {
$message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey());
$message->setHeader('attempts', 0);
$newMessage = $this->getContext()->getSerializer()->toMessage($message->getReservedKey());
$newMessage->setHeader('attempts', 0);

if ($message->getTimeToLive()) {
$message->setHeader('expires_at', time() + $message->getTimeToLive());
$newMessage->setHeader('expires_at', time() + $message->getTimeToLive());
}

$payload = $this->getContext()->getSerializer()->toString($message);

$payload = $this->getContext()->getSerializer()->toString($newMessage);
$this->getRedis()->lpush($this->queue->getName(), $payload);
}

$this->acknowledge($message);
}

private function getContext(): RedisContext
Expand Down
66 changes: 28 additions & 38 deletions pkg/redis/RedisConsumerHelperTrait.php
Expand Up @@ -6,41 +6,31 @@

trait RedisConsumerHelperTrait
{
/**
* @var string[]
*/
protected $queueNames;

abstract protected function getContext(): RedisContext;

/**
* @param RedisDestination[] $queues
* @param RedisDestination $destination
* @param int $timeout
* @param int $redeliveryDelay
*
* @return RedisMessage|null
*/
protected function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage
protected function receiveMessage(RedisDestination $destination, int $timeout, int $redeliveryDelay): ?RedisMessage
{
$startAt = time();
$thisTimeout = $timeout;

if (null === $this->queueNames) {
$this->queueNames = [];
foreach ($queues as $queue) {
$this->queueNames[] = $queue->getName();
}
}

while ($thisTimeout > 0) {
$this->migrateExpiredMessages($this->queueNames);
$queueName = $destination->getName();
$this->migrateExpiredMessages([$queueName]);

if (false == $result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) {
if (false == $result = $this->getContext()->getRedis()->brpoplpush(
$queueName, $queueName.':processing', $thisTimeout
)) {
$this->migrateProcessingMessages([$queueName]);
return null;
}

$this->pushQueueNameBack($result->getKey());

if ($message = $this->processResult($result, $redeliveryDelay)) {
return $message;
}
Expand All @@ -53,10 +43,15 @@ protected function receiveMessage(array $queues, int $timeout, int $redeliveryDe

protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage
{
$this->migrateExpiredMessages([$destination->getName()]);
$queueName = $destination->getName();
$this->migrateExpiredMessages([$queueName]);

if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) {
if ($result = $this->getContext()->getRedis()->rpoplpush(
$queueName, $queueName.':processing'
)) {
return $this->processResult($result, $redeliveryDelay);
} else {
$this->migrateProcessingMessages([$queueName]);
}

return null;
Expand All @@ -80,29 +75,15 @@ protected function processResult(RedisResult $result, int $redeliveryDelay): ?Re
$message->setReservedKey($this->getContext()->getSerializer()->toString($message));

$reservedQueue = $result->getKey().':reserved';
$processingQueue = $result->getKey().':processing';
$redeliveryAt = $now + $redeliveryDelay;

$this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt);

$redis = $this->getContext()->getRedis();
$redis->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt);
$redis->lrem($processingQueue, 0, $result->getMessage());
return $message;
}

protected function pushQueueNameBack(string $queueName): void
{
if (count($this->queueNames) <= 1) {
return;
}

if (false === $from = array_search($queueName, $this->queueNames, true)) {
throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName));
}

$to = count($this->queueNames) - 1;

$out = array_splice($this->queueNames, $from, 1);
array_splice($this->queueNames, $to, 0, $out);
}

protected function migrateExpiredMessages(array $queueNames): void
{
$now = time();
Expand All @@ -115,4 +96,13 @@ protected function migrateExpiredMessages(array $queueNames): void
->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]);
}
}

protected function migrateProcessingMessages(array $queueNames): void
{
foreach ($queueNames as $queueName) {
$this->getContext()->getRedis()
->renamenx($queueName.':processing', $queueName);
}

}
}
5 changes: 5 additions & 0 deletions pkg/redis/RedisContext.php
Expand Up @@ -8,6 +8,7 @@
use Interop\Queue\Context;
use Interop\Queue\Destination;
use Interop\Queue\Exception\InvalidDestinationException;
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
use Interop\Queue\Message;
use Interop\Queue\Producer;
Expand Down Expand Up @@ -135,10 +136,14 @@ public function createConsumer(Destination $destination): Consumer
*/
public function createSubscriptionConsumer(): SubscriptionConsumer
{
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
/*
* Todo temp disable subscription consumer
$consumer = new RedisSubscriptionConsumer($this);
$consumer->setRedeliveryDelay($this->redeliveryDelay);

return $consumer;
*/
}

/**
Expand Down