# python私房手册-线程进程和协程

## 线程

[《聊聊Python中的GIL》](https://www.cnblogs.com/ArsenalfanInECNU/p/9968621.html)  
[《Python中的多线程编程，线程安全与锁（一）》](https://www.cnblogs.com/ArsenalfanInECNU/p/10022740.html)  
[《Python中的多线程编程，线程安全与锁（二）》](https://www.cnblogs.com/ArsenalfanInECNU/p/10134591.html)  
[《线程间同步之条件变量Condition》](https://www.jianshu.com/p/5d2579938517)

### `join`，`setDameon`的区别

- 默认情况下，主线程会等所有的子线程执行完毕以后再退出。
- `join`是指主线程阻塞，等子线程执行完毕以后再往下执行，可以理解成子线程“加入”主线程，所以必须在子线程运行起来（`start()`）以后再`join`，注意，主程序在线程`join()`调用处阻塞。

In [9]:
import threading
from time import sleep


def func():
    print("子线程开始运行。")
    sleep(4)
    print("子程序结束运行。")

class MyThread(threading.Thread):       
    def join(self):
        print("子线程加入主线程，主线程阻塞，等待子线程结束。")
        super().join()

        
t = MyThread(target=func)
t.start()
sleep(2)
print("现在仍然在主线程里面。")
t.join()
print("主线程结束。")

子线程开始运行。
现在仍然在主线程里面。
子线程加入主线程，主线程阻塞，等待子线程结束。
子程序结束运行。
主线程结束。


- `setDaemon`值将子进程设置为后台进程（一般都是翻译成守护进程，我觉得这个翻译反而会让人误解），设置为后台进程以后，主线程就不会再阻塞着等待子线程了，会直接向下执行，主进程结束的时候，子进程也跟着结束。注意，`subthread.setDaemon(True)`需要在子进程`start()`运行起来之前，先声明它是后台进程。或者在定义子进程的时候，通过`daemon`参数设置。

```python
import threading
from time import sleep

def func1():
    print('func1 start')
    sleep(2)
    print('func1 end')
    
def func2():
    print('func2 start')
    sleep(4)
    print('func2 end')

t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2, daemon=True)
t1.start()
t2.start()
```
以上代码，不会出现`func2 end`，但是在`jupyter`的交互式环境中，还是会打印的。

### `concurrent.futures`的细节

使用多线程和多进程最简单的方法，就是使用内部库的`concurrent.futures`模块，它是对内部的`threading`和`multiprocessing`库的上层封装，分别对应`futures.ThreadPoolExecutor()`和`Futures.ProcessPoolExecutor()`两个方法。两种方法都返回一个`Executor`执行器，使用的时候，主要有两种方式，一种是使用`executor`的`map`方法，一种是使用`executor`的`submit`搭配`as_completed`方法。
1. `map`方法如下几点主要注意：
    - `map`方法直接返回一个结果的生成器，生成器内部实际上是一个期程（`Future`）的列表，迭代生成器时，实际上依次调用期程（`Future`）的`result()`的方法。如果线程还没有执行完，会在这里阻塞，等待`result()`返回结果，如果发生异常，也会在这里抛出，同时结束，返回主线程。因此最后的结果和传入参数顺序是一样的。

In [53]:
from concurrent import futures
from time import sleep
from datetime import datetime

In [54]:
def now():
    now = datetime.now()
    return now.strftime("%Y-%m-%d %H:%M:%S")

In [55]:
def func1():
    print(f"func1 start, time is {now()}")
    sleep(4)
    return f"func1 end, time is {now()}"

def func2():
    print(f"func2 start, time is {now()}")
    sleep(2)
    return f"func2 end, time is {now()}"

def func3():
    print(f"func3 start, time is {now()}")
    sleep(3)
    raise TypeError("func3 error")
    
def func4():
    print(f"func4 start, time is {now()}")
    sleep(5)
    return f"func4 end, time is {now()}"

线程根本上还是时间排期，并非真正的并发，通过以下两个例子仔细体会每个时间点：

In [57]:
with futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 立即对所有线程进行排期，开始并发执行，返回一个生成器，保留了加入时的顺序
    results = executor.map(lambda func:func(), [func1, func2, func3, func4]) 
    # 并发启动3个线程以后，回到主线程向下执行
    sleep(1)
    print(f"waiting...time is {now()}") 
    # 2秒钟后，func1线程结束，启动func4，然后又回到主线程向下执行
    try:
        # 这里隐式依次调用底层future的result()方法，如果线程还没有执行完，会在这里阻塞，异常也在这里抛出
        for r in results:  
            print(r)
    # 捕捉到func3抛出的错误，后面的线程会结束运行
    except TypeError as e: 
        print(e)

func1 start, time is 2020-12-28 14:15:20
func2 start, time is 2020-12-28 14:15:20
func3 start, time is 2020-12-28 14:15:20
waiting...time is 2020-12-28 14:15:21
func4 start, time is 2020-12-28 14:15:22
func1 end, time is 2020-12-28 14:15:24
func2 end, time is 2020-12-28 14:15:22
func3 error


In [58]:
with futures.ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(lambda func:func(), [func1, func2, func3, func4]) 
    sleep(1)
    print(f"waiting...time is {now()}")
# 就算不调用期程的result方法，也会在with代码块的最后等待所有线程执行完毕
print(f"out with block, time is {now()}")

func1 start, time is 2020-12-28 14:17:13
func2 start, time is 2020-12-28 14:17:13
func3 start, time is 2020-12-28 14:17:13
waiting...time is 2020-12-28 14:17:14
func4 start, time is 2020-12-28 14:17:15
out with block, time is 2020-12-28 14:17:20


可见，4个函数同时运行，然后按照传入的顺序依次返回结果，当遇到异常时，整个退出，未获得`func4`的结果。

2. `submit`方法一般都是和`as_completed`方法搭配使用，注意以下几点：
    - `submit`明确的返回一个期程。然后调用`as_completed`方法，将多个期程组成的列表传入`as_completed`方法，返回一个期程的生成器。
    - 和`map`不同，生成器会按照执行完成的先后顺序返回执行完毕的期程，注意在隐性调用`next`的时候阻塞，而不是`future.result()`的时候。
    - 注意异常在调用`future.result()`的时候抛出，因此可以捕获单个`future`的异常，获取全部`future`的结果。

In [24]:
with futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(func1)  # 返回一个已排期的期程，在这里会立即执行
    future2 = executor.submit(func2)
    future3 = executor.submit(func3)
    sleep(1)  # 同样的，启动以后立即回到主线程向下执行
    print(f"waiting...time is {now()}")
    # 返回func4的期程，但是这里不会马上执行，因为线程池是满的
    future4 = executor.submit(func4)
    # 这里并不会阻塞，只是返回一个期程构成的生成器
    done_iter = futures.as_completed([future1, future2, future3, future4])
    print(f"after as_completed...time is {now()}")
    # 此时func2执行完，线程池空了一个，开始启动func4的线程,然后马上又回到主线程
    try:
        # 一个线程结束才会进入生成器并返回，如果线程没有执行完就阻塞等待，所以哪个先执行完哪个先返回，不会保留顺序
        for f in done_iter: 
            print(f"waiting...time is {now()}")
            # 这里不会阻塞，因为此时的future是已经执行完毕的future，注意：如果有异常，会在这里抛出。
            print(f.result())
    # 如果在for循环外层捕获错误，则会和map一样，直接结束
    except TypeError as e:
        print(e)

func1 start, time is 2019-12-05 10:34:49
func2 start, time is 2019-12-05 10:34:49
func3 start, time is 2019-12-05 10:34:49
waiting...time is 2019-12-05 10:34:50
after as_completed...time is 2019-12-05 10:34:50
func4 start, time is 2019-12-05 10:34:51waiting...time is 2019-12-05 10:34:51

func2 end, time is 2019-12-05 10:34:51
waiting...time is 2019-12-05 10:34:52
func3 error


In [71]:
with futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(func1)
    future2 = executor.submit(func2)
    future3 = executor.submit(func3)
    future4 = executor.submit(func4)
    done_iter = futures.as_completed([future1, future2, future3, future4])
    for f in done_iter:
         # 如果在内层捕捉异常，可以继续完成整个循环。
        try:
            print(f.result())
        except TypeError as e:
            print(e)

func1 start, time is 2019-11-12 13:12:46
func2 start, time is 2019-11-12 13:12:46
func3 start, time is 2019-11-12 13:12:46
func4 start, time is 2019-11-12 13:12:48
func2 end, time is 2019-11-12 13:12:48
func3 error
func1 end, time is 2019-11-12 13:12:50
func4 end, time is 2019-11-12 13:12:53


注意捕获异常的代码的位置，如果在`for`循环外层捕获异常，则会中断`for`循环，而在内层捕获，则可以继续完成整个循环。

3. 期程的其它方法：
    - `done()`：查看期程是否执行完毕。
    - `add_done_callback()`：给期程添加一个回调函数。注意：这个回调函数只接受一个参数就是期程。
    - `exception()`：和`result()`类似，阻塞主线程，如果有异常则返回，如果运行正常则返回None。

In [131]:
from functools import partial


def t():
    print(f"t started, time is {now()}")
    sleep(1)
    return 42


def cbfunc(f, func):
    if f.done():
        print(f"{func.__name__} end, time is {now()}")


with futures.ThreadPoolExecutor(max_workers=3) as executor:
    f = executor.submit(t)
    f.add_done_callback(partial(cbfunc, func=t))
    f.result()

t started, time is 2019-11-12 15:15:08
t end, time is 2019-11-12 15:15:09


42

3. 不设置`max_workers`的时候，就是不做限制，对于线程来说，传进去多少个就执行多少个。而对于进程来说，同时并发数和cpu的核个数有关。

2022年2月8日补充，上面的内容都对，但是有一些细节仍然没有说清楚：
1. 对于`with`来说，python会在`__exit__`中调用`shutdown(wait=True)`，因此`with`外的代码，都会在所有线程执行完毕以后再运行。

In [16]:
import time

def func(n):
    print(f'func {n} started')
    time.sleep(n)
    print(f'func {n} end')
    return n

In [18]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    res = executor.map(func, [3, 1, 2])

# 所有线程都执行完毕才会运行下面的代码
print("=" * 50)
print(list(res))

func 3 started
func 1 started
func 2 started
func 1 end
func 2 end
func 3 end
[3, 1, 2]


所以，如果使用`submit`，把代码放到`with`外面的话，`futures.as_completed`就没必要了：

In [21]:
with ThreadPoolExecutor() as executor:
    # submit返回一个future
    fs = [executor.submit(func, i) for i in (3, 1, 2)]

print("=" * 50)    
for f in fs:
    print(f.result())  # 线程全部已经执行完毕，直接调用future.result()方法获取结果

func 3 started
func 1 started
func 2 started
func 1 end
func 2 end
func 3 end
3
1
2


如果不使用`with`上下文管理器，那么使不使用`as_completed`会有微妙的区别：
- 不使用`as_completed`会在`f.result`处阻塞，等待线程返回结果。并且保持了`future`添加的顺序。

In [27]:
executor = ThreadPoolExecutor()

fs = [executor.submit(func, n) for n in (3, 1, 2)]

for f in fs:
    # 这里并不会阻塞，且fs保留了线程添加的顺序
    print(f'{f} done is {f.done()}') 
    # 会在这里阻塞，等待结果返回 
    r = f.result()
    print(r)

func 3 started
func 1 started
func 2 started
<Future at 0x25c412815b0 state=running> done is False
func 1 end
func 2 end
func 3 end
3
<Future at 0x25c4126b460 state=finished returned int> done is True
1
<Future at 0x25c3ec7c310 state=finished returned int> done is True
2


- 使用`as_completed`有些微妙，它会返回一个生成器，当线程执行完毕的时候，生成器返回done为True的future。因此，它不保留future添加的顺序，且会在`for`循环处堵塞，等待线程执行完毕。

In [29]:
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()

# as_completed接受一个future组成的列表，返回一个生成器
# 并不会在此处阻塞，等待线程完成
fs = futures.as_completed([executor.submit(func, n) for n in (3, 1, 2)])
print(fs)

# 会在这里阻塞，只有线程执行完毕，生成器才会返回该线程，而且先完成先返回，并不保留添加的顺序
for f in fs:
    # 因此这里的future全部已经执行完毕，done都为True
    print(f'{f} done is {f.done()}') 
    r = f.result()
    print(r)

func 3 started
func 1 started
func 2 started
<generator object as_completed at 0x0000025C410CEDD0>
func 1 end
<Future at 0x25c40f31970 state=finished returned int> done is True
1
func 2 end
<Future at 0x25c40f21850 state=finished returned int> done is True
2
func 3 end
<Future at 0x25c40f31040 state=finished returned int> done is True
3


2. 线程如果抛出异常，也有几点需要注意，如果使用`map`，返回一个生成器，此时如果某个线程抛出异常，则会向上冒泡，生成器立即停止：

In [35]:
import time
from concurrent.futures import ThreadPoolExecutor
from inspect import getgeneratorstate


def func(n):
    print(f'func {n} started')
    if n == 2:
        raise RuntimeError('something wrong happened!')
    time.sleep(n)
    print(f'func {n} end')
    return n


with ThreadPoolExecutor() as executor:
    res = executor.map(func, [3, 2, 1])  # 异常不会在这里抛出

print("=" * 50)
try:
    # 异常会在此处抛出，并且生成器抛出异常后，生成器的状态变为CLOSED，不会再继续迭代
    # 这属于生成器的特性
    for r in res:  
        print(r)
except RuntimeError as e:
        print(e)
        print(getgeneratorstate(res))

func 3 started
func 2 started
func 1 started
func 1 end
func 3 end
3
something wrong happened!
GEN_CLOSED


这属于生成器的行为：

In [40]:
def gen():
    for i in range(5):
        yield i

In [41]:
g = gen()

next(g)

0

In [42]:
next(g)

1

In [45]:
try:
    g.throw(RuntimeError('something wrong happened!'))
except RuntimeError as e:
    print(e)

something wrong happened!


In [47]:
try:
    next(g)
except StopIteration as e:
    print('stopped!')

stopped!


因此，如果想要获取所有的结果，只能使用`submit`：

In [48]:
def func(n):
    print(f'func {n} started')
    if n == 2:
        raise RuntimeError('something wrong happened!')
    time.sleep(n)
    print(f'func {n} end')
    return n


with ThreadPoolExecutor() as executor:
    # submit返回一个future
    fs = [executor.submit(func, i) for i in (3, 1, 2)]

print("=" * 50)    
for f in fs:
    try: 
        print(f.result())  # 异常会在此处抛出，因此捕获以后仍可以迭代
    except RuntimeError as e:
        print(e)

func 3 started
func 1 started
func 2 started
func 1 end
func 3 end
3
1
something wrong happened!


### 用`Queue`协调线程间的工作

使用`ThreadPoolExcutor`是很容易获取线程返回的结果的，但是当我们的工作分为好几个步骤，合适的做法是通过`Queue`来创建管道，先来看官方的生产者消费者模型的例子：
```python
def worker():
    while True:
        item = q.get()
        # 当任务执行完毕，需要再一次向队列传入和线程数一致的None，这样线程才能结束，否则当item消耗完毕，会卡在q.get()，一直等待获取
        if item is None:
            break   
        do_work(item)
        # task_done和get是成对的，当item处理完毕以后再task_done
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
# 在这里阻塞主线程，等待队列内部计数器清零才继续执行
q.join()

# stop workers
# 重新向队列传入和线程数目一致的None，以保证所有的线程都能结束运行
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()
```
2020年12月28日补充：

这里特别注意是如何保证线程结束运行的。在《python标准库》中又看到一个方法，可以这样：
```python
def worker():
    while True:
        item = q.get() 
        do_work(item)
        q.task_done()

q = queue.Queue()
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.setDaemon()
    t.start()

for item in source():
    q.put(item)

q.join()
```
比官方的例子简短，诀窍在于将每个线程设置为守护线程，这样，就算线程卡在q.get()，当主线程结束，所有的线程也就跟着结束了。

注意，`join`和`task_done`的意义，`queue`内部维护着一个计数器，当向它`put`一个元素的时候，计数器加1，当在`queue`上调用`task_done()`的时候，计数器减一，当调用`q.join()`的时候，会在这里阻塞，一直等到计数器为0。

以下的自定义类对`queue`进行了扩展，可以对其进行迭代，可以直接在`Queue`上进行关闭：

In [3]:
# 2020年12月28日补充：能否结束所有的线程？

from queue import Queue, LifoQueue, PriorityQueue


class IterQueueMixin:
    SIG = object()

    def close(self, n_workers=1):
        for i in range(n_workers):
            self.put(self.SIG)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SIG:
                    return  # 生成器中return会抛出StopIteration错误
                yield item
            finally:
                self.task_done()
                

class IterQueue(IterQueueMixin, Queue):
    pass


class IterLifoQueue(IterQueueMixin, LifoQueue):
    pass

注意，上面这个类在使用的时候，需要先调用`close`，然后再在`queue`上调用`join`,官方的例子可以改成下面这样：

In [4]:
import threading
from time import sleep


def worker(n):
    for item in q:
        print(f"worker {item} start!")
        sleep(n)
        print(f"worker {item} finished!")
        
q = IterQueue()

ts = []

for i in [1, 2]:
    t = threading.Thread(target=worker, args=(i, ))
    t.start()
    ts.append(t)
    
for i in ["thread1", "thread2", "thread3", "thread4"]:
    q.put(i)
    
q.close(n_workers=len(ts))
q.join()

worker thread1 start!
worker thread2 start!
worker thread1 finished!
worker thread3 start!
worker thread2 finished!worker thread3 finished!
worker thread4 start!

worker thread4 finished!


### 在线程中使用辅助进程

先看代码：
```python
print(f"in {multiprocessing.current_process().name}")

class C:
    def __init__(self):
        self.pool = Pool()
        self.q = Queue()

    @staticmethod
    def func(r):
        print(f"I am in {multiprocessing.current_process().name}, result is {r}")
        time.sleep(3)
        print(f"end {multiprocessing.current_process().name}")


    def pfunc(self):
        print(f"I am producer, self is {self}")
        for i in [42, 33, 24, None, None, None]:
            self.q.put(i)


    def cfunc(self):
        print(f"I am consumer, self is {self}")
        while True:
            r = self.q.get()
            if r is None:
                break
            self.pool.apply(self.func, args=(r,))

    def run(self):
        t = threading.Thread(target=self.pfunc)
        t.start()
        with ThreadPoolExecutor() as executor:
            [executor.submit(self.cfunc) for _ in range(3)]


if __name__ == "__main__":
    c = C()
    c.run()
```
输出结果为：
```
in MainProcess
I am producer, self is <__main__.C object at 0x000002C12D314408>
I am consumer, self is <__main__.C object at 0x000002C12D314408>
I am consumer, self is <__main__.C object at 0x000002C12D314408>
I am consumer, self is <__main__.C object at 0x000002C12D314408>
in SpawnPoolWorker-2
I am in SpawnPoolWorker-2, result is 42
in SpawnPoolWorker-8
in SpawnPoolWorker-1
I am in SpawnPoolWorker-8, result is 33
I am in SpawnPoolWorker-1, result is 24
in SpawnPoolWorker-5
in SpawnPoolWorker-6
in SpawnPoolWorker-11
in SpawnPoolWorker-7
in SpawnPoolWorker-10
in SpawnPoolWorker-3
in SpawnPoolWorker-4
in SpawnPoolWorker-9
in SpawnPoolWorker-12
end SpawnPoolWorker-2
end SpawnPoolWorker-8
end SpawnPoolWorker-1
```
这里有几点需要理解：
1. `pool = Pool()`必须放在`__name__ == "__main__"`中，因为当执行`pool = Pool()`的时候，在后台已经创建进程了。
2. 虽然启动了3个消费者线程，但是线程的上下文都是同一个，因此都是同一个pool进程池，在不同的线程中调用`pool.apply`，都是使用同一个进程池来启动子进程。
3. 以下都只是根据输出结果进行推测，不一定正确：调用`pool.apply`会创建多个进程，在此例中，会folk(windows下为spawn)和CPU核数相同的进程数，此例中为12个，统一由pool进程池管理。当`apply`一个函数，进程池会选派一个进程执行这个函数。
4. 当while一次循环结束，如下：
```python
while True:
    r = self.q.get()
    if r is None:
        break
    self.pool.apply(self.func, args=(r,))
```
即一个函数执行完毕，此时进程池并不会关闭，当下一次轮询，会从进程池里又选派一个进程来执行函数。
5. pool的工作进程都是dameon守护进程，即如果主进程关闭，则所有子进程也会结束，但是如果使用Ctrl+C来结束进程，有可能结束的只是某一个子进程。据说3.4已经修复，但是不确定。可以参考以下两篇文章：
 - [How to kill all Pool workers in multiprocess?](https://stackoverflow.com/questions/26068819/how-to-kill-all-pool-workers-in-multiprocess)
 - [Kill Python Multiprocessing Pool](https://stackoverflow.com/questions/25415104/kill-python-multiprocessing-pool)

## 进程

- [Pool Limited Queue Processing in Python](https://towardsdatascience.com/pool-limited-queue-processing-in-python-2d02555b57dc)
- [Multiprocessing and Pickle, How to Easily fix that?](https://towardsdatascience.com/multiprocessing-and-pickle-how-to-easily-fix-that-6f7e55dee29d)
- [Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python](https://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python)
- [How to Use the Multiprocessing Package in Python](https://towardsdatascience.com/how-to-use-the-multiprocessing-package-in-python3-a1c808415ec2)

### Process和Pool中使用队列

遇到个很有意思的问题，先看代码：
```python
from multiprocessing import Queue, Process
import multiprocessing


def func(q):
    r = q.get()
    print(f"I am {multiprocessing.current_process().name}, result is {r}")


if __name__ == "__main__":
    q = Queue()
    q.put(42)
    p = Process(target=func, args=(q,))
    p.start()
```
执行很顺利，打印如下：
```
I am Process-1, result is 42
```
但是稍稍修改代码，改为使用pool，则抛出错误：
```python
from multiprocessing import Queue, Process, Pool
import multiprocessing


def func(q):
    r = q.get()
    print(f"I am {multiprocessing.current_process().name}, result is {r}")


if __name__ == "__main__":
    q = Queue()
    q.put(42)
    pool = Pool()
    pool.apply(func, args=(q,))
```
抛出错误：
```
RuntimeError: Queue objects should only be shared between processes through inheritance
```
提示Queue对象应该使用继承来共享。

进程之间传参，实际上参数是通过pipe管道从主进程传给子进程，参数需要能`picklability`，可以被`pickle`序列化。而`multiprocessing.Queue`对象是不能被序列化的，因为子进程相当于一个不同的应用程序，传递的参数就像复制了一份，而不是原来的那个，考虑如下代码：
```python
def func(a):
    a.append(4)
    print(f"I am {multiprocessing.current_process().name}, a is {a}")


if __name__ == "__main__":
    a = [1, 2, 3]
    p = Process(target=func, args=(a,))
    p.start()
    p.join()
    print(f"I am {multiprocessing.current_process().name}, a is {a}")
```
输出为：
```
I am Process-1, a is [1, 2, 3, 4]
I am MainProcess, a is [1, 2, 3]
```
说明a在主进程和子进程中是不同的对象，修改了子进程的a，对主进程的a无影响，这和普通的函数调用不同，普通的函数传参传递的总是参数的引用，函数内外引用的是同一个对象。

这就是为什么`pool.apply`会报错的原因，因为不管是`multiprocessing.Queue`还是`queue.Queue`，都被设置为不能序列化，因为不同进程中的队列肯定需要是同一个队列，否则没有意义。

`multiprocessing.Queue`是针对`multiprocess.Process`的，在内部并不是直接将队列进行序列化然后通过管道发送，肯定经过了额外的处理。目的是为了和`threading.Thread`的API保持一致。

`pool`的机制是，当调用`pool=Pool(n)`时，则启动了n个（如果不指定，则默认为当前cpu核数）个子进程。同时在内部有一个队列，当调用map,apply或者apply_async时，会将要执行的任务塞进队列，另外内部还启动了一个线程，用来将参数序列化传递给子进程。内部的这个线程只是简单的将参数进行序列化，因此当传入`queue`对象时，由于`queue`不能被序列化，会抛出错误。

有几种方法可以将队列传递给`pool`进程池，一个是使用`manager`管理器，代码如下：
```python
from multiprocessing import Pool, Manager
import multiprocessing


def func(q):
    r = q.get()
    print(f"I am {multiprocessing.current_process().name}, result is {r}")


if __name__ == "__main__":
    m = Manager()
    q = m.Queue()
    q.put(42)
    pool = Pool()
    pool.apply(func, args=(q,))
```
输出为：
```
I am SpawnPoolWorker-2, result is 42
```
为什么使用`m.Queue()`创建的`q`队列可以传递呢，因为这个q是一个代理对象，本质上它是一个地址，指向由管理器管理的共享队列，它是可以被序列化的，它保证序列化传递到子进程以后仍然表现得像同一个对象（这里用像是因为传递到子进程的队列和主进程的队列不是内存中的同一个对象，通过打印id可以看出来）。`manager`创造的队列，对于Pool和Process都适用。

还有一种变通的方法，如下：
```python
from multiprocessing import Pool, Queue
import multiprocessing

queue = Queue()  # 子进程中会重新运行一遍，在子进程中创建一个新的queue队列


def initialize_shared(q):  # 在子进程创建初始会运行这个函数，从主进程中传递q。这个q是主进程的queue队列
    global queue  # 申明queue是全局变量，否则只会创建一个initializer_shared函数内部的局部queue变量
    queue = q # 将主进程传递进来的队列q赋值给子进程内部的全局变量queue


def func():
    r = queue.get()
    print(f"I am {multiprocessing.current_process().name}, result is {r}")


if __name__ == "__main__":
    queue.put(42)
    pool = Pool(initializer=initialize_shared, initargs=(queue,))
    pool.apply(func)
```
initializer会在每个进程创建的时候运行，initargs是传递给initializer的参数，原理见注释代码。这里仍然疑惑，不知道主进程的queue对象是如何在子进程初始化的时候传递过去的。

参考：
- [Python multiprocessing.Queue vs multiprocessing.manager().Queue()](https://stackoverflow.com/questions/43439194/python-multiprocessing-queue-vs-multiprocessing-manager-queue)
- [Why is “pickle” and “multiprocessing picklability” so different in Python?](https://stackoverflow.com/questions/56912846/why-is-pickle-and-multiprocessing-picklability-so-different-in-python)

### 获取返回值

#### 使用multiprocessing.Queue

#### 使用multiprocessing.Pool

#### 使用concurrent.futures.ProcessPoolExecutor

### 为什么multiprocessing需要放在`__main__`里面

- [官方文档](https://docs.python.org/zh-cn/3.9/library/multiprocessing.html#multiprocessing-programming)

这个问题比较底层，还没有完全弄清楚，大致的原因是因为Windows只能通过spawn方式生成子进程，所以没有办法创建与现有进程相同的新进程。所以子进程必须再次运行代码，比如：
```python
from multiprocessing import Process

def f(name):
    print('hello', name)

p = Process(target=f, args=('bob',))
p.start()
p.join()
```
换句话说，子进程运行的是函数f，但是现在要获取f，需要整个脚本以模块方式运行一次，如果不使用`__main__`，那么此时子进程里又会运行`p=Process...`，导致形成无限循环。如果使用`__main__`，如下：
```python
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
```
此时由于是模块方式调用，子进程运行脚本时，不会运行`__main__`之后的代码，从而避免了无限循环。

### 为什么multiprocessing也有一个queque，而不直接使用Queue模块

一个`Queue.queue`不会在进程之间共享。如果`Queue.queue`在进程之前声明，然后每个进程都会收到它的副本，这个副本独立于其他进程，如果`Queue.queue`在进程之后声明，则队列中的项只在主进程中可见，子进程是看不见的。虽然API看起来很相似(它就是这样设计的)，但底层机制却有本质上的不同。
- `multiprocessing`通过pickle(序列化)对象并通过管道，也就是`PIPE`来交换数据。
- `Queue.queue`则使用`str`，`int`等数据结构在线程和锁/互斥体之间共享，以实现正确的行为。

### multiprocessing.queue的close、join_thread以及Process的terminate和join方法

- 进程之间实际上是通过`PIPE`来交换数据的，`multiprocessing.queue`实际上是在主进程中创建了一个后台的线程，获取了数据以后将数据送往缓冲区，然后通过管道传送给子进程。`queue.close()`表示不再接收数据，但是队列里的缓冲数据通过管道传送给子进程还需要时间，`queue.join_thread()`在运行了`queue.close()`之后调用，它会阻塞主进程，等待后台线程把缓冲区的数据全部传送完毕，否则主进程直接结束，有可能导致数据丢失。因此`queue.close`和`queue.join_thread()`用来保证队列正确关闭和数据的完整性。
- 一般使用“毒药”方式来关闭子进程，但是在某些情况下子进程无法关闭，则可以手动调用`process.terminate()`来关闭，同样的，在调用`terminate`以后，调用`process.join()`阻塞主进程，保证子进程顺利关闭。

## 协程

- [官方入门文档](https://docs.python.org/zh-cn/3.8/library/asyncio-task.html#coroutine)  
- [Python Async/Await入门指南](https://zhuanlan.zhihu.com/p/27258289)  
- [一份详细的asyncio入门教程](https://zhuanlan.zhihu.com/p/59671241)
- [How to cancel all remaining tasks in gather if one fails?](https://stackoverflow.com/questions/59073556/how-to-cancel-all-remaining-tasks-in-gather-if-one-fails)
- [Async and await with subprocesses](https://fredrikaverpil.github.io/2017/06/20/async-and-await-with-subprocesses/)
- [Running Python code in a subprocess with a time limit](https://til.simonwillison.net/python/subprocess-time-limit)

协程比较难理解的，《流畅的python》第16章对协程的底层原理讲解的比较透彻。《一份详细的asyncio入门教程》对如何使用协程有较为全面的介绍，这里记载一些遇到的问题以及不同的场景如何使用协程。

### 3种可等待对象

- 协程：注意，协程本身是个函数，调用它以后才返回一个协程。协程是最基础的可等待对象，可以在其它的协程中被等待。无法排期，所谓无法排期，个人理解就是不能加入后台的优先级队列。
- 任务：对协程进行了包装，是期程的子类，可以排期，并发执行，主要就是执行任务，一旦`create_task`创建一个任务，主程序被挂起，立即执行这个任务，执行到`await`语句，返回的结果加入到后台的优先级队列，任务被挂起，然后返回主程序，一直到`await 任务`的语句，返回之前存入队列的结果，然后从任务的中断处继续执行任务。
- 期程：一种特殊的低层级可等待对象，表示一个异步操作的最终结果。一般不使用，只有库的作者需要编写`Future`。

### 执行协程的两种方式

1. 使用`asyncio.run()`+`asyncio.create_task`的方式，`asyncio.run()`是协程的高级api，它隐藏了底层的事件循环代码。要注意的是，它总是创建一个新的事件循环并且在结束的时候关闭它，如果在同一线程中已经有事件循环，则会报错。因此，如果在jupyter里面，使用`asyncio.run()`，总是会报错，因为jupyter是异步的，它本身运行着一个事件循环。基本的使用方法是先创建一个入口，在入口里面通过`create_task`创建任务：

```python
import asyncio
import time


async def c1():
    print(f"c1 started at {time.strftime('%X')}")
    await asyncio.sleep(2)
    print(f"c1 ended at {time.strftime('%X')}")
    return 42


async def c2():
    print(f"c2 started at {time.strftime('%X')}")
    await asyncio.sleep(3)
    print(f"c2 ended at {time.strftime('%X')}")
    return 24


async def main():
    t1 = asyncio.create_task(c1())
    t2 = asyncio.create_task(c2())
    r1 = await t1
    r2 = await t2
    return r1, r2


print(asyncio.run(main()))
```
最后输出结果为：
```
c1 started at 00:35:23
c2 started at 00:35:23
c1 ended at 00:35:25
c2 ended at 00:35:26
(42, 24)
```

2. 显示的创建事件循环，一般是`loop=asynic.get_event_loop()`+`loop.run_untill_completed()`的方式，在`jupyter`下，尝试下面的代码，注意，不能把`t.result()`放在同一个`cell`，因为`jupyter`的机制和默认不太一样，不会`create_task`的时候就立即执行任务，而是等`cell`里的代码执行完毕以后立即去执行任务。所以这里需要等待3秒，看到`c ended`出现以后，再调用`t.result()`才能获得结果，否则会抛出`InvalidStateError`错误。

In [9]:
import asyncio
import time


async def c():
    print(f"c started at {time.strftime('%X')}")
    await asyncio.sleep(3)
    print(f"c ended at {time.strftime('%X')}")
    return 42


loop = asyncio.get_running_loop()
t = loop.create_task(c())

c started at 00:48:50
c ended at 00:48:53


In [10]:
t.result()

42

### 高级API

#### task

`create_task`运行的时间非常微妙，先看代码：
```python
async def func1():
    print("func1 start!", time.strftime("%X"))
    await asyncio.sleep(1)
    print("func1 end!", time.strftime("%X"))


async def func2():
    print("func2 start!", time.strftime("%X"))
    await asyncio.sleep(2)
    print("func2 end!", time.strftime("%X"))


async def main():
    task1 = asyncio.create_task(func1())  # 任务被放入事件循环，但未运行
    task2 = asyncio.create_task(func2())  # 任务被放入事件循环，但未运行
    print("main start!")
    await task1  # 主程序挂起，开始事件循环，执行任务
    await task2
    print("main end!")


asyncio.run(main())
```
输出为：
```
main start!
func1 start! 09:07:25
func2 start! 09:07:25
func1 end! 09:07:26
func2 end! 09:07:27
main end!
```
可见，协程并非是在`create_task`的时候就开始运行。task运行的时间非常微妙，`create_task`创建了一个任务，并将其放入后台的事件循环队列中，然后马上回到主程序，但是此时并没有运行，直到遇到紧接着的`await`，即主程序被挂起，开始执行事件循环里的事件，此时任务开始运行。

把`main`改一改，可以看得更清晰。
```python
async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await asyncio.sleep(.1)
    print("main start!")
    print("main end!")
```
输出为：
```
func1 start! 09:29:57
func2 start! 09:29:57
main start!
main end!
```
可见，即使没有`await task1`，任务也会运行，因为任务前面已经被加入到事件循环队列，当遇到`await asyncio.sleep`时，主程序挂起，运行事件循环里的任务，遇到任务中的`await`时，又回到主程序。此时没有`await task1`这样的语句来获取任务结果，因此直接就结束了。

#### gather

gather可以将协程包装成任务加入事件循环队列，也就是使用gather，可以直接传入协程而不需要先创建任务，gather的异常处理有点微妙：
```python
async def func1():
    print("func1 start!", time.strftime("%X"))
    raise RuntimeError("func1 failed!")
    await asyncio.sleep(1)
    print("func1 end!", time.strftime("%X"))
    return "func1"


async def func2():
    print("func2 start!", time.strftime("%X"))
    await asyncio.sleep(2)
    print("func2 end!", time.strftime("%X"))
    return "func2"


async def main():
    print("main start!", time.strftime("%X"))
    try:
        results = await asyncio.gather(func1(), func2())  # 异常抛出
        print("results is", results)
    except RuntimeError as e:
        print(e)
        await asyncio.sleep(3)  # 如果继续等待，则可以看到func2仍然运行了
    print("main end!", time.strftime("%X"))


asyncio.run(main())
```
输出为：
```
main start! 10:01:01
func1 start! 10:01:01
func2 start! 10:01:01
func1 failed!
func2 end! 10:01:03
main end! 10:01:04
```
默认情况下，如果任务队列里的任务抛出错误，会向上传播，要注意的是，队列里的其它任务不会取消还是会继续运行，如果设置`return_exception=True`，则异常不会抛出，会和结果一起返回。如：
```python
async def main():
    print("main start!", time.strftime("%X"))
    results = await asyncio.gather(func1(), func2(), return_exceptions=True)
    print("results is", results)
    print("main end!", time.strftime("%X"))
```
此时返回的结果是：
```python
main start! 10:08:52
func1 start! 10:08:52
func2 start! 10:08:52
func2 end! 10:08:54
results is [RuntimeError('func1 failed!'), 'func2']
main end! 10:08:54
```
gather返回一个future，可以在这个future上调用cancel，但是它的一些细节还是没有弄清楚，以下记录了stack overflow的一些问题：
- [How to cancel all remaining tasks in gather if one fails?](https://stackoverflow.com/questions/59073556/how-to-cancel-all-remaining-tasks-in-gather-if-one-fails)

#### wait

`wait`和`gather`有点类似，都是获取任务结果的方法，不过`wait`比`gather`更灵活：
```python
async def main():
    print("main start!", time.strftime("%X"))
    done, pending = await asyncio.wait([func1(), func2()], timeout=2)
    print(done.pop().result())
    for p in pending:
        p.cancel()
    print("main end!", time.strftime("%X"))
```
输出为：
```
main start! 14:59:21
func2 start! 14:59:21
func1 start! 14:59:21
func1 end! 14:59:22
func1
main end! 14:59:23
```
`wait`返回done和pending两个集合，分别包含已经完成的任务和被挂起的任务，可以通过`timeout`关键字设定等待时间，也可以通过`return_when`关键字设定返回的条件，比如`asyncio.FIRST_COMPLETED`，表示第一个就返回。

#### as_completed

也是用来获取结果，感觉和`gather`差不多，不过返回一个生成器，然后迭代生成器来获取结果，注意，其返回的结果是`Future`，并且必须调用`await`来等待`Future`的结果，否则会抛出错误：
```python
async def main():
    print("main start!", time.strftime("%X"))
    tasks = asyncio.as_completed([func1(), func2()])
    for task in tasks:
        print(await task)
    print("main end!", time.strftime("%X"))
```

#### shield

shield不太好理解，它并不是真正的禁止任务被取消，可以把它理解为任务的一个状态，当使用`shield`包装一个协程，则会把这个协程包装为一个Future并且立即将其放入事件循环队列，当调用`Future.cancel()`方法的时候，`Future`会被标记成“取消”的状态，当`await`这个`Future`的时候，会立即抛出`CancelledError`的错误，但是实际上，这个Future依然在队列中，并未受影响：
```python
async def main():
    print("main start!", time.strftime("%X"))
    task1 = asyncio.create_task(func1())
    task1_with_shield = asyncio.shield(task1) 
    try:
        task1_with_shield.cancel()
        await task1_with_shield  # 只要await task1_with_shield就会抛出CancelledError错误
    except asyncio.CancelledError:
        print("stask1 canceled!")
        await asyncio.sleep(2)  # 实际的task1仍然会运行，不受影响
    print("main end!", time.strftime("%X"))
```
输出为：
```
main start! 14:18:21
stask1 canceled!
func1 start! 14:18:21
func1 end! 14:18:22
main end! 14:18:23
```
因此，可以在捕获了错误以后，仍然获取任务的值：
```python
async def main():
    print("main start!", time.strftime("%X"))
    task1 = asyncio.create_task(func1())
    task1_with_shield = asyncio.shield(task1)
    try:
        task1_with_shield.cancel()
        await task1_with_shield
    except asyncio.CancelledError:
        print("stask1 canceled!")
        r = await task1  # task1仍然在队列中运行，可以获取task1的返回值
        print(r)
    print("main end!", time.strftime("%X"))
```
输出为：
```python
main start! 14:30:42
stask1 canceled!
func1 start! 14:30:42
func1 end! 14:30:43
func1
main end! 14:30:43
```
要注意的是，`shield`如果包装协程的话，不能像上面这样获取协程的值，比如：
```python
async def main():
    print("main start!", time.strftime("%X"))
    task1_with_shield = asyncio.shield(func1())
    try:
        task1_with_shield.cancel()
        await task1_with_shield
    except asyncio.CancelledError:
        print("stask1 canceled!")
        r = await func1()  # 这里的func1()创建了一个新的协程，和task1_with_shield并非队列里的同一个任务
        print(r)
    print("main end!", time.strftime("%X"))
```
输出为：
```
main start! 14:33:15
stask1 canceled!
func1 start! 14:33:15
func1 start! 14:33:15
func1 end! 14:33:16
func1
main end! 14:33:16
func1 end! 14:33:16
```
可见，func1执行了两次，这并不是想要的结果。

#### asyncio.run

`asyncio.run`基本上可以近似为以下的代码：
```python
import asyncio, sys, types

def run(coro):
    if sys.version_info >= (3, 7):
        return asyncio.run(coro)

    # Emulate asyncio.run() on older versions

    # asyncio.run() requires a coroutine, so require it here as well
    if not isinstance(coro, types.CoroutineType):
        raise TypeError("run() requires a coroutine object")

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        return loop.run_until_complete(coro)
    finally:
        loop.close()
        asyncio.set_event_loop(None)
```
因此，在使用它的时候，和直接通过loop调用有微妙的区别，来源于以下的代码，在屏幕上运行两个协程，一个会动态的显示`thinking!`，`thinking!`字符前面会有线条转动的动画，一个在2秒钟以后打印结果：
```python
import asyncio
import itertools
import sys


async def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle("|/-\\"):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            await asyncio.sleep(.1)  # 调用spin.cancel的话，异常会在这里抛出
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))


async def slow_function():
    await asyncio.sleep(2)
    return 42


async def supervisor():
    spinner = asyncio.create_task(spin('thinking!'))
    result = await slow_function()
    spinner.cancel()
    return result


def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print(result)


main() 
```
最终输出为42，符合预期，而如果稍微修改一下main函数，使用`asyncio.run`运行，如下：
```python
async def main():
    result = await supervisor()
    print(result)

asyncio.run(main())
```
此时，最终输出为`42thinking!`，问题出在哪里呢？因为两种方式`loop.close()`执行的时间点不同，第一种是先调用`loop.close`，再打印result，而第二种是先打印result，然后再调用`loop.close`。在`supervisor`中，在`spinner.cancel`处，并不会挂起，而是将spinner的状态设置为`cancel`，并返回`result`。当调用`loop.close`时，此时`spin`还在事件循环队列中，处于挂起状态，所以会继续执行`spin`。
1. 第一种情况，相当于先`loop.close`，结束`spin`协程以后，再`print(result)`，相当于会先执行`write(' ' * len(status) + '\x08' * len(status))`，将屏幕上的`thinking!`字符消除，最后打印结果。
2. 第二种情况，相当于先打印result，然后再`loop.close`，执行`write(' ' * len(status) + '\x08' * len(status))`，而由于`print(result)`导致了换行，所以最终结果为`42thinking!`，要解决的话也很简单，可以将`print(result)`改为`print(result, end='')`就行了。

### 低级API

#### Future

Future的概念并不是很好理解，它是一个低层级的API，做个类比，可以把它想象成一个放礼物的盒子，现在我们要送礼，但是事先并不知道送什么，我们可以先对这个盒子做一些装饰，然后在未来的某个时刻，再把礼物放进去，最终的目的是获取这个盒子里的礼物。在代码中，我们可以先创建一个`Future`，可以先为这个future添加回调函数（装饰盒子），再在后期设置future的结果（把礼物放进盒子），最终的目的是获取这个结果（得到礼物），比如：
```python
import asyncio


async def set_future_result(future, delay, result):
    await asyncio.sleep(delay)
    future.set_result(42)


def callback(future):
    print("future is finished! result is", future.result())


async def main():
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    future.add_done_callback(callback)
    loop.create_task(set_future_result(future, 2, 42))
    await future


asyncio.run(main())  # 输出为：future is finished! result is 42
```
注意，需要安排一个任务来设置future的结果，而不能在`add_done_callback()`回调函数里面设置，因为`future.set_result()`会同时将future的状态设置为`done`，而`add_done_callback`只有在future的状态为done时才会触发，因此如果在`add_done_callback()`设置future的结果，`await future`会导致程序挂死。

#### run_in_executor

`run_in_executor`可以新开一个线程或者一个进程来运行同步的函数，返回一个asyncio的future，这样就可以通过协程来并发的运行多个同步函数。比如有两个同步函数，一个是io阻塞型的，一个是计算密集型的，一般情况下，我们只能够同步的运行这两个函数，比如：
```python
def blocking_io():
    print(time.strftime("%X"), "start blocking io")
    time.sleep(3)
    return 42


def cpu_bound():
    print(time.strftime("%X"), "start cpu bound")
    return sum(i * i for i in range(10 ** 7))

blocking_io_result = blocking_io()
cpu_bound_result = cpu_bound()
print(blocking_io_result, cpu_bound_result)
```
但是，现在我们为了提高性能，想要同时运行这两个函数，传统的做法比较麻烦（可以开两个线程，其中一个线程对cpu_bound做封装，在内部使用进程来运行cpu_bound），如果利用asyncio，则十分简单：
```python
import asyncio
import concurrent.futures
import time


def blocking_io():
    print(time.strftime("%X"), "start blocking io")
    time.sleep(2)
    return 42


def cpu_bound():
    print(time.strftime("%X"), "start cpu bound")
    return sum(i * i for i in range(10 ** 7))


async def async_blocking_io():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print(time.strftime("%X"), "blocking io result is", result)


async def async_cpu_bound():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print(time.strftime("%X"), "cpu bound result is", result)


async def main():
    await asyncio.gather(async_cpu_bound(), async_blocking_io())


if __name__ == "__main__":
    asyncio.run(main())
```
输出结果如下：
```
16:53:25 start blocking io
16:53:25 start cpu bound
16:53:26 blocking io result is 333333283333335000000
16:53:27 cpu bound result is 42
```
这里有一点要非常注意，就是创建新的进程会复制整个代码，所以调用的语句`asyncio.run(main())`必须放在`if __name__ == "__main__":`中，否则会抛出错误，而且抛出的错误说明并不直观，具体原因见《python那些事儿》笔记的multiprocessing模块一节。

#### asyncio.Future vs concurrent.futures.Future

`asyncio.Future`虽然和`concurrent.futures.Future`虽然有十分类似的API，但是两者其实是完全不同的。但是`concurrent.futures.Future`可以通过`asyncio.wrap_future`转换为`asyncio.Future`，其实在`run_in_executor`内部就是这么做的，也可以手动进行转换：
```python
import asyncio
import random
from concurrent.futures import ThreadPoolExecutor
from time import sleep


def return_after_5_secs(message):
    sleep(5)
    return message


pool = ThreadPoolExecutor(3)


async def doit():
    identify = random.randint(1, 100)
    future = pool.submit(return_after_5_secs, (f"result: {identify}"))
    awaitable = asyncio.wrap_future(future)
    print(f"waiting result: {identify}")
    return await awaitable


async def app():
    # run some stuff multiple times
    tasks = [doit(), doit()]

    result = await asyncio.gather(*tasks)
    print(result)

print("waiting app")
asyncio.run(app())
```
输出为：
```
waiting app
waiting result: 76
waiting result: 87
['result: 76', 'result: 87']
```