并发的程序有潜在的危险. 因此, 本章的主要目标之一是给出更加可信赖和易调试的代码.

## 12.1 启动与停止线程

threading 库可以在单独的线程中执行任何的在 Python 中可以调用的对象。

In [5]:
# Code to execute in an independent thread
import time

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(0.5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()

n = 10
while n > 0:
    n -= 1
    if t.is_alive():
        print('Still running')
    else:
        print('Completed')
        n = 0
    time.sleep(1)

T-minus 10
Still running
T-minus 9
T-minusStill running 8

T-minus 7
Still running
T-minus 6
T-minus 5
Still running
T-minus 4
T-minus 3
Still running
T-minus 2
T-minus 1
Still running
Completed


In [None]:
当你创建好一个线程对象后,该对象并不会立即执行,除非你调用它的 start()
方法(当你调用 start() 方法时,它会调用你传递进来的函数,并把你传递进来的参
数传递给该函数)。

你也可以将一个线程加入到当前线程,并等待它终止:
`t.join()`

你可以查询一个线程对象的状态,看它是否还在
执行:
```python
if t.is_alive():
    print('Still running')
else:
    print('Completed')
```

后台线程无法等待,不过,这些线程会在主线程终止时自动销毁。

<span class="girk">由于`全局解释锁(GIL)`的原因,Python 的线程被`限制到同一时刻只允许一个线程执行`这样一个执行模型。</span>所以,Python 的线程更**适用于处理 I/O 和其他需要并发执行的阻塞操作**(比如等待 I/O、等待从数据库获取数据等等),而**不是需要多处理器并行的计算密集型任务**。


## 12.2 判断线程是否已经启动

*线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其
他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就
会变得非常棘手。*

要使用 `threading 库中的 Event 对象`。
> Event 对象包含一个**可由线程设置的信号标志**,它允许线程等待某些事件的发生。在初
始情况下,event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象,而
这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程
如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待这个 event 对象的线
程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继
续执行。

In [2]:
from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set() # ???
    while n>0:
        print('T-minus ', n)
        n -= 1
        time.sleep(1)

# Create the event object that will be used to signal startup
started_evt = Event()

# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10, started_evt))
t.start()

# Wait for the thread to start
started_evt.wait()
print('countdown is running')

Launching countdown
countdown starting
T-minus  10


True

countdown is running
T-minus  7
T-minus  9
T-minus  6
T-minus  8
T-minus  5
T-minus  7
T-minus  4
T-minus  6
T-minus  3
T-minus  5
T-minus  2
T-minus  4
T-minus  1
T-minus  3
T-minus  2
T-minus  1


> **event 对象最好单次使用**,就是说,你创建一个 event 对象,让某个线程等待这个
对象,一旦这个对象被设置为真,你就应该丢弃它。


> 尽管可以通过 clear() 方法来重
置 event 对象,但是很难确保安全地清理 event 对象并对它重新赋值。很可能会发生错
过事件、死锁或者其他问题(特别是,你无法保证重置 event 对象的代码会在线程再次
等待这个 event 对象之前执行)。

如果一个线程需要不停地重复使用 event 对象,你最
好使用 `Condition `对象来代替。

## 12.4 给关键部分加锁

Lock 对象和 with 语句块一起使用可以保证互斥执行

In [None]:
import threading
class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()
  
    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
            self._value += delta


比较笨的方法来获取和释放锁

In [10]:
def incr(self,delta=1):
    '''
    Increment the counter with locking
    '''
    self._value_lock.acquire()
    self._value += delta
    self._value_lock.release()

**为了避免出现死锁的情况,使用锁机
制的程序应该设定为每个线程一次只允许获取一个锁。**

在 `threading 库`中还提供了其他的
同步原语,比如 `RLock` 和 `Semaphore` 对象。**但是根据以往经验,这些原语是用于一些
特殊的情况,如果你只是需要简单地对可变对象进行锁定,那就不应该使用它们。**
> 一个
`RLock (可重入锁)`可以被同一个线程多次获取,主要用来实现基于监测对象模式的锁
定和同步。在使用这种锁的情况下,当锁被持有时,只有一个线程可以使用完整的函数
或者类中的方法。

使用`RLock (可重入锁)`的例子, 注意decr函数被执行时，锁被获取了两次

与一个标准的锁不同的是,已经持有这个锁的方法在调用同样使用这个锁的方法时,无需再次获取锁。

In [1]:
import threading

class SharedCounter:
    """
    a counter that can be shared by multiple threads
    """
    # reentrant lock objects as Class Member
    _lock = threading.RLock()
    def __init__(self,val):
        self._val=val
        pass

    def incr(self, delta=1):
        with SharedCounter._lock:
            self._val += delta

    def decr(self, delta=1):
        with SharedCounter._lock:
            self.incr(-delta)

`信号量对象`是一个**建立在共享计数器基础上**的同
步原语。如果计数器不为 0,with 语句将计数器减 1,线程被允许执行。with 语句执行
结束后,计数器加1。如果计数器为 0,线程将被阻塞,直到其他线程结束将计数器加
1。

**不推荐标准锁一样使用信号量来做线程同步**,因为使用信号量为程序增加的复杂性会**影响程序性能**。相对于简单地作为锁使
用,信号量更适用于那些需要在线程之间引入信号或者限制的程序。比如,你需要**限制
一段代码的并发访问量**

In [None]:
from threading import Semaphore
import urllib.request

# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)

def fetch_url(url):
    with _fetch_url_sema:
        return urllib.request.urlopen(url

## 12.5 防止死锁的加锁机制

一个多线程程序,其中线程需要一次获取多个锁,此时如何避免死锁问题.

死锁问题很大一部分是由于线程同时获取多个锁造成的

。解决死锁问题的一种方案是为程序
中的每一个锁分配一个唯一的 id,然后只允许按照升序规则来使用多个锁,这个规则
使用上下文管理器是非常容易实现的

In [None]:
# encoding=utf-8

import threading
from contextlib import contextmanager

# 线程本地变量，存储已获取得到的锁
_local =  threading.local()

@contextmanager
def acquire(*locks):
    # 根据锁id进行排序
    locks = sorted(locks, key=lambda lock: id(lock))

    # 确保锁的次序规则
    acquired = getattr(_local, 'acquired', [])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    #获取所有锁
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    # 进行上下文管理器的下半部分：__exit__部分
    finally:
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]


# 测试
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    while True:
        with acquire(x_lock, y_lock):
            print("Threading-1")

def thread_2():
    while True:
        with acquire(y_lock, x_lock):
            print("Threading-2")



# if '__main__' == __name__:
t1 = threading.Thread(target=thread_1)
# t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
# t2.daemon = True
t2.start()

# t1.join()
# t2.join()


# # 要么像这样
# t1 = threading.Thread(target=thread_1)
# t1.daemon = True
# t1.start()

# t2 = threading.Thread(target=thread_2)
# t2.daemon = True
# t2.start()

# t1.join()
# t2.join()

In [None]:
如果设置了threadx.daemon = True， 那么必须threadx.join()

属性为一个布尔值，表示是否为一个守护进程，且这个属性设置必须在线程的start方法开始之前调用。它的值继承自主线程，主线程的daemon为False且所有从主线程创建的线程都是daemon = False 。

死锁的检测与恢复是一个几乎没有优雅的解决方案的扩展话题。一个比较常用的
死锁检测与恢复的方案是引入看门狗计数器。当线程正常运行的时候会每隔一段时间
重置计数器,在没有发生死锁的情况下,一切都正常进行。一旦发生死锁,由于无法重
置计数器导致定时器超时,这时程序会通过重启自身恢复到正常状态。


避免死锁是另外一种解决死锁问题的方式,在进程获取锁的时候会严格按照对象
id 升序排列获取,

## 12.6 保存线程的状态信息

有时在多线程编程中,你需要只保存当前运行线程的状态。要这么做,可使用
`thread.local()` 创建一个`本地线程存储对象`。对这个对象的属性的保存和读取操作都
**只会对执行线程可见,而其他线程并不可见**。

其原理是,每个 `threading.local() 实例为每个线程维护着一个单独的实例字典。`
所有普通实例操作比如获取、修改和删除值仅仅操作这个字典。每个线程使用个独立
的字典就可以保证数据的隔离了。

## 12.7 创建一个线程池

`concurrent.futures` 函数库有一个 `ThreadPoolExecutor` 类可以被用来完成这个
任务。

In [5]:
from concurrent.futures import ThreadPoolExecutor
import urllib.request


def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data


pool = ThreadPoolExecutor(10)

# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

使用 `ThreadPoolExecutor` 相对于手动实现的一个好处在于**它使得任务提交者更方便的从被调用函数中获取返回值。**

全局解释器锁 GIL。通常,你**应该只在 I/O 处理相关代码中使用线程池**

## 12.8 简单的并行编程

个程序要执行 CPU 密集型工作,你想让他**利用多核 CPU 的优势**来运行的快
一点。

`concurrent.futures `库提供了一个 `ProcessPoolExecutor 类`,可被用来在一个单
独的 Python 解释器中执行`计算密集型函数`。

In [None]:
from concurrent import futures

all_robots = set()
with futures.ProcessPoolExecutor() as pool:
    for robots in pool.map(find_robots, files):
        all_robots.update(robots)

其原理是,**一个 ProcessPoolExecutor 创建 N 个独立的 Python 解释器,N 是系统上面可用 CPU 的个数。**

除使用列表或推导方式使用pool.map方式调用外，也可以使用 `pool.submit()` 来**手动的提交单个任务**

In [None]:
# Some function
def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)
    # Obtaining the result (blocks until done)
    r = future_result.result()

