/
InteropMessageProvider.php
109 lines (88 loc) · 2.41 KB
/
InteropMessageProvider.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
namespace Swarrot\Broker\MessageProvider;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrQueue;
use Swarrot\Broker\Message;
final class InteropMessageProvider implements MessageProviderInterface
{
/**
* @var PsrContext
*/
private $context;
/**
* @var PsrConsumer
*/
private $consumer;
/**
* @var PsrQueue
*/
private $queue;
/**
* @var PsrMessage[]
*/
private $consumedMessages = [];
/**
* @var int
*/
private $waitTimeout;
/**
* @param PsrContext $context
* @param string $queueName
* @param float|int $waitTimeout
*/
public function __construct(PsrContext $context, $queueName, $waitTimeout = 1000 /* 1sec */)
{
$this->context = $context;
$this->waitTimeout = (int) $waitTimeout;
$this->queue = $context->createQueue($queueName);
$this->consumer = $context->createConsumer($this->queue);
$this->consumedMessages = [];
}
/**
* {@inheritdoc}
*/
public function get()
{
if (false == $message = $this->consumer->receive($this->waitTimeout)) {
return;
}
$messageId = $message->getMessageId() ?: uniqid('', true);
$this->consumedMessages[$messageId] = $message;
$properties = $message->getHeaders();
$properties['headers'] = $message->getProperties();
return new Message($message->getBody(), $properties, $messageId);
}
/**
* {@inheritdoc}
*/
public function ack(Message $message)
{
if (false == isset($this->consumedMessages[$message->getId()])) {
return;
}
$psrMessage = $this->consumedMessages[$message->getId()];
unset($this->consumedMessages[$message->getId()]);
$this->consumer->acknowledge($psrMessage);
}
/**
* {@inheritdoc}
*/
public function nack(Message $message, $requeue = false)
{
if (false == isset($this->consumedMessages[$message->getId()])) {
return;
}
$psrMessage = $this->consumedMessages[$message->getId()];
unset($this->consumedMessages[$message->getId()]);
$this->consumer->reject($psrMessage, $requeue);
}
/**
* {@inheritdoc}
*/
public function getQueueName()
{
return $this->queue->getQueueName();
}
}