Skip to content

Commit 4ca285f

Browse files
committed
redis new implementation
1 parent c34dc71 commit 4ca285f

File tree

10 files changed

+493
-48
lines changed

10 files changed

+493
-48
lines changed

pkg/redis/LuaScripts.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Redis;
6+
7+
class LuaScripts
8+
{
9+
/**
10+
* Get the Lua script to migrate expired messages back onto the queue.
11+
*
12+
* KEYS[1] - The queue we are removing messages from, for example: queues:foo:reserved
13+
* KEYS[2] - The queue we are moving messages to, for example: queues:foo
14+
* ARGV[1] - The current UNIX timestamp
15+
*
16+
* @return string
17+
*/
18+
public static function migrateExpired()
19+
{
20+
return <<<'LUA'
21+
-- Get all of the messages with an expired "score"...
22+
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
23+
24+
-- If we have values in the array, we will remove them from the first queue
25+
-- and add them onto the destination queue in chunks of 100, which moves
26+
-- all of the appropriate messages onto the destination queue very safely.
27+
if(next(val) ~= nil) then
28+
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
29+
30+
for i = 1, #val, 100 do
31+
redis.call('lpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
32+
end
33+
end
34+
35+
return val
36+
LUA;
37+
}
38+
}

pkg/redis/PRedis.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,34 @@ public function __construct(array $config)
5353
}
5454
}
5555

