# Multithreading

When we execute the `python3 main.py` command the Python interpretator(CPython) compiles the main.py file into machine code. And then the OS needs to load the program into the memory (RAM) to run the program.

Once the OS load the program to memory, it moves the instructions to the CPU for execution via bus

In general the OS moves the instructions to a queue, also known as a pipline. Then CPU will execute the insructions from the pipline

**By defenition a process is an instance of a program running on a computer. And a threads is a unit of execution within a process**

Notice that if you launch a program multiple times, you will have single program bu multiple processes, each representing an instance of the program

![image](../images/run_python_program.png)

A program may have one or more processes and a processes can have one or more threads.

When a program has multiple processes, it's called **multiprocessing**. If a program has multiple threads, it's called **multithreading**


To execute multiple processes "at the same time", the OS uses a software component called a **scheduler**.

![image](../images/softeware_scheduler.png)

The scheduler is like a switch that schedules processes. The main tasks of the scheduler is to select the instructions and submit them from execution regularly

The scheduler switches between processes so quickly (around 1ms) that it creates the illusion of the computer being able to execute multiple processes simultaneously.

Today the CPU often has multiple cores and the number of cores will determine the number of processes that the CPU can execute simultaneusly.



## Single-threaded applications

In [1]:
from time import sleep, perf_counter

def task():
    print("Starting a task...")
    sleep(1)
    print("DONE")


start_time = perf_counter()

task()
task()

end_time = perf_counter()

print(f"It took {end_time - start_time} second(s) to complete")

Starting a task...
DONE
Starting a task...
DONE
It took 2.0006959089998873 second(s) to complete


![image](../images/therad_time.png)

Using with Python threading 

In [3]:
from time import sleep, perf_counter
from threading import Thread

def task():
    print("Starting a task...")
    sleep(1)
    print("Done")
    sleep(2)
    print("Another Time")

start_time = perf_counter()

t1 = Thread(target=task)
t2 = Thread(target=task)

t1.start()
t2.start()

t1.join()
t2.join()

end_time = perf_counter()

print(f"It took {end_time - start_time} second(s) to complete")

Starting a task...
Starting a task...
Done
Done
Another Time
Another Time
It took 3.001933607999945 second(s) to complete


Whent the program executes, It'll have three threads: **the main thread and two other child threads**

![image](../images/thread_example.png)

In [5]:
from time import sleep, perf_counter
from threading import Thread

def task(id):
    print(f"Starting the task {id}")
    sleep(1)
    print(f"The task {id} completed")


start_time = perf_counter()

threads = []
for i in range(1, 11):
    t = Thread(target=task, args=(i, ))

    threads.append(t)
    t.start()

for thread in threads:
    thread.join()

end_time = perf_counter()

print(f"It took {end_time - start_time} second(s) to complete")


Starting the task 1
Starting the task 2
Starting the task 3
Starting the task 4
Starting the task 5
Starting the task 6
Starting the task 7
Starting the task 8
Starting the task 9
Starting the task 10
The task 1 completed
The task 2 completed
The task 3 completed
The task 4 completed
The task 5 completed
The task 6 completed
The task 7 completed
The task 8 completed
The task 9 completed
The task 10 completed
It took 1.003576738999982 second(s) to complete


In [9]:
from time import perf_counter

def replace(filename, substr, new_substr):
    print(f"Processing the file {filename}")

    print("Opening file to read")
    with open(filename, "r") as f:
        content = f.read()

    content = content.replace(substr, new_substr)

    print("Opening file to write")
    with open(filename, "w") as f:
        f.write(content)


def main():
    filenames = [f"files/test{i}.txt" for i in range(1, 11)]

    threads =[Thread(target=replace, args=(filename, "ids", "id")) for filename in filenames]
    
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()


    
if __name__ == "__main__":
    start_time = perf_counter()

    main()

    end_time = perf_counter()

    print(f"It took {end_time - start_time} second(s) to complete")


