forked from php-amqplib/RabbitMqBundle
-
Notifications
You must be signed in to change notification settings - Fork 13
/
MultipleConsumer.php
111 lines (92 loc) · 3.12 KB
/
MultipleConsumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
<?php
namespace OldSound\RabbitMqBundle\RabbitMq;
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
use PhpAmqpLib\Message\AMQPMessage;
class MultipleConsumer extends Consumer
{
protected $queues = array();
/**
* Queues provider
*
* @var QueuesProviderInterface
*/
protected $queuesProvider = null;
/**
* QueuesProvider setter
*
* @param QueuesProviderInterface $queuesProvider
*
* @return self
*/
public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
{
$this->queuesProvider = $queuesProvider;
return $this;
}
public function getQueueConsumerTag($queue)
{
return sprintf('%s-%s', $this->getConsumerTag(), $queue);
}
public function setQueues(array $queues)
{
$this->queues = $queues;
}
protected function setupConsumer()
{
$this->mergeQueues();
if ($this->autoSetupFabric) {
$this->setupFabric();
}
foreach ($this->queues as $name => $options) {
//PHP 5.3 Compliant
$currentObject = $this;
$this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use($currentObject, $name) {
$currentObject->processQueueMessage($name, $msg);
});
}
}
protected function queueDeclare()
{
foreach ($this->queues as $name => $options) {
list($queueName, ,) = $this->getChannel()->queue_declare($name, $options['passive'],
$options['durable'], $options['exclusive'],
$options['auto_delete'], $options['nowait'],
$options['arguments'], $options['ticket']);
if (isset($options['routing_keys']) && count($options['routing_keys']) > 0) {
foreach ($options['routing_keys'] as $routingKey) {
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
}
} else {
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
}
}
$this->queueDeclared = true;
}
public function processQueueMessage($queueName, AMQPMessage $msg)
{
if (!isset($this->queues[$queueName])) {
throw new QueueNotFoundException();
}
$processFlag = call_user_func($this->queues[$queueName]['callback'], $msg);
$this->handleProcessMessage($msg, $processFlag);
}
public function stopConsuming()
{
foreach ($this->queues as $name => $options) {
$this->getChannel()->basic_cancel($this->getQueueConsumerTag($name));
}
}
/**
* Merges static and provided queues into one array
*/
protected function mergeQueues()
{
if ($this->queuesProvider) {
$this->queues = array_merge(
$this->queues,
$this->queuesProvider->getQueues()
);
}
}
}