/
BufferedMessageProducer.php
170 lines (153 loc) · 5.29 KB
/
BufferedMessageProducer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<?php
namespace Oro\Bundle\MessageQueueBundle\Client;
use Oro\Component\MessageQueue\Client\MessageProducerInterface;
use Psr\Log\LoggerInterface;
/**
* The message producer that can be used in case if the sending messages
* to the queue should be postponed by some reasons.
* When the buffering is enabled, stores messages in the internal buffer.
* When the buffering is disabled, sends messages directly to the queue via the decorated message producer.
* By default the buffering is disabled.
*/
class BufferedMessageProducer implements MessageProducerInterface
{
/** @var MessageProducerInterface */
private $innerProducer;
/** @var LoggerInterface */
private $logger;
/** @var MessageFilterInterface */
private $filter;
/** @var MessageBuffer|null */
private $buffer;
/** @var int */
private $enableBufferingNestingLevel = 0;
public function __construct(
MessageProducerInterface $producer,
LoggerInterface $logger,
MessageFilterInterface $filter
) {
$this->innerProducer = $producer;
$this->logger = $logger;
$this->filter = $filter;
}
/**
* {@inheritdoc}
*/
public function send($topic, $message)
{
if ($this->enableBufferingNestingLevel > 0) {
if (null === $this->buffer) {
$this->buffer = new MessageBuffer();
}
$this->buffer->addMessage($topic, $message);
} else {
$buffer = new MessageBuffer();
$buffer->addMessage($topic, $message);
$this->sendMessages($buffer);
}
}
/**
* Indicates whether the buffering of messages is enabled.
*/
public function isBufferingEnabled(): bool
{
return $this->enableBufferingNestingLevel > 0;
}
/**
* Enables the buffering of messages.
* In this mode messages are not sent, instead they are added to internal buffer.
* To send collected messages to the queue the "flushBuffer" method should be called.
* To remove all collected messages without sending them to the queue the "clearBuffer" method should be called.
*/
public function enableBuffering(): void
{
$this->enableBufferingNestingLevel++;
}
/**
* Disables the buffering of messages.
* In this mode messages are sent to the queue directly without buffering.
* Please note that this method does nothing with already buffered messages;
* to send them to the queue the buffering should be enabled and the "flushBuffer" method should be called.
*
* @throws \LogicException if the buffering of messages is already disabled
*/
public function disableBuffering(): void
{
if (0 === $this->enableBufferingNestingLevel) {
$this->logger->critical(
'The buffered message producer fails because the buffering of messages is already disabled.'
);
throw new \LogicException('The buffering of messages is already disabled.');
}
$this->enableBufferingNestingLevel--;
}
/**
* Checks whether the buffer contains at least one message.
*/
public function hasBufferedMessages(): bool
{
return null !== $this->buffer && $this->buffer->hasMessages();
}
/**
* Flushes buffered messages.
*
* @throws \LogicException if the buffering of messages is disabled
* @throws \Oro\Component\MessageQueue\Transport\Exception\Exception if the sending a message to the queue
* fails due to some internal error
*/
public function flushBuffer(): void
{
$this->assertBufferingEnabled();
if (null !== $this->buffer && $this->buffer->hasMessages()) {
$this->sendMessages($this->buffer);
}
}
/**
* Clears buffered messages.
*
* @throws \LogicException if the buffering of messages is disabled
*/
public function clearBuffer(): void
{
$this->assertBufferingEnabled();
if (null !== $this->buffer) {
$this->buffer->clear();
}
}
/**
* Asserts that the buffering of messages is enabled.
*
* @throws \LogicException if the buffering is disabled
*/
private function assertBufferingEnabled(): void
{
if (0 === $this->enableBufferingNestingLevel) {
$this->logger->critical(
'The buffered message producer fails because the buffering of messages is disabled.'
);
throw new \LogicException('The buffering of messages is disabled.');
}
}
/**
* @throws \Oro\Component\MessageQueue\Transport\Exception\Exception if the sending a message to the queue
* fails due to some internal error
*/
private function sendMessages(MessageBuffer $buffer): void
{
try {
$this->filter->apply($buffer);
$messages = $buffer->getMessages();
foreach ($messages as [$topic, $message]) {
$this->innerProducer->send($topic, $message);
}
} catch (\Throwable $e) {
$this->logger->error(
'The buffered message producer fails to send messages to the queue.',
['exception' => $e]
);
throw $e;
} finally {
$buffer->clear();
}
}
}