# Project 2 -- Gossip Style Failure Detectors

## Instructions

Please read carefully:

* Solve the project yourself. No teamwork.
* If you have questions, please post these in the public channel on Slack. The answers may be relevant to others as well. 
* Feel free to import and use any additional Python package you need.
* You are allowed to solve the project using a different programming language. In this case, please send me your full code and instructions how to run it (in this case you may have to use a different socket library).
* Your code may be tested on more than 2 nodes. Two nodes are used for the sake of an example.
* In case you experience errors when running your code - read the error carefully. It may happen that the port has not been freed and you just need to wait for a few more seconds to fix the problem.
* Make sure to fill in your `student_name` in the following block below.

In [None]:
student_name = 'David Mihola' # fill with your student name
assert student_name != 'your_student_name', 'Please fill in your student_name before you start.'

## Setup

Necessary imports:

In [None]:
import random
import threading
import multiprocessing
import os
import time
import zmq
import sys
import numpy as np

You will be sending and receiving messages via sockets. We will use ZeroMQ library, is a  high-performance asynchronous messaging library, aimed at use in distributed or concurrent applications. ZeroMQ simplifies communication handling between distributed nodes. ZeroMQ reference guide: https://zguide.zeromq.org/docs/chapter2/ 


Below you will find an example of generating a pair of communicating nodes, which periodically exchange messages. Both nodes eventually terminate. 

In [None]:
def gossip(n, N, msgNet):
    # create a listener
    listener_thread = threading.Thread(target=responder,args=(n, N, msgNet))
    listener_thread.start()

    # Creates a publisher socket for sending messages
    context = zmq.Context()
    s = context.socket(zmq.PUB)
    s.bind(f"tcp://127.0.0.1:{(5550 + n)}")
    msgNet[n]['SOCKET'] = s

    # Waiting for the listener thread sockets to connect
    time.sleep(3)

    while(msgNet[n]['TERMINATE'] == False):
        # Choose a random neighbor, compile and send it a GOSSIP message
        p = n
        while (p==n):
          p = random.randint(0,N-1)
        print("GOSSIP msg sent by {} to {}".format(n, p))
        status = {"sender_id": int(n), "receiver_id": p, "msg": True}
        s.send_string("GOSSIP", flags=zmq.SNDMORE)
        s.send_json(status)
        time.sleep(5)

        # Process can fail with a small probability
        if (random.randint(0,3)<1):
            msgNet[n]['TERMINATE'] = True;
        
    print("Terminating {} ...".format(n))
    listener_thread.join()


def responder(n, N, msgNet):
    pid = os.getpid()
    print("Listener:{}".format(n))

    context = zmq.Context()
    
    # Create subscriber sockets for each process
    sockets = [k for k in range(N)]
    for p in range(N):
        s = context.socket(zmq.SUB)
        s.connect(f"tcp://127.0.0.1:{5550 + p}")
        s.subscribe("GOSSIP")
        s.subscribe("TERMINATE")
        sockets[p] = s

    # Listening all nodes
    while not msgNet[n]['TERMINATE']:
        for p in range(N):
            s = sockets[p]
            try:
                s.RCVTIMEO = 100
                msg_type = s.recv_string()
                msg = s.recv_json()

                # Notifies the main thread to terminate
                if(msg_type == "TERMINATE" and msg['receiver_id'] == n):
                    print("TERMINATE msg received by {} from {}".format(n, msg['sender_id']))
                    break

                elif(msg_type == "GOSSIP" and msg['receiver_id'] == n):
                    print("GOSSIP msg received by {} from {}".format(n, msg['sender_id']))
            except:
                pass
            

# Test above implementation on a pair of nodes
numnodes = 2
nodes = np.arange(numnodes)
np.random.shuffle(nodes)
print("Node IDs: ", nodes)

msgNet = [dict() for k in range(numnodes)]
for k in range(numnodes):
    msgNet[k]['GOSSIP'] = False
    msgNet[k]['TERMINATE'] = False
    msgNet[k]['SOCKET'] = None

