# Lamport clocks in python

Code from https://connorwstein.github.io/Lamport-Clocks/

In [3]:
import signal
import sys
import time
import threading
import zmq
import pickle
from multiprocessing import Queue

Let's simulate three process trying to access a shared resource  
Variable **message_queues** will emulate message passing between process

In [4]:
initially_granted_proc = "A"
procs = {"A", "B", "C"}
context = zmq.Context()
#resource_usage_counts = {"A": 0, "B": 0, "C": 0}
message_queues = {"A" : Queue(), "B": Queue(), "C": Queue()}
urlA="tcp://"+ "127.0.0.1" +":"+ "6781"   
urlB="tcp://"+ "127.0.0.1" +":"+ "6782" 
urlC="tcp://"+ "127.0.0.1" +":"+ "6783" 
urls={"A":urlA,"B":urlB,"C":urlC}
sockets_send={}

Let's define a Message class that just return a string with the message type, timestamp, sender and receiver

In [5]:
class Message(object):
    def __init__(self, msg_type, timestamp, sender, receiver):
        self.msg_type = msg_type
        self.timestamp = timestamp
        self.sender = sender
        self.receiver = receiver

    def __repr__(self):
        return "Message {} at {} from {} to {}".format(self.msg_type, self.timestamp, self.sender, self.receiver)

Try the class with this simple example

In [25]:
msg=Message("request","10h30","Teacher","Student")
print(msg)
print(msg.msg_type)
print(msg.timestamp)
print(msg.sender)
print(msg.receiver)

Message request at 10h30 from Teacher to Student
request
10h30
Teacher
Student


Now let's create a Process class inheriting from threading.Thread to implement different threads accesing shared data

* Method **remove_request**. Looks in the Process request_queue to remove a message from a particular type and sender 
* Method **use_resource**. Emulates using some shared resource during 2 seconds
* Method **process_message**. Analyses a mesage received. 
    * If type is "request", then put it in the local process queue and send an "ack" to the process that send the request
    * If type is "release", then remove an existing "request" message form that sender in the process local queue
    * If type is "ack", do nothing
* Method **check_available**. This methods checks if my request is the oldest from all other request in my queue. If so, then returns True. If my request is not the oldest or I have no request in the queue, then returns False 
* Method **run**. This is the main method that run the thread. It makes an infinite loop checking if:
    * If the current process has the resource, then use it for 2 seconds. When done, remove request from local queue and send release messages to other processes. Increase its clock and clear flags "has_resource" and "requested"
    * It the current process does not have the resource, and did not requested yet, then do it. That is done sending a message to its own queue and sending a message to other processes queues. Increase its clock and setting the "requested" flag when done
    * If the current process does not have the resource but it has already requested it, then wait until available processing the messages that arrive from other processes. When received, if the lamport clock from the message is larger than its own, then replace its own by the incoming lamport clock +1. Process incoming message using method **process_message**, increase again the local clock and check availability using **check_available**