56+
public function eval(string $script, array $keys = [], array $args = [])
57+
{
58+
try {
59+
// mixed eval($script, $numkeys, $keyOrArg1 = null, $keyOrArgN = null)
60+
return call_user_func_array([$this->redis, 'eval'], array_merge([$script, count($keys)], $keys, $args));
61+
} catch (PRedisServerException $e) {
62+
throw new ServerException('eval command has failed', null, $e);
63+
}
64+
}
65+
66+
public function zadd(string $key, string $value, float $score): int
67+
{
68+
try {
69+
return $this->redis->zadd($key, [$value => $score]);
70+
} catch (PRedisServerException $e) {
71+
throw new ServerException('zadd command has failed', null, $e);
72+
}
73+
}
74+
75+
public function zrem(string $key, string $value): int
76+
{
77+
try {
78+
return $this->redis->zrem($key, [$value]);
79+
} catch (PRedisServerException $e) {
80+
throw new ServerException('zrem command has failed', null, $e);
81+
}
82+
}
83+
5684
public function lpush(string $key, string $value): int
5785
{
5886
try {

pkg/redis/PhpRedis.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,33 @@ public function __construct(array $config)
2626
$this->config = $config;
2727
}
2828

29+
public function eval(string $script, array $keys = [], array $args = [])
30+
{
31+
try {
32+
return $this->redis->eval($script, array_merge($keys, $args), count($keys));
33+
} catch (\RedisException $e) {
34+
throw new ServerException('eval command has failed', null, $e);
35+
}
36+
}
37+
38+
public function zadd(string $key, string $value, float $score): int
39+
{
40+
try {
41+
return $this->redis->zAdd($key, $score, $value);
42+
} catch (\RedisException $e) {
43+
throw new ServerException('zadd command has failed', null, $e);
44+
}
45+
}
46+
47+
public function zrem(string $key, string $value): int
48+
{
49+
try {
50+
return $this->redis->zRem($key, $value);
51+
} catch (\RedisException $e) {
52+
throw new ServerException('zrem command has failed', null, $e);
53+
}
54+
}
55+
2956
public function lpush(string $key, string $value): int
3057
{
3158
try {

pkg/redis/Redis.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,38 @@
66

77
interface Redis
88
{
9+
/**
10+
* @param string $script
11+
* @param array $keys
12+
* @param array $args
13+
*
14+
* @throws ServerException
15+
*
16+
* @return mixed
17+
*/
18+
public function eval(string $script, array $keys = [], array $args = []);
19+
20+
/**
21+
* @param string $key
22+
* @param string $value
23+
* @param float $score
24+
*
25+
* @throws ServerException
26+
*
27+
* @return int
28+
*/
29+
public function zadd(string $key, string $value, float $score): int;
30+
31+
/**
32+
* @param string $key
33+
* @param string $value
34+
*
35+
* @throws ServerException
36+
*
37+
* @return int
38+
*/
39+
public function zrem(string $key, string $value): int;
40+
941
/**
1042
* @param string $key
1143
* @param string $value

pkg/redis/RedisConsumer.php

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,42 @@ class RedisConsumer implements Consumer
2121
*/
2222
private $context;
2323

24+
/**
25+
* @var int
26+
*/
27+
private $retryDelay;
28+
29+
/**
30+
* @var RedisQueueConsumer
31+
*/
32+
private $queueConsumer;
33+
2434
public function __construct(RedisContext $context, RedisDestination $queue)
2535
{
2636
$this->context = $context;
2737
$this->queue = $queue;
2838
}
2939

40+
/**
41+
* @return int
42+
*/
43+
public function getRetryDelay(): ?int
44+
{
45+
return $this->retryDelay;
46+
}
47+
48+
/**
49+
* @param int $retryDelay
50+
*/
51+
public function setRetryDelay(int $retryDelay): void
52+
{
53+
$this->retryDelay = $retryDelay;
54+
55+
if ($this->queueConsumer) {
56+
$this->queueConsumer->setRetryDelay($this->retryDelay);
57+
}
58+
}
59+
3060
/**
3161
* @return RedisDestination
3262
*/
@@ -40,40 +70,37 @@ public function getQueue(): Queue
4070
*/
4171
public function receive(int $timeout = 0): ?Message
4272
{
43-
$timeout = (int) ($timeout / 1000);
44-
if (empty($timeout)) {
73+
$timeout = (int) ceil($timeout / 1000);
74+
75+
if ($timeout <= 0) {
4576
while (true) {
4677
if ($message = $this->receive(5000)) {
4778
return $message;
4879
}
4980
}
5081
}
5182

52-
if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
53-
return RedisMessage::jsonUnserialize($result->getMessage());
54-
}
83+
$this->initQueueConsumer();
5584

56-
return null;
85+
return $this->queueConsumer->receiveMessage($timeout);
5786
}
5887

5988
/**
6089
* @return RedisMessage
6190
*/
6291
public function receiveNoWait(): ?Message
6392
{
64-
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
65-
return RedisMessage::jsonUnserialize($result->getMessage());
66-
}
93+
$this->initQueueConsumer();
6794

68-
return null;
95+
return $this->queueConsumer->receiveMessageNoWait($this->queue);
6996
}
7097

7198
/**
7299
* @param RedisMessage $message
73100
*/
74101
public function acknowledge(Message $message): void
75102
{
76-
// do nothing. redis transport always works in auto ack mode
103+
$this->getRedis()->zrem($this->queue->getName().':reserved', $message->getReservedKey());
77104
}
78105

79106
/**
@@ -83,15 +110,33 @@ public function reject(Message $message, bool $requeue = false): void
83110
{
84111
InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class);
85112

86-
// do nothing on reject. redis transport always works in auto ack mode
113+
$this->acknowledge($message);
87114

88115
if ($requeue) {
89-
$this->context->createProducer()->send($this->queue, $message);
116+
$message = RedisMessage::jsonUnserialize($message->getReservedKey());
117+
$message->setHeader('attempts', 0);
118+
119+
if ($message->getTimeToLive()) {
120+
$message->setHeader('expires_at', time() + $message->getTimeToLive());
121+
}
122+
123+
$this->getRedis()->lpush($this->queue->getName(), json_encode($message));
90124
}
91125
}
92126

93127
private function getRedis(): Redis
94128
{
95129
return $this->context->getRedis();
96130
}
131+
132+
private function initQueueConsumer(): void
133+
{
134+
if (null === $this->queueConsumer) {
135+
$this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]);
136+
137+
if ($this->retryDelay) {
138+
$this->queueConsumer->setRetryDelay($this->retryDelay);
139+
}
140+
}
141+
}
97142
}

pkg/redis/RedisContext.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public function deleteQueue(Queue $queue): void
7878
{
7979
InvalidDestinationException::assertDestinationInstanceOf($queue, RedisDestination::class);
8080

81-
$this->getRedis()->del($queue->getName());
81+
$this->deleteDestination($queue);
8282
}
8383

8484
/**
@@ -88,7 +88,7 @@ public function deleteTopic(Topic $topic): void
8888
{
8989
InvalidDestinationException::assertDestinationInstanceOf($topic, RedisDestination::class);
9090

91-
$this->getRedis()->del($topic->getName());
91+
$this->deleteDestination($topic);
9292
}
9393

9494
public function createTemporaryQueue(): Queue
@@ -129,7 +129,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
129129
*/
130130
public function purgeQueue(Queue $queue): void
131131
{
132-
$this->getRedis()->del($queue->getName());
132+
$this->deleteDestination($queue);
133133
}
134134

135135
public function close(): void
@@ -154,4 +154,11 @@ public function getRedis(): Redis
154154

155155
return $this->redis;
156156
}
157+
158+
private function deleteDestination(RedisDestination $destination): void
159+
{
160+
$this->getRedis()->del($destination->getName());
161+
$this->getRedis()->del($destination->getName().':delayed');
162+
$this->getRedis()->del($destination->getName().':reserved');
163+
}
157164
}

0 commit comments

Comments
 (0)