In [1]:

import socket
import sys
import json
import time
import threading

class Node(threading.Thread):

    def __init__(self, host, port, my_id, delay_p0, delay_p1, delay_p2, n):
        super(Node, self).__init__()

        # When this flag is set, the node will stop and close
        self.terminate_flag = threading.Event()

        # Server details, host (or ip) to bind to and the port
        self.host = host
        self.port = port

        self.networkSize = n
        # Nodes that have established a connection with this node
        self.nodesIn = [] 

        # Nodes that this nodes is connected to
        self.nodesOut = []
        
        self.sendings = []
        self.id = my_id
        self.delay_p0 = delay_p0
        self.delay_p1 = delay_p1
        self.delay_p2 = delay_p2
        self.state = "idle"
        self.decision = None
        self.max_ballot_id = 0
        self.p1votes = []
        self.p2votes = []
        self.prop = None
        self.prop_id = None
        # Start the TCP/IP server
        self.init_server()

        self.message_count_send = 0;
        self.message_count_recv = 0;

        # dprints
        self.debug = False

    def get_message_count_send(self):
        return self.message_count_send

    def get_message_count_recv(self):
        return self.message_count_recv

    def enable_debug(self):
        self.debug = True

    def dprint(self, message):
        if ( self.debug ):
            print("DPRINT: Node "+self.id+"   "  + message)

    def init_server(self):
        print("Initialisation of the TcpServer on port: " + str(self.port) + " on node (" + self.id + ")")

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #self.sock.bind((self.host, self.port))
        self.sock.bind(('', self.port))
        self.sock.settimeout(10.0)
        self.sock.listen(1)
        
        ### manager initialization
        self.nodeManager = NodeManager(self, self.delay_p0 , self.delay_p1, self.delay_p1)

   
    def print_connections(self):
        print("Node "+self.id+" Connection status:")
        print("- Total nodes connected with us: %d" % len(self.nodesIn))
        print("- Total nodes connected to     : %d" % len(self.nodesOut))

    def get_inbound_nodes(self):
        return self.nodesIn

    def get_outbound_nodes(self):
        return self.nodesOut

    def get_id(self):
        return self.id

    def get_host(self):
        return self.host

    def get_port(self):
        return self.port

   
    def delete_closed_connections(self):
        for n in self.nodesIn:
            if n.terminate_flag.is_set():
                self.event_node_inbound_closed(n)

                

                n.join()
                del self.nodesIn[self.nodesIn.index(n)]

        for n in self.nodesOut:
            if n.terminate_flag.is_set():
                self.event_node_outbound_closed(n)

                
                n.join()
                del self.nodesOut[self.nodesIn.index(n)]

    def create_message(self, data):
        self.message_count_send = self.message_count_send + 1
        #data['sender']= self.id
        return data;


    def send_to_nodes(self, data, exclude = []):
        #for n in self.nodesIn:
        #    if n in exclude:
        #        self.dprint("TcpServer.send2nodes: Excluding node in sending the message")
        #    else:
        #        self.send_to_node(n, data)
        
        for n in self.nodesOut:
            if n in exclude:
                self.dprint("TcpServer.send2nodes: Excluding node in sending the message")
            else:
                new_send = threading.Thread(target = self.send_to_node,args=(n,data))
                new_send.start()
                self.sendings.append(new_send)
    
    def send_to_node(self, n, data):
        self.delete_closed_connections()
        if n in self.nodesIn or n in self.nodesOut:
            try:
                n.send(self.create_message( data ))
                
            except Exception as e:
                self.dprint("TcpServer.send2node: Error while sending data to the node (" + str(e) + ")");
        else:
            self.dprint("TcpServer.send2node: Could not send the data, node is not found!")

    def reply_to_node(self,n,data,message):
        target_node = None
        for node in self.nodesOut:
            if(node.id == data['nid']):
                target_node = node
        if(target_node != None):
            #target_node.send(self.create_message(message))
            new_send = threading.Thread(target=self.send_to_node,args=(target_node,message))
            new_send.start()
            self.sendings.append(new_send)
        else:
            self.dprint("TcpServer.reply_to_node: Could not send the data, node is not found!")
    
    def connect_with_node(self, host, port,client_id=None, link_delay=0):
        print("connect_with_node(" + host + ", " + str(port) + ")")
        if ( host == self.host and port == self.port ):
            print("connect_with_node: Cannot connect with yourself!!")
            return;

        # Check if node is already connected with this node!
        for node in self.nodesOut:
            if ( node.get_host() == host and node.get_port() == port ):
                print("connect_with_node: Already connected with this node.")
                return True
        
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.dprint("connecting to %s port %s" % (host, port))
            sock.connect((host, port))

            #thread_client = NodeConnection(self, sock, (host, port))
            thread_client = self.create_new_connection(sock, (host, port),client_id, link_delay)
            thread_client.start()
            self.nodesOut.append(thread_client)
            self.event_connected_with_node(thread_client)

            
            self.print_connections()

        except Exception as e:
            self.dprint("TcpServer.connect_with_node: Could not connect with node. (" + str(e) + ")")

    
    def disconnect_with_node(self, node):
        if node in self.nodesOut:
            node.send(self.create_message( {"type": "message", "message": "Terminate connection"} ))
            node.stop()
            node.join()
            del self.nodesOut[self.nodesOut.index(node)]

    
    def stop(self):
        self.terminate_flag.set()

    def create_new_connection(self, connection, client_address, client_id= None, link_delay=0):
        return NodeConnection(self, connection, client_address,client_id, link_delay)

  
    def run(self):
        while not self.terminate_flag.is_set():  # Check whether the thread needs to be closed
            try:
                self.dprint("TcpServerNode: Wait for incoming connection")
                connection, client_address = self.sock.accept()

                thread_client = self.create_new_connection(connection, client_address)
                thread_client.start()
                self.nodesIn.append(thread_client)

                self.event_node_connected(thread_client)



            except socket.timeout:
                pass

            except:
                raise

            time.sleep(0.01)

        print("TcpServer stopping...")
        for t in self.nodesIn:
            t.stop()

        for t in self.nodesOut:
            t.stop()
        
        self.nodeManager.stop()

        time.sleep(1)

        for t in self.nodesIn:
            t.join()

        for t in self.nodesOut:
            t.join()

        self.sock.close()
        self.nodeManager.join()
        print("TcpServer stopped")

    def event_node_connected(self, node):
        self.dprint("event_node_connected: " + node.getName())

        
    def event_connected_with_node(self, node):
        self.dprint("event_node_connected: " + node.getName())

    def event_node_inbound_closed(self, node):
        self.dprint("event_node_inbound_closed: " + node.getName())

    def event_node_outbound_closed(self, node):
        self.dprint("event_node_outbound_closed: " + node.getName())

    def event_node_message(self, node, data):
        self.dprint("event_node_message: " + node.getName() + ": " + str(data))
        print("Node "+self.id+" received: "+ node.getName() + ": " + str(data))
        self.handle_message(node,data)
        
 
    def handle_message(self,node,data):
        mtype = data['type']
        if(mtype=='POTENTIAL_LEADER'):
            self.rec_potleader(node,data)
        elif(mtype=='POTENTIAL_LEADER_ACK'):
            self.rec_plack(node,data)
        elif(mtype=='V_PROPOSE'):
            self.rec_vpropose(node,data)
        elif(mtype=='V_PROPOSE_ACK'):
            self.rec_vpack(node,data)
        elif(mtype=='V_DECIDE'):
            self.rec_vdecide(node,data)
        else:
            pass
        
            
    def init_election(self):
        self.max_ballot_id = self.max_ballot_id+1
        self.send_to_nodes(self.create_message( {"nid":str(self.id), "type": "POTENTIAL_LEADER", \
                                                 "value":str(self.max_ballot_id)}))
        self.state = "ph1"
        self.p1votes = []
                           
    
    def init_phase2(self):
        if(self.state == "ph1"):
            if(len(self.p1votes)>(self.networkSize/2)): # start proposing
                past_val = None
                max_seen = 0
                my_prop = int(self.id) * self.networkSize
                for prop in self.p1votes :
                    if(int(prop['value1'])>max_seen):
                        max_seen = int(prop['value1'])
                        past_val = int(prop['value2'])
                self.state = "ph2"
                self.p2votes = []
                if(past_val != None):
                    self.send_to_nodes(self.create_message( {"nid":str(self.id), "type": "V_PROPOSE", \
                                                 "value1":str(self.max_ballot_id), "value2":str(past_val)}))
                    self.prop = past_val
                    self.prop_id = self.max_ballot_id
        
                else:
                    self.send_to_nodes(self.create_message( {"nid":str(self.id), "type": "V_PROPOSE", \
                                                 "value1":str(self.max_ballot_id), "value2":str(my_prop)}))
                    self.prop = my_prop
                    self.prop_id = self.max_ballot_id
        
            else:
                self.state = "idle"  # cancelled
        else:
            pass
    
    def finish_ballot(self):
        if(self.state == "ph2"):
            if(len(self.p2votes)>(self.networkSize/2)): # decide
                self.decision = self.prop
                self.state = "decided"
                print("Node "+self.id+" decided value "  + str(self.decision))
                self.send_to_nodes(self.create_message( {"nid":str(self.id), "type": "V_DECIDE", \
                                                 "value":str(self.decision)}))
        
                
            else:
                self.state = "idle"  # cancelled
        else:
            pass
                   
    
    def rec_potleader(self,node,data):
        if(self.state=="decided"):
            pass
        elif(int(data['value'])>=self.max_ballot_id): # must stop everything and answer
            if(self.state != "decided"): # if it's decided must keep
                self.state = "idle"
            if(self.prop == None):
                self.reply_to_node(node,data,self.create_message( {"nid":str(self.id), "type": "POTENTIAL_LEADER_ACK", \
                                                 "value1":str(0), "value2":str(-1)}))
            else:
                self.reply_to_node(node,data,self.create_message( {"nid":str(self.id), "type": "POTENTIAL_LEADER_ACK", \
                                                 "value1":str(self.prop_id), "value2":str(self.prop)}))
            self.max_ballot_id = int(data['value'])
        else: # must ignore
            pass
        
    def rec_plack(self,node,data):
        if(self.state == "ph1"): # must count
            #if(int(data['value1']) == self.max_ballot_id):  # relevant, must count
            self.p1votes.append(data)
            if(len(self.p1votes)>(self.networkSize/2)):
                self.init_phase2()
           # else:  # irrelevant, ignore
           #     pass
        else:  # ignore
            pass
    
    def rec_vpropose(self,node,data):
        if(int(data['value1'])>=self.max_ballot_id): # must answer
            self.prop = int(data['value2'])
            self.prop_id = int(data['value1'])
            self.reply_to_node(node,data,self.create_message( {"nid":str(self.id), "type": "V_PROPOSE_ACK", \
                                                 "value":str(-1)}))
                                   
    def rec_vpack(self,node,data):
        if(self.state == "ph2"): # must count
            #if(int(data['value']) == self.max_ballot_id):  # relevant, must count
            self.p2votes.append(data)
            if(len(self.p2votes)>(self.networkSize/2)): # decide
                self.finish_ballot()
            #else:  # irrelevant, ignore
            #    pass
        else:  # ignore
            pass
    
    def rec_vdecide(self,node,data):
        self.state = "decided"
        self.prop = int(data['value'])
        self.prop_id = self.max_ballot_id
        self.decision = int(data['value'])
        print("Node "+self.id+" decided value "  + data['value'])
        
                                   

