Skip to content

Commit f15d3f7

Browse files
committed
[redis] migrate redis transport
1 parent 6863fb6 commit f15d3f7

File tree

9 files changed

+99
-242
lines changed

9 files changed

+99
-242
lines changed

pkg/redis/RedisConnectionFactory.php

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Redis;
44

55
use Interop\Queue\PsrConnectionFactory;
6+
use Interop\Queue\PsrContext;
67

78
class RedisConnectionFactory implements PsrConnectionFactory
89
{
@@ -74,11 +75,9 @@ public function __construct($config = 'redis:')
7475
}
7576

7677
/**
77-
* {@inheritdoc}
78-
*
7978
* @return RedisContext
8079
*/
81-
public function createContext()
80+
public function createContext(): PsrContext
8281
{
8382
if ($this->config['lazy']) {
8483
return new RedisContext(function () {
@@ -89,10 +88,7 @@ public function createContext()
8988
return new RedisContext($this->createRedis());
9089
}
9190

92-
/**
93-
* @return Redis
94-
*/
95-
private function createRedis()
91+
private function createRedis(): Redis
9692
{
9793
if (false == $this->redis) {
9894
if ('phpredis' == $this->config['vendor'] && false == $this->redis) {
@@ -121,12 +117,7 @@ private function createRedis()
121117
return $this->redis;
122118
}
123119

124-
/**
125-
* @param string $dsn
126-
*
127-
* @return array
128-
*/
129-
private function parseDsn($dsn)
120+
private function parseDsn(string $dsn): array
130121
{
131122
if (false === strpos($dsn, 'redis:')) {
132123
throw new \LogicException(sprintf('The given DSN "%s" is not supported. Must start with "redis:".', $dsn));
@@ -155,10 +146,7 @@ private function parseDsn($dsn)
155146
return $config;
156147
}
157148

158-
/**
159-
* @return array
160-
*/
161-
private function defaultConfig()
149+
private function defaultConfig(): array
162150
{
163151
return [
164152
'host' => 'localhost',

pkg/redis/RedisConsumer.php

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Interop\Queue\InvalidMessageException;
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
89

910
class RedisConsumer implements PsrConsumer
1011
{
@@ -18,32 +19,24 @@ class RedisConsumer implements PsrConsumer
1819
*/
1920
private $context;
2021

21-
/**
22-
* @param RedisContext $context
23-
* @param RedisDestination $queue
24-
*/
2522
public function __construct(RedisContext $context, RedisDestination $queue)
2623
{
2724
$this->context = $context;
2825
$this->queue = $queue;
2926
}
3027

3128
/**
32-
* {@inheritdoc}
33-
*
3429
* @return RedisDestination
3530
*/
36-
public function getQueue()
31+
public function getQueue(): PsrQueue
3732
{
3833
return $this->queue;
3934
}
4035

4136
/**
42-
* {@inheritdoc}
43-
*
44-
* @return RedisMessage|null
37+
* @return RedisMessage
4538
*/
46-
public function receive($timeout = 0)
39+
public function receive(int $timeout = 0): ?PsrMessage
4740
{
4841
$timeout = (int) ($timeout / 1000);
4942
if (empty($timeout)) {
@@ -57,36 +50,34 @@ public function receive($timeout = 0)
5750
if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
5851
return RedisMessage::jsonUnserialize($result->getMessage());
5952
}
53+
54+
return null;
6055
}
6156

6257
/**
63-
* {@inheritdoc}
64-
*
65-
* @return RedisMessage|null
58+
* @return RedisMessage
6659
*/
67-
public function receiveNoWait()
60+
public function receiveNoWait(): ?PsrMessage
6861
{
6962
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
7063
return RedisMessage::jsonUnserialize($result->getMessage());
7164
}
65+
66+
return null;
7267
}
7368

7469
/**
75-
* {@inheritdoc}
76-
*
7770
* @param RedisMessage $message
7871
*/
79-
public function acknowledge(PsrMessage $message)
72+
public function acknowledge(PsrMessage $message): void
8073
{
8174
// do nothing. redis transport always works in auto ack mode
8275
}
8376

8477
/**
85-
* {@inheritdoc}
86-
*
8778
* @param RedisMessage $message
8879
*/
89-
public function reject(PsrMessage $message, $requeue = false)
80+
public function reject(PsrMessage $message, bool $requeue = false): void
9081
{
9182
InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class);
9283

@@ -97,10 +88,7 @@ public function reject(PsrMessage $message, $requeue = false)
9788
}
9889
}
9990

100-
/**
101-
* @return Redis
102-
*/
103-
private function getRedis()
91+
private function getRedis(): Redis
10492
{
10593
return $this->context->getRedis();
10694
}

pkg/redis/RedisContext.php

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
namespace Enqueue\Redis;
44

55
use Interop\Queue\InvalidDestinationException;
6+
use Interop\Queue\PsrConsumer;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\PsrDestination;
9+
use Interop\Queue\PsrMessage;
10+
use Interop\Queue\PsrProducer;
811
use Interop\Queue\PsrQueue;
9-
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
12+
use Interop\Queue\PsrSubscriptionConsumer;
1013
use Interop\Queue\PsrTopic;
14+
use Interop\Queue\TemporaryQueueNotSupportedException;
1115

12-
class RedisContext implements PsrContext, PsrSubscriptionConsumerAwareContext
16+
class RedisContext implements PsrContext
1317
{
1418
/**
1519
* @var Redis
@@ -42,106 +46,96 @@ public function __construct($redis)
4246
}
4347

4448
/**
45-
* {@inheritdoc}
46-
*
4749
* @return RedisMessage
4850
*/
49-
public function createMessage($body = '', array $properties = [], array $headers = [])
51+
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
5052
{
5153
return new RedisMessage($body, $properties, $headers);
5254
}
5355

5456
/**
55-
* {@inheritdoc}
56-
*
5757
* @return RedisDestination
5858
*/
59-
public function createTopic($topicName)
59+
public function createTopic(string $topicName): PsrTopic
6060
{
6161
return new RedisDestination($topicName);
6262
}
6363

6464
/**
65-
* {@inheritdoc}
66-
*
6765
* @return RedisDestination
6866
*/
69-
public function createQueue($queueName)
67+
public function createQueue(string $queueName): PsrQueue
7068
{
7169
return new RedisDestination($queueName);
7270
}
7371

7472
/**
75-
* @param RedisDestination|PsrQueue $queue
73+
* @param RedisDestination $queue
7674
*/
77-
public function deleteQueue(PsrQueue $queue)
75+
public function deleteQueue(PsrQueue $queue): void
7876
{
7977
InvalidDestinationException::assertDestinationInstanceOf($queue, RedisDestination::class);
8078

8179
$this->getRedis()->del($queue->getName());
8280
}
8381

8482
/**
85-
* @param RedisDestination|PsrTopic $topic
83+
* @param RedisDestination $topic
8684
*/
87-
public function deleteTopic(PsrTopic $topic)
85+
public function deleteTopic(PsrTopic $topic): void
8886
{
8987
InvalidDestinationException::assertDestinationInstanceOf($topic, RedisDestination::class);
9088

9189
$this->getRedis()->del($topic->getName());
9290
}
9391

94-
/**
95-
* {@inheritdoc}
96-
*/
97-
public function createTemporaryQueue()
92+
public function createTemporaryQueue(): PsrQueue
9893
{
99-
throw new \LogicException('Not implemented');
94+
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
10095
}
10196

10297
/**
103-
* {@inheritdoc}
104-
*
10598
* @return RedisProducer
10699
*/
107-
public function createProducer()
100+
public function createProducer(): PsrProducer
108101
{
109102
return new RedisProducer($this->getRedis());
110103
}
111104

112105
/**
113-
* {@inheritdoc}
114-
*
115106
* @param RedisDestination $destination
116107
*
117108
* @return RedisConsumer
118109
*/
119-
public function createConsumer(PsrDestination $destination)
110+
public function createConsumer(PsrDestination $destination): PsrConsumer
120111
{
121112
InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class);
122113

123114
return new RedisConsumer($this, $destination);
124115
}
125116

126117
/**
127-
* {@inheritdoc}
128-
*
129118
* @return RedisSubscriptionConsumer
130119
*/
131-
public function createSubscriptionConsumer()
120+
public function createSubscriptionConsumer(): PsrSubscriptionConsumer
132121
{
133122
return new RedisSubscriptionConsumer($this);
134123
}
135124

136-
public function close()
125+
/**
126+
* @param RedisDestination $queue
127+
*/
128+
public function purgeQueue(PsrQueue $queue): void
129+
{
130+
$this->getRedis()->del($queue->getName());
131+
}
132+
133+
public function close(): void
137134
{
138135
$this->getRedis()->disconnect();
139136
}
140137

141-
/**
142-
* @return Redis
143-
*/
144-
public function getRedis()
138+
public function getRedis(): Redis
145139
{
146140
if (false == $this->redis) {
147141
$redis = call_user_func($this->redisFactory);

pkg/redis/RedisDestination.php

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,22 @@ class RedisDestination implements PsrQueue, PsrTopic
1212
*/
1313
private $name;
1414

15-
/**
16-
* @param string $name
17-
*/
18-
public function __construct($name)
15+
public function __construct(string $name)
1916
{
2017
$this->name = $name;
2118
}
2219

23-
/**
24-
* @return string
25-
*/
26-
public function getName()
20+
public function getName(): string
2721
{
2822
return $this->name;
2923
}
3024

31-
/**
32-
* {@inheritdoc}
33-
*/
34-
public function getQueueName()
25+
public function getQueueName(): string
3526
{
3627
return $this->getName();
3728
}
3829

39-
/**
40-
* {@inheritdoc}
41-
*/
42-
public function getTopicName()
30+
public function getTopicName(): string
4331
{
4432
return $this->getName();
4533
}

0 commit comments

Comments
 (0)