# Multiprocessing

Reference: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

In [7]:
import multiprocessing
from tqdm import tqdm
from functools import partial
import os
import sys
from pytubefix import YouTube
import psutil
import time
import subprocess

### Parallelism Concept

In [8]:
def say_numbers():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        #sys.stdout.write(f"Number: #{i}\n")
        #sys.stdout.flush()
        time.sleep(0.1)

def say_alphabet():
    # Count a~e
    for i in range(ord("a"), ord("f")):
        print(f"Alphabet: {chr(i)}")
        #sys.stdout.write(f"Alphabet: {chr(i)}\n")
        #sys.stdout.flush()
        time.sleep(0.1)

In [9]:
print("-"*10+"Sequential execution"+"-"*10)
seq = time.time()
say_numbers()
say_alphabet()
print("execution time :", time.time() - seq)

----------Sequential execution----------
Number: #1
Number: #2
Number: #3
Number: #4
Number: #5
Alphabet: a
Alphabet: b
Alphabet: c
Alphabet: d
Alphabet: e
execution time : 1.0030419826507568


In [12]:
print("-"*10+"Parallel execution"+"-"*10)
par = time.time()   # 러닝타임 체크 시작
p1 = multiprocessing.Process(target=say_numbers)
p2 = multiprocessing.Process(target=say_alphabet)
p1.start()
p2.start()

# join으로 대기하지 않으면 부모 process가 종료되어 자식이 zombie가 된다.
p1.join()
p2.join()

print("execution time :", time.time() - par)    # 러닝타임 체크 끝

----------Parallel execution----------
Number: #1
Alphabet: a
Number: #2Alphabet: b

Number: #3Alphabet: c

Number: #4Alphabet: d

Alphabet: eNumber: #5

execution time : 0.5221431255340576


### Process control methods

- **start()**: Sub process를 실행시킵니다.
- **terminate()**: Process에게 SIGTERM을 보냅니다.
- **join()**: Process가 종료될 때까지 기다립니다.
- join([_timeout_]): 지정된 시간(초 단위)까지 기다립니다.

In [15]:
# terminate() example
# 아래의 코드는 좀비 프로세스를 생성시킵니다.
# `watch -n1 "ps aux|grep defunct"` 명령어로 관찰 할 수 있습니다.

def child_process_func():
    print("Child process counting started")
    # Count 1~5
    for i in range(10):
        print(f"Number: #{i}")
        time.sleep(0.5)
    # print on normal termination only.
    print("Normal termination")


p1 = multiprocessing.Process(target=child_process_func)
print("Process child process")
p1.start()
time.sleep(1)

# Force kill process
p1.terminate()

Process child process
Child process counting started
Number: #0
Number: #1


In [16]:
# join()을 이용해서 좀비 프로세스가 발생하는 것을 방지 할 수 있습니다.

def say_numbers():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        time.sleep(0.1)
    print("Process terminating...", flush=True)

p1 = multiprocessing.Process(target=say_numbers)
print("Process starting...")
p1.start()

# Wait until process terminates
print("Waiting for process to be terminated...")
p1.join()

Process starting...
Number: #1
Number: #2
Waiting for process to be terminated...
Number: #3
Number: #4
Number: #5
Process terminating...


### IPC practice: Popen

In [17]:
print("Current process ID:{}".format(os.getpid()), flush=True)

