# Python - Multiprocessing and Threading

- [GTWang - Python 多執行緒 threading 模組平行化程式設計教學](https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/)
- Introducing Python: Modern Computing in Simple Packages (p.269)
- [Morvan - Threading](https://morvanzhou.github.io/tutorials/python-basic/threading/)
- [Morvan - Multiprocessing](https://morvanzhou.github.io/tutorials/python-basic/multiprocessing/)
- https://www.tutorialspoint.com/python/python_multithreading.htm
- [A Better Model for Day to Day Threading Tasks](http://chriskiehl.com/article/parallelism-in-one-line/)
- [Python多进程multiprocessing使用示例](http://outofmemory.cn/code-snippet/2267/Python-duojincheng-multiprocessing-usage-example)

## Threading

- `run()` − the entry point for a thread.
- `start()` − starts a thread by calling the run method.
- `join([time])` − waits for threads to terminate.
- `isAlive()` − checks whether a thread is still executing.
- `getName()` − returns the name of a thread.
- `setName()` − sets the name of a thread.

In [None]:
import threading
import time

In [30]:
print(f'Active count: {threading.active_count()}\n')  # 獲取已激活的線程數
print(f'All Threads:\n{threading.enumerate()}\n')  # 查看所有線程信息
print(f'Current thread:\n{threading.current_thread()}\n')  # 查看現在正在運行的線程

Active count: 5

All Threads:
[<_MainThread(MainThread, started 140736131847040)>, <Thread(Thread-2, started daemon 123145457262592)>, <Heartbeat(Thread-3, started daemon 123145462517760)>, <HistorySavingThread(IPythonHistorySavingThread, started 123145468846080)>, <ParentPollerUnix(Thread-1, started daemon 123145474101248)>]

Current thread:
<_MainThread(MainThread, started 140736131847040)>



In [3]:
from functools import wraps

def timer(orig_fun):
    import time
    import threading
    
    @wraps(orig_fun)
    def wrapper(tic, *args, **kwargs):
        print(f'{time.time() - tic:06.5f} - Start {orig_fun.__name__} at thread: {threading.current_thread()}')
        orig_fun(tic, *args, **kwargs)
        print(f'{time.time() - tic:06.5f} - Done {orig_fun.__name__}')
        
    return wrapper


@timer
def job_long(tic):
    time.sleep(5)


@timer
def job_short(tic):  
    time.sleep(2)

In [34]:
def main():
    tic = time.time()
    print('Start main thread')
    
    # 建立一個子執行緒
    td1 = threading.Thread(target=job_long, name='task1', kwargs={'tic': tic})
    td2 = threading.Thread(target=job_short, name='task2', kwargs={'tic': tic})
    
    # 執行該子執行緒
    td1.start()
    td2.start()
    
    # 主執行緒繼續執行自己的工作
    for i in range(3):
        print(f"{time.time() - tic:06.5f} - Main thread working...", i)
        time.sleep(1)

    td1.join() # 附加到主線程 Wait until thread td1 terminates its task
    td2.join() # 附加到主線程 Wait until thread td2 terminates its task
    
    print('\nAll done!')

In [35]:
if __name__ == '__main__':
    main()

Start main thread
0.00051 - Start job_long at thread: <Thread(task1, started 123145479356416)>
0.00137 - Start job_short at thread: <Thread(task2, started 123145484611584)>
0.00179 - Main thread working... 0
1.00513 - Main thread working... 1
2.00410 - Done job_short
2.00619 - Main thread working... 2
5.00270 - Done job_long

All done!


### 多個子執行緒與參數

In [6]:
import threading
import time

# 子執行緒的工作函數
def job(num):
    print("Thread", num)
    time.sleep(1)

# 建立 5 個子執行緒
threads = []
for i in range(5):
    threads.append(threading.Thread(target = job, args = (i,)))
    threads[i].start()

# 主執行緒繼續執行自己的工作
# ...

# 等待所有子執行緒結束
for i in range(5):
    threads[i].join()

print("Done.")

Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Done.


### 物件導向

`threading.Thread` 在開始執行時，會呼叫它自己的 `run` 方法函數，這個方法函數預設會呼叫前面我們以 `target` 參數所指定的函數，在這裡我們在繼承 `threading.Thread` 類別之後，就直接把 `run` 覆寫成要執行的函數即可。

In [11]:
import threading
import time

# 子執行緒類別
class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        print(f"Thread {self.num}")
        time.sleep(1)

# 建立 5 個子執行緒
threads = []
for i in range(5):
    threads.append(MyThread(i))
    threads[i].start()

# 主執行緒繼續執行自己的工作
# ...

# 等待所有子執行緒結束
for i in range(5):
    threads[i].join()

print("Done.")

Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Done.


### 佇列（Queue）

如果我們有許多的工作要分給多個 CPU 核心做運算，最簡單的方式就是使用佇列的方式，讓多個 CPU 可從佇列中取得尚未處理的工作來處理

In [14]:
import time
import threading
import queue

# Worker 類別，負責處理資料
class Worker(threading.Thread):
    def __init__(self, queue, num):
        threading.Thread.__init__(self)
        self.queue = queue
        self.num = num

    def run(self):
        while self.queue.qsize() > 0:
            # 取得新的資料
            msg = self.queue.get()

            # 處理資料
            print(f"Worker {self.num}: {msg}")
            time.sleep(1)

# 建立佇列
my_queue = queue.Queue()

# 將資料放入佇列
for i in range(10):
    my_queue.put(f"Data {i}")

# 建立兩個 Worker
my_worker1 = Worker(my_queue, 1)
my_worker2 = Worker(my_queue, 2)

# 讓 Worker 開始處理資料
my_worker1.start()
my_worker2.start()

# 等待所有 Worker 結束
my_worker1.join()
my_worker2.join()

print("Done.")

Worker 1: Data 0
Worker 2: Data 1
Worker 1: Data 2Worker 2: Data 3

Worker 2: Data 4Worker 1: Data 5

Worker 1: Data 6Worker 2: Data 7

Worker 2: Data 8Worker 1: Data 9

Done.


### 鎖定（Lock）

在平行化的多執行緒程式中，每個執行緒都是同時在執行的，若遇到不可以讓多個執行緒同時進行的工作時（例如將資料寫入同一個檔案），就必須使用鎖定（lock）的方式，一次只讓一個執行緒處理這種工作。在 Python 中，我們可以使用 threading 模組的 Lock 來處理多執行緒的鎖定問題。

當一個執行緒呼叫了 Lock 的 acquire 時，代表取得了這個 Lock 的使用權，接著它就可以往下執行裡面的工作，若此時又有另外一個執行緒想要呼叫 acquire 取得使用權的話，就必須等待上一個執行緒執行完，並呼叫 release 釋放這個 Lock 之後，才能夠取得這個 Lock 的使用權，接著執行裡面的工作。

In [17]:
import time
import threading
import queue

class Worker(threading.Thread):
    def __init__(self, queue, num, lock):
        threading.Thread.__init__(self)
        self.queue = queue
        self.num = num
        self.lock = lock

    def run(self):
        while self.queue.qsize() > 0:
            msg = self.queue.get()

            # 取得 lock
            self.lock.acquire()
            print(f"Lock acquired by Worker {self.num}")

            # 不能讓多個執行緒同時進的工作
            print(f"Worker {self.num}: {msg}")
            time.sleep(1)

            # 釋放 lock
            print(f"Lock released by Worker {self.num}")
            self.lock.release()

my_queue = queue.Queue()
for i in range(5):
    my_queue.put(f"Data {i}")

# 建立 lock
lock = threading.Lock()

my_worker1 = Worker(my_queue, 1, lock)
my_worker2 = Worker(my_queue, 2, lock)

my_worker1.start()
my_worker2.start()

my_worker1.join()
my_worker2.join()

print("Done.")

Lock acquired by Worker 1
Worker 1: Data 0
Lock released by Worker 1
Lock acquired by Worker 2
Worker 2: Data 1
Lock released by Worker 2
Lock acquired by Worker 1
Worker 1: Data 2
Lock released by Worker 1
Lock acquired by Worker 1
Worker 1: Data 4
Lock released by Worker 1
Lock acquired by Worker 2
Worker 2: Data 3
Lock released by Worker 2
Done.


### 旗標（Semaphore）

旗標（semaphore）的作用跟鎖定（lock）類似，但是它多了一個計數器的功能，當一個執行緒呼叫了 acquire 時，旗標內部的計數器就會遞減 1，而當執行緒呼叫了 release 時，計數器就會遞增 1，當計數器遞減到 0 的時候，後面來的執行緒就要等待其他執行緒release 後才能繼續。

In [19]:
import time
import threading
import queue

class Worker(threading.Thread):
    def __init__(self, queue, num, semaphore):
        threading.Thread.__init__(self)
        self.queue = queue
        self.num = num
        self.semaphore = semaphore

    def run(self):
        while self.queue.qsize() > 0:
            msg = self.queue.get()

            # 取得旗標
            self.semaphore.acquire()
            print(f"Semaphore acquired by Worker {self.num} ({self.semaphore._value} left)")

            # 僅允許有限個執行緒同時進的工作
            print(f"Worker {self.num}: {msg}")
            time.sleep(1)

            # 釋放旗標
            print(f"Semaphore released by Worker {self.num} ({self.semaphore._value} left)")
            self.semaphore.release()

my_queue = queue.Queue()
for i in range(5):
    my_queue.put(f"Data {i}")

# 建立旗標
maxconnections = 2
semaphore = threading.Semaphore(maxconnections)

my_worker1 = Worker(my_queue, 1, semaphore)
my_worker2 = Worker(my_queue, 2, semaphore)
my_worker3 = Worker(my_queue, 3, semaphore)

my_worker1.start()
my_worker2.start()
my_worker3.start()

my_worker1.join()
my_worker2.join()
my_worker3.join()

print("Done.")

Semaphore acquired by Worker 1 (1 left)
Worker 1: Data 0
Semaphore acquired by Worker 2 (0 left)
Worker 2: Data 1
Semaphore released by Worker 2 (0 left)Semaphore released by Worker 1 (0 left)
Semaphore acquired by Worker 2 (0 left)
Worker 2: Data 3

Semaphore acquired by Worker 1 (0 left)
Worker 1: Data 4
Semaphore released by Worker 2 (0 left)Semaphore released by Worker 1 (0 left)

Semaphore acquired by Worker 3 (0 left)
Worker 3: Data 2
Semaphore released by Worker 3 (1 left)
Done.


---

## Use Multiprocessing

https://docs.python.org/3.6/library/multiprocessing.html

> `multiprocessing` is a package that supports spawning processes using an API similar to the `threading` module. The `multiprocessing` package offers both local and remote concurrency, effectively side-stepping the **Global Interpreter Lock** by using subprocesses instead of threads. Due to this, the `multiprocessing` module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

In [20]:
import multiprocessing
import os
import time

In [24]:
def do_job(name):
    whoami(name)
    
def whoami(name):
    print(f'Process {os.getpid()} - doing: {name}')
    
def loopy(name):
    whoami(name)
    start = 1
    n_loop = 100000
    for i in range(start, n_loop):
        print(f'Doing {i} of {n_loop}')
        time.sleep(1)

In [25]:
# Do 4 times
if __name__ == '__main__':
    whoami('Main program')
    for i in range(4):
        p = multiprocessing.Process(target=do_job, args=(f'loop {i}',))
        p.start()

Process 93907 - doing: Main program
Process 93994 - doing: loop 0
Process 93995 - doing: loop 1
Process 93996 - doing: loop 2
Process 93997 - doing: loop 3


In [29]:
# Early stop
if __name__ == '__main__':
    whoami('Main program')
    p = multiprocessing.Process(target=loopy, args=(f'loopy',))
    
    # 在背景開始作業
    p.start()
    
    # 主執行緒繼續執行自己的工作
    for _ in range(3):
        print('Main processing...')
        time.sleep(4)
    
    # 提前終止
    p.terminate()  # 如果沒有這行會一直等到 p 結束才會運行後面
    print('Done.')

Process 93907 - doing: Main program
Process 94022 - doing: loopy
Doing 1 of 100000
Main processing...
Doing 2 of 100000
Doing 3 of 100000
Doing 4 of 100000
Doing 5 of 100000
Main processing...
Doing 6 of 100000
Doing 7 of 100000
Doing 8 of 100000
Doing 9 of 100000
Main processing...
Doing 10 of 100000
Doing 11 of 100000
Doing 12 of 100000
Done.


### 進程池 (Pool)

[What's the difference between map_async and imap?](https://stackoverflow.com/a/26521507/3744499)

- `Pool.map()`: 放入迭代参数，返回多个结果。Consumes your iterable by converting the iterable to a list.
- `Pool.imap()`: It doesn't turn the iterable you give it into a list. It will iterate over the iterable one element at a time, and send them each to a worker process. 
- `Pool.map_async()`: 
- `Pool.apply`: Get result. 只能放入一组参数，并返回一个结果
- `Pool.apply_async()`: 只能放入一组参数，并返回一个结果

In [36]:
import multiprocessing as mp

def job(x):
    return x**2


def multicore():
    # Init a Pool
    pool = mp.Pool(processes=3)  # 自定義核數量
    
    # 在 map() 中需要放入函數和需要迭代運算的值，然後它會自動分配給CPU核，返回結果
    res = pool.map(job, range(10)); print(res)
    
    # apply_async() 一次只能傳遞一個值，它只會放入一個核進行運算，但是傳入值時要注意是可迭代的 (tuple)
    res = pool.apply_async(job, (2,)); print(res.get())  # 用get獲得結果
    
    # Iterate apply_async() results
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    print([res.get() for res in multi_res])  # 從迭代器中取出
    
    
if __name__ == '__main__':
    multicore()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


### Multiprocessing with Queue

`Queue` 的功能是將每個核或線程的運算結果放在隊裡中，等到每個線程或核運行完畢後再從隊列中取出結果，繼續加載運算。原因很簡單，多線程調用的函數不能有返回值，所以使用 `Queue` 存儲多個線程運算的結果。

In [None]:
import multiprocessing as mp
import time
import os

In [None]:
def washer(dishes, output):
    for dish in dishes:
        print(f'[{time.time() - tic:.2f}] Washing dish ({os.getpid()}): {dish}')
        time.sleep(1)
        output.put(dish)
        
def dryer(input):
    name = mp.current_process().name
    while True:
        dish = input.get()
        print(f'[{time.time() - tic:.2f}] {name}({os.getpid()}) - Drying dish: {dish}')
        time.sleep(5)
        print(f'[{time.time() - tic:.2f}] {name}({os.getpid()}) - Done: {dish}')
        input.task_done()  # tell queue that the task is done

In [None]:
if __name__ == '__main__':
    tic = time.time()
    dish_queue = mp.JoinableQueue()
    
    # 逐個加載 Processor 到 dish_queue
    dryer_proc1 = mp.Process(target=dryer, args=(dish_queue,))
    dryer_proc2 = mp.Process(target=dryer, args=(dish_queue,))
    dryer_proc1.daemon = True  # 會在背景一直等著執行 This must be set before start() is called.
    dryer_proc2.daemon = True  # 總共 2 dryers
    dryer_proc1.start()  # 開始進程 1
    dryer_proc2.start()  # 開始進程 2

    # 開始工作
    dishes = ['apple', 'banana', 'orange', 'salad']
    washer(dishes, dish_queue)  # washer 先逐個洗盤子，洗完的盤子丟到 dish_queue
    print('All wash work done!!!')
    dish_queue.join()  # Block until all items in the queue have been gotten and processed.
    print('All Done!')

[**Queue vs JoinableQueue**](https://stackoverflow.com/a/31230329/3744499)

`JoinableQueue` has methods `join()` and `task_done()`, which `Queue` hasn't.

> **class multiprocessing.JoinableQueue( [maxsize] )**
> 
> `JoinableQueue`, a `Queue` subclass, is a queue which additionally has `task_done()` and `join()` methods.

If you use `JoinableQueue` then you must call `JoinableQueue.task_done()` for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

### Deamon Process

In [31]:
import multiprocessing
import time
import sys

def daemon():
    name = multiprocessing.current_process().name
    print(f'[{time.time() - tic:.2f}] Starting: {name}')
    time.sleep(5)
    print(f'[{time.time() - tic:.2f}] Exiting: {name}')

def non_daemon():
    name = multiprocessing.current_process().name
    print(f'[{time.time() - tic:.2f}] Starting: {name}')
    print(f'[{time.time() - tic:.2f}] Exiting: {name}')

if __name__ == '__main__':
    tic = time.time()
    
    d = multiprocessing.Process(name='daemon',
                                target=daemon)
    d.daemon = True  # 守護進程就是不阻擋主程序退出，自己幹自己的

    n = multiprocessing.Process(name='non-daemon',
                                target=non_daemon)
    n.daemon = False

    d.start()
    n.start()

    d.join(1)  # 等 d job 完成才繼續，等待n久就不等了
    print('d.is_alive():', d.is_alive())
    n.join()

[0.03] Starting: daemon
[0.03] Starting: non-daemon
[0.04] Exiting: non-daemon
d.is_alive(): True
[5.04] Exiting: daemon


---