# first create files run this command `touch test{1..11}.txt`
# and then write something inside file `for f in test{1..11}.txt; do echo -e "ids" >> $f; done`

Processing the file files/test1.txt
Opening file to read
Processing the file files/test2.txt
Opening file to read
Processing the file files/test3.txt
Opening file to read
Opening file to write
Processing the file files/test4.txt
Opening file to read
Opening file to write
Opening file to write
Processing the file files/test5.txt
Opening file to read
Opening file to write
Processing the file files/test6.txt
Opening file to read
Opening file to write
Processing the file files/test7.txt
Opening file to read
Processing the file files/test8.txt
Opening file to read
Processing the file files/test9.txt
Opening file to read
Opening file to write
Opening file to write
Opening file to write
Opening file to write
Processing the file files/test10.txt
Opening file to read
Opening file to write
It took 0.007595904000027076 second(s) to complete


## Introduction to Python Thread Class

When a Python program starts it has a thread called the main thread. Sometimes you want to offload the I/O bound tasks to a new thread to execute them concurrently


In [11]:
from threading import Thread
import urllib.error
import urllib.request

class HttpRequestThread(Thread):
    def __init__(self, url: str):
        super().__init__()
        self.url = url
    
    def run(self):
        print(f"Checking {self.url}...")

        try:
            response = urllib.request.urlopen(self.url)
            print(response.code)
        except urllib.error.HTTPError as e:
            print(e.code)
        except urllib.error.URLError as e:
            print(e.reason)
    
def main():
    urls = [
        'https://httpstat.us/200',
        'https://httpstat.us/400'
    ]

    threads = [HttpRequestThread(url) for url in urls]
    
    [t.start() for t in threads]
    [t.join() for t in threads]

if __name__ == "__main__":
    main()

Checking https://httpstat.us/200...
200


## Python Daemon Threads

Sometimes you may want to execute a task in the background. To do that you use a special kind of thread called a daemon thread

By defenition **daemon threads** are background threads. In other words daemon threads execute tasks in the background.

For example:
- Log Information to a file in the background
- Scrap contents from a website in the background
- Auto-save the data into database in the background


### A daemon Thread Example


In [None]:
from threading import Thread
import time

def show_timer():
    counter = 0
     
    while True:
        counter += 1
        time.sleep(1)
        print(f"Has been waiting for {counter} second(s)...")

t = Thread(target=show_timer)
t.start()
answer = input("Do you want to exit?\n")

### Thread Pools

Manually managing threads is not efficient because creating and destroying many threads frequently are very expensive in terms of computational costs

Instead of doing so, you may want to reuse the threads if you expect to runmany ad-hoc tasks in the program A thread pool allows you to achive this

![image](../images/thread_pool_executor.svg)

![image](../images/thread_pool_executor_example.png)

In [15]:
from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor

def task(id):
    print(f"Starting the task: {id}")
    sleep(1)
    return f"Done with task {id}"

start = perf_counter()

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(task, 1)
    f2 = executor.submit(task, 2)

    print(f1.result())
    print(f2.result())

end = perf_counter()

print(f"It took {end-start} second(s) to finish.")

Starting the task: 1
Starting the task: 2
Done with task 1
Done with task 2
It took 1.0015631039996151 second(s) to finish.


In [16]:
from time import sleep, perf_counter
from concurrent.futures import ThreadPoolExecutor


def task(id):
    print(f'Starting the task {id}...')
    sleep(1)
    return f'Done with task {id}'

start = perf_counter()

with ThreadPoolExecutor() as executor:
    results = executor.map(task, [1,2])
    for result in results:
        print(result)

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

Starting the task 1...
Starting the task 2...
Done with task 1
Done with task 2
It took 1.0015038800002003 second(s) to finish.


# Thread Synchronization

A race condition occurs when two or more threads try to access a shared variable simultaneously,leading to unpredictable outcomes.

