Process: PID，耗CPU操作，使用多进程。  
Thread: 进程可以包含很多线程，耗IO操作，用多线程。

操作系统开几十个线程thread很容易，开几十个进程process很难。

进程Process切换代价 > 线程Thread切换代价

In [1]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

In [5]:
from concurrent.futures import as_completed, wait
import time

In [4]:
# CPU 操作
def fib(n):
    if n<=2:
        return 1
    else:
        return fib(n-1)+fib(n-2)

In [25]:
with ThreadPoolExecutor(2) as executor:
    st = time.time()
    all_tasks = [executor.submit(fib, num) for num in range(30, 35)]
    for future in as_completed(all_tasks):
        print(future.result())
     
    print('===============================')
    print('thread time:{} s'.format(str(time.time()-st)))
          

832040
1346269
2178309
3524578
5702887
thread time:2.720202922821045 s


In [24]:
with ProcessPoolExecutor(2) as executor:
    st = time.time()
    all_tasks = [executor.submit(fib, num) for num in range(30, 35)]
    for future in as_completed(all_tasks):
        print(future.result())
     
    print('===============================')
    print('process time:{} s'.format(str(time.time()-st)))
          

832040
1346269
2178309
3524578
5702887
process time:1.7755279541015625 s


但运行时process切换时间长于thread切换时间。

IO操作时多线程thread能减少切换时间。

CPU操作时多进程process能利用更多的核心计算。

windows 下 ThreadPoolExecutor, ProcessPoolExecutor 要在中：  
`if '__name__' == '__main__':`  
Linux Unix mac 不用。


### 有趣的例子

In [36]:
import os
import time

pid = os.fork()  # fork 出子进程，仅unix。
print('hi')

if pid == 0:  # 0 当前为子进程
    print('in child process:{}  parent process:{}'.format(os.getpid(), os.getppid()))
    os._exit(0)  # 子进程退出返回到主进程。
else:  # >0 当前为父进程，<0 error.
    print('in parent process:{}  child process:{}'.format(os.getpid(), pid))
    
time.sleep(1)
    

hi
in child process:67399  parent process:66645
hi
in parent process:66645  child process:67399


子进程拷贝父进程内容，并继续往下执行，运行fork之后的代码。（打印两次hi）  
数据子进程与父进程完全隔离。  
子进程：返回pid为0  
父进程：返回pid当前进程

In [37]:
import multiprocessing 

In [44]:
def print_fib(n):
    num = fib(n)
    print('run fib {}:{}'.format(n, num))
    return num
program = multiprocessing.Process(target=print_fib, args=(10,))
program.start()
program.join()
print(program.pid)  # pid属性 program identifier

run fib 10:55
67610


In [45]:
class MyProcess(multiprocessing.Process):
    def run(self):
        print_fib(10)
        

In [46]:
pool = multiprocessing.Pool(multiprocessing.cpu_count())
res = pool.apply_async(print_fib, args=(10,))

pool.close()
pool.join()

print(res.get())

run fib 10:55
55


In [48]:
def do(n):
    time.sleep(n)
    print(n, ' done!')
    return n

pool = multiprocessing.Pool(multiprocessing.cpu_count())
for res in pool.imap(do, (0.3, 0.5, 0.1)):
    print(res)

0.1  done!
0.3  done!
0.5  done!
0.3
0.5
0.1


In [49]:
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for res in pool.imap_unordered(do, (0.3, 0.5, 0.1)):
    print(res)

0.1  done!
0.3  done!
0.1
0.3
0.5  done!
0.5


### 进程间通信

共享全局变量不能通信。  
queue.Queue  不能实现进程间通信。  
multiprocessing.Queue  能实现进程间通信，但Pool中无法使用。  
multiprocessing.Pipe  只能实现两个进程间通信，比multiprocessing.Queue性能要高。   
multiprocessing.Manager  多种数据类型、多个进程间通信。  

In [50]:
from multiprocessing import Queue, Pipe, Manager, Pool, Process

##### multiprocessing.Queue

In [54]:
def producer(q):
    q.put('a')
    print('producer put: a')
    
def consumer(q):
    time.sleep(0.2)
    a = q.get()
    print('consumer get:', a)

q = Queue(3)
p0 = Process(target=producer, args=(q,))
p1 = Process(target=consumer, args=(q,))

p0.start()
p1.start()
p0.join()
p1.join()

producer put: a
consumer get: a


##### multiprocessing.Manager.Queue

In [55]:
def producer(q):
    q.put('a')
    print('producer put: a')
    
def consumer(q):
    time.sleep(0.2)
    a = q.get()
    print('consumer get:', a)
    
q = Manager().Queue(3)  # 
p0 = Process(target=producer, args=(q,))
p1 = Process(target=consumer, args=(q,))

p0.start()
p1.start()
p0.join()
p1.join()

producer put: a
consumer get: a


##### multiprocessing.Pipe()

In [57]:
def producer(pipe):
    time.sleep(0.5)
    pipe.send('a')
    print('producer send: a')
    
def consumer(pipe):
    print('consumer get:', pipe.recv())  # 有阻塞
    
recv_pipe, send_pipe = Pipe()  # 
p0 = Process(target=producer, args=(send_pipe,))
p1 = Process(target=consumer, args=(recv_pipe,))

p0.start()
p1.start()
p0.join()
p1.join()

consumer get: a
producer send: a


##### multiprocessing.Manager.dict

In [60]:
def puta(d, k, v):
    d[k] = v
    
def putb(q, k, v):
    d[k] = v
    
d = Manager().dict()  # 
p0 = Process(target=puta, args=(d,'aa', 12))
p1 = Process(target=putb, args=(d, 'bb', 13))

p0.start()
p1.start()
p0.join()
p1.join()
print(d)

{'aa': 12, 'bb': 13}
