### 全局解释器锁(GIL)
python代码的执行由python虚拟机来控制。python在设计之初就考虑到要在主循环中，同时只有一个线程在执行，就像单cpu的系统中运行多个进程那样，内存中可以存放多个程序，但任意时刻只有一个程序在cpu中运行。同样，虽然python解释器可以运行多个线程，但在任意时刻只有一个线程在解释器中运行。
对python虚拟机的访问由全局解释器锁 GIL来控制。
在多线程环境中 python虚拟机按以下方式执行。
1. 设置GIL
2. 切换到一个线程去运行
3. 运行：
    a. 指定数量的字节码的指令 或者
    b. 线程主动让出控制
4. 把线程设置为睡眠状态
5. 解锁GIL
6. 再重复上面的所有步骤

在所有面向I/O的程序来说，GIL会在这个I/O调用之前被释放，以运行其他的线程在这个线程等待I/O的时候运行 <br>
标准库中的 time.sleep 也会让出GIL

In [None]:
import threading

| threading模块对象 | 描述  |
|:--------------: | :----:|
|Thread           | 表示一个线程的执行的对象|
|Lock             | 锁原语对象|
|RLock            | 可重入锁对象 使单线程可以再次获得已经获得了的锁 递归锁定|
|Condition        | 条件变量对象能让一个线程停下来，等待其他线程满足某个条件 如 状态的改变或值的改变|
|Event            | 通用的条件变量 多个线程可以等待某个事件的发生 在事件发生之后 所有的线程都被激活|
|Semaphore(信号量) | 为等待锁的线程提供一个类似等候室的结构  |
|BoundedSemaphore | 与Semaphore类似 只是他不允许超过初始值 |
|Timer            | 与Thread相似 只是他要等待一段时间后才开始运行 |

#### 实例化每个Thread对象 我们把函数和参数传进去

In [None]:
from time import sleep, ctime

In [None]:
loops = [4, 2]

In [None]:
def loop(nloop, nsec):
    print('start loop{} at: {}'.format(nloop, ctime()))
    sleep(nsec)
    print('loop{} done at: {}'.format(nloop, ctime()))

In [None]:
def mtsleep1():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))
    
    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)
    
    for i in nloops:        # start threads
        threads[i].start()
    
    for i in nloops:        # wait for all
        threads[i].join()   # threads to finish
    
    print('all DONE at: {}'.format(ctime()))

In [None]:
mtsleep1()

join()会等到所有线程结束 join的另一个重要的方面是可以完全不用调用 
** 一旦线程启动 就会一直运行 之一等到线程的函数结束 退出为止
如果你的主线程出来等待线程结束外还有其他的事情要做 如处理或者等待其他的客户请求 那就不要调用join 只有你要等待线程结束的时候才要调用join **

#### join可以理解为 子线程加入到主线程 主线程被阻塞

1. join方法的作用是阻塞主进程（挡住，无法执行join以后的语句），专注执行多线程。

2. 多线程多join的情况下，依次执行各线程的join方法，前头一个结束了才能执行后面一个。

3. 无参数，则等待到该线程结束，才开始执行下一个线程的join。

4. 设置参数后，则等待该线程这么长时间就不管它了（而该线程并没有结束）。不管的意思就是可以执行后面的主进程了

#### 创建一个Thread实例 传给他一个可调用的类对象

In [None]:
class ThreadFunc:
    
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args
    
    def __call__(self):
        self.func(*self.args) ## python3 取消 apply() 使用 *， ** 展开参数

In [None]:
def mtslepp2():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))
    
    for i in nloops:
        t = threading.Thread(target=ThreadFunc(
            loop, (i, loops[i]), 
            loop.__name__))
        threads.append(t)
    
    for i in nloops:        # start threads
        threads[i].start()
    
    for i in nloops:        # wait for completion
        threads[i].join()   
    
    print('all DONE at: {}'.format(ctime()))

In [None]:
mtslepp2()

传入一个可调用的类的实例供线程启动的时候执行

#### 子类化一个Thread类 使其更加通用

In [None]:
class MyThread(threading.Thread):
    
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self) # 子类构造器一定要先调用父类的构造器
        self.func = func
        self.args = args
        self.name = name
    
    def getResult(self):
        return self.res
    
    # 线程运行时 自动执行Thread的run方法 下面我们重写父类的run方法
    def run(self):     
        print('starting {} at: {}'.format(self.name, ctime()))
        self.res = self.func(*self.args)
        print('{} finished at: {}'.format(self.name, ctime()))

In [None]:
def mtslepp3():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))
    
    for i in nloops:
        t = MyThread(loop,(i, loops[i]), loop.__name__)
        threads.append(t)
    
    for i in nloops:        # start threads
        threads[i].start()
    
    for i in nloops:        # wait for completion
        threads[i].join()   
    
    print('all DONE at: {}'.format(ctime()))

In [None]:
mtslepp3()

In [None]:
def mtslepp4():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))
    
    for i in nloops:
        t = MyThread(loop,(i, loops[i]), loop.__name__)
        threads.append(t)
    
    for i in nloops:        # start threads start之后就join 线程变成有序 启动时间也不是并行
        threads[i].start()
        threads[i].join()
        
    print('all DONE at: {}'.format(ctime()))

In [None]:
mtslepp4()

#### 斐波那契 阶乘和累加和

In [None]:
def fib(x):
    sleep(0.005)
    if x < 2: return 1
    return (fib(x-2) + fib(x-1))

def fac(x):
    sleep(0.1)
    if x < 2: return 1
    return (x * fac(x-1))

def sum(x):
    sleep(0.1)
    if x < 2: return 1
    return (x + sum(x-1))

funcs = [fib, fac, sum]

n = 12

