# COMP3221 - Distributed Systems

## Tutorial 01 - Multithreading

**Objectives:**

* To develop basic **Python 3** programming skills.
* To become familiar with common Python libraries.
* To understand the basics of Multithreading in Python programming language.

**Instructions:**
* Exercises tend to be completed by **Python 3** programming language.
* You can either use IPython notebook or just python files to finish the exercises.
* To install IPython notebooks and Python 3, you can:
   * Use IPython 3 (Jupyter) notebook installed on your computer: http://jupyter.org/install (you need to have Python installed first: https://docs.python.org/3/using/index.html )
   * Use Web-based IPython notebooks such as Google Colaboratory: https://colab.research.google.com/   
* Using the IPython notebook:
  * If you are using Jupyter intalled on your computer, Go to *File -> Open*. Drag and drop the "*Tutorial 01 - Multithreading.ipynb*" file to the home interface and click upload.
  * If you are using Google Colaboratory, Click *File -> Upload notebook*, and upload the "*Tutorial 01 - Multithreading.ipynb*" file.
  * To run the code cells you can press *Ctrl-Enter* or hit the *Play* button at the top.
* Using python files:
  * We recommend using Visual Studio Code as IDE: https://code.visualstudio.com/
  
* Please complete all exercises marked with **TODO**.
* Make sure to save your files when you are done with the exercises, so you can show your tutor next week.

## 1. Exercise 1: Python threads

In this exercise, we will cover the basics of multithreading in Python programming language.

### What is Thread?

$\rightarrow$ The smallest processing unit that can be performed in an Operating System

$\rightarrow$ The entity within a process that can be scheduled for execution.

<img src="https://miro.medium.com/v2/resize:fit:1400/1*hZ3guTdmDMXevFiT5Z3VrA.png" width="1000">

***Multithreading*** refers to the ability of a processor to execute multiple threads *concurrently*,

$\rightarrow$ Each thread runs a sequence of instructions within a program that can be executed independently of other code.

$\rightarrow$ Share the same code, data, and files but run on a different register and stack.

###  Use Case: Assigment 1

In Asg. 1, each node in the network will need to perform multiple tasks concurrently:

1. Sending and Receiving the update packets
2. Finding the Least-Cost Paths
3. Dealing with Link Cost Changes and Node Failures


$\rightarrow$ Multithreading comes to rescue!

<img src="https://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/images/Chapter4/4_02_MultithreadedArchitecture.jpg" width="600">

### How to create a Thread in Python

<img src="https://miro.medium.com/v2/resize:fit:531/0*XmVZKEErfDTV8wqC.jpg" width="400">

### Example

#### Import Libraries

In [20]:
import threading
import os

#### Define Tasks

In [21]:
# For thread 1
def task1():
    print("Task 1 assigned to thread: {}".format(threading.current_thread().name))
    print("ID of process running task 1: {}".format(os.getpid()))
 
def task2():
    print("Task 2 assigned to thread: {}".format(threading.current_thread().name))
    print("ID of process running task 2: {}".format(os.getpid()))
 

#### Start Threads

To create a new thread, we create an object of the Thread class. It typically takes the `target`, `name`, and `args` as the parameters. 
- `target` is the function to be executed by the thread, 
- `name` is the name of the thread
- `args` is the arguments to be passed to the target function

In [22]:
t1 = threading.Thread(target=task1, name='t1')
t2 = threading.Thread(target=task2, name='t2')

# To start a thread, we use the start() method of the Thread class.
t1.start()
t2.start()


# End the thread Execution
t1.join()
t2.join()
 
print("Done!")

Task 1 assigned to thread: t1
ID of process running task 1: 19460
Task 2 assigned to thread: t2
ID of process running task 2: 19460
Done!


<img src="https://media.geeksforgeeks.org/wp-content/uploads/20230824111950/multithreading-python-5.png" width="300">

### 1.1 Create a thread from scratch

We will implement a new **MyThread** class based on Python standard library *threading* which has following methods: **init, run, stop**.

To finish this exercise, all **TODO** needs to be implemented.