In [10]:
class Process(threading.Thread):

    def __init__(self, name, initially_granted, other_processes):
        super(Process, self).__init__()
        self.name = name
        self.has_resource = initially_granted == name
        self.other_processes = other_processes
        self.lamport_clock = 0 # tick after each "event"
        self.request_queue = []
        self.requested = False
        self._stop_event = threading.Event()
        sockets_send[name] = context.socket(zmq.PUSH)
        sockets_send[name].connect(urls[name])
        for i in other_processes:
            sockets_send[i] = context.socket(zmq.PUSH)
            sockets_send[i].connect(urls[i])
        self.request_queue.append(Message("request", -1, initially_granted, initially_granted))
        msg = Message("request",-1,initially_granted,initially_granted)
        self.send_msg_request("A",msg)
        socket_receive = context.socket(zmq.PULL)
        print("listens: ",urls[name])
        socket_receive.bind(urls[name])

    def remove_request(self, msg_type, sender):
    # Find if msg_type and sender are in the self.request_queue
    # If it is, then remove
        index_of_req = -1
        for i in range(len(self.request_queue)):
            if self.request_queue[i].msg_type == msg_type and self.request_queue[i].sender == sender:
                index_of_req = i
                break
        if i == -1:
            print("Unable to remove") 
        else:
            del self.request_queue[i]

    def use_resource(self):
    # emulates 2 seconds of resource usage from process
        print("Process {} is using resource".format(self.name))
        resource_usage_counts[self.name] += 1
        time.sleep(2)

    def process_message(self, msg):
        # Based on msg_type handle appropriately
        if msg.msg_type == "request":
            # Put in our request queue and send an ack to the sender
            self.request_queue.append(msg)
            for proc in self.other_processes:
                if proc == msg.sender:
                    message_queues[proc].put(Message("ack", self.lamport_clock, self.name, msg.sender))
        elif msg.msg_type == "release":
            # Got a release, remove it from our queue
            self.remove_request("request", msg.sender)
        elif msg.msg_type == "ack":
            pass
        else:
            print("Unknown message type")

    def run(self):
        while True:
            if self._stop_event.is_set(): break
            if self.has_resource:
                self.use_resource()
                self.remove_request("request", self.name)
                # Tell everyone that we are done
                for proc in self.other_processes:
                    message_queues[proc].put(Message("release", self.lamport_clock, self.name, proc))
                    self.lamport_clock += 1
                self.has_resource, self.requested = False, False
                continue
            # Want to get the resource
            if not self.requested:
                # Request it
                print("Process {} requesting resource".format(self.name))
                self.request_queue.append(Message("request", self.lamport_clock,self.name, self.name))
                # Broadcast this request
                for proc in self.other_processes:
                    message_queues[proc].put(Message("request", self.lamport_clock, self.name, proc))
                    self.lamport_clock += 1
                self.requested = True
            else:
                # Just wait until it is available by processing messages
                print("Process {} waiting for message".format(self.name))
                msg = message_queues[self.name].get(block=True)        
                # Got a message, check if the timestamp is greater than our clock, if so advance it
                if msg.timestamp >= self.lamport_clock:
                    self.lamport_clock = msg.timestamp + 1
                print("Got message {}".format(msg))
                self.process_message(msg)
                self.lamport_clock += 1
                # Check after processing if the resource is 
                # available for me now, if so, grab it.
                # We need earliest request to be ours and check that we 
                # have received an older message from everyone else 
                if self.check_available():
                    print("Resource available for {}".format(self.name))
                    self.has_resource = True
            print("Process {}: {}".format(self.name, self.request_queue))
            print("Process {} Clock: {}".format(self.name, self.lamport_clock))
            time.sleep(1)

    def check_available(self):
        got_older = {k: False for k in self.other_processes}
        # Get timestamp of our req
        our_req = None
        for req in self.request_queue:
            if req.sender == self.name:
                our_req = req
        if our_req is None:
            return False
        # We found our req make sure it is younger than 
        # all the others and we have an older one from 
        # the other guys
        for req in self.request_queue:
            if req.sender in got_older and req.timestamp > our_req.timestamp:
                got_older[req.sender] = True
        if all(got_older.values()):
            return True
        return False
    
    def stop(self):
        self._stop_event.set()

    def send_msg_request( p, m):
        print(" - Sent message from {} to {}".format(self.name,p))
        print(" sent to ",urls[p])
        sockets_send[p].send(pickle.dumps((m))) # not blocking if socket PULL is up and listening

In [11]:
t1 = Process("A", initially_granted_proc, list(procs - set("A")))
t2 = Process("B", initially_granted_proc, list(procs - set("B")))
t3 = Process("C", initially_granted_proc, list(procs - set("C")))

TypeError: send_msg_request() takes 2 positional arguments but 3 were given

Daemonizing threads means that if main thread dies so do they. That way the process will exit if the main thread is killed

In [28]:
t1.setDaemon(True)
t2.setDaemon(True)
t3.setDaemon(True)

Run threads and stop them after 30 seconds

In [29]:
t1.start()
t2.start()
t3.start()
time.sleep(30)
t1.stop()
t2.stop()
t3.stop()
t1.join()
t2.join()
t3.join()
print("Resource usage:")
print(resource_usage_counts)

Process A is using resource
Process B requesting resourceProcess C requesting resource

Process C: [Message request at -1 from A to A, Message request at 0 from C to C]Process B: [Message request at -1 from A to A, Message request at 0 from B to B]
Process B Clock: 2

Process C Clock: 2
Process B waiting for messageProcess C waiting for message
Got message Message request at 1 from C to B
Process B: [Message request at -1 from A to A, Message request at 0 from B to B, Message request at 1 from C to B]
Process B Clock: 3

Got message Message request at 1 from B to C
Process C: [Message request at -1 from A to A, Message request at 0 from C to C, Message request at 1 from B to C]
Process C Clock: 3
Process A requesting resource
Process A: [Message request at 2 from A to A]
Process A Clock: 4
Process C waiting for messageProcess B waiting for message
Got message Message ack at 2 from B to C
Process C: [Message request at -1 from A to A, Message request at 0 from C to C, Message request at

Process A waiting for message
Got message Message release at 17 from C to A
Process A: [Message request at 14 from A to A, Message request at 19 from B to A]
Process A Clock: 22
Process B waiting for message
Got message Message request at 20 from C to B
Process B: [Message request at 14 from A to B, Message request at 19 from B to B, Message request at 20 from C to B]
Process B Clock: 23
Process C waiting for message
Got message Message request at 20 from B to C
Process C: [Message request at 15 from A to C, Message request at 19 from C to C, Message request at 20 from B to C]
Process C Clock: 23
Resource usage:
{'A': 2, 'B': 2, 'C': 2}