The final value of shared variable depends on which thread completes its update last. Whatever thread that changes the valuue last will win the race

**Race Condition example**

In [2]:
from threading import Thread
from time import sleep

counter = 0

def increase(by):
    global counter

    local_counter = counter
    local_counter += by

    sleep(1)

    counter = local_counter

    print(f"Counter={counter}")

t1 = Thread(target=increase, args=(10, ))
t2 = Thread(target=increase, args=(20, ))

t1.start()
t2.start()

t1.join()
t2.join()

print(f"Final counter is {counter}")

Counter=10
Counter=20
Final counter is 20


### Using a threading lock to prevent the race condition

To prevent race condition you can use a threading lock

A **threading lock** is a synchronization primative that provides exclusive access to a shared resource in a multithreaded application. A thread lock aslo known as a mutex which is short for mutal exclusion


In [3]:
from threading import Thread, Lock
from time import sleep

counter = 0

def increase(by, lock: Lock):
    global counter

    lock.acquire()

    local_counter = counter
    local_counter += by

    counter = local_counter

    print(f"Counter{counter}")

    lock.release()

lock = Lock()

t1 = Thread(target=increase, args=(10, lock))
t2 = Thread(target=increase, args=(20, lock))

t1.start()
t2.start()

t1.join()
t2.join()

print(f"The final counter is {counter}")

Counter10
Counter30
The final counter is 30


In [4]:
from threading import Thread, Lock
from time import sleep


counter = 0

def increase(by, lock):
    global counter

    with lock:
        local_counter = counter
        local_counter += by

        sleep(0.1)

        counter = local_counter
        print(f'counter={counter}')


lock = Lock()

# create threads
t1 = Thread(target=increase, args=(10, lock))
t2 = Thread(target=increase, args=(20, lock))

# start the threads
t1.start()
t2.start()


# wait for the threads to complete
t1.join()
t2.join()


print(f'The final counter is {counter}')

counter=10
counter=30
The final counter is 30


### Event

Sometimes you need to communicate between the threads. To do that you can use a lock(mutex) and a boolean variable

However Python provides you with a better way to communicate between threads using the `Event` class from the threading module 

The `Event` class offers a simple but effective way to coordinate between threads: one thread signals an event while other threads wait for it.


In [3]:
from threading import Thread, Event
from time import sleep

def task(event: Event, id: int) -> None:
    print(f"Thread {id} started. Waiting for the signal...")
    event.wait()
    print(f"Received Signal. The thread {id} was completed")


def main():
    event = Event()

    t1 = Thread(target=task, args=(event, 1))
    t2 = Thread(target=task, args=(event, 2))

    t1.start()
    t2.start()

    print("Blocking the main thread for 3 seconds...")
    sleep(3)
    event.set()

if __name__ == "__main__":
    main()


Thread 1 started. Waiting for the signal...
Thread 2 started. Waiting for the signal...
Blocking the main thread for 3 seconds...
Received Signal. The thread 2 was completed
Received Signal. The thread 1 was completed


In [3]:
from threading import Thread, Event
from urllib import request

def download_file(url, event):
    print(f"Downloading the file {url}...")
    
    filename, _ = request.urlretrieve(url, "rfc793.txt")


    event.set()

def process_file(event):
    print("Waiting for the file to be downloaded...")
    event.wait()

    print("File download completed. Start processing...")

    word_count = 0

    with open("rfc793.txt", "r") as f:
        for line in f:
            word_count += len(line.split())

    print(f"Number of words in the file: {word_count}")


def main():
    event = Event()

    download_thread = Thread(target=download_file, args=("https://tools.ietf.org/rfc/rfc793.txt", event))
    process_thread = Thread(target=process_file, args=(event, ))

    download_thread.start()
    process_thread.start()

    download_thread.join()
    process_thread.join()

    print("Main thread completed")

if __name__ == "__main__":
    main()