class NodeConnection(threading.Thread):

    def __init__(self, nodeServer, sock, clientAddress,client_id= None, link_delay=0):
        super(NodeConnection, self).__init__()

        self.host = clientAddress[0]
        self.port = clientAddress[1]
        self.nodeServer = nodeServer
        self.sock = sock
        self.clientAddress = clientAddress
        self.terminate_flag = threading.Event()
        self.link_delay = link_delay
        # Variable for parsing the incoming json messages
        self.buffer = ""
        self.nodeServer.dprint("NodeConnection.send: Started with client '" + self.host + ":" + str(self.port) + "'")        
        self.id = client_id
    def get_host(self):
        return self.host

    def get_port(self):
        return self.port

    
    # This data is converted into json and send.
    def send(self, data):
        #data = self.create_message(data) # Call it yourself!!
        time.sleep(self.link_delay)
        try:
            message = json.dumps(data, separators=(',', ':')) + "-TSN";
            self.sock.sendall(message.encode('utf-8'))

            
        except:
            self.nodeServer.dprint("NodeConnection.send: Unexpected error:", sys.exc_info()[0])
            self.terminate_flag.set()

    def check_message(self, data):
        return True

    def get_id(self):
        return self.id

    # Stop the node client
    def stop(self):
        self.terminate_flag.set()

    # This is the main loop of the node client.
    def run(self):

        # Timeout, so the socket can be closed when it is dead
        self.sock.settimeout(10.0)

        while not self.terminate_flag.is_set(): # Check whether the thread needs to be closed
            line = ""
            try:
                line = self.sock.recv(4096) # the line ends with -TSN\n
                #line = line.encode('utf-8');
                
            except socket.timeout:
                pass

            except:
                self.terminate_flag.set()
                self.nodeServer.dprint("NodeConnection: Socket has been terminated (%s)" % line)

            if line != "":
                try:
                    self.buffer += str(line.decode('utf-8'))
                except:
                    print("NodeConnection: Decoding line error")

                # Get the messages
                index = self.buffer.find("-TSN")
                while ( index > 0 ):
                    #time.sleep(link_delay) # if need delay on receiver
                    message = self.buffer[0:index]
                    self.buffer = self.buffer[index+4::]

                    try:
                        data = json.loads(message)
                        
                    except Exception as e:
                        print("NodeConnection: Data could not be parsed (%s) (%s)" % (line, str(e)) )

                    if ( self.check_message(data) ):
                        self.nodeServer.message_count_recv = self.nodeServer.message_count_recv + 1
                        self.nodeServer.event_node_message(self, data)
                        
                        
                        

                    else:
                        self.nodeServer.dprint("-------------------------------------------")
                        self.nodeServer.dprint("Message is damaged and not correct:\nMESSAGE:")
                        self.nodeServer.dprint(message)
                        self.nodeServer.dprint("DATA:")
                        self.nodeServer.dprint(str(data))
                        self.nodeServer.dprint("-------------------------------------------")

                    index = self.buffer.find("-TSN")

            time.sleep(0.01)

        self.sock.settimeout(None)
        self.sock.close()
        self.nodeServer.dprint("NodeConnection: Stopped")

        