ps_process = subprocess.Popen(['bash', 'subprocess.sh'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ps_process.communicate()

print(stdout.decode())

Current process ID:10510
Subprocess PID: 11275, PPID: 10510
total 3892
drwxrwxr-x  3 chmin chmin    4096 10월  2 11:07 Arduino
-rw-rw-r--  1 chmin chmin 3906384 10월  2 13:10 D15-C1-4-아두이노실습.pdf
drwxr-xr-x  5 chmin chmin    4096 10월  2 11:07 Desktop
drwxr-xr-x  3 chmin chmin    4096  9월 23 10:56 Documents
drwxr-xr-x  2 chmin chmin    4096 10월  2 13:38 Downloads
drwxrwxr-x  2 chmin chmin    4096  9월 27 09:07 draw.io
drwxrwxr-x  2 chmin chmin    4096  9월 25 11:19 intel
drwxrwxr-x  6 chmin chmin    4096  9월 30 14:48 intel-05
drwxr-xr-x  2 chmin chmin    4096  9월  6 15:00 Music
drwxr-xr-x  3 chmin chmin    4096  9월  6 16:32 Pictures
drwxrwxr-x 11 chmin chmin    4096  9월 27 09:55 portfolio
drwxr-xr-x  2 chmin chmin    4096  9월  6 15:00 Public
drwxrwxr-x  2 chmin chmin    4096  9월  6 16:00 python_test
drwxrwxr-x  4 chmin chmin    4096  9월 30 14:27 Signal-Project
drwx------  5 chmin chmin    4096  9월  6 16:07 snap
drwxr-xr-x  2 chmin chmin    4096  9월  6 15:00 Templates
drwxrwxr-x  8 chmin chmi

### `pytube` `tqdm` example

- UI와 다운로드가 message_queue로 통신

- UI Process

In [32]:
# UI 프로세스
def draw_ui(message_queue):
    print(
        "UI process starting ... PID:{}, PPID:{}"
        .format(
            os.getpid(),
            psutil.Process(os.getpid()).ppid()), flush=True)
    prev = 0
    tqdm_bar = None
    while True:
        message = message_queue.get()
        if message["type"] == "on_progress":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            cur_rate = message["progress_rate"]
            tqdm_bar.update(int(cur_rate-prev))
            prev = int(cur_rate)
        elif message["type"] == "on_complete":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            tqdm_bar.update(100-prev)
            tqdm_bar.close()
            print("Completed!")
            break

- Download Process

In [27]:
# 다운로드 프로세스
def on_progress(stream, chunk, bytes_remaining, message_queue):
    total_size = stream.filesize
    bytes_downloaded = total_size - bytes_remaining
    progress = (bytes_downloaded / total_size) * 100
    message_queue.put({"type":"on_progress", "progress_rate":progress})

def on_complete(stream, file_handle, message_queue):
    message_queue.put({"type":"on_complete"})

def download(url, message_queue):
    print(
        "Download process starting ... PID:{}, PPID:{}"
        .format(
            os.getpid(),
            psutil.Process(os.getpid()).ppid()), flush=True)
    on_progress_with_MQ = partial(on_progress, message_queue=message_queue)
    on_complete_with_MQ = partial(on_complete, message_queue=message_queue)
    youtube_clip = YouTube(
                        url,
                        on_progress_callback=on_progress_with_MQ,
                        on_complete_callback=on_complete_with_MQ)
    youtube_stream = youtube_clip.streams.get_highest_resolution()
    youtube_stream.download("videos")

- Multiprocessing

In [33]:
# 미국은 어떻게 강대국이 되었나
url = "https://www.youtube.com/embed/S0NsxGrMg3Q?si=22Ob1x1I4VC3-w2Y"

print("main process running ... PID:{}".format(os.getpid()), flush=True)

message_queue = multiprocessing.Queue()

p1 = multiprocessing.Process(target=draw_ui, args=(message_queue,))
p2 = multiprocessing.Process(target=download, args=(url, message_queue,))

p1.start()
p2.start()

p1.join()
p2.join()

main process running ... PID:10510
UI process starting ... PID:11694, PPID:10510
Download process starting ... PID:11695, PPID:10510


Process Process-20:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_10510/2620347243.py", line 23, in download
    youtube_stream = youtube_clip.streams.get_highest_resolution()
  File "/home/chmin/intel-05/class02/smart-factory/.sf_env/lib/python3.10/site-packages/pytube/__main__.py", line 296, in streams
    return StreamQuery(self.fmt_streams)
  File "/home/chmin/intel-05/class02/smart-factory/.sf_env/lib/python3.10/site-packages/pytube/__main__.py", line 176, in fmt_streams
    stream_manifest = extract.apply_descrambler(self.streaming_data)
  File "/home/chmin/intel-05/class02/smart-factory/.sf_env/lib/python3.10/site-packages/pytube/__main__.py", line 160, in streaming_data
    self.bypass_age_gate()
  File "/home/chmin/intel-05/class02/smart-factory/.sf_env

KeyboardInterrupt: 