# Multithreading

In [None]:

import threading
import os
import queue
from tqdm import tqdm
from functools import partial
from pytube import YouTube
import time
from threading import Lock
import concurrent.futures


### Parallelism Concept

- Sequential vs Parallel

In [None]:
def func1():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        time.sleep(0.1)

def func2():
    # Count a~e
    for i in range(ord("a"), ord("f")):
        print(f"Alphabet: {chr(i)}")
        time.sleep(0.1)

print("-"*10+"Sequential execution"+"-"*10)
seq = time.time()
func1()
func2()
print("execution time :", time.time() - seq)


print("-"*10+"Parallel execution"+"-"*10)
par = time.time()
thread1 = threading.Thread(target=func1)
thread2 = threading.Thread(target=func2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("execution time :", time.time() - par)

- Accessing shared area

In [None]:
import multiprocessing

def func1(lst):
    lst.append(1)

def func2(lst):
    lst.append(2)

# Different threads are able to access on list address
print("-"*10+"Multi-threading"+"-"*10)
list1 = []
thread1 = threading.Thread(target=func1, args=(list1,))
thread2 = threading.Thread(target=func2, args=(list1,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Multi-threading result:{list1}")

# Different processes can't.
print("-"*10+"Multi-processing"+"-"*10)
list2 = []
process1 = multiprocessing.Process(target=func1, args=(list2,))
process2 = multiprocessing.Process(target=func2, args=(list2,))
process1.start()
process2.start()
process1.join()
process2.join()
print(f"Multi-processing result:{list2}")

### Synchronization problem
To test synchronization problem, run following 2 cells repeatedly.
> Expected result: 500000

* Using unsafe variable: would show irregular result
* Using thread-safe variable: would show expected result

In [None]:
shared_variable = 0

def increment_shared_variable():
    global shared_variable
    for _ in range(100000):
        shared_variable += 1

print("-"*10+"Using unsafe variable"+"-"*10)

threads = []
for _ in range(5):
    t = threading.Thread(target=increment_shared_variable)
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

print("Final result:", shared_variable)

In [None]:
shared_variable = 0

# Thread-safe
lock = threading.Lock()

def increment_shared_variable():
    global shared_variable
    for _ in range(100000):
        # Mutex lock
        lock.acquire()
        shared_variable += 1
        # Mutex release
        lock.release()

print("-"*10+"Using thread-safe variable"+"-"*10)

threads = []
for _ in range(5):
    t = threading.Thread(target=increment_shared_variable)
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

print("Final result:", shared_variable)

### ThreadPool

In [None]:
shared_variable = 0
lock = threading.Lock()

def increment_shared_variable():
    global shared_variable
    for _ in range(100000):
        lock.acquire()
        shared_variable += 1
        lock.release()

print("-" * 10 + "Using ThreadPool" + "-" * 10)

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(increment_shared_variable) for _ in range(5)]

# Wait for all threads to finish
concurrent.futures.wait(futures)

print("Final result:", shared_variable)


### `pytube` `tqdm` example - using variable typed Queue

- UI function

In [None]:
def draw_ui(var):
    print("UI thread starting ... PID:{}".format(os.getpid()))
    prev = 0
    tqdm_bar = None
    while True:
        message = var.get()
        if message["type"] == "on_progress":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            cur_rate = message["progress_rate"]
            tqdm_bar.update(int(cur_rate-prev))
            prev = int(cur_rate)
        elif message["type"] == "on_complete":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            tqdm_bar.update(100-prev)
            tqdm_bar.close()
            break

- Downloading function

In [None]:
def on_progress(stream, chunk, bytes_remaining, var):
    total_size = stream.filesize
    bytes_downloaded = total_size - bytes_remaining
    progress = (bytes_downloaded / total_size) * 100
    var.put({"type":"on_progress", "progress_rate":progress})

def on_complete(stream, file_handle, var):
    var.put({"type":"on_complete"})

def download(url, var):
    print("Download thread starting ... PID:{}".format(os.getpid()))
    on_progress_with_Q = partial(on_progress, var=var)
    on_complete_with_Q = partial(on_complete, var=var)
    youtube_clip = YouTube(
                        url,
                        on_progress_callback=on_progress_with_Q,
                        on_complete_callback=on_complete_with_Q)
    youtube_stream = youtube_clip.streams.filter(
                        adaptive=True, 
                        file_extension='mp4').first()
    youtube_stream.download("multithreading")

- Multithreading

In [None]:
# Charlie Puth - One Call Away [Official Video]
url = "https://www.youtube.com/watch?v=BxuY9FET9Y4"

print("main process running ... PID:{}".format(os.getpid()))

shared_var = queue.Queue()

thread1 = threading.Thread(target=draw_ui, args=(shared_var,))
thread2 = threading.Thread(target=download, args=(url, shared_var,))

thread1.start()
thread2.start()

thread1.join()
shared_var.put(None)
thread2.join()

### `pytube` `tqdm` example - using variable typed thread-safe list

In [None]:
class ThreadSafeList:
    def __init__(self):
        self._list = []
        self._lock = threading.Lock()

    def append(self, item):
        with self._lock:
            self._list.append(item)

    def pop(self, index=-1):
        with self._lock:
            return self._list.pop(index)

    def __len__(self):
        with self._lock:
            return len(self._list)

- UI function

In [None]:
def draw_ui(shared_queue):
    print("UI thread starting ... PID:{}".format(os.getpid()), flush=True)
    prev = 0
    tqdm_bar = None
    while True:
        if len(shared_queue)>0:
            message = shared_queue.pop()
            if message["type"] == "on_progress":
                if tqdm_bar is None:
                    tqdm_bar = tqdm(total=100, desc="Downloading...")
                cur_rate = message["progress_rate"]
                tqdm_bar.update(int(cur_rate-prev))
                prev = int(cur_rate)
            elif message["type"] == "on_complete":
                if tqdm_bar is None:
                    tqdm_bar = tqdm(total=100, desc="Downloading...")
                tqdm_bar.update(100-prev)
                tqdm_bar.close()
                break

- Downloading function

In [None]:
def on_progress(stream, chunk, bytes_remaining, shared_queue):
    total_size = stream.filesize
    bytes_downloaded = total_size - bytes_remaining
    progress = (bytes_downloaded / total_size) * 100
    shared_queue.append({"type":"on_progress", "progress_rate":progress})

def on_complete(stream, file_handle, shared_queue):
    shared_queue.append({"type":"on_complete"})

def download(url, shared_queue):
    print("Download thread starting ... PID:{}".format(os.getpid()), flush=True)
    on_progress_with_Q = partial(on_progress, shared_queue=shared_queue)
    on_complete_with_Q = partial(on_complete, shared_queue=shared_queue)
    youtube_clip = YouTube(
                        url,
                        on_progress_callback=on_progress_with_Q,
                        on_complete_callback=on_complete_with_Q)
    youtube_stream = youtube_clip.streams.filter(
                        adaptive=True, 
                        file_extension='mp4').first()
    youtube_stream.download("multithreading")

- Multithreading

In [None]:
# Charlie Puth - One Call Away [Official Video]
url = "https://www.youtube.com/watch?v=BxuY9FET9Y4"

print("main process running ... PID:{}".format(os.getpid()), flush=True)

shared_queue = ThreadSafeList()

thread1 = threading.Thread(target=draw_ui, args=(shared_queue,))
thread2 = threading.Thread(target=download, args=(url, shared_queue,))

thread1.start()
thread2.start()

thread1.join()
thread2.join()
shared_queue = None