class NodeManager(threading.Thread):

    def __init__(self, nodeServer,delay_p0, delay_p1, delay_p2):
        super(NodeManager, self).__init__()

        self.nodeServer = nodeServer
        self.terminate_flag = threading.Event()
        self.delay_p0 = delay_p0
        self.delay_p1 = delay_p1
        self.delay_p2 = delay_p2
        #self.state = "idle"
    # Stop the Manager, become receiver just
    def stop(self):
        self.terminate_flag.set()

    # This is the main loop that initiates elections.
    def run(self):


        while not self.terminate_flag.is_set(): # Check whether the thread needs to be closed
            if(self.nodeServer.state == "idle"): # wait to begin
                time.sleep(self.delay_p0)
                if(self.nodeServer.state == "idle"):
                    self.nodeServer.dprint("starting election")
                    self.nodeServer.init_election()
            elif(self.nodeServer.state == "ph1"): # phase 1 in progress
                time.sleep(self.delay_p1)
                if(self.nodeServer.state == "ph1"):
                    self.nodeServer.dprint("phase 0 ended")
                    self.nodeServer.init_phase2()
            elif(self.nodeServer.state == "ph2"): # phase 2 in progress
                time.sleep(self.delay_p2)
                if(self.nodeServer.state == "ph2"):
                    self.nodeServer.dprint("phase 1 ended")
                    self.nodeServer.finish_ballot()
            elif(self.nodeServer.state == "decided"):  # decided so don't start any elections
                self.nodeServer.dprint("already decided, no more election")
                self.stop()
            else:
                pass
            time.sleep(0.01)



