-
Notifications
You must be signed in to change notification settings - Fork 844
/
tools.py
94 lines (72 loc) · 2.8 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****
# Async pika testing support
import sys
sys.path.append('..')
import os
import pika
import random
import time
timeout_id = None
def test_queue_name(keyword):
uid = random.randint(0, 1000) + os.getpid()
return 'test-%s-%i' % (keyword, uid)
# These methods are scoped to the AsyncPattern class
def timeout(method):
def _timeout(self, *args, **kwargs):
global timeout_id
timeout_id = self.connection.add_timeout(time.time() + 2,
self._on_timeout)
return method(self, *args, **kwargs)
return _timeout
def timeout_cancel(method):
def _timeout(self, *args, **kwargs):
global timeout_id
self.connection.remove_timeout(timeout_id)
del(timeout_id)
return method(self, *args, **kwargs)
return _timeout
class AsyncPattern(object):
def __init__(self):
self.connection = None
self.channel = None
self._queue = test_queue_name(self.__class__.__name__)
self._timeout = False
def _connect(self, connection_type, parameters):
return connection_type(parameters, self._on_connected)
def _on_connected(self, connection):
self.connection.channel(self._on_channel)
def _on_channel(self, channel):
assert False, "_on_channel no _extended"
def _queue_declare(self):
self.channel.queue_declare(queue=self._queue,
durable=False,
exclusive=True,
auto_delete=True,
callback=self._on_queue_declared)
def _on_queue_declared(self, frame):
assert False, "_on_queue_declared not extended"
def _send_message(self, exchange='', mandatory=False, immediate=False):
message = 'test-message-%s: %.8f' % (self.__class__.__name__,
time.time())
self.channel.basic_publish(exchange=exchange,
routing_key=self._queue,
body=message,
properties=pika.BasicProperties(
content_type="text/plain",
delivery_mode=1),
mandatory=mandatory,
immediate=immediate)
return message
@property
def _is_connected(connection):
return self.connection.is_open
def _on_timeout(self):
self._timeout = True
self.connection.add_on_close_callback(self._on_closed)
self.connection.close()
def _on_closed(self, frame):
self.connection.ioloop.stop()