You can finish the code directly on this notebook or in python file "*Mythread.py*".

In [14]:
# Import threading and time libraries
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self._stop = threading.Event()
        self.threadID = threadID
        self.name = name
    def run(self):
        
        # TODO: this function is run when we start the thread
        # This function keeps running until it gets a stop flag.
        # It function will call print_function() to print out the current time.
        
    def stop(self):
        self._stop.set()

    def stopped(self):
        return self._stop.isSet()

# Task
def print_function():
    print("Hello world!!!!",time.ctime(time.time()))

In [25]:
# Import threading and time libraries
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self._stop = threading.Event()
        self.threadID = threadID
        self.name = name
    def run(self):
        
        # TODO: this function is run when we start the thread
        # This function keeps running until it gets a stop flag.
        # It function will call print_function() to print out the current time.

        print(f"Starting {self.name} with ID {self.threadID}")
        while True:
            if self.stopped():
                print(f"{self.name} + is killed")
                return
            time.sleep(1)
            print_function()

        print("Exiting " + self.name, time.ctime(time.time()))

    def stop(self):
        self._stop.set()

    def stopped(self):
        return self._stop.isSet()

def print_function():
    print("Hello world!!!!",time.ctime(time.time()))

Create a MyThread object, and then using **start** method to start a thread

In [26]:
# Create a MyThread object, and then using start() method to start a thread
thread1 = MyThread(1, "Thread-1")
thread1.start()

time.sleep(10)
# Stop the thread
thread1.stop()

Starting Thread-1 with ID 1


  return self._stop.isSet()


Hello world!!!! Wed Feb 28 14:54:06 2024
Hello world!!!! Wed Feb 28 14:54:07 2024
Hello world!!!! Wed Feb 28 14:54:08 2024
Hello world!!!! Wed Feb 28 14:54:09 2024
Hello world!!!! Wed Feb 28 14:54:10 2024
Hello world!!!! Wed Feb 28 14:54:11 2024
Hello world!!!! Wed Feb 28 14:54:12 2024
Hello world!!!! Wed Feb 28 14:54:13 2024
Hello world!!!! Wed Feb 28 14:54:14 2024


Hello world!!!! Wed Feb 28 14:54:15 2024
Thread-1 + is killed


If you use python files, using the following command to execute the code: **python MyThread.py**

Stop the thread

In [None]:
thread1.stop()

##### Solution

### 1.2 Create a thread using an existing library : "thread"

We can also use *thread* module to spawn threads. Let try some code and observe the results.

In [24]:
import _thread

# Define a function for the thread
def print_time(threadName, delay):
    count = 0
    while count < 5:
        time.sleep(delay)
        count += 1
        print ("Hello world!!!!", "%s: %s" % ( threadName, time.ctime(time.time())))

# Create two threads as follows
try:
    _thread.start_new_thread( print_time, ("Thread-1", 2, ) )
    _thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
    print("Error: unable to start thread")

while 1:
   pass

Hello world!!!! Thread-1: Wed Feb 28 14:30:21 2024
Hello world!!!! Thread-2: Wed Feb 28 14:30:23 2024
Hello world!!!! Thread-1: Wed Feb 28 14:30:23 2024
Hello world!!!! Thread-1: Wed Feb 28 14:30:25 2024
Hello world!!!! Thread-2: Wed Feb 28 14:30:27 2024
Hello world!!!! Thread-1: Wed Feb 28 14:30:27 2024
Hello world!!!! Thread-1: Wed Feb 28 14:30:29 2024
Hello world!!!! Thread-2: Wed Feb 28 14:30:31 2024
Hello world!!!! Thread-2: Wed Feb 28 14:30:35 2024
Hello world!!!! Thread-2: Wed Feb 28 14:30:39 2024


KeyboardInterrupt: 

Program goes in an infinite loop. You will have to press *Ctrl-C* or hit *Stop* button to stop

## 2. Exercise 2: Communication between producer and consumer

