forked from harshi12/AI_On_The_Edge_Platform
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue_req_resp.py
85 lines (64 loc) · 3.05 KB
/
queue_req_resp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#shared queue for communication between gateways and service manager and other such scenarios
import pika
import json
import sys
from pathlib import Path
home = str(Path.home())
path = home+'/Platform/'
sys.path.insert (0, path)
RMQFile = path+"RMQCredentials.txt"
class RabbitMQ:
def __init__(self):
with open(RMQFile, 'r') as f:
data = json.load(f)
self.server_IP = data["IP"]
self.server_Port = data["Port"]
self.credentials = pika.PlainCredentials(data["username"], data["password"])
self.create_queue("", "AD_SM")
self.create_ServiceQueues("SM","Docker")
self.create_ServiceQueues("SM", "Scheduler")
def create_queue(self, exchange_name, queue_name):
channel, conn = self.create_connection()
# channel.exchange_declare(exchange='', exchange_type='direct')
channel.queue_declare(queue = queue_name, durable = True)
# channel.queue_bind(exchange=exchange_name, queue=queue_name)
conn.close()
def create_ServiceQueues(self,Module1, Module2):
self.create_queue("", str(Module1+"_"+Module2))
self.create_queue("", str(Module2+"_"+Module1))
def create_connection(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(self.server_IP, self.server_Port, '/', self.credentials))
channel = connection.channel()
return channel, connection
def send(self,exchange_name, queue_name, message):
channel, conn = self.create_connection()
self.create_queue(exchange_name, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=message)
print(" [x] Sent",message)
conn.close()
def receive_nonblock(self, exchange_name, queue_name):
channel, conn = self.create_connection()
self.create_queue(exchange_name, queue_name)
method_frame, header_frame, body = channel.basic_get(queue_name, True)
if body == None:
while body == None:
method_frame, header_frame, body = channel.basic_get(queue_name, True)
# body = channel.basic_get(queue_name, True) #callback, queue = queue_name, no_ack = True)
print("In queue:", type(body))
return body
def receive(self, callback, exchange_name, queue_name):
channel, conn = self.create_connection()
self.create_queue(exchange_name, queue_name)
try:
channel.basic_consume(on_message_callback = callback, queue = queue_name, auto_ack = True)
except:
channel.basic_consume(callback, queue = queue_name, no_ack = True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
def queue_length(self, exchange_name, queue_name):
channel, conn = self.create_connection()
# channel.exchange_declare(exchange='', exchange_type='direct')
queue = channel.queue_declare(queue = queue_name, durable = True)
# channel.queue_bind(exchange=exchange_name, queue=queue_name)
conn.close()
return queue.method.message_count