# Multiprocessing

Reference: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

In [None]:
import multiprocessing
from tqdm import tqdm
from functools import partial
import os
from pytube import YouTube
import psutil
import time
import subprocess

### Parallelism Concept

In [None]:
def func1():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        time.sleep(0.1)

def func2():
    # Count a~e
    for i in range(ord("a"), ord("f")):
        print(f"Alphabet: {chr(i)}")
        time.sleep(0.1)

print("-"*10+"Sequential execution"+"-"*10)
seq = time.time()
func1()
func2()
print("execution time :", time.time() - seq)


print("-"*10+"Parallel execution"+"-"*10)
par = time.time()
process1 = multiprocessing.Process(target=func1)
process2 = multiprocessing.Process(target=func2)
process1.start()
process2.start()
process1.join()
process2.join()
print("execution time :", time.time() - par)

### Process methods

- **start()**: start the process's activity  
- **terminate()**: terminate(Linux: `SIGTERM`/ Windows: `TerminateProcess()`) the process  
- **join()**: Wait until the process gets terminated
- join([_timeout_]): Wait at most _timeout_ seconds for the process to be terminated   

In [None]:
# terminate() example

def func1():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        time.sleep(0.1)

process1 = multiprocessing.Process(target=func1)
print("Process starting...")
process1.start()

time.sleep(0.3)

# Force kill process
print("Process terminating...")
process1.terminate()

In [None]:
# join() example

def func1():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        time.sleep(0.1)
    print("Process terminating...", flush=True)

process1 = multiprocessing.Process(target=func1)
print("Process starting...")
process1.start()

# Wait until process terminates
print("Waiting for process to be terminated...")
process1.join()

### IPC practice: Popen

In [None]:
print("Parent process running ... PID:{}".format(os.getpid()), flush=True)

ps_process = subprocess.Popen(['python', 'subprocess_practice.py'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ps_process.communicate()

print(stdout.decode())

### `pytube` `tqdm` example

- UI function

In [None]:
def draw_ui(message_queue):
    print("UI process starting ... PID:{}, PPID:{}".format(os.getpid(), psutil.Process(os.getpid()).ppid()), flush=True)
    prev = 0
    tqdm_bar = None
    while True:
        message = message_queue.get()
        if message["type"] == "on_progress":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            cur_rate = message["progress_rate"]
            tqdm_bar.update(int(cur_rate-prev))
            prev = int(cur_rate)
        elif message["type"] == "on_complete":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            tqdm_bar.update(100-prev)
            tqdm_bar.close()
            break

- Downloading function

In [None]:
def on_progress(stream, chunk, bytes_remaining, message_queue):
    total_size = stream.filesize
    bytes_downloaded = total_size - bytes_remaining
    progress = (bytes_downloaded / total_size) * 100
    message_queue.put({"type":"on_progress", "progress_rate":progress})

def on_complete(stream, file_handle, message_queue):
    message_queue.put({"type":"on_complete"})

def download(url, message_queue):
    print("Download process starting ... PID:{}, PPID:{}".format(os.getpid(), psutil.Process(os.getpid()).ppid()), flush=True)
    on_progress_with_MQ = partial(on_progress, message_queue=message_queue)
    on_complete_with_MQ = partial(on_complete, message_queue=message_queue)
    youtube_clip = YouTube(
                        url,
                        on_progress_callback=on_progress_with_MQ,
                        on_complete_callback=on_complete_with_MQ)
    youtube_stream = youtube_clip.streams.filter(
                        adaptive=True, 
                        file_extension='mp4').first()
    youtube_stream.download("multiprocessing")

- Multiprocessing

In [None]:
# Charlie Puth - One Call Away [Official Video]
url = "https://www.youtube.com/watch?v=BxuY9FET9Y4"

print("main process running ... PID:{}".format(os.getpid()), flush=True)

message_queue = multiprocessing.Queue()

process1 = multiprocessing.Process(target=draw_ui, args=(message_queue,))
process2 = multiprocessing.Process(target=download, args=(url, message_queue,))

process1.start()
process2.start()

process1.join()
process2.join()