In [143]:
%%writefile config.py


import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--com',  help = "Directory of the command file")
parser.add_argument('-o', help = "Directory for outputs")
parser.add_argument('--id', help = "An id for client e.g. c1")
parser.add_argument('--nc', help = "Expected numebr of clients", type = int)

def get_config():
    config, unparsed = parser.parse_known_args()
    return config, unparsed

Overwriting config.py


In [146]:
%%writefile F_R.py

import zmq 
import threading
from config import get_config

class brokers(object):
    def __init__(self, config):
        self.SUBSCRIBERS_EXPECTED = config.nc
        
        
    def forwarder_tom(self):
        try:
            self.context = zmq.Context() # Socket facing clients
            self.frontend = self.context.socket(zmq.SUB)
            self.frontend.bind("tcp://*:5559")
            self.frontend.setsockopt(zmq.SUBSCRIBE, b"")
            # Socket facing servics
            self.backend = self.context.socket(zmq.PUB)
            self.backend.bind("tcp://*:5560")
    

            # Socket to receive signals
            self.syncservice = self.context.socket(zmq.REP)
            self.syncservice.bind('tcp://*:5561')

            # Get synchronization from subscribers
            subscribers = 0
            while subscribers < self.SUBSCRIBERS_EXPECTED:
                # wait for synchronization request
                msg = self.syncservice.recv()
                # send synchronization reply
                self.syncservice.send(b'')
                subscribers += 1
                print("+1 subscriber (%i/%i)" % (subscribers, self.SUBSCRIBERS_EXPECTED))
            self.backend.send("start".encode())  

            zmq.proxy(self.frontend, self.backend)

        except e:
            print(e)
            print("bringing down zmq device") 
        finally:
            pass
            self.frontend.close()
            self.backend.close()
            self.context.term()
        
        
    def router_uno(self):
        self.context = zmq.Context()
        self.router = self.context.socket(zmq.ROUTER)
        self.router.bind("tcp://*:5562")

        # Initialize poll set
        self.poller = zmq.Poller()
        self.poller.register(self.router, zmq.POLLIN)

        # Switch messages between sockets
        while True:
            self.socks = dict(self.poller.poll())

            if self.socks.get(self.router) == zmq.POLLIN:
                sender_id, empty, receiver_id, message = self.router.recv_multipart()
                print("Received ", message.decode("ascii"), " from ", sender_id.decode("ascii"), "!")
                print( "Forwarding to ", receiver_id.decode("ascii"))
                self.router.send_multipart([receiver_id, b'',sender_id, message])
        

if __name__ == "__main__":
    
    config, _ = get_config()
    
    b = brokers(config)
    threading.Thread(target = b.forwarder_tom , args = ()).start()
    threading.Thread(target = b.router_uno , args = ()).start()

Overwriting F_R.py


In [147]:
%%writefile client.py 

import zmq
import random
import sys
import time

class client_(object):
    def __init__(self, config):
        self.identity = config.id
        self.c_socket()
        
    def c_socket(self):
        self.context = zmq.Context()
        
        # Multicasting
        self.client_receiver = self.context.socket(zmq.SUB)
        self.client_receiver.connect("tcp://localhost:5560")
        self.client_receiver.setsockopt(zmq.SUBSCRIBE, b'')
        
        self.client_sender = self.context.socket(zmq.PUB)
        self.client_sender.connect("tcp://localhost:5559")
        
        # unordered messaging
        self.client_deal = self.context.socket(zmq.DEALER)
        self.client_deal.setsockopt(zmq.IDENTITY, (self.identity).encode())
        self.client_deal.connect("tcp://localhost:5562")
        
        # synchronization with publisher
        self.syncclient = self.context.socket(zmq.REQ)
        self.syncclient.connect('tcp://localhost:5561')
        
        # sending a synchronization request
        self.syncclient.send(b'')
        
        # waiting for synchronization reply
        self.syncclient.recv()
        while True:
            self.msg = self.client_receiver.recv()
            if (self.msg).decode("ascii") == "start":  # All clients are connected, Start!
                break
        
    def send_tom(self, msg, seq_num = None):
        print("sending ", msg.decode("ascii"), " from ", self.identity,  " to all!")
        if seq_num != None:
            self.client_sender.send_multipart([self.identity.encode(), seq_num ,msg])  # to be able to send sequence number
        else:                                                                           # using sequencer
            self.client_sender.send_multipart([self.identity.encode(), b'', msg])
            
    def recv_tom(self):
        sender_id, seq_num ,msg = self.client_receiver.recv_multipart()
        print ("Received ", msg.decode("ascii"), " from ", sender_id.decode("ascii"), "!")
        if seq_num.decode("ascii") == "":
            return [msg]
        else:
            return [msg, seq_num]   # to be able to receive sequence number sent by sequencer
            
    def send_uno(self, receiver_id, msg):
        print("Sending ", msg.decode("ascii"), " to ", receiver_id.decode("ascii"), "!" )
        self.client_deal.send_multipart([b'', receiver_id, msg])
            
    def recv_uno(self):
        empty, sender_id, msg = self.client_deal.recv_multipart()
        print ("Received ", msg.decode("ascii"), " from ", sender_id.decode("ascii") , "!")
        return msg

            