Now you know how to spawn threads. Next, we will implement two communicating threads, a producer and a consumer. They communicate by enqueuing and dequeuing messages from a communication channel. The producer and consumer access the same MessageQueue concurrently.

<img src="https://miro.medium.com/v2/resize:fit:1400/1*5u_fpJ1KlSbt7KxrUNVjXg.png" width="500">

> Producer and Consumer share a common buffer or queue. The producer continuously produces certain data and pushes it onto the buffer, whereas the consumer consumes those data from the buffer.

The code that spawns the threads is indicated below. It is your turn to complete **TODO** and make the code run correctly.

**Consumer thread**

In [None]:
import threading
import time
import queue

class Consumer(threading.Thread):
    def __init__(self, mess_queue, processing_time, name):
        self._stop = threading.Event()
        threading.Thread.__init__(self)
        self.q = mess_queue
        self.processing_time = processing_time # ThreadID is set to be processing rate
        self.name = name

    def stop(self):
        self._stop.set()

    def stopped(self):
        return self._stop.is_set()

    def run(self):
        while(1):
            if self.stopped():
                print(self.name +  " is killed")
                return
            try:
                time.sleep(self.processing_time)
            except:
                print("Already interrupted")
                
            # TODO: get message from the queue and printout the message


**Producer thread**

In [None]:
class Producer(threading.Thread):
    def __init__(self, mess_queue, processing_time, name):
        threading.Thread.__init__(self)
        self._stop = threading.Event()
        self.q = mess_queue
        self.processing_time = processing_time
        self.name = name

    def stop(self):
        self._stop.set()

    def stopped(self):
        return self._stop.is_set()

    def run(self):
        task_id = 0
        while(1):
            if self.stopped():
                print(self.name +  " is killed")
                return
            try:
                time.sleep(self.processing_time)
            except:
                print("Already interrupted")
                
            # TODO: Create a message and send it to the queue.
            # The message is the current time.

Create a queue, thread objects, and start Producer and Consumer threads

#### Solution

In [27]:
import threading
import time 
import queue

class Consumer(threading.Thread):
    def __init__(self, mess_queue, processing_time, name):
        self._stop = threading.Event() 
        threading.Thread.__init__(self)
        self.q = mess_queue
        self.processing_time = processing_time # ThreadID is set to be processing rate
        self.name = name
    
    def stop(self): 
        self._stop.set() 
  
    def stopped(self): 
        return self._stop.is_set() 
    
    def run(self):
        while(1):
            if self.stopped(): 
                print(self.name +  " is killed")
                return
            try:
                time.sleep(self.processing_time)
            except:
                print("Already interrupted")
            mess = self.q.get()
            if(mess):
                print("Consumer consumed: ", mess)

In [28]:
class Producer(threading.Thread):
    def __init__(self, mess_queue, processing_time, name):
        threading.Thread.__init__(self)
        self._stop = threading.Event() 
        self.q = mess_queue
        self.processing_time = processing_time
        self.name = name
    
    def stop(self): 
        self._stop.set() 
  
    def stopped(self): 
        return self._stop.is_set()
    
    def run(self):
        task_id = 0
        while(1):
            if self.stopped(): 
                print(self.name +  " is killed")
                return
            try:
                time.sleep(self.processing_time)
            except:
                print("Already interrupted")
            mess = f"task_id {task_id}"
            print("Producer produced: ", mess)
            self.q.put(mess)
            task_id += 1

In [29]:
q = queue.Queue()

# Instantiate producer
thProducer = Producer(q, 1, "Producer")
thProducer.start()

# Instantiate consumer
thConsumer = Consumer(q, 2, "Consumer")
thConsumer.start()

# Wait for 3 seconds and stop the producer
time.sleep(3)
thProducer.stop()
# Give 10 seconds for consumer processing all task in queue
time.sleep(10)

Producer produced:  task_id 0
Consumer consumed:  task_id 0
Producer produced:  task_id 1
Producer produced:  task_id 2
Producer is killed
Consumer consumed:  task_id 1
Consumer consumed:  task_id 2


## Acknowledgements
This tutorial was originally written for COMP3221 - Distributed Systems.