In [42]:
from multiprocessing import Process, Pipe
import signal
import logging
import random
import numpy as np

In [62]:
filename = 'chm_log.txt'
with open(filename, 'w+') as f:
    f.write('')
f.close()
msg_filename = 'chm_msg.txt'
with open(msg_filename, 'w+') as f:
    f.write('')
f.close()

## Logging

* `logger.setLevel(logging.DEBUG)` to debug
* `logger.setLevel(logging.WARNING)` to only log the final result (deadlocked or not deadlocked)

In [63]:
logging.basicConfig()
logger = logging.getLogger('MPLogger')
logger.setLevel(logging.DEBUG)
#logger.setLevel(logging.WARNING)

## Set parameters

In [65]:
num_processes = 5
wfg = deadlocked_wfg
initiator = 2
timeout = 1
msg_gaurentee = 0.99
num_experiments = 100 # Number of times to simulate

## Timeout

* In diffusion based methods, if initiator receives messages for all processes in its dependent set, it is deadlocked.
* Else if it did not receive all messages before timeout, it will assume that it is not deadlocked
* Noisy channel with less than 100% delivery gaurentee can only mistake deadlocked situation as not deadlocked, vice versa cannot happen

In [66]:
class Timeout:
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

## Message passing gaurentees

* if prob == 1, then gaurentee is 100%

In [67]:
def noisy_send(pipe, msg):
    send_prob = np.random.uniform(0, 1)
    if send_prob <= msg_gaurentee:
        pipe.send(msg)
        logger.debug("Succesfully sent: {}".format(msg) + " Prob generated: {}".format(send_prob))
        with open(msg_filename, 'a+') as f: f.write('1')
    else:
        logger.debug("Failed: {}".format(msg) + " Prob generated: {}".format(send_prob))

## Dependency lists

* Funcion to get dependency lists for each process

In [68]:
def get_waiting_on(id, wfg):
    waiting_on = []
    for pi, pj in wfg:
        if pj == id:
            waiting_on.append(pi)
    return waiting_on

In [69]:
def get_waited_by(id, wfg):
    waited_by = []
    for pi, pj in wfg:
        if pi == id:
            waited_by.append(pj)
    return waited_by

## CHM algo

**Initiate a diffusion computation for a blocked process Pi :**

* send query(i, i, j) to all processes Pj in the dependent set DSi of Pi
    
`       
numi (i):= |DSi |; 
waiti (i):= true;
`
        
**When a blocked process Pk receives a query(i,j,k):** 
* if this is the engaging query for process Pi
    * then send query(i, k, m) to all Pm in its dependent set DSk ;
    * numk (i): = |DSk |; waitk (i):= true
* else if waitk (i) then send a reply (i, k, j) to Pj . 

**When a process Pk receives a reply(i,j,k):**
* if waitk (i) then begin
    * numk (i):= numk (i) − 1;
    * if numk (i)= 0
        * then if i=k then declare a deadlock
        * else send reply(i, k, m) to the process Pm which sent the engaging query.

## Chandy-Mishra-Hass

* Implementation of CHM using multiple processes and static WFG

In [70]:
def process(id, num_processes, send_pipes, recv_pipes, waiting_on, initiator=False, timeout=10):
    np.random.seed(random.randint(0, 100000))
    
    dependent_set = [False for _ in range(num_processes)]
    num = [0 for _ in range(num_processes)]
    wait = [False for _ in range(num_processes)]
    
    engaged = False
    engaged_by = None
    
    logger.debug("{} waiting_on {}".format(id, waiting_on))
    
    try:
        with Timeout(timeout):
            
            # =================================================================================
            """
            **Initiate a diffusion computation for a blocked process Pi :**

                * send query(i, i, j) to all processes Pj in the dependent set DSi of Pi

                `       
                numi (i):= |DSi |; 
                waiti (i):= true;
                `
            """
            
            if initiator:
                logger.debug('Initiator {}'.format(id))
                for pj in waiting_on:
                    noisy_send(send_pipes[pj], ('query', id, id, pj))
                num[id] = len(waiting_on)
                wait[id] = True
                engaged = True
                engaged_by = id
            
            # =================================================================================

            while True:
                for recv_id, conn in enumerate(recv_pipes):
                    while conn.poll():
                        type_, i, j, k = conn.recv()
                        
                        # ====================================================================
                        """         
                        **When a blocked process Pk receives a query(i,j,k):** 
                        * if this is the engaging query for process Pi
                            * then send query(i, k, m) to all Pm in its dependent set DSk ;
                            * numk (i): = |DSk |; waitk (i):= true
                        * else if waitk (i) then send a reply (i, k, j) to Pj . 
                        """
                        
                        if len(waiting_on) > 0: # if True, the process is blocked
                            if type_ == 'query':

                                if not engaged:
                                    engaged = True
                                    engaged_by = j
                                    for pj in waiting_on:
                                        noisy_send(send_pipes[pj], ('query', i, k, pj))
                                        num[i] = len(waiting_on)
                                        wait[i] = True
                                else:
                                    if wait[i]:
                                        noisy_send(send_pipes[j], ('reply', i, k, j))

                        # ====================================================================
                        """
                        **When a process Pk receives a reply(i,j,k):**
                        * if waitk (i) then begin
                            * numk (i):= numk (i) − 1;
                            * if numk (i)= 0
                                * then if i=k then declare a deadlock
                                * else send reply(i, k, m) to the process Pm which sent the engaging query.
                        """

                        if type_ == 'reply':
                            if wait[i]:
                                num[i] = num[i] - 1
                                if num[i] == 0:
                                    if initiator:
                                        logger.error("Deadlocked!")
                                        with open(filename, 'a+') as f: f.write("{} {}\n".format(msg_gaurentee, 1))
                                        return
                                    else:
                                        noisy_send(send_pipes[engaged_by], ('reply', i, k, engaged_by))
                                    
                        # ====================================================================
    
    except TimeoutError as e:
        if initiator:
            logger.error('Not Deadlocked!')
            with open(filename, 'a+') as f: f.write("{} {}\n".format(msg_gaurentee, 0))