In [8]:
#from sharkfin.utilities import *
#from sharkfin.markets import AbstractMarket
import numpy as np
import json
import uuid
import time
import os
import uuid
import threading
import session_info
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.management import ServiceBusAdministrationClient
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential

class ClientRPCMarket():
    def __init__(self, seed_limit='', session_id='', keyVault=''):
        self.simulation_price_scale = 1
        self.default_sim_price = 100
        seed_limit = None
        self.sample = 0
        self.seeds = []
        self.seed_limit = seed_limit
        self.latest_price = None
        self.prices = [self.default_sim_price]
        self.rpc_session_id = session_id
        self.rpc_send_queue_name = session_id +'_requests'
        self.rpc_response_queue_name = session_id +'_responses'
        self.rpc_messageBus_namespace = 'sharfinmq'
        self.connection_str = self.get_rpc_connectionString(keyVault)  
        self.init_az_rpc()
        self.create_queues()
        #setup lists for sent and received messages.
        self.sentMsgHistory = []
        self.recvMsgHistory = []
    
    #method to initialize Azure Service Bus Clients for messageing and managment
    def init_az_rpc(self):
        self.service_bus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(self.connection_str)
        self.service_bus_message_client = ServiceBusClient.from_connection_string(conn_str=self.connection_str, logging_enable=True)
   

    #method to create and delete the required sending and response queues for RPC pattern
    def create_queues(self):
        #create queue for sharkfin to send daily values
        self.service_bus_mgmt_client.create_queue(self.rpc_send_queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True)
        print (f"Created request queue: {self.rpc_send_queue_name}")
        
        #create queue for amps response - 
        self.service_bus_mgmt_client.create_queue(self.rpc_response_queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True)
        print (f"Created response queue: {self.rpc_send_queue_name}")
    
    def delete_queues(self):
        #delete queue for sharkfin daily values
        self.service_bus_mgmt_client.delete_queue(self.rpc_send_queue_name)
        print (f"Deleted request queue: {self.rpc_send_queue_name}")
        
        #delete used queue for amps response - 
        self.service_bus_mgmt_client.delete_queue(self.rpc_response_queue_name)
        print (f"Deleted response queue: {self.rpc_send_queue_name}")
        
    #method to instanciate a well-formed service bus message. requires passing json object as msg_body parameter
    def new_rpc_message(self,msg_body):
        msgdata = json.dumps(msg_body)
        self.service_bus_message = ServiceBusMessage(
        msgdata,
        session_id = self.rpc_session_id,
        reply_to = self.rpc_response_queue_name,
        reply_to_session_id = self.rpc_session_id,
        application_properties = {'placeholdermetadata': 'custom_data_example_if_needed'})

    #method to send a service bus message
    def send_rpc_message(self):
        sender = self.service_bus_message_client.get_queue_sender(queue_name=self.rpc_send_queue_name)
        result = sender.send_messages(self.service_bus_message)
        #self.coorelation_id = result.correlation_id
        #self.message_id = result.message_id
        #print (f"Sent RPC message with ID: {self.message_id} to consumer queue {self.rpc_send_queue_name} await reply into response queue: {self.rpc_response_queue_name}...")
        print (f"Sending RPC message to consumer queue {self.rpc_send_queue_name} await reply into response queue: {self.rpc_response_queue_name}...")
        print(self.service_bus_message.message_id)
        print(f"Receiving: {self.service_bus_message}")
        print(f"Time to live: {self.service_bus_message.time_to_live}")
        print(f"Application Properties: {self.service_bus_message.application_properties}")
        print(f"Message ID: {self.service_bus_message.message_id}")
        return result
        
    def get_rpc_response(self, service_bus_client, qname, coorelation_id):
        receiver = service_bus_client.get_queue_receiver(queue_name=self.rpc_response_queue_name, max_wait_time=5)
        for msg in receiver:
            if self.coorelation_id == msg.correlation_id:
                self.response = body
        print("Received: " + str(msg))
        receiver.complete_message(msg)
    
    def get_rpc_connectionString(self,keyVaultName):
        keyVaultUri = (f"https://{keyVaultName}.vault.azure.net")
        credential = DefaultAzureCredential()
        client = SecretClient(vault_url=keyVaultUri, credential=credential)
        retrieved_secret_bus_conectionString = client.get_secret('sharkfinMQconnectionstring')
        sharkfinMQconnectionstring = retrieved_secret_bus_conectionString.value
        print('opened vault, obtained service bus connection string')
        return sharkfinMQconnectionstring

    def check_listener(self):
        listener = self.service_bus_message_client.get_queue_receiver(queue_name=self.rpc_response_queue_name)
        print('Starting Service Busl listener...waiting for message.')
        received_msgs = listener.receive_messages(max_message_count=1)
        for message in received_msgs:
            global msgBody
            print(f"Receiving: {message}")
            print(f"Time to live: {message.time_to_live}")
            print(f"Sequence number: {message.sequence_number}")
            print(f"Enqueued Sequence number: {message.enqueued_sequence_number}")
            print(f"Partition Key: {message.partition_key}")
            print(f"Application Properties: {message.application_properties}")
            print(f"Delivery count: {message.delivery_count}")
            print(f"Message ID: {message.message_id}")
            msgBody = str(message)
            #print(f"Received the follow message: {msgBody}")
            listener.complete_message(message)
    
    def complete_message(self):
        listener.complete_message(msg)
        
