# Building a brokerless MOM

As explained in the last lecture, a MOM is a Message Oriented Middleware. We saw an example of a middleware that used dynamic brokers to decrease latency and prioritize quality of service. Since the network you will build for project 2 is unlikely to span several regions, we are going to use a brokerless MOM.

## 0MQ (Zero MQ)

ZMQ stands for Zero broker, message queue. Technically is not a MOM since it doen't run as a separate application or on a separat platform, therefore doesn't qualify fully as a middleware. Instead, ZMQ is used through multilanguage libraries and implemented directly into your own code. It's free, high performance, and has a very simple implementation. ZMQ is so good that it is part of Jupyter's own backend. There's a fully detailed guide here: http://zguide.zeromq.org/, however, we are going to learn the basics and how to setup a simple local message queue that you can use for your projects. Also, note that ZMQ has libraries for most common programming languages, if you have experience with C or Java, I recommend you setup your MOM using those with multithreading. In python I will show you how to use multiprocessing to spawn several servers but the uni-thread nature of Python will make your application hang while waiting for a message.


If you have Jupyter, it is very likely that you have ZMQ installed, but otherwise you will need to install it by running:



## How to run this notebook
Many parts of this notebook require parallel execution. For example, client-server interactions requires that both the client and server be running simultaneously. Since a Jupyter Notebook can only execute code blocks sequentially, please ensure to follow the instructions below:

1. Create two separate copies of this notebook on your Jetson device.
2. From one notebook, run the client/subscriber, and from the other notebook, run the server/publisher/collector/queue device/forwarder device/streamer device/task worker, as required, simultaneously.
3. Create additional copies of this notebook file if you need more simultaneous code block executions.

## Message patterns

ZMQ supports 4 types of message patterns. In lecture we only learned about Pub/Sub, but we'll go through each of the patterns here along with examples.

### PAIR

Running ZMQ using a PAIR pattern is similar to how conventional sockets work. This means that there's a unique, bidirectional connection between the server and the client. The server will send the message when the client connects to it so you don't have to worry about missing any messages. THe following diagram represents how this works:
![PAIR](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/pair.png)

Let's run a little example, we are going to build our client first which we will run on command prompt:

In [None]:
%%file pairclient.py

import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

while True:
    msg = socket.recv()
    print (msg)
    socket.send_string("client message to server1")
    socket.send_string("client message to server2")
    time.sleep(1)


In [None]:
!python3.7 pairclient.py

Now lets build the server, we are going to send a message when we get confirmation that a client is connected.

In [None]:
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)

while True:
    socket.send_string("Server message to client3")
    msg = socket.recv()
    print (msg)
    time.sleep(1)

Since we are killing the server after every run, make sure that you restart your kernel and import the libraries for each example.

As you can see, the server will not send a message until a client connects, then there's communication between the two. We are using the TCP protocol and we are using port 5556. You could use other protocols (for example UDP) but this is fine. 

Remember, we are running everything locally so we are connecting to localhost, but you must replace this with the actual ip of your server, this will be either your ethernet ip (if the RPi is connected to your computer), the ip your router assigns to you (if they're both connected to the same router), your public ip (if you are running on a server with a static public domain), or the virtual ip from your VPN.


### Client-Server

Also known as the Request/Reply model. In this one, the servers are passive until they receive a request from a client. The server will keep trying to respond to that request until it has confirmation or times out. 

Unlike PAIR, in Client-Server the client can connect to multiple servers. However, your requests and replies will hang and block communication until they have been fulfilled and confirmed or have timed-out.

![REREP](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/reqrep.png)

Lets go ahead and build our client. Is good practice to select your port form a command line argument so you don't have to modify your code every time, we will leave a default port but have the option to change it if necessary. Our client has the option to connect to two servers, but we won't use that for now.

In [None]:
%%file reqrep_client.py

import zmq
import sys

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

if len(sys.argv) > 2:
    port1 =  sys.argv[2]
    int(port1)

context = zmq.Context()
print ("Connecting to server...")
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
if len(sys.argv) > 2: ##same socket object can be used to connect to both servers
    socket.connect ("tcp://localhost:%s" % port1)

    #  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print( "Sending request ", request,"...")
    socket.send_string ("Hello")
    #  Get the reply.
    message = socket.recv()
    print ("Received reply ", request, "[", message, "]")


And now our server looks something like this:

In [None]:
import zmq
import time
import sys

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)

while True:
    #  Wait for next request from client
    message = socket.recv()
    print ("Received request: ", message)
    time.sleep (1)  
    socket.send_string("World from %s" % port)

