-
Notifications
You must be signed in to change notification settings - Fork 32
/
amqp_consumer.php
78 lines (62 loc) · 2.11 KB
/
amqp_consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<?php
require_once(__DIR__ . '/../amqp.inc');
include(__DIR__ . '/config.php');
$exchange = 'router';
$queue = 'msgs';
$consumer_tag = 'consumer';
$conn = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$ch = $conn->channel();
/*
The following code is the same both in the consumer and the producer.
In this way we are sure we always have a queue to consume from and an
exchange where to publish messages.
*/
/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/
$ch->queue_declare($queue, false, true, false, false);
/*
name: $exchange
type: direct
passive: false
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$ch->exchange_declare($exchange, 'direct', false, true, false);
$ch->queue_bind($queue, $exchange);
function process_message($msg) {
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
$msg->delivery_info['channel']->
basic_ack($msg->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($msg->body === 'quit') {
$msg->delivery_info['channel']->
basic_cancel($msg->delivery_info['consumer_tag']);
}
}
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
function shutdown($ch, $conn){
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
while(count($ch->callbacks)) {
$ch->wait();
}
?>