# Python并行

*杜远超*

本文主要包含以下部分：
1. 如何使用subprocess进行子进程管理
2. 使用线程进行阻塞I/O，避免数据冲突
3. 使用lock避免线程中的数据竞争
4. 使用Queue来进行线程之间的合作
5. 使用concurrent.futures进行真正的并行

In [9]:
# 使用命令行进行并行

import subprocess

proc = subprocess.Popen(
    ['echo', 'Hello from the child!'],
    stdout=subprocess.PIPE)

out, err = proc.communicate()
print(out.decode('utf-8'))




Hello from the child!



## 介绍subprocess的基本内容

详情参见 Python doc Subprocess -- [Subprocess managemetn](https://docs.python.org/3.6/library/subprocess.html)

subprocess用于管理新的进程，与进程的input/output/error pipes通信，并且管理返回的代码。

subprocess用于替换旧modules
> os.system

> os.spawn

推荐通过`run()`来对subprocess进行基础调用，使用`Popen`对其进行高级调用
*注：Python 3.5加入了 run()函数*

### run()
```
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, 
cwd=None, timeout=None, check=False, encoding=None, errors=None, env=None)
```

运行args中定义的命令行，等命令行完成后，返回一个CompletedProcess实例

### Popen
Popen命令提供更大的灵活性。

```
subprocess.popen(args, bugsize=-1, excutable=None, stdin=None, stdout=None, 
stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, 
universal_newlines=False, startupinfo=None, cerationflags=0, restore_signals=True, 
start_new_session=False, pass_fds=(), *, encoding=None, errors=None)
```

#### Popen Objects

- Popen.poll()查看子进程是否结束，设置并返回`returncode`，否则，返回`None`
- Popen.wait(timeout=None) 等待子程序返回，设置并返回`returncode`。（当timeout结束时，子程序没有结束，raise a TimeoutExpired exception）
- Popen.communicate(input=None, timeout=None)：与子程序交互，发送数据到stdin，从stdout和stderr读取数据直到EOF。communicate()返回一个元组(stdout_data, stderr_data)
- Popen.send_signal(signal) 向子程序发送数据
- Popen.terminate()停止子程序
- Popen.kill()杀死子程序
- Popen.args
- Popen.stdin
- Popen.stdout
- Popen.stderr
- Popen.pid
- Popen.returncode





In [10]:
# 示例：如何使用subprocess开启多个子程序
# 注：此种情况所有subprocess是并行进行的

import subprocess
import time

# 模拟一个需要一段时间运行的函数
def run_sleep(period):
    proc = subprocess.Popen(['sleep', str(period)])
    return proc

procs = []
results = []
tic = time.time()

for _ in range(10):
    proc = run_sleep(0.5)
    procs.append(proc)

for proc in procs:
#     out = proc.communicate()
    out = proc.wait()
    results.append(out)

toc = time.time()
time_consume = toc - tic
print(results)
print('Done', time_consume)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Done 0.5282430648803711


In [1]:
# 并行示例，mac电脑一共8核

import subprocess
import os
import time



def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'asdf'
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()
    return proc
    
procs = []
tic = time.time()
for _ in range(126):
    data = os.urandom(100)
    proc = run_openssl(data)
    procs.append(proc)

# results = []
# for _ in range(126):
#     rand_data = os.urandom(100)
#     result = run_openssl(rand_data)
#     results.append(result)

    
for proc in procs:
    out, _ = proc.communicate()
    print(out)
toc = time.time()
print('Time suming {} s'.format(toc-tic))


b'Salted__I\xb0\xf6%\xf1\xa8\x8b7\x19\xf8\xdcYlq\xb5\xf9\x8a)\x01g\xb8\xfd\x1ahU\xcdy\xf4\n|d\x85\x11\xba\xc8\xdd\x1a\x8e\xef\xf8\x83\x7f\xac\xaex\xbav\xb6v\xe9\xcb>|n)\x8e\xaa\xf2\xc25\xcc<\x83\xd7J\xd0\xf6\xdc-aW;\xe8H\xdc.\xa7\x01\x9a\xce\xca\xcf\xb3t\x03\xd4\xcb\x11\x80\xac"\x9b\xa5\x7f\x84+\xc4\xb4H\xea%\xc8$\xfd\x92\x8eoU\x9e\xf8\xf7d'
b'Salted__\x02\xf6^\x9dM\xd3\r\xa0h\xc0\x9a9\x91dRz\x9f\xbc\x8aq\xdb`\x1e#\xa0\xae\xd0\xdd\x9fa\xc5\x07L+\xa4r\x9fR\xeav\x1c\xa4cz\t\xcb\x90\xbay2b\x83j\xb4\xd8AF%\xaa\x17k\xed9k:\x80=c\xce\xb9\xef#\x1a\xcca\x90\x13N\ncCX\xae\xb5\\\xbd\x84[GZ\x82T\x1a\xb4P\\\xb6\x9b \xca\xf9\xcchEY\x9c\x8f\xc6\x8e\xc0\xcd\xde'
b'Salted__*\xd0\xd8U\x1e\xa9\xe0\xa18$\x0f\xc2y\xff\xa0jl:a\xb06\x87\xaa\nLl\x89F\xfa3\x8a\xc3MO+em(\xd8\\\xe1Q:@F\xd6\xff\xe1a=,\xca\x9bu3\xd3\xfe\xa2@\xad;\x88h\xf3\x88\x1e\xa2\xbb\xb2\xaaX\x8f}\xce\x9d\x8e\x14\xad\xb9\xfc\x86\x81\x99\x03\x02Tb[\xb0t\xa7\x99\xe2H6i\xc4\xcb\x80F\x163\x16:\x9d*e\xd1\x1bd\xf44'
b'Salted__\x0c\xab\x18X\x8bw\x18

In [11]:
# subprocess基础用法示例

import subprocess
import time

# run() 示例
subprocess.run(["ls", "-l"])


# Popen 示例
proc = subprocess.Popen(['sleep', '0.3'])

while proc.poll() is None:
    print('Working...')
    # Time consuming work goes here
    time.sleep(0.2)
    
print('Exit status', proc.poll())
    
    
# 等待延迟示例
proc_timeout = subprocess.Popen(['sleep','10'])
try:
    proc_timeout.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc_timeout.terminate()
    proc_timeout.wait()

print('Exit when timeout', proc_timeout.poll())


Working...
Working...
Exit status 0
Exit when timeout -15


In [51]:
# 串行subprocess示例

import subprocess
import os
import time



def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'asdf'
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()
    return proc

def run_md5(input_stdin):
    proc = subprocess.Popen(
        ['md5'],
        stdin=input_stdin,
        stdout=subprocess.PIPE)
    return proc

tic = time.time()

input_proces = []
for _ in range(5):
    data = os.urandom(100)
    proc = run_openssl(data)
    input_proces.append(proc)

hash_procs = []

for proc in input_proces:
    hash_proc = run_md5(proc.stdout)
    hash_procs.append(hash_proc)
    
    
for proc in input_proces:
    out, _ = proc.communicate()
    print('a')


for proc in hash_procs:
    out, _ = proc.communicate()
    print(out)
    
    
    
toc = time.time()
print('Time suming {} s'.format(toc-tic))

a
a
a
a
a
b'151c4ea2d83eef2d79c9486262703853\n'
b'1728326f3552b41d1b89a219cc575887\n'
b'2148a0dfa7324bb187d2d01b044f2881\n'
b'cd3f198ab4e743d61717978bb9901ea8\n'
b'9054cb92edf2ae09f8e48365f882a928\n'
Time suming 0.04123497009277344 s


## Use threads for blocking I/O, avoid for parallelism


## python threading --Thread-based parallelism

threading是对`_thread`模块的高级封装，

`threading`模块有以下函数：

- threading.active_count()：返回当前存在的Thread类的数量，返回的结果和`enumerate()`返回的list长度相同
- threading.current_thread()：返回当前的Thread类。
- threading.get_ident()：返回当前thread的‘thread identifier’。
- threading.enumerate()：返回当前存活的所有Thread类。
- threading.main_thread()：返回main Thread类，通常来说，main thread是创建Python解释器的thread
- threading.settrace(*func*)：为从threading中启动的所有thread建立一个trace函数。
- threading.setprofile(*func*)为从threading中启动的所有thread建立一个profile函数。
- threading.stack_size([*size*]) 当创建新thread时，返回thread stack的大小
- threading.TIMEOUT_MAX： 常数，阻塞函数blocking functions(Lock.acquire(), RLock.acquire(), Condition.wait())的最大允许等待值

### Thread-Local Data

Thread-local data是针对特定thread的值。
```
mydata = thraeding.local()
mydata.x = 1
```

### Thread Objects

```
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None)
```

- start() : 将thread激活
- run() ： 代表thread的激活状态的方法
- join(timeout=None)：等待thread终止。This blocks the calling thread until the thread whose join() method is called terminates - either normally or through an unhandled exception - or until the optional timeout occures.
- name: 字符串，用于识别
- getName()
- setName()
- ident：‘thread identifier’ of this thread
- is_alive(): 返回thread是否处于激活态
- deamon : 返回一个布尔值，代表thread是否后台运行
- isDaemon()
- setDeamon()

### Lock Objects

原始的Lock主要有两个状态“locked”以及“unlocked”。最初创建时候为unlocked状态.有两个基础的方法，`acquire()`与`release()`。
- 当状态为'unlockd'时候，`acquire()`将状态改为'locked'并且立即返回；
- 当状态为'locked'时候，`acquire()`将其锁定，直到再另一线程中调用`release()`将其改为'unlocked'，而后`acquire()`将其再改为locked并且返回
> When the state is locked, acquire() blocks until a call to release() in another thread changes it to unlocked, then the acquire() call resets it to locked and returns. 
- `release()`方法只能在'locked'状态下被调用，它将状态改为unlocked并且立即返回。
- 如果尝试release一个unlocked的Lock，会报RuntimeError错误。


In [13]:
# 示例：无并行的计算，以及通过thread的并行计算，并行计算并没有提高速度
# python thread 的作用：
# 1. 函数能同时运行/看起来同时运行，希望在一个脚本中，同时跑几个工作，GIL确保几个thread的独立，以及公平分配CPU
# 2. I/O， 


import time
import threading

# a time comsuming function
def factorize(number):
    for i in range(1, number+1):
        if number % i == 0:
            yield i

start = time.time()
numbers = [12342,1235456,7832487,33244, 132412, 2341, 213412, 124]

# # 无并行的方法
# for number in numbers:
#     list(factorize(number))
#     print(number)
            
# pause1 = time.time()
# print('Time consuming{}'.format(pause1-start))

# 利用thread进行并行
class FactorizeThread(threading.Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
    
    def run(self):
        self.factors = list(factorize(self.number))

threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()
    print('Factors: {}'.format(thread.factors))    


end=time.time()
print('Time consuming{}'.format(end-start))
        

Factors: [1, 2, 3, 6, 11, 17, 22, 33, 34, 51, 66, 102, 121, 187, 242, 363, 374, 561, 726, 1122, 2057, 4114, 6171, 12342]
Factors: [1, 2, 4, 8, 16, 19, 32, 38, 64, 76, 127, 128, 152, 254, 256, 304, 508, 512, 608, 1016, 1216, 2032, 2413, 2432, 4064, 4826, 4864, 8128, 9652, 9728, 16256, 19304, 32512, 38608, 65024, 77216, 154432, 308864, 617728, 1235456]
Factors: [1, 3, 13, 39, 229, 687, 877, 2631, 2977, 8931, 11401, 34203, 200833, 602499, 2610829, 7832487]
Factors: [1, 2, 4, 8311, 16622, 33244]
Factors: [1, 2, 4, 7, 14, 28, 4729, 9458, 18916, 33103, 66206, 132412]
Factors: [1, 2341]
Factors: [1, 2, 4, 53353, 106706, 213412]
Factors: [1, 2, 4, 31, 62, 124]
Time consuming0.7085530757904053


In [15]:
# 示例：通过thread进行I/O
# 并行有效的示例

import select
import threading
import time

# 模拟系统I/O，模拟操作直升飞机螺旋桨
# 可以在主程序外进行比较慢的需要I/O的计算
def slow_systemcall():
    select.select([], [], [], 0.1)

start = time.time()

threads = []

# 利用并行进行的操作
for _ in range(5):
    thread = threading.Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)
    
# # 对比，无并行的版本
# for _ in range(5):
#     slow_systemcall()
    
# def compute_helicopter_location(index):
#     # 模拟计算直升飞机位置
#     # 主程序
#     pass
   
# for i in range(5):
#     compute_helicopter_location(i)

for thread in threads:
    thread.join()
    
    
end = time.time()
print('Time consuming{:.3f}'.format(end-start))


Time consuming0.101


## 使用Lock避免threads中的数据竞争


GIL：global interpreter lock


In [16]:
# 模拟多IO进行并发
# 模拟数据竞争

import threading

class Counter(object):
    def __init__(self):
        self.count = 0
        
    def increment(self, offset):
        self.count += offset
        
# Thread simulation
# Thread A running 
# value_a = getattr(self, 'count')

# Thread B running 
# value_b = getattr(self, 'count')
# result_b = value_b + offset_b 
# setattt = (self, 'count', result_b)

# result_a = value_a + offset_a 
# setattt = (self, 'count', result_a)


        
        
worker_count = 5    
barrier = threading.Barrier(worker_count) # 限制并发的数目
        
def worker(sensor_index, how_many, counter):
    barrier.wait()
    for _ in range(how_many):
        # Do the sensor readings
        counter.increment(1)

threads = []
how_many = 100000  # 当添加了barrier之后，how_many 大到一定程度，最后的counter就会有bug
counter = Counter()

for i in range(worker_count):
    args = (i, how_many, counter)
    thread = threading.Thread(target=worker, args=args)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()


    
print(counter.count)    
    
    
    

330251


In [19]:
# 模拟多IO进行并发
# 使用lock进行避免数据竞争

import threading

class Counter(object):
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
        
    def increment(self, offset):
        with self.lock:
            self.count += offset
        
# Thread simulation
# Thread A running 
# value_a = getattr(self, 'count')



# Thread B running 
# value_b = getattr(self, 'count')
# result_b = value_b + offset_b 
# setattt = (self, 'count', result_b)


# result_a = value_a + offset_a 
# setattt = (self, 'count', result_a)


        
        
worker_count = 5    
barrier = threading.Barrier(worker_count) # 限制并发的数目
        
def worker(sensor_index, how_many, counter):
    barrier.wait()
    for _ in range(how_many):
        # Do the sensor readings
        counter.increment(1)

threads = []
how_many = 100000  # 当添加了barrier之后，how_many 大到一定程度，最后的counter就会有bug
counter = Counter()

for i in range(worker_count):
    args = (i, how_many, counter)
    thread = threading.Thread(target=worker, args=args)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()


    
print(counter.count)    
    
    
    

500000


# 利用Queue进行线程之间的协同

In [33]:
from threading import Thread
from queue import Queue

queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get()
    print('Consumer done')
    
thread = Thread(target=consumer)
thread.start()

print('Producer putting')
queue.put(object())
thread.join()
print('Producer done')
    
    


Consumer waiting
Producer putting
Consumer done
Producer done


In [36]:
# 为queue添加了buffersize

from threading import Thread
from queue import Queue
import time

queue = Queue(1)

def consumer():
    time.sleep(0.5)
    queue.get()
    print('Consumer got 1')
    queue.get()
    print('Consumer got 2')
    
    
thread = Thread(target=consumer)
thread.start()

print('Producer putting')
queue.put(object())
print('Producer put 1')
queue.put(object())
print('Producer put 2')
thread.join()
print('Producer done')

Producer putting
Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done


In [41]:
from queue import Queue
from threading import Thread

in_queue = Queue()

def consumer():
    print('Consumer Waiting')
    work = in_queue.get()
    print('Consumer Working')
    # Do some actural work
    print('Consumer done')
    in_queue.task_done()
    
    
thread = Thread(target=consumer)
thread.start()

in_queue.put(object())
print('Producer is waiting')
thread.join()
in_queue.join()
    
    
    
    
    

Consumer Waiting
Producer is waitingConsumer Working
Consumer done



In [None]:
from queue import Queue
from threading import Thread

class ClosableQueue(Queue):
    
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return 
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(item)
            
# 模拟一个流程，下载图片，resize图片，上传图片
def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

print('start')

download_queue = ClosableQueue(10)
resize_queue = ClosableQueue(10)
upload_queue = ClosableQueue(10)
done_queue = ClosableQueue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    download_queue.put(object())
    
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()

print(done_queue.qsize(), 'items finished')

In [None]:
print('start')

# 使用concurrent.futures进行真并行

In [20]:
# 如果使用Thread无法进行真正并行
import time
from concurrent.futures import ThreadPoolExecutor

def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i ==0 and b % i ==0:
            return i
    return 1


numbers = [(543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490), (543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490), (543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490)]
start = time.time()

# pool = ThreadPoolExecutor(max_workers=8)
# results = list(pool.map(gcd, numbers))
results = list(map(gcd, numbers))


end = time.time()
print(results)
print('Time was {}'.format(end-start))

[1, 4, 1, 10, 1, 4, 1, 10, 1, 4, 1, 10]
Time was 5.523375034332275


In [22]:

# 如果使用Thread无法进行真正并行
import time
from concurrent.futures import ThreadPoolExecutor

def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i ==0 and b % i ==0:
            return i
    return 1


numbers = [(543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490), (543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490), (543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490)]
start = time.time()

pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))



end = time.time()
print(results)
print('Time was {}'.format(end-start))

[1, 4, 1, 10, 1, 4, 1, 10, 1, 4, 1, 10]
Time was 5.627691984176636


In [23]:
# 如果使用Thread无法进行真正并行
import time
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i ==0 and b % i ==0:
            return i
    return 1


numbers = [(543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490), (543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490), (543456, 43523465), (324435212, 23342412,), (1232412324,890789), (7134890,873490)]
start = time.time()

# pool = ProcessPoolExecutor(max_workers=2)
# results = list(pool.map(gcd, numbers))
pool = multiprocessing.Pool()
results = list(pool.map(gcd, numbers))

end = time.time()
print(results)
print('Time was {}'.format(end-start))


[1, 4, 1, 10, 1, 4, 1, 10, 1, 4, 1, 10]
Time was 1.9306187629699707