Try killing the server or the client before finishing and see what happens. When your client or server are not reliable your application will hang until you restart everything.

### Publish/Subscribe

The PAIR model is fine when you need a simple straightforward way to communicate between one client and one server. Req/Rep allows you to connect to several servers but communication is contingent to both confirming the receipt of their messages. They are ideal for static devices (such as computers within an organization), but for Edge Networks we need something more relaxed that allows us to connect and disconnect devices on the fly with minimal configuration. This is where Pub/Sub comes in.

Just as it names suggest, this model changes the role of the server to that of a publisher. The client is now a subscriber. The publisher will boradcast messages on different "topics" and the subscriber will listen to the messages form the topic it's subscribed to. The server doesn't care if their messages are received so doesn't need to verify that there's a subscriber present. You can have one subscriber listening to multiple mpublishers or multiple subscribers listening to the same publisher like so:

![PUBSUB](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/pubsub.png)

Lets build our sub slient, it will listen to the topic "1001" from our server:

In [None]:
%%file sub_client.py

import sys
import zmq

port = "5556"

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print ("Collecting updates from weather server...")
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print (topic, messagedata)

print ("Average messagedata value for topic '%s' was %dF" % (topicfilter, total_value / update_nbr))


Our server will publish message with a random topic, one of those will be the topic our client subcsribes to.

In [None]:
import zmq
import random
import sys
import time

port = "5556"


context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print ("%d %d" % (topic, messagedata))
    socket.send_string("%d %d" % (topic, messagedata))
    time.sleep(1)

Notice that you can kill either process and the other will continue working. The major downside is that if there were messages posted on your topic before you subscribed to it you will lose them. And if topics are published faster than what you can process you will lose those messages, unless you are using TCP which will add the missed messages to a queue.

### Push/Pull

A more expensive model than pub/sub, but solves the main drawback of missed messages, is the Push/Pull model. In this model, there are three roles, the producer, the consumers and the result collector. The producer pushes at message downstream through the pipeline which is equally distributed among the consumers, then the consumers send the result of their computation to a result collector. This is a very efficient model for distributed computing since you ensure that all data is processed even if you have multiple consumers. The pipeline looks like this:

![PUSH](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/pushpull.png)

Now let's build our own system. This notebook will be the producer, but we have to create and start the consumer and the collector first, so lets start with the consumer:

In [None]:
%%file consumer.py

import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print ("I am consumer #%s" % (consumer_id))
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    # send work
    consumer_sender = context.socket(zmq.PUSH)
    consumer_sender.connect("tcp://127.0.0.1:5558")
    
    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = { 'consumer' : consumer_id, 'num' : data}
        if data%2 == 0: 
            consumer_sender.send_json(result)

consumer()

Note that the consumer pulls from one port (the producer) and pushes to another (the collector).

The next module is our collector:

In [None]:
%%file resultcollector.py

import time
import zmq
import pprint

def result_collector():
    context = zmq.Context()
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")
    collecter_data = {}
    for x in range(1000):
        result = results_receiver.recv_json()
        if result['consumer'] in collecter_data :
            collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1
        else:
            collecter_data[result['consumer']] = 1
        if x == 999:
            pprint.pprint(collecter_data)

result_collector()

No run each one on separate shells, then we can start our producer:

In [None]:
import time
import zmq

def producer():
    context = zmq.Context()
 
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.bind("tcp://127.0.0.1:5557")
    # Start your result manager and workers before you start your producers
    for num in range(20000):
        work_message = { 'num' : num }
        zmq_socket.send_json(work_message)

producer()


Now that you know all four models, can you think of use cases for each one of them?

All the examples here are extracted from https://learning-0mq-with-pyzmq.readthedocs.io/, just note that the guide uses python 2. There are other uses and tools such as socket polling for when you have to monitor several sockets, or multiprocessing so you can generate multiple servers in the same kernel.

These are advanced techniques that are useful to learn, but for the scale of your project 2 we are going to focus on one last tool from ZMQ that will likely help you a lot. These are:

##ZMQ Devices

A ZMQ device works as a middle man between your server/publisher/producer and your client/subscriber/consumer. These are useful for several reasons, for example, maybe your server and your client are dynamic (ip is assigned on startup for example), and you can't rely on binding a port from either of them. You can use a third device with static ports that will serve as your server, from the point of vie of the client, and as a client, from the server's point of view.