Downloading the file https://tools.ietf.org/rfc/rfc793.txt...
Waiting for the file to be downloaded...
File download completed. Start processing...
Number of words in the file: 21369
Main thread completed


### Stop Threads

To stop thread you use the Event class of the threading module.

In the `Event` class, the `set()` method sets the internal flag to `True` while the `clear()` method resets the flag to `False`. Also the `is_set()` method returns `True` if the internal flag is set to True

![image](../images/stop_thread.png)


In [4]:
from threading import Thread, Event
from time import sleep

def task(event: Event):
    for i in range(6):
        print(f"Running #{i+1}")
        sleep(1)
        if event.is_set():
            print("The Thread was stopped permaturely")
            break
    else:
        print("The thread was stopped maturely.")
    

def main():
    event = Event()

    thread = Thread(target=task, args=(event, ))
    thread.start()

    sleep(3)

    event.set()


if __name__ == "__main__":
    main()

Running #1
Running #2
Running #3
Running #4


The Thread was stopped permaturely


## Semaphore

A Python semaphore is a synchronization primitive that allows you to control access to a shared resource. Basically, a semaphore is a counter associated with a lock that limits the number of threads that can access a shared resource simultaneously

A Semaphore helps prevent thread synchronization issues like race conditions, where multiple threads attempt to access the resource at the same tiem and interfere the semapthos checks the count

In [5]:
from threading import Semaphore, Thread
from urllib import request

MAX_CONCURRENT_DOWNLOADS = 3
semaphore = Semaphore(MAX_CONCURRENT_DOWNLOADS)

def download(url):
    with semaphore:
        print(f"Downloading {url}...")

        response = request.urlopen(url)
        data = response.read()

        print(f'Finished downloading {url}')
        
        return data
    
def main():
    urls = [
        'https://www.ietf.org/rfc/rfc791.txt',
        'https://www.ietf.org/rfc/rfc792.txt',
        'https://www.ietf.org/rfc/rfc793.txt',
        'https://www.ietf.org/rfc/rfc794.txt',
        'https://www.ietf.org/rfc/rfc795.txt',
    ]

    # Create threads for each download
    threads = []
    for url in urls:
        thread = Thread(target=download, args=(url,))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete
    for thread in threads:
        thread.join()


if __name__ == '__main__':
    main()

Downloading https://www.ietf.org/rfc/rfc791.txt...
Downloading https://www.ietf.org/rfc/rfc792.txt...
Downloading https://www.ietf.org/rfc/rfc793.txt...
Finished downloading https://www.ietf.org/rfc/rfc793.txt
Downloading https://www.ietf.org/rfc/rfc794.txt...
Finished downloading https://www.ietf.org/rfc/rfc792.txt
Downloading https://www.ietf.org/rfc/rfc795.txt...
Finished downloading https://www.ietf.org/rfc/rfc791.txt
Finished downloading https://www.ietf.org/rfc/rfc795.txt
Finished downloading https://www.ietf.org/rfc/rfc794.txt


## Queue

The built-in queue module allows you to exchange data safely between multiple threads.

In [7]:
import time
from queue import Empty, Queue
from threading import Thread

def producer(queue: Queue):
    for i in range(1, 6):
        print(f"Inserting itme {i} into the queue")
        time.sleep(1)
        queue.put(i)


def consumer(queue: Queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f"Processing item {item}")
            time.sleep(2)
            queue.task_done()


def main():
    queue = Queue()

    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.start()

    consumer_thread = Thread(target=consumer, args=(queue,), daemon=True)

    consumer_thread.start()

    producer_thread.join()

    queue.join()


main()

Inserting itme 1 into the queue
Inserting itme 2 into the queueProcessing item 1

Inserting itme 3 into the queue
Processing item 2Inserting itme 4 into the queue

Inserting itme 5 into the queue
Processing item 3
Processing item 4
Processing item 5


# Multiprocessing

![image](../images/difference_multiprocessing.png)