def rpc_unitTest(simulation_days, simID, kvnamespace):
    #instansiate the Market class
    mc= ClientRPCMarket(4,simID,kvnamespace)
    days = range(1,simulation_days+1)
    for day in days:
        print(f"Preparing Message {day}")
        #msgID = str(uuid.uuid4())
        #construct message data
        jsonmsgdata = {'seed': simID, 'bl': 11, 'sl': 12, 'end_simulation':False}
        #print(jsonmsgdata)
        #msgdata = json.dumps(jsonmsgdata)
        #print(msgdata)
        sbMessage = mc.new_rpc_message(jsonmsgdata)
        sentMsg = mc.send_rpc_message()
        msgID = mc.service_bus_message.message_id
        print(f"Sent message to AMMPS. Message ID:{msgID}")
        fullmsgdata= {'day':day,'msgID':jsonmsgdata}
        fullmsgdata.update(jsonmsgdata)
        mc.sentMsgHistory.append(fullmsgdata)
        responseThread = threading.Thread(target=mc.check_listener())
        responseThread.start()                                       
        # waiting here for the response to be available in the response queue before continuing
        responseThread.join()                                          
        receivedMessage = msgBody
        print(f"Received response from AMMPS: {receivedMessage}")
        jsonreceivedMessage = json.loads(receivedMessage)
        respfullmsgdata= {'day':day,'msgID':'placeholdervalue'}
        respfullmsgdata.update(jsonreceivedMessage)
        mc.recvMsgHistory.append(respfullmsgdata)
    print(f"Printing list of received messages: {mc.recvMsgHistory}")

In [9]:
rpc_unitTest(3,'chum3031','sharkfinkv')

opened vault, obtained service bus connection string
Created request queue: chum3031_requests
Created response queue: chum3031_requests
Preparing Message 1
Sending RPC message to consumer queue chum3031_requests await reply into response queue: chum3031_responses...
84c152f8-1cde-4a33-a789-b17dc025a8ab
Receiving: {"seed": "chum3031", "bl": 11, "sl": 12, "end_simulation": false}
Time to live: None
Application Properties: {'placeholdermetadata': 'custom_data_example_if_needed'}
Message ID: 84c152f8-1cde-4a33-a789-b17dc025a8ab
Sent message to AMMPS. Message ID:84c152f8-1cde-4a33-a789-b17dc025a8ab
Starting Service Busl listener...waiting for message.
Receiving: {"seed": "chum3031", "bl": 11, "sl": 12, "end_simulation": false}
Time to live: None
Sequence number: 1
Enqueued Sequence number: None
Partition Key: None
Application Properties: None
Delivery count: 0
Message ID: bb74ad9d43e14f85b2288d419cf4cb4b
Received response from AMMPS: {"seed": "chum3031", "bl": 11, "sl": 12, "end_simulation"

In [None]:
#for testing and QA - use this to delete queues in bulk
#!az login --use-device-code
#connection_str = get_rpc_connectionString('sharkfinkv')
#service_bus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_str)
#ql = service_bus_mgmt_client.list_queues()
#for qp in ql:
#    print(f"Found queue: {qp.name}")
#    service_bus_mgmt_client.delete_queue(qp.name)
#    print(f"Deleted queue: {qp.name}")

def list_queues():
    connection_str = get_rpc_connectionString('sharkfinkv')
    service_bus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_str)   
    qlist = service_bus_mgmt_client.list_queues()
    print (f"Found the following Queues:{qlist}")
    

def delete_queue(queue_name):
    connection_str = get_rpc_connectionString('sharkfinkv')
    service_bus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_str)   
    service_bus_mgmt_client.delete_queue(queue_name)
    print (f"Deleted queue: {queue_name}")
    
def get_rpc_connectionString(keyVaultName):
    keyVaultUri = (f"https://{keyVaultName}.vault.azure.net")
    credential = DefaultAzureCredential()
    client = SecretClient(vault_url=keyVaultUri, credential=credential)
    retrieved_secret_bus_conectionString = client.get_secret('sharkfinMQconnectionstring')
    sharkfinMQconnectionstring = retrieved_secret_bus_conectionString.value
    print('opened vault, obtained service bus connection string')
    return sharkfinMQconnectionstring





