/
pika_wrapper.py
189 lines (163 loc) · 7.41 KB
/
pika_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import pika
import logging
import time
LOG = logging.getLogger(__name__)
class ChannelWrapper(object):
"""
Wrapper for pika library for AMQP
"""
reconnect_timeout = 3
def __init__(self, channel_id, exchange_type, callback=None,
publish=False, manual_ack=False, **kwargs):
(exch, rtg_key) = self._parse_id(channel_id)
self.routing_wildcard_match = "match_all" in kwargs and \
kwargs['match_all']
self.channel_id = exch
self.routing_key = rtg_key
self.exchange_type = exchange_type
self.callback = callback
self.publish = publish
self.manual_ack = manual_ack
self._create(channel_id=self.channel_id, exchange_type=exchange_type,
callback=callback, publish=publish,
manual_ack=manual_ack, **kwargs)
self.stopping = False
def _parse_id(self, id):
if (":" in id):
# break to exchange and rtg key
tokens = id.split(":")
else:
tokens = [id]
return (tokens[0], ".".join(tokens[1:]))
def _create(self, channel_id, exchange_type, callback,
publish, manual_ack, **kwargs):
self.queue = None
def on_connected(connection):
LOG.info("Connected to %s:%s" % (self.host,
self.vhost))
connection.channel(on_channel_open)
def on_closed(frame):
if not self.stopping:
# unexpected close, try reconnect
LOG.warn("Invoked unexpected on_close, "
"trying to reconnect")
self.connection.add_timeout(self.reconnect_timeout,
self._reconnect)
def on_backpressure():
LOG.warn("Backpressure detected")
def push(sender, data):
LOG.info("Pushing data from [%s]:%s to exc: %s" % \
(sender,
"%s%s" % (self.routing_key + "." if
self.routing_key else "",
sender),
self.channel_id))
try:
self.channel.basic_publish(exchange=self.channel_id,
routing_key="%s%s" % (self.routing_key + "." if
self.routing_key else "",
sender),
body=data)
except Exception, ex:
LOG.exception(unicode(ex))
self._reconnect()
def on_channel_open(channel):
LOG.info("Channel open")
self.channel = channel
self.queue = channel
self.queue.push = push
self.queue.escalate = self.publish_once
channel.exchange_declare(exchange=self.channel_id,
type=exchange_type,
callback=on_exchange_declared)
def on_exchange_declared(frame):
LOG.info("Exchange_declared: %s" % self.channel_id)
if not publish:
# We have a decision, or listener, bind a queue
self.channel.queue_declare(durable=True,
exclusive=False,
auto_delete=False,
callback=on_queue_declared)
def on_queue_declared(frame):
LOG.info("Queue declared on exchange %s:%s [%s]" % (
self.channel_id,
self.routing_key,
exchange_type))
if (self.routing_wildcard_match):
if (self.routing_key):
routing_key = "%s.#" % self.routing_key
else:
routing_key = "#"
else:
routing_key = self.routing_key
self.channel.queue_bind(exchange=self.channel_id,
queue=frame.method.queue,
routing_key=routing_key)
self.channel.basic_consume(on_consume, no_ack=not manual_ack,
queue=frame.method.queue)
def on_consume(channel, method_frame, header_frame, body):
if manual_ack:
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
self.on_receive(self.channel_id, method_frame, header_frame, body)
self.host = kwargs.get('host', "localhost")
self.vhost = kwargs.get("vhost", "/")
params = pika.ConnectionParameters(host=self.host,
virtual_host=self.vhost)
if "user" in kwargs.keys() and "password" in kwargs.keys():
self.user = kwargs['user']
self.password = kwargs['password']
credentials = pika.PlainCredentials(self.user, self.password)
params.credentials = credentials
self.params = params
reconnect = pika.reconnection_strategies.SimpleReconnectionStrategy()
self.connection = pika.SelectConnection(params,
on_open_callback=on_connected,
reconnection_strategy=reconnect)
#self.connection.add_on_close_callback(on_closed)
self.connection.add_backpressure_callback(on_backpressure)
def _reconnect(self):
LOG.info("Trying reconnect...")
self.queue = None
self.connection.ioloop.stop()
self.connection.close()
self._create(channel_id=self.channel_id,
exchange_type=self.exchange_type,
callback=self.callback, publish=self.publish,
manual_ack=self.manual_ack, **self.connection_kwargs)
self.connect(**self.connection_kwargs)
def on_receive(self, channel, method, properties, body):
if self.callback:
self.callback(method.routing_key,
body,
method=method,
properties=properties)
def connect(self, **kwargs):
try:
self.connection_kwargs = kwargs
self.connection.ioloop.start()
except Exception, ex:
LOG.exception("Channel error: %r" % str(ex))
# retry
if not self.connection.closing:
self._reconnect(**self.connection_kwargs)
def stop(self, **kwargs):
self.stopping = True
# connection.ioloop is blocking, this will stop and exit the app
if (self.connection):
LOG.info("Closing connection")
self.connection.ioloop.stop()
self.connection.close()
def publish_once(self, sender, receiver, data):
LOG.info("Escalating from %s to %s" % (sender, receiver))
try:
(exch, _) = self._parse_id(receiver)
def on_channel(channel):
LOG.info("New channel opened: %s" % receiver)
channel.exchange_declare(exchange=exch,
type='topic')
channel.basic_publish(exchange=exch,
routing_key=sender,
body=data)
self.connection.channel(on_channel)
except Exception, ex:
LOG.exception(ex)