-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue-consumer-async.py
50 lines (46 loc) · 2.07 KB
/
queue-consumer-async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import print_function
import optparse
from proton.handlers import MessagingHandler , Release, Reject
from proton.reactor import Container
import random
import time
class Recv(MessagingHandler):
def __init__(self, server, target, count):
super(Recv, self).__init__()
self.server = server
self.target = target
self.expected = count
self.received = 0
self.processed = 0
self.returned = 0
def on_start(self, event):
self.conn = event.container.connect(self.server)
event.container.create_receiver(self.conn, source=self.target)
def on_message(self, event):
self.received += 1
if self.expected == 0 or self.received <= self.expected:
fate = random.choice([None])
print("Fate: %s" %fate)
if fate != None:
self.returned += 1
raise fate
else:
print("body: %s | delivery count : %d | id : %s" %(event.message.body,event.message.delivery_count,event.message.id))
time.sleep(random.randint(1,3))
self.processed += 1
if self.received == self.expected:
event.receiver.close()
event.connection.close()
def on_connection_closed(self, event):
print("received: %d | processed: %d | returned: %d"%(self.received,self.processed, self.returned))
parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-s", "--server", default="fedoraserver.local",
help="address from which messages are received (default %default)")
parser.add_option("-t", "--target", default="queue://testQueue",
help="Queue or Topic target this client should be getting messages from (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
help="number of messages to receive; 0 receives indefinitely (default %default)")
opts, args = parser.parse_args()
try:
Container(Recv(opts.server, opts.target, opts.messages)).run()
except KeyboardInterrupt: pass