如果你手动提交一个任务,结果是一个 Future 实例。**要获取最终结果,你需要调用它的 result() 方法。它会阻塞进程直到结果被返回来。**

如果不想阻塞,你还可以使用一个回调函数。

回调函数接受一个 Future 实例,被用来获取最终的结果(比如通过调用它的result() 方法)。

In [None]:
def when_done(r):
    print('Got:', r.result())
    with ProcessPoolExecutor() as pool:
    future_result = pool.submit(work, arg)
    future_result.add_done_callback(when_done)

### 尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意:
地方,如下几点:
- 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
- 被提交的任务必须是简单函数形式。**对于方法、闭包和其他类型的并行执行还不支持。**
- 函数**参数和返回值必须兼容 pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化**
- **被提交的任务函数不应保留状态或有副作用**。除了打印日志之类简单的事情,一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。
- 在 Unix 上进程池通过调用 fork() 系统调用被创建,
> 它会克隆 Python 解释器,包括 fork 时的所有程序状态。而在 Windows 上,克隆解释器时不会克隆状态。实际的 fork 操作会在第一次调用 pool.map() 或 pool.submit()
后发生。
- 当你混合使用进程池和多线程的时候要特别小心。
> 你**应该在创建任何线程之前先创建并激活进程池**(比如在程序启动的 main 线程中创建进程池)。