Overwriting client.py


In [160]:
%%writefile main.py

from config import get_config
from client import client_
from queue import Queue
import threading
import time
import csv
import numpy as np


if __name__ == "__main__":

    config, _ = get_config()
    c = client_(config)
    
    if config.id == "sequencer":
        g_seq_num = 0  # global sequence number (initial value set to 0)
        while True:
            msg = c.recv_tom()
            if len(msg) == 1:
                g_seq_num += 1
                c.send_tom(msg[0], str(g_seq_num).encode())
    
    
    else:
        q1 = Queue(100)  # queue for storing unorderd messages
        q2 = Queue(100)  # quese for storing totally ordered messages

        def rcv1(q):
            while True:
                msg = c.recv_uno()
                q.put(msg)

        def rcv2(q):
            while True:
                msg = c.recv_tom()
                q.put(msg)

        def send_commands():
            with open(config.com, 'r') as f:   # opening command files
                reader = csv.reader(f)
                cmd = list(reader)
                for item in cmd:
                    if item[0] == "Multicast":  # checking to see if it is a multicast message
                        if item[1] == config.id:
                            c.send_tom((config.id + "###" +item[2]).encode())
                    elif item[0] == "sleep":  
                        time.sleep(np.float(item[1]))
                    else:                              
                        if item[0] == config.id:
                            c.send_uno(item[1].encode(), (config.id + "###" + item[2]).encode())

    
        def tom_uno():
            cmd_list = []
            seq_list = []
            l_seq_num = 0    # local sequence number (initial value set to 0)

            with open(config.o + "/test_result_" + config.id +"_tom.txt", 'wb', 0) as g, \
                         open(config.o + "/test_result_" + config.id +"_uno.txt", 'wb', 0) as h:
                while True:

                    if not q1.empty():     # checking for unorderd messages
                        #print(item1)
                        item1 = q1.get()
                        h.write((item1.decode("ascii") + "\n").encode()) # write received uorderd messages in the file

                    if not q2.empty(): # checking for totally orderd messages
                        #print(item2)
                        item2 = q2.get() 
                        if len(item2) == 1:         # checking whether it is from a client or the sequencer
                            cmd_list.append(item2)
                        else:
                            seq_list.append(item2)
                    if len(seq_list) > 0:         # checking to see if there is any uprocessed message from sequencer
                        if int(seq_list[0][1].decode("ascii")) == (l_seq_num + 1):
                            if [seq_list[0][0]] in cmd_list:
                                idx = cmd_list.index([seq_list[0][0]])
                                g.write((cmd_list[idx][0].decode("ascii") + "\n").encode()) # delivering the received 
                                del cmd_list[idx]                                           # message to the application
                                del seq_list[0]
                                l_seq_num += 1           
                    time.sleep(0.01)           # For preventing from high cpu usage
            
            
            
        # threads for receiving messages and sending commands                    
        threading.Thread(target = rcv1  , args = (q1,)).start()
        threading.Thread(target = rcv2 , args = (q2,)).start()
        threading.Thread(target = send_commands , args = ()).start()
        threading.Thread(target = tom_uno, args = ()).start()   

Overwriting main.py
