Skip to content

Commit a25f83e

Browse files
committed
redis new implementation
1 parent 4ca285f commit a25f83e

14 files changed

+286
-236
lines changed

pkg/redis/JsonSerializer.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Redis;
6+
7+
class JsonSerializer implements Serializer
8+
{
9+
public function toString(RedisMessage $message): string
10+
{
11+
$json = json_encode([
12+
'body' => $message->getBody(),
13+
'properties' => $message->getProperties(),
14+
'headers' => $message->getHeaders(),
15+
]);
16+
17+
if (JSON_ERROR_NONE !== json_last_error()) {
18+
throw new \InvalidArgumentException(sprintf(
19+
'The malformed json given. Error %s and message %s',
20+
json_last_error(),
21+
json_last_error_msg()
22+
));
23+
}
24+
25+
return $json;
26+
}
27+
28+
public function toMessage(string $string): RedisMessage
29+
{
30+
$data = json_decode($string, true);
31+
if (JSON_ERROR_NONE !== json_last_error()) {
32+
throw new \InvalidArgumentException(sprintf(
33+
'The malformed json given. Error %s and message %s',
34+
json_last_error(),
35+
json_last_error_msg()
36+
));
37+
}
38+
39+
return new RedisMessage($data['body'], $data['properties'], $data['headers']);
40+
}
41+
}

pkg/redis/RedisConnectionFactory.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class RedisConnectionFactory implements ConnectionFactory
3838
* 'read_write_timeout' => Timeout (expressed in seconds) used when performing read or write operations on the underlying network resource after which an exception is thrown.
3939
* 'predis_options' => An array of predis specific options.
4040
* 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options
41+
* 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay.
42+
* It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time.
4143
* ].
4244
*
4345
* or
@@ -85,10 +87,10 @@ public function createContext(): Context
8587
if ($this->config['lazy']) {
8688
return new RedisContext(function () {
8789
return $this->createRedis();
88-
});
90+
}, $this->config['redelivery_delay']);
8991
}
9092

91-
return new RedisContext($this->createRedis());
93+
return new RedisContext($this->createRedis(), $this->config['redelivery_delay']);
9294
}
9395

9496
private function createRedis(): Redis
@@ -158,6 +160,7 @@ private function defaultConfig(): array
158160
'read_write_timeout' => null,
159161
'predis_options' => null,
160162
'ssl' => null,
163+
'redelivery_delay' => 300,
161164
];
162165
}
163166
}

