In [1]:
"""
Name: check_causal_delivery.py
Author: Liangwei Chen
Create Time: 06/12/2018
Last Modified Time: 07/12/2018
"""

'\nName: check_causal_delivery.py\nAuthor: Liangwei Chen\nCreate Time: 06/12/2018\nLast Modified Time: 07/12/2018\n'

In [2]:
import os
import linecache
import numpy as np
import pandas as pd
from collections import defaultdict

# Helpers 

In [3]:
# Step 1: Get causality
def get_peer_dependency(host_file):
    """
    Get dependency list
    Params: host_file: str
    Return: peer_dependencies: list(list)
    """
    
    linecache.clearcache()
    
    # Get number of peers
    num_peers = int(linecache.getline(host_file, 1))
    
    # Get dependency for each peer
    dependencies = [linecache.getline(host_file, num_peers + 2 + i) for i in np.arange(num_peers)]

    # Parse dependency
    dep_map = map(lambda x: x.rsplit('\n')[0].rstrip().split(' '), dependencies)

    peer_dependency_list = list(map(lambda line: list(map(lambda x: int(x), line)), dep_map))
    
    return peer_dependency_list

In [18]:
# Step 2: build msg dependencies
def get_msg_dependency(target, peer_dependency_list):
    """
    Obtain msg dependency of target using peer dependency in host file
    Params: target: int
            peer_dependency_list: list
    Return: msg_dependency: dict(set(str))
    """
    
    peer_dependency = peer_dependency_list[target - 1]
    log_file = "da_proc_{0}.log".format(target)
    history = set()
    msg_dependency = defaultdict(set)

    with open(log_file) as f:
        # Preprocess line
        lines = f.readlines()
        lines = list(map(lambda x: x.rsplit('\n')[0], lines))

        for line in lines:

            if line[0] == 'b':

                # Add broadcast msg to dependency
                msg_dependency[str(target) + line.split(' ')[1]] = history.copy()
                history.add(str(target) + line.split(' ')[1])
                continue

            if line[0] == 'd':
                root = int(line.split(' ')[1])
                if root in peer_dependency and root != target:

                    # Add dependent msg to history
                    history.add(line.split(' ')[1] + line.split(' ')[2])
                    continue
                    
    return msg_dependency

In [52]:
# Step 3: Get delivery sequence
def get_deliver_seq(target):
    """
    Obtain deliver sequence
    Params: target: int
    Return: deliver_seq: list(str)
    """

    log_file = "da_proc_{0}.out".format(target)

    with open(log_file, "r") as f:
        lines = f.readlines()
        deliver_seq = list(map(lambda line: line.rsplit('\n')[0].split(' ')[1] +\
                                             line.rsplit('\n')[0].split(' ')[2],
                             filter(lambda x: x[0] == 'd', lines)))
    return deliver_seq

In [53]:
# Step 3.5: Store msg_dependency and delivery sequence
def get_data(peer_dependency_list):
    """
    Get delivery and dependency data for each process
    Params: peer_dependency_list: list
    Return: delivery: dict(list)
            msg_dependencies: dict(dict(set))
    """
    
    delivery = dict()
    msg_dependencies = dict()
    num_peers = len(peer_dependency_list)
    
    for peer in np.arange(1, num_peers + 1):
        delivery[peer] = get_deliver_seq(peer)
        msg_dependencies[peer] = get_msg_dependency(peer, peer_dependency_list)
        
    return delivery, msg_dependencies

In [71]:
# Step 4: Check validity
def check_causal_locality(delivery, msg_dependencies):
    """
    Check causal locality of broadcast
    Params: delivery: dict(list)
            msg_dependencies: dict(dict(set))
    Return: validity: bool
    """
    
    for i, test_delivery in enumerate(delivery.values()):
        # Go through all delivery seq
        
        print("checking ", i + 1, " len:")
        print(len(test_delivery))
        for test_msg_dependency in msg_dependencies.values():
            # Go through all dependencies
            
            for test_msg in test_msg_dependency.keys():
                # Check causality for each msg delivery
                causal_msg = test_msg_dependency[test_msg]

                # Check for delivery of causal msg before delivering test msg
                if len(set(causal_msg).difference(set(test_delivery[: test_delivery.index(test_msg)]))) > 0:
                    return False
    return True

In [72]:
# Step 5: Validate correctness of testing functionality

# Build wrong example
test_msg_dependency = dict()
test_msg_dependency[2] = set([1])
test_msg_dependency[3] = set([1, 2])
test_msg_dependency[4] = set([1, 2, 3])

test_delivery = [3, 1, 2, 4]

In [73]:
for test_msg in test_msg_dependency.keys():

    causal_msg = test_msg_dependency[test_msg]

    # Check for delivery of causal msg before delivering test msg
    if len(set(causal_msg).difference(set(test_delivery[: test_delivery.index(test_msg)]))) > 0:
        print("Error")

Error


# Main func

In [74]:
def main():
    """
    Check causal locality 
    Params: 
    Return:
    """
    
    host_file = "membership"
    
    peer_dependency_list = get_peer_dependency(host_file)
    
    delivery, msg_dependencies = get_data(peer_dependency_list)
    
    result = check_causal_locality(delivery, msg_dependencies)
    
    if result:
        print("Causal locality delivery satisfied")
    else:
        print("Fail to satisfy causal locality")

In [80]:
main()

checking  1  len:
250
checking  2  len:
250
checking  3  len:
250
checking  4  len:
250
checking  5  len:
250
Causal locality delivery satisfied