## 12.9 Python 的全局锁问题

**GIL 最大的问题就是 Python 的多线程程序并不能利用多核 CPU 的优势**(比如一个使用了多个线程的计算密集型程序只会在一个单 CPU 上面运行)

如果你的程序大部分只会涉及到 I/O,比如网络交互,那么使用多线程就很合适,因为它们大部分时间都在等待。

### 两种策略来解决 GIL 的缺点:

#### * 使用 multiprocessing 模块来创建一个进程池,并像协同处理器一样的使用它。

#### * 另外一个解决 GIL 的策略是使用 C 扩展编程技术。主要思想是将计算密集型任务转移给 C,跟 Python 独立,在工作的时候在 C 代码中释放 GIL。

In [None]:
#include...
"Python.h"
PyObject *pyfunc(PyObject *self, PyObject *args) {
    ...
    Py_BEGIN_ALLOW_THREADS
    // Threaded C code
    ...
    Py_END_ALLOW_THREADS
    ...
}

如果你使用其他工具访问 C 语言,比如对于 Cython 的 ctypes 库,你不需要做任何事。例如,ctypes 在调用 C 时会自动释放 GIL。

**C 扩展最重要的特征是它们和 Python 解释器是保持独立的。**也就是说,如果你准
备将 Python 中的**任务分配到 C 中去执行,你需要确保 C 代码的操作跟 Python 保持
独立,这就意味着不要使用 Python 数据结构以及不要调用 Python 的 C API**。另外一
个就是你要确保 C 扩展所做的工作是足够的,值得你这样做。也就是说 C 扩展担负起
了大量的计算任务,而不是少数几个计算。