## 12. Concurrency

IO 和并发， 两大问题。 本质上仍然是最大化利用计算机资源的问题。

如果想要使计算机“同时”做多件事情， 考虑 并发。 但实际上 CPU（单核）一次只能做一件事情（一个指令），所以做一会儿程序1，再做一会儿程序2。 因为CPU计算速度很快，会造成同时做事情的错觉。

并发的几个概念：

1. 进程
2. 线程


### 12.1 Starting and Stopping Threads



In [9]:
import time

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(2)
        
from threading import Thread
t = Thread(target=countdown, args=(10, ))
t.start()

if t.is_alive():
    print('Still running')
else:
    print('Completed')
    
t.join()

T-minus 10
Still running
T-minus 9
T-minus 8
T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


### 12.2 监控线程： 知道何时开始

In [11]:


from threading import Thread, Event


def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()   # 这里发出信号。。
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(2)
        
started_evt = Event()
    
print('Lauching coutndown')
t = Thread(target=countdown, args=(10, started_evt))
t.start()

print('wait')
time.sleep(2)
started_evt.wait()
print('is running')

t.join()

Lauching coutndown
countdown startingwait

T-minus 10
is running
T-minus 9
T-minus 8
T-minus 7
T-minus 6
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


### 12.3 线程间通信

主要靠队列。。。  生产者消耗者模型。

队列也可以是优先级队列

In [12]:
from queue import Queue
from threading import Thread
import time

_sentinel = object()

# A thread that produces data
def producer(out_q):
    n = 10
    while n > 0:
        # Produce some data
        out_q.put(n)
        time.sleep(2)
        n -= 1


    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Process the data
        print('Got:', data)
    print('Consumer shutting down')

if __name__ == '__main__':
    q = Queue()
    t1 = Thread(target=consumer, args=(q,))
    t2 = Thread(target=producer, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()


Got: 10
Got: 9
Got: 8
Got: 7
Got: 6
Got: 5
Got: 4
Got: 3
Got: 2
Got: 1
Consumer shutting down



### 12.4 线程锁

避免竞争（race condition）

下面这个例子非常简单直观。  多个线程共用一个计数器对象。 当修改值的时候，用锁保护一下。  不这样做，会造成计算错误

有一些对象不是线程安全的，意思就是指 多个线程共用这个对象（修改值），会产生错误。

错误产生的原因，也很好解释：

比如此时 `_value` 值是 100， 一个线程加1， 计算得到101；还没有赋值给 `_value` 的时候， 另一个线程减1， 计算得到99，并且赋值给 `_value`。再到第一个线程的时候，`_value`值为 101。 所以第二个线程所做的工作都白费了。  最后结果一定是错误的。

In [16]:
import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        self._value += delta
#         with self._value_lock:
#              self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        self._value -= delta
#         with self._value_lock:
#              self._value -= delta

def test(c):
    for n in range(1000000):
        c.incr()
    for n in range(1000000):
        c.decr()

if __name__ == '__main__':
    c = SharedCounter()
    t1 = threading.Thread(target=test, args=(c,))
    t2 = threading.Thread(target=test, args=(c,))
    t3 = threading.Thread(target=test, args=(c,))
    t1.start()
    t2.start()
    t3.start()
    print('Running test')
    t1.join()
    t2.join()
    t3.join()
    
    print(c._value)
    assert c._value == 0
    print('Looks good!')


Running test
-75097


AssertionError: 


### 12.5 避免死锁

如果一个程序同时获取多个锁，容易产生死锁。

能猜出大概的原因。 但是具体的例子不知道怎么弄。


一种解决死锁问题的方案是避免死锁： 在进程获取锁的时候会严格按照对象id升序排列获取，经过数学证明，这样保证程序不会进入 死锁状态。

避免死锁的主要思想是，单纯地按照对象id递增的顺序加锁不会产生循环依赖，而循环依赖是 死锁的一个必要条件，从而避免程序进入死锁状态。

In [31]:
import threading
from contextlib import contextmanager

# Thread-local state to stored information on locks already acquired
_local = threading.local()

@contextmanager
def acquire(*locks):
    # Sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))

    # Make sure lock order of previously acquired locks is not violated
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # Acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]

下面以一个关于线程死锁的经典问题：“哲学家就餐问题”，作为本节最后一个例子。题目是这样的：五位哲学家围坐在一张桌子前，每个人 面前有一碗饭和一只筷子。在这里每个哲学家可以看做是一个独立的线程，而每只筷子可以看做是一个锁。每个哲学家可以处在静坐、 思考、吃饭三种状态中的一个。需要注意的是，每个哲学家吃饭是需要两只筷子的，这样问题就来了：如果每个哲学家都拿起自己左边的筷子， 那么他们五个都只能拿着一只筷子坐在那儿，直到饿死。此时他们就进入了死锁状态。 下面是一个简单的使用死锁避免机制解决“哲学家就餐问题”的实现：

In [None]:
import threading

# The philosopher thread
def philosopher(left, right):
    while True:
        with acquire(left,right):
             print(threading.currentThread(), 'eating')

# The chopsticks (represented by locks)
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]

# Create all of the philosophers
for n in range(NSTICKS):
    t = threading.Thread(target=philosopher,
                  
                         args=(chopsticks[n],chopsticks[(n+1) % NSTICKS]))
    t.start()