In [2]:
net_graph = []
user_inp = input() 
network_size = int(user_inp)
for i in range(network_size):
    node_spec = []
    #node_spec.append(i)
    user_inp = input()
    node_main = user_inp.split()
    node_spec.append(node_main)
    for j in range(network_size-1):
        user_inp2 = input()
        link_spec = user_inp2.split()
        node_spec.append(link_spec)
    net_graph.append(node_spec)

#net_graph = [[1,[10,6,5],[2,1],[3,1.2]],[2,[14,5,5],[1,1.5],[3,1]],[3,[18,6,6],[1,1],[2,2.5]]]
print(net_graph)
host = '127.0.0.1'
base_port = 5500
graph_nodes = []
graph_edges = []
#3
#1 10 6 5
#2 1
#3 1.2
#2 14 5 5
#1 1.5
#3 1
#3 18 6 6
#1 1
#2 2.5

for node_spec in net_graph: ### first create nodes and initialize them
    node_id = node_spec[0][0]
    node_delays = node_spec[0][1:4]
    new_node = Node(host, base_port+int(node_id),my_id=node_id, delay_p0=float(node_delays[0])\
                    , delay_p1=float(node_delays[1]), delay_p2=float(node_delays[2]), n= network_size)
    new_node.start()
    new_node.nodeManager.start()
    graph_nodes.append(new_node)
                                     