processes = []
for n in nodes:
    p = multiprocessing.Process(target=gossip, args=(n, numnodes, msgNet))
    processes.append(p)
    
# Start node processes
for p in processes:
    p.start()
 
# Join node processes
for p in processes:
    p.join()

A sample output of the above example may look like this:
```
Node IDs:  [1 0]
Listener:1
Listener:0
GOSSIP msg sent by 1 to 0
GOSSIP msg received by 0 from 1
GOSSIP msg sent by 0 to 1
GOSSIP msg received by 1 from 0
GOSSIP msg sent by 1 to 0
Terminating 0 ...
GOSSIP msg received by 0 from 1
GOSSIP msg sent by 1 to 0
GOSSIP msg sent by 1 to 0
GOSSIP msg sent by 1 to 0
GOSSIP msg sent by 1 to 0
GOSSIP msg sent by 1 to 0
Terminating 1 ...
```

## A Gossip Style Failure Detection Service

In this homework, you are asked to modify and extend the above example to implement a gossip style failure detection protocol based on the following publication: https://www.cs.cornell.edu/home/rvr/papers/GossipFD.pdf. The protocol was also covered in the lecture. The essential points are summarized below (also see Section 2 in the paper and revisit the slides of Lecture 4 -- Coordination). 

Each node maintains a list with for each known node its address and an integer which is going to be used for failure detection. We call the integer the _heartbeat counter_. Every $T_{gossip}$ seconds, each node increments its own heartbeat counter, and selects one other node at random to send its list to. Upon receipt of such a gossip message, a node merges the list in the message with its own list, and adopts the maximum heartbeat counter for each node.

Each node also maintains, for each other node in the list, the last time that its corresponding heartbeat counter has increased. If the heartbeat counter has not increased for more than $T_{fail}$ seconds, then the node is considered failed. $T_{fail}$ is selected so that the probability that anybody makes an erroneous failure detection is less than some small threshold $P_{mistake}$.

After a node is considered faulty, it cannot immediately be forgotten about. The problem is that not all nodes will detect failures at the same time, and thus a node A may receive a gossip about another node B that A has previously detected as faulty. If A had forgotten about B, it would reinstall B in its membership list, since A would think that it was seeing B’s heartbeat for the first time. A would continue to gossip this information on to other nodes, and, in effect, the faulty node B never quite disappears from the membership.

Therefore, the failure detector does not remove a node from its membership list until after $T_{cleanup}$ seconds ($T_{cleanup} \geq T_{fail}$). $T_{cleanup}$ is chosen so that the probability that a gossip is received about this node, after it has been detected as faulty, is less than some small threshold $P_{cleanup}$. We can make $P_{cleanup}$ equal to $P_{fail}$ by setting $T_{cleanup}$ to $2 \times T_{fail}$.

To see why, let B be some failed node, and consider another node A that heard B’s last heartbeat at time $t$. With probability $P_{fail}$, every other node will have heard B’s last heartbeat by $t+T_{fail}$, and so every process will fail B by time $t+2\times T_{fail}$. Thus, if we set $T_{cleanup}$ to $2\times T_{fail}$, then with probability $P_{fail}$, no failed node will ever reappear on A’s list of nodes once that node has been removed.

## 1 - Gossiping Neighborship Lists [6 points]

**Your task:** Modify the sample code above to make nodes gossip their neighborship lists to a randomly chosen neighbor.

## 2 - Receiving and Updating Neighborship Lists [6 points]

**Your task:** Extend the code to correctly update the neighborship lists maintained by every node.

## 3 - Detect Node Failures [8 points]

**Your task:** Extend the code to correctly handle node failures by maintaining $T_{fail}$ and $T_{cleanup}$ timeouts and updating the neighborbood list according to the protocol.

## 4 - How to Submit Your Solution?

Download your notebook (File --> Download --> Download .ipynb) and send per email to [saukh@tugraz.at](mailto:saukh@tugraz.at).