# <font color='dimgrey'> RPC using RabbitMQ </font>


## <font color='grey'> What's an RPC </font><br>
<font color='grey'>
It's a remote procedural call when made, the client waits for the response from the remote server while blocking the client  </font>

## <font color='grey'> How to make an rpc using rabbitmq </font><br>

<font color='grey'>
It can be summarized in the following steps  <br>
    1. A client publishes a message to the rpc queue with a callback queue name in the request and a correlation_id<br>
    2. Server is listening to the request, recieves it, callback queue and corr_id. It process the request, publishes a message to the callback queue using the corr_id<br>
    3. Client is listening to this queue, it checks the response and the correlation_id with it.<br>
    4. Client compares the correlation_id with the correlation_id it send in the first step, if matches saves the response/answer
</font>

<img src="https://www.rabbitmq.com/img/tutorials/python-six.png"/>

## <font color='grey'> Step 1: Client sents request with callback queue and correlation_id </font><br>


In [None]:
class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()
        
        #declaring the callback queue here
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
    
    # sending the request
    def call(self, n):
        self.response = None
        # creates a unique corr_id
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                # passing callback queue to reply_to property of the request
                reply_to=self.callback_queue,
                # passing corr_id as the correlation_id 
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            #this one blocks the connection
            self.connection.process_data_events()
        return int(self.response)

## <font color='grey'> Step 2: Server recieves request, callback queue and corr_id </font><br>


In [1]:
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()
#rpc_queue declaration
channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
    
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    #finds fibnocci 
    response = fib(n)
    
    #response sent back to
    ch.basic_publish(exchange='',
                     # routing key is set here
                     routing_key=props.reply_to,
                     # correlation id is set inside properties
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# when a message is recieved, on_message_callback is set as the on_request method
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
#W Waiting for RPC requests
channel.start_consuming()

## <font color='grey'> Step 3: Client waits for the response and compares the correlation id </font><br>


In [None]:
class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()
        
        #declaring the callback queue here
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        
        # Listening to the callback queue
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)
        
    # checking correlation id of the response with the correlation id generated on request
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

<font color='dimgrey'> <h2> Final Code : Client </h2> </font>

In [None]:
import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

# initialises the callback queue for this client
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

<font color='dimgrey'> <h2> Final Code : Server </h2> </font>

In [None]:
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()
# server declares the rpc_queue. Needs to start first
channel.queue_declare(queue='rpc_queue')

#finds fibonacci
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

# when recieving a request from the client
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)
    
    # response is published to the callback queue stored in the reply_to arg
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

## <font color='dimgrey'> Testing </font> 
<t><font color='dimgrey'> We will run the client code from this notebook and we will create another server nodes for serving request. We will run the below code after server nodes have declared queues </font> 

In [9]:
import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

# initialises the callback queue for this client
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(35)")
response = fibonacci_rpc.call(40)
print(" [.] Got %r" % response)

 [x] Requesting fib(35)
 [.] Got 102334155


It waits for the RPC call to get completed