for idx, node_spec in enumerate(net_graph): ### now create links
    node_id = node_spec[0][0]
    this_node = graph_nodes[idx]
    target_node = None
    for i in range(1,len(node_spec)):
        new_link = node_spec[i]
        for n in graph_nodes: # find the target node to connect
            if(int(n.id) == int(new_link[0])):
                target_node = n
        if(target_node == None):
            print("error: target node not found, check your network graph")
            
        this_node.connect_with_node(host,target_node.get_port(),target_node.id,float(new_link[1]))
        time.sleep(0.1)
        


[[['1', '10', '6', '5'], ['2', '1'], ['3', '1.2']], [['2', '14', '5', '5'], ['1', '1.5'], ['3', '1']], [['3', '18', '6', '6'], ['1', '1'], ['2', '2.5']]]
Initialisation of the TcpServer on port: 5501 on node (1)
Initialisation of the TcpServer on port: 5502 on node (2)
Initialisation of the TcpServer on port: 5503 on node (3)
connect_with_node(127.0.0.1, 5502)
Node 1 Connection status:
- Total nodes connected with us: 0
- Total nodes connected to     : 1
connect_with_node(127.0.0.1, 5503)
Node 1 Connection status:
- Total nodes connected with us: 0
- Total nodes connected to     : 2
connect_with_node(127.0.0.1, 5501)
Node 2 Connection status:
- Total nodes connected with us: 1
- Total nodes connected to     : 1
connect_with_node(127.0.0.1, 5503)
Node 2 Connection status:
- Total nodes connected with us: 1
- Total nodes connected to     : 2
connect_with_node(127.0.0.1, 5501)
Node 3 Connection status:
- Total nodes connected with us: 2
- Total nodes connected to     : 1
connect_with_node