pkg/redis/RedisConsumer.php

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
class RedisConsumer implements Consumer
1313
{
14+
use RedisConsumerHelperTrait;
15+
1416
/**
1517
* @var RedisDestination
1618
*/
@@ -24,12 +26,7 @@ class RedisConsumer implements Consumer
2426
/**
2527
* @var int
2628
*/
27-
private $retryDelay;
28-
29-
/**
30-
* @var RedisQueueConsumer
31-
*/
32-
private $queueConsumer;
29+
private $redeliveryDelay = 300;
3330

3431
public function __construct(RedisContext $context, RedisDestination $queue)
3532
{
@@ -40,21 +37,17 @@ public function __construct(RedisContext $context, RedisDestination $queue)
4037
/**
4138
* @return int
4239
*/
43-
public function getRetryDelay(): ?int
40+
public function getRedeliveryDelay(): ?int
4441
{
45-
return $this->retryDelay;
42+
return $this->redeliveryDelay;
4643
}
4744

4845
/**
49-
* @param int $retryDelay
46+
* @param int $delay
5047
*/
51-
public function setRetryDelay(int $retryDelay): void
48+
public function setRedeliveryDelay(int $delay): void
5249
{
53-
$this->retryDelay = $retryDelay;
54-
55-
if ($this->queueConsumer) {
56-
$this->queueConsumer->setRetryDelay($this->retryDelay);
57-
}
50+
$this->redeliveryDelay = $delay;
5851
}
5952

6053
/**
@@ -80,19 +73,15 @@ public function receive(int $timeout = 0): ?Message
8073
}
8174
}
8275

83-
$this->initQueueConsumer();
84-
85-
return $this->queueConsumer->receiveMessage($timeout);
76+
return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay);
8677
}
8778

8879
/**
8980
* @return RedisMessage
9081
*/
9182
public function receiveNoWait(): ?Message
9283
{
93-
$this->initQueueConsumer();
94-
95-
return $this->queueConsumer->receiveMessageNoWait($this->queue);
84+
return $this->receiveMessageNoWait($this->queue, $this->redeliveryDelay);
9685
}
9786

9887
/**
@@ -113,30 +102,26 @@ public function reject(Message $message, bool $requeue = false): void
113102
$this->acknowledge($message);
114103

115104
if ($requeue) {
116-
$message = RedisMessage::jsonUnserialize($message->getReservedKey());
105+
$message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey());
117106
$message->setHeader('attempts', 0);
118107

119108
if ($message->getTimeToLive()) {
120109
$message->setHeader('expires_at', time() + $message->getTimeToLive());
121110
}
122111

123-
$this->getRedis()->lpush($this->queue->getName(), json_encode($message));
112+
$payload = $this->getContext()->getSerializer()->toString($message);
113+
114+
$this->getRedis()->lpush($this->queue->getName(), $payload);
124115
}
125116
}
126117

127-
private function getRedis(): Redis
118+
private function getContext(): RedisContext
128119
{
129-
return $this->context->getRedis();
120+
return $this->context;
130121
}
131122

132-
private function initQueueConsumer(): void
123+
private function getRedis(): Redis
133124
{
134-
if (null === $this->queueConsumer) {
135-
$this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]);
136-
137-
if ($this->retryDelay) {
138-
$this->queueConsumer->setRetryDelay($this->retryDelay);
139-
}
140-
}
125+
return $this->context->getRedis();
141126
}
142127
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Redis;
6+
7+
trait RedisConsumerHelperTrait
8+
{
9+
/**
10+
* @var string[]
11+
*/
12+
protected $queueNames;
13+
14+
abstract function getContext(): RedisContext;
15+
16+
/**
17+
* @param RedisDestination[] $queues
18+
* @param int $timeout
19+
* @param int $redeliveryDelay
20+
*
21+
* @return RedisMessage|null
22+
*/
23+
protected function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage
24+
{
25+
$startAt = time();
26+
$thisTimeout = $timeout;
27+
28+
if (null === $this->queueNames) {
29+
$this->queueNames = [];
30+
foreach ($queues as $queue) {
31+
$this->queueNames[] = $queue->getName();
32+
}
33+
}
34+
35+
while ($thisTimeout > 0) {
36+
$this->migrateExpiredMessages($this->queueNames);
37+
38+
if ($result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) {
39+
$this->pushQueueNameBack($result->getKey());
40+
41+
if ($message = $this->processResult($result, $redeliveryDelay)) {
42+
return $message;
43+
}
44+
}
45+
46+
$thisTimeout -= time() - $startAt;
47+
}
48+
49+
return null;
50+
}
51+
52+
protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage
53+
{
54+
$this->migrateExpiredMessages([$destination->getName()]);
55+
56+
if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) {
57+
return $this->processResult($result, $redeliveryDelay);
58+
}
59+
60+
return null;
61+
}
62+
63+
protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage
64+
{
65+
$message = $this->getContext()->getSerializer()->toMessage($result->getMessage());
66+
67+
$now = time();
68+
69+
if ($expiresAt = $message->getHeader('expires_at')) {
70+
if ($now > $expiresAt) {
71+
return null;
72+
}
73+
}
74+
75+
$message->setHeader('attempts', $message->getAttempts() + 1);
76+
$message->setRedelivered($message->getAttempts() > 1);
77+
$message->setKey($result->getKey());
78+
$message->setReservedKey($this->getContext()->getSerializer()->toString($message));
79+
80+
$reservedQueue = $result->getKey().':reserved';
81+
$redeliveryAt = $now + $redeliveryDelay;
82+
83+
$this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt);
84+
85+
return $message;
86+
}
87+
88+
protected function pushQueueNameBack(string $queueName): void
89+
{
90+
if (count($this->queueNames) <= 1) {
91+
return;
92+
}
93+
94+
if (false === $from = array_search($queueName, $this->queueNames, true)) {
95+
throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName));
96+
}
97+
98+
$to = count($this->queueNames) - 1;
99+
100+
$out = array_splice($this->queueNames, $from, 1);
101+
array_splice($this->queueNames, $to, 0, $out);
102+
}
103+
104+
protected function migrateExpiredMessages(array $queueNames): void
105+
{
106+
$now = time();
107+
108+
foreach ($queueNames as $queueName) {
109+
$this->getContext()->getRedis()
110+
->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]);
111+
112+
$this->getContext()->getRedis()
113+
->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]);
114+
}
115+
}
116+
}

pkg/redis/RedisContext.php

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
class RedisContext implements Context
1919
{
20+
use SerializerAwareTrait;
21+
2022
/**
2123
* @var Redis
2224
*/
@@ -27,12 +29,18 @@ class RedisContext implements Context
2729
*/
2830
private $redisFactory;
2931

32+
/**
33+
* @var int
34+
*/
35+
private $redeliveryDelay = 300;
36+
3037
/**
3138
* Callable must return instance of Redis once called.
3239
*
3340
* @param Redis|callable $redis
41+
* @param int $redeliveryDelay
3442
*/
35-
public function __construct($redis)
43+
public function __construct($redis, int $redeliveryDelay)
3644
{
3745
if ($redis instanceof Redis) {
3846
$this->redis = $redis;
@@ -45,6 +53,9 @@ public function __construct($redis)
4553
Redis::class
4654
));
4755
}
56+
57+
$this->redeliveryDelay = $redeliveryDelay;
58+
$this->setSerializer(new JsonSerializer());
4859
}
4960

5061
/**
@@ -101,7 +112,7 @@ public function createTemporaryQueue(): Queue
101112
*/
102113
public function createProducer(): Producer
103114
{
104-
return new RedisProducer($this->getRedis());
115+
return new RedisProducer($this);
105116
}
106117

107118
/**
@@ -113,15 +124,21 @@ public function createConsumer(Destination $destination): Consumer
113124
{
114125
InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class);
115126

116-
return new RedisConsumer($this, $destination);
127+
$consumer = new RedisConsumer($this, $destination);
128+
$consumer->setRedeliveryDelay($this->redeliveryDelay);
129+
130+
return $consumer;
117131
}
118132

119133
/**
120134
* @return RedisSubscriptionConsumer
121135
*/
122136
public function createSubscriptionConsumer(): SubscriptionConsumer
123137
{
124-
return new RedisSubscriptionConsumer($this);
138+
$consumer = new RedisSubscriptionConsumer($this);
139+
$consumer->setRedeliveryDelay($this->redeliveryDelay);
140+
141+
return $consumer;
125142
}
126143

127144
/**

0 commit comments

Comments
 (0)