In [None]:
def mix_single_thread():
    nfuncs = range(len(funcs))
    
    for i in nfuncs:
        print('starting {} at:{}'.format(funcs[i].__name__, ctime()))
        print(funcs[i](n))
        print('{} finished at:{}'.format(funcs[i].__name__, ctime()))

In [None]:
def mix_multiple_threads():
    threads = []
    nfuncs = range(len(funcs))
    
    for i in nfuncs:
        t = MyThread(funcs[i], (n,), funcs[i].__name__)
        threads.append(t)
    
    for i in nfuncs:
        threads[i].start()
        
    for i in nfuncs:
        threads[i].join()
        print(threads[i].getResult())
    
    print('all DONE')

In [None]:
mix_single_thread()

In [None]:
mix_multiple_threads()

#### 生产者-消费者问题和Queue模块

Queue模块可以用来进行线程间通讯，让各个线程之间共享数据，现在我们要创建一个队列，让生产者把新生产的货物放进去供消费者使用。queue内部实现了相关的锁

In [None]:
from random import randint
from queue import Queue # python3 中 Queue 改名为 queue

In [None]:
def writeQ(queue):
    print('producing object for Q...')
    queue.put('xxx', 1)  # put(item, block=0) Queue对象函数 把item放到队列中 如果block不为0，函数会一直阻塞到队列中有空间为止
    print('size now {}'.format(queue.qsize())) # 返回队列大小 

def readQ(queue):
    val = queue.get(1)   # get(block=0) Queue对象函数 从队列取出对象 如果block不为0，函数会一直阻塞到队列中有对象为止
    print('consumed object from Q...')
    print('size now {}'.format(queue.qsize()))

def writer(queue, loops):
    for i in range(loops):
        writeQ(queue)
        sleep(randint(1, 3))

def reader(queue, loops):
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))

funcs = [writer, reader]
nfuncs = range(len(funcs))

In [None]:
def pcQuestion():
    nloops = randint(2, 5)
    q = Queue(32) # 创建大小为32的队列
    
    threads = []
    
    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)
        
    for i in nfuncs:
        threads[i].start()
    
    for i in nfuncs:
        threads[i].join()
    
    print('all DONE')

In [None]:
pcQuestion()

### 多线程同步机制

GIL 的作用是：对于一个解释器，只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。但是如果你有个操作比如 x += 1，这个操作需要多个bytecodes操作，在执行这个操作的多条bytecodes期间的时候可能中途就换thread了，这样就出现了data races的情况了。

#### Lock(锁)

1. 如果一个锁的状态是unlocked，调用acquire()方法改变它的状态为locked；
2. 如果一个锁的状态是locked，acquire()方法将会阻塞，直到另一个线程调用release()方法释放了锁；
3. 如果一个锁的状态是unlocked调用release()会抛出RuntimeError异常；
4. 如果一个锁的状态是locked，调用release()方法改变它的状态为unlocked。

#### Semaphore（信号量）

在多线程编程中，为了防止不同的线程同时对一个公用的资源（比如全部变量）进行修改，需要进行同时访问的数量（通常是1）。信号量同步基于内部计数器，每调用一次acquire()，计数器减1；每调用一次release()，计数器加1.当计数器为0时，acquire()调用被阻塞。

#### RLock（可重入锁）

acquire() 能够不被阻塞的被同一个线程调用多次。但是要注意的是release()需要调用与acquire()相同的次数才能释放锁。

#### Condition（条件）

一个线程等待特定条件，而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者/消费者」模型：

In [None]:
import time
import threading

def consumer(cond):
    t = threading.currentThread()
    with cond:
        cond.wait()  # wait()方法创建了一个名为waiter的锁，并且设置锁的状态为locked。这个waiter锁用于线程间的通讯
        print('{}: Resource is available to consumer'.format(t.name))
        
def producer(cond):
    t = threading.currentThread()
    with cond:
        print('{}: Making resource available'.format(t.name))
        cond.notifyAll()  # 释放waiter锁，唤醒消费者
        
condition = threading.Condition()

c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

In [None]:
c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()

#### Event

一个线程发送/传递事件，另外的线程等待事件的触发。我们同样的用「生产者/消费者」模型的例子:

### 使用future处理并发

In [None]:
from time import sleep, strftime
from concurrent import futures

In [None]:
def display(*args):
    print(strftime('[%H:%M:%S]'),end=' ')
    print(*args)

In [None]:
def loiter(n):
    msg = '{}loiter({}): doing noting for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

In [None]:
def main():
    display('Script Starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results: ', results)
    display('waiting....')
    for i, result in enumerate(results):
        display('result {}:{}'.format(i, result))

In [None]:
main()

ThreadPoolExecutor.map 函数返回结果的顺序和调用开始的顺序一致 <br>
executor.submit 和 futures.as_completed 这个组合比 map更加零活 <br>
因为submit可以处理不同的可调用对象 而map只能针对参数不同的同一个可调用对象 <br>
此外 futures.as_completed函数的future集合可以来自多个Executor实例 ThreadPoolExecutor和ProcessPoolExecutor

#### TQDM进度条

In [None]:
from tqdm import tqdm
for i in tqdm(range(100)):
    sleep(0.1)

In [55]:
def simple(n):
    sleep(n)
    return n

In [67]:
def runsimple():
    simplelist = []
    with futures.ThreadPoolExecutor(5) as executor:
        for i in range(10):
            e = executor.submit(simple, i)
            simplelist.append(e)
        results = futures.as_completed(simplelist)
        print(len(list(results)))
        for l in results:
            print(l.result())

In [68]:
runsimple()

10


futures.as_completed返回一个迭代器 当使用for循环迭代时 as_completed只返回已经运行结束的future