Another use case for this is for asynchronous processing, your server can send a message to the device and continue working trusting that the device will deliver it to the client. In this case it works similar to a broker in a MOM, and you can use different messaging models from each end of the device.

There are three device configurations, let's look at each one:

### Queue

Queue devices are used for the request/reply model and it's in charge of forwarding client requests to the servers and replying with the message the server sends back.. The client and the server will look the same as the device-less model, but now they will connect to the device instead of each other. 
![queue](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/Queue.png)

Each script will be handled by a different kernel. Let us begin with the device:

In [None]:
%%file queue_device.py
import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.XREP)
        frontend.bind("tcp://*:5559")
        # Socket facing services
        backend = context.socket(zmq.XREQ)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.QUEUE, frontend, backend)
    except  e:
        print (e)
        print ("bringing down zmq device")
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Notice that there is no sending nor requesting done by the device, instead it serves as a bridge between the client and the server. It is also important to remind you that the device, the client and the sevrer could be written in different languages as long as they adhere to ZMQ's API.

Let's build our client which is very similar to the one we had before:


In [None]:
%%file queue_client.py

import zmq
import sys
import random

port = "5559"
context = zmq.Context()
print ("Connecting to server...")
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
client_id = random.randrange(1,10005)
#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print ("Sending request ", request,"..."  )  
    socket.send_string("Hello from %s" % client_id)
    #  Get the reply.
    message = socket.recv()
    print ("Received reply ", request, "[", message, "]")

Finally our server, nothing really new going on here:

In [None]:
import zmq
import time
import sys
import random

port = "5560"
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:%s" % port)
server_id = random.randrange(1,10005)
while True:
    #  Wait for next request from client
    message = socket.recv()
    print( "Received request: ", message)
    time.sleep (1)  
    socket.send_string("World from server %s" % server_id)

As you can see the device is completely transparent and does nothing to the message being sent. 

### Forwarder

A forwarder device is similar to the queue but abides to the pub/sub model. Do note that topic filtering should still be done at the sub level, the forwarder should relay messages as they come form the server. 

![forwarder](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/forwarder.png)

Let us build the device:

In [None]:
%%file forwarder_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.SUB)
        frontend.bind("tcp://*:5559")
        
        frontend.setsockopt_string(zmq.SUBSCRIBE, "")
        
        # Socket facing services
        backend = context.socket(zmq.PUB)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.FORWARDER, frontend, backend)
    except e:
        print (e)
        print ("bringing down zmq device")
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

As you can see, the devices have a very similar structure, the only difference is which ZMQ object you use for the bridges. Our client is a generic sub client:

In [None]:
%%file forwarder_subscriber.py

import sys
import zmq

port = "5560"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print ("Collecting updates from server...")
socket.connect ("tcp://localhost:%s" % port)
topicfilter = "9"
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
for update_nbr in range(10):
    string = socket.recv()
    topic, messagedata = string.split()
    print (topic, messagedata)  

Finally the server:

In [None]:
import zmq
import random
import sys
import time

port = "5559"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:%s" % port)
publisher_id = random.randrange(0,9999)
while True:
    topic = random.randrange(1,10)
    messagedata = "server#%s" % publisher_id
    print ("%s %s" % (topic, messagedata))
    socket.send_string("%d %s" % (topic, messagedata))
    time.sleep(1)

### Streamer

Our last device was designed for the push/pull model. The streamer can handle the push from several servers and push to multiple workers.

![streamer](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/streamer.png)

We will skip the result collector for now but we are going to start with the streamer:

In [None]:
%%file streamer_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.PULL)
        frontend.bind("tcp://*:5559")
        
        # Socket facing services
        backend = context.socket(zmq.PUSH)
        backend.bind("tcp://*:5557")

        zmq.device(zmq.STREAMER, frontend, backend)
    except  e:
        print (e)
        print ("bringing down zmq device")
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

In [None]:
%%file task_worker.py

import sys
import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print ("I am consumer #%s" % (consumer_id))
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = { 'consumer' : consumer_id, 'num' : data}
        print (result)
consumer()

In [None]:
import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.connect("tcp://127.0.0.1:5559")
    # Start your result manager and workers before you start your producers
    for num in range(20000):
        work_message = { 'num' : num }
        zmq_socket.send_json(work_message)
        time.sleep(1)

producer()

Final note, you may have noticed that we used bind and connect interchangeably. The first one means that you are dedicating that port so others connect to you, and connect needs one side bound in order to work. It doesn't matter which side does it but ideally it will be the more stable and static one.