In [6]:
rpc_unitTest(3,'chum3030','sharkfinkv')

opened vault, obtained service bus connection string
Created request queue: chum3030_requests
Created response queue: chum3030_requests
Preparing Message 1
Sending RPC message to consumer queue chum3030_requests await reply into response queue: chum3030_responses...
3b92b276-3369-4cb9-9209-4c9e20c7e565
Receiving: {"seed": "chum3030", "bl": 11, "sl": 12, "end_simulation": false}
Time to live: None
Application Properties: {'placeholdermetadata': 'custom_data_example_if_needed'}
Message ID: 3b92b276-3369-4cb9-9209-4c9e20c7e565
Sent message to AMMPS. Message ID:3b92b276-3369-4cb9-9209-4c9e20c7e565
Starting Service Busl listener...waiting for message.
Receiving: {"seed": "chum3030", "bl": 11, "sl": 12, "end_simulation": false}
Time to live: None
Sequence number: 1
Enqueued Sequence number: None
Partition Key: None
Application Properties: None
Delivery count: 0
Message ID: 7704f279b07a4a5ebddf93dec55d95fb
Received response from AMMPS: {"seed": "chum3030", "bl": 11, "sl": 12, "end_simulation"

ConnectionClose("ErrorCodes.ConnectionCloseForced: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:36360e6601c843b7932d0cbfd92b0a50_G0, SystemTracker:gateway7, Timestamp:2022-07-21T14:13:39")
ConnectionClose("ErrorCodes.ConnectionCloseForced: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:10e2e7923a5a4929abd757e8097e5a0d_G20, SystemTracker:gateway7, Timestamp:2022-07-21T14:13:18")
ConnectionClose("ErrorCodes.ConnectionCloseForced: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:927ed8417d5849e394f81465c97aa349_G11, SystemTracker:gateway7, Timestamp:2022-07-21T14:14:39")
ConnectionClose("ErrorCodes.ConnectionCloseForced: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:f83c4f5

In [None]:
!az login --use-device-code

#mc.start_listener()
#mc.delete_queues()

In [None]:
#for testing and QA - use this to delete queues in bulk
#!az login --use-device-code
#connection_str = get_rpc_connectionString('sharkfinkv')
#service_bus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_str)
#ql = service_bus_mgmt_client.list_queues()
#for qp in ql:
#    print(f"Found queue: {qp.name}")
#    service_bus_mgmt_client.delete_queue(qp.name)
#   print(f"Deleted queue: {qp.name}")

In [None]:
#construct message data as json
msgdata = {'seed': 9017, 'bl': 10, 'sl': 10, 'end_simulation': False}
#convert json message to string
msgdata = json.dumps(msgdata)
#recieves the message body and constructs the SB message object
mc.new_rpc_message(msgdata)
sentMessage = mc.send_rpc_message()

while receivedMessage = mc.check_listener() = null
    time.sleep(3)



    




In [None]:
def rpc_unitTest(simulation_days, simID, kvnamespace):
    #instansiate the Market class
    mc= ClientRPCMarket(4,simID,kvnamespace)
    responseThread = threading.Thread(target=mc.check_listener())
    message_available = threading.Event()
    days = range(1,simulation_days+1)
    for day in days:
        msgID = str(uuid.uuid4())
        #construct message data
        jsonmsgdata = {'seed': simID, 'bl': 11, 'sl': 12, 'end_simulation':False}
        print(jsonmsgdata)
        msgdata = json.dumps(jsonmsgdata)
        print(msgdata)
        sentMessage = mc.new_rpc_message(msgdata)
        msgID = mc.service_bus_message.message_id
        print(f"Sent message to AMMPS. Message ID:{msgID}")
        fullmsgdata= {'day':day,'msgID':jsonmsgdata}
        fullmsgdata.update(jsonmsgdata)
        mc.sentMsgHistory.append(fullmsgdata)
        responseThread.start()                                       
        # wait here for the result to be available before continuing
        message_available.wait()
        responseThread.join()                                          
        receivedMessage = msgBody
        print(f"Received response from AMMPS: {receivedMessage}")
        jsonreceivedMessage = json.loads(receivedMessage)
        respfullmsgdata= {'day':day,'msgID':'placeholdervalue'}
        respfullmsgdata.update(jsonreceivedMessage)
        mc.receivedMsgHistory.append(respfullmsgdata)
        
        

In [None]:
rpc_unitTest(3,'chum10008','sharkfinkv')

In [None]:
msgdata = {'seed': 3, 'bl': 11, 'sl': 12, 'end_simulation':False}
msgdata = json.dumps(msgdata)
print(type(msgdata))
msgdata = json.loads(msgdata)
print(type(msgdata))

In [None]:
import threading

In [None]:
sentMsgHistory

In [None]:
sentMsgHistory=[]

In [None]:
sentMsgHistory