# 进程和线程

## 课堂：多进程

今天我们使用的计算机早已进入多CPU或多核时代，而我们使用的操作系统都是支持“多任务”的操作系统，这使得我们可以同时运行多个程序，也可以将一个程序分解为若干个相对独立的子任务，让多个子任务并发的执行，从而缩短程序的执行时间，同时也让用户获得更好的体验。因此在当下不管是用什么编程语言进行开发，实现让程序同时执行多个任务也就是常说的“并发编程”，应该是程序员必备技能之一。为此，我们需要先讨论两个概念，一个叫进程，一个叫线程。
+ 几乎所有的操作系统都支持同时运行多个任务，一个任务就是一个程序，每一个运行中的程序就是一个进程。也就是说进程是处于运行中的程序。
+ 进程是系统进行资源分配和调度的一个独立单位。

进程就是操作系统中执行的一个程序，操作系统以进程为单位分配存储空间，每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据，操作系统管理所有进程的执行，为它们合理的分配资源。进程可以通过fork或spawn的方式来创建新的进程来执行其他的任务，不过新的进程也有自己独立的内存空间，因此必须通过进程间通信机制（IPC，Inter-Process Communication）来实现数据共享，具体的方式包括管道、信号、套接字、共享内存区等。

所谓的线程就是CPU调度的执行单元。由于线程在同一个进程下，它们可以共享相同的上下文，因此相对于进程而言，线程间的信息共享和通信更加容易。当然在单核CPU系统中，真正的并发是不可能的，因为在某个时刻能够获得CPU的只有唯一的一个线程，多个线程共享了CPU的执行时间。使用多线程实现并发编程为程序带来的好处是不言而喻的，最主要的体现在提升程序的性能和改善用户体验，今天我们使用的软件几乎都用到了多线程技术，这一点可以利用系统自带的进程监控工具（如macOS中的“活动监视器”、Windows中的“任务管理器”）进行查看。

进程和线程拥有的资源对比见下图

<img src="picture/进程.png" width="400"> 

<img src="picture/线程.png" width="400"> 

+ Unix/Linux操作系统提供了一个fork()系统调用创建子进程
+ Python的os模块封装了常见的系统调用，其中就包括fork，通过fork可以在Python程序中创建子进程，特别注意这里只能针对Unix/Linux/Mac系统。因此如果写多进程的服务程序，Unix/Linux是最好的选择。
+ 由于Windows没有fork调用，因此不能使用fork创建子进程

### multiprocessing模块的Process类

multiprocessing模块是跨平台的多进程模块

In [None]:
import multiprocessing
help(multiprocessing.Process)

+ multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

- group：分组，实际上很少使用
- target：表示调用对象，可以传入方法的名字
- name：别名，相当于给这个进程取一个名字
- args：表示被调用对象的位置参数元组，比如target是函数a，他有两个参数m，n，那么args就传入(m, n)即可
- kwargs：表示调用对象的字典

创建新进程方式1： 以指定函数作为target，创建Process对象来创建新的进程

创建新进程方式2： 继承Process类，重写它的run()方法创建进程类，然后创建该进程类的实例来创建子进程

Tips：multiprocessing模块不支持交互模式，需要在 cmd 命令行下 输入 python processtest.py来运行程序

In [None]:
# processtest1.py
from multiprocessing import Process
import os,time

# 子进程要执行的代码
def fun(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))
    print('hello', name)

if __name__=='__main__': #主程序
    print('Parent process %s.' % os.getpid())
    p = Process(target=fun, args=('zhang',)) #以指定函数创建子进程
    print('Child process will start.',p.name)
    p.start() #启动进程
    p.join() #当前进程等待子进程p执行完毕再向下继续执行
    print('Parent process fininshed.')

In [None]:
# processtest2.py
import multiprocessing
import os

# 子进程要执行的代码
def fun(num):
    print('process %d ' % num)

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    for i in range(8):
        p = multiprocessing.Process(target=fun, args=(i+1,))
        p.start()
   
    # active_children() 方法获取目前所有的运行的进程
    for p in multiprocessing.active_children():
        print('child process name is: '+p.name+' child process id:'+str(p.pid))
    print('Waiting for All Child process fininshed.')
    for p in multiprocessing.active_children():
        p.join()
    print('Parent process fininshed.')

创建新进程方式2： 继承Process类，重写它的run()方法创建进程类，然后创建该进程类的实例来创建子进程

In [None]:
#processtest3.py
import multiprocessing
import os,time

# 子进程要执行的代码
class MySubProcess(multiprocessing.Process):
    def __init__(self,name):
        self.name = name
        super().__init__()
    #重写run()方法
    def run(self):
        print('Run child process %s (%s)...' % (self.name, os.getpid()))
        print('hello', self.name)

if __name__=='__main__': #主程序（也就是主进程）
    print('Parent process %s.' % os.getpid())
    p = MySubProcess('zhang') #创建子进程实例
    print('Child process will start.')
    p.start() #启动进程
    p.join() #当前进程等待子进程p执行完毕再向下继续执行
    print('Parent process fininshed.')

### 进程池

可以用进程池的方式批量创建和管理子进程

In [None]:
import multiprocessing 
help(multiprocessing.Pool())

In [None]:
#pool.py
from multiprocessing import Pool
import os, time, random
def fun(num):
    print('child process %s Running (%s)' % (num,os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('child process %s runs %0.2f seconds.' % (num, (end - start)))
if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
#   #创建多个进程，可以同时执行的进程数量。默认大小为CPU的内核数量
    p = Pool(5)
    for i in range(8):
        p.apply_async(fun, args=(i,)) #apply_async是异步非阻塞式，由操作系统调度来进行进程切换。
#         p.apply(fun, args=(i,))      #apply是阻塞方式
    print('Waiting for All child process finished...')
   #关闭进程池。调用该方法后，该进程池不能再接收新任务，
   #它会在当前进程池中的所有任务执行完成之后再关闭自己
  #在调用join()之前必须先close
    p.close()
    #进程池对象调用join，会等待进程池中所有的子进程结束完毕再去结束父进程
    p.join()
    print('All child process finished.')

使用with语句管理进程池

In [None]:
#poolmap.py
import multiprocessing
import os,time

def fun(num):
    mysum = 0
    for i in range(num):
        print('(%s)process is running: %d' %(os.getpid(),i))
        mysum += i
    return mysum

if __name__=='__main__':
    #使用with语句进行上下文管理
    with multiprocessing.Pool(4) as pool:
        #使用进程执行map计算
        #元组由3个元素，启动三个进程执行函数fun
        results = pool.map(fun,(3,6,9))
        for item in results:
            print(item)

### ProcessPoolExecutor并发编程

+ 计算密集型的任务：主要消耗CPU资源，比如大素数计算、圆周率计算、对视频高清解码等。
因此，代码运行效率至关重要。
+ 下面通过这个大素数计算的例子，通过使用ProcessPoolExecutor类进行并发编程提高效率。

In [None]:
#PrimeNoWithfuture.py
from time import time
import math
PRIMES = [
    112272535095293, #15位
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419] #16位

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main(): 
    start = time()
    for n in PRIMES:
        print('%d is prime? %s' % (n, is_prime(n)))
    end = time()
    print((end-start)/n)

if __name__ == '__main__':
    main()

In [None]:
import concurrent.futures
help(concurrent.futures.ProcessPoolExecutor())

In [None]:
#PrimeWithfuture.py
import concurrent.futures
import math
from time import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    start = time()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        #提交给pool的工作是一个函数，如果是map，提交方式为pool.map()
        for number, prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
            print('%d is prime? %s' % (number, prime))
    end = time()
    print((end-start)/len(PRIMES))

if __name__ == '__main__':
    main()

In [None]:
help(zip)

一个 ProcessPoolExecutor 创建N个进程， 默认N是系统上面可用CPU的个数。这个处理池会一直运行到with块中最后一个语句执行完成， 然后处理池被关闭。不过，程序会一直等待直到所有提交的工作被处理完成。
+ 比较设置N=2,N=4,N=8,N=16的效率

从比较可以看出，计算密集型任务虽然也可以用多任务完成，但是任务越多，花在任务切换的时间就越多，CPU执行任务的效率就越低，所以，一般情况下，要最高效地利用CPU，计算密集型任务同时进行的数量应当等于CPU的核心数。

### 进程间的通信

In [None]:
#pingpang1.py
from multiprocessing import Process
from time import sleep

counter = 0
def sub_task(string):
    global counter
    while counter < 5:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)
        
def main():
    Process(target=sub_task, args=('Ping', )).start()
    Process(target=sub_task, args=('Pong', )).start()
    
if __name__ == '__main__':
    main()

In [None]:
#pingpang2.py
from multiprocessing import Process
from time import sleep

counter = 0
def sub_task(string):
    global counter
    while counter < 5:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)
        
def main():
    p1 = Process(target=sub_task, args=('Ping', ))
    p2 = Process(target=sub_task, args=('Pong', ))
    p1.start()
    p1.join()
    p2.start()
    p2.join()

if __name__ == '__main__':
    main()

每个子进程有自己独立的内存空间，这也就意味着两个子进程中各有一个counter变量，每个子进程有自己独立的内存空间，这也就意味着两个子进程中各有一个counter变量.

+ 操作系统提供了多种进程之间的通信方式
+ Python的multiprocessing模块包装了底层的机制，提供了Queue、Pipes、Semaphore等多种方式来交换数据。

注意区分multiprocessing.Queue和queue.Queue
+ 不同点：前者为进程提供服务，后者为线程提供服务；
+ 相同点：他们都提供了put(),get(),full(),empty(),qsize()等方法

第一个示例：子进程向Queue中写入数据，父进程在Queue中读取数据

In [None]:
# queuecommu1.py
from multiprocessing import Process,Queue,Pipes
import os

def fun(q):
    print('(%s) 进程向队列中写入数据...'% os.getpid())
    q.put('hello')

def main():
    #创建进程通信的Queue
    q = Queue()
    #创建子进程
    p = Process(target=fun,args=(q,))
    p.start()
    print('(%s) 进程开始从队列这读取数据...' % os.getpid())
    #读取数据
    print(q.get())
    p.join()
    
if __name__=='__main__':
    main()

In [None]:
import multiprocessing
help(multiprocessing)

第二个示例：两个子进程之间的通信，一个子进程向Queue中写入数据，另一个子进程从Queue中读取数据

In [None]:
#queuecommu2.py

from multiprocessing import Process, Queue,current_process
import time, random

# 写数据进程:
def write(q):
    print('Process to write: %s' % current_process().pid)
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程:
def read(q):
    print('Process to read: %s' % current_process().pid)
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

def main()
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pwrite = Process(target=write, args=(q,))
    pread = Process(target=read, args=(q,))
    # 启动子进程pwrite，向队列写入数据
    pwrite.start()
    # 启动子进程pread，从队列读取数据
    pread.start()
    # 等待pwrite结束:
    pwrite.join()
    # pread进程里是死循环，无法等待其结束，只能强行终止:
    pread.terminate()
    
if __name__=='__main__':
    main()

In [None]:

# multitask1.py
from multiprocessing import Process, Queue
from random import randint
from time import time

def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)

def main():
    processes = []
    number_list = [x for x in range(1, 100000001)]
    result_queue = Queue()
    index = 0
    # 启动8个进程将数据切片后进行运算
    for _ in range(8):
        p = Process(target=task_handler,
                    args=(number_list[index:index + 12500000], result_queue))
        index += 12500000
        processes.append(p)
        p.start()
    # 开始记录所有进程执行完成花费的时间
    start = time()
    for p in processes:
        p.join()
    # 合并执行结果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()
    print(total)
    end = time()
    print('Execution time: ', (end - start), 's', sep='')

if __name__ == '__main__':
    main()

In [None]:
#不采用多进程处理计算密集型任务，与上面cell进行比较
# multitask0.py
from time import time

def main():
    total = 0
    number_list = [x for x in range(1, 1000000001)]
    start = time()
    for number in number_list:
        total += number
    print(total)
    end = time()
    print('Execution time: %.3fs' % (end - start))

if __name__ == '__main__':
    main()

使用多进程后由于获得了更多的CPU执行时间以及更好的利用了CPU的多核特性，明显的减少了程序的执行时间，而且计算量越大效果越明显。

### 随堂educoder实训

随堂测试-进程：https://www.educoder.net/shixuns/3xc8ptyf/challenges

## 多线程

多任务可以由多进程完成，也可以由一个进程内的多线程完成。

多线程的实际应用场景：一个浏览器必须同时下载多张图片；一个web服务器必须同时响应多个用户请求等待。

### 课堂：threading模块

模块
+ 低级模块: _thread模块
+ 高级模块：threading模块对_thread进行了封装。大多数情况下，我们只需要使用threading这个高级模块

In [None]:
import threading
help(threading)
# print(threading.stack_size())  #查看当前线程栈大小
# threading.stack_size(64*1024) #设置当前栈大小
# print(threading.stack_size())
# print(threading.active_count()) #查看活动的线程数量
# print(threading.enumerate())    #枚举当前活动的线程，返回列表形式      
# print(threading.current_thread())

创建一个线程有以下两种方式：
+ 通过threading模块的Thread类的构造方法创建线程。把一个函数传给target创建Thread实例，然后调用start方法启动线程
+ 继承threading模块的Thread类创建线程类

In [None]:
import threading
help(threading.Thread)

In [None]:
#join方法
import threading
import time
def func(x, y):
    for i in range(x, y):
        print(i,end = ' ')
t1=threading.Thread(target = func, name = 'thread1',args = (1, 10))
t1.start()
t1.join()         #注释掉这里试试
t2=threading.Thread(target = func,name = 'thread2', args = (10, 15))
t2.start()
t2.join()          #注释掉这里试试

In [None]:
from threading import Thread
def music(par):
    for i in range(3):
        print("listening %s" %par)      
def movie(par):
     for i in range(3):
        print("watching %s" %par)
if __name__=='__main__':
    threads=[]
    t1=Thread(target=music,args=("学猫叫",))
    threads.append(t1)
    t2=Thread(target=movie,args=("流浪地球",))
    threads.append(t2)
    for t in threads:
        t.start()
        t.join()

In [None]:
import threading,time
def music(par):
    for i in range(3):
        print("listening %s" %par)
        time.sleep(0.01) #睡眠1秒
def movie(par):
     for i in range(3):
        print("watching %s" %par)
        time.sleep(0.1) #睡眠1秒
if __name__=='__main__':
    threads=[]
    t1=threading.Thread(target=music,args=("学猫叫",))
    threads.append(t1)
    t2=threading.Thread(target=movie,args=("流浪地球",))
    threads.append(t2)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(t1.isAlive())
    print(t2.isAlive()) #测试线程是否处于运行状态

In [None]:
#继承threading模块的Thread类自定义线程类
import threading
import time
class myThread(threading.Thread):
    def __init__(self, num, threadname):
        threading.Thread.__init__(self, name = threadname)  #使用未绑定方法调用父类的构造方法
        self.num = num
    def run(self):        
        time.sleep(2)     #阻塞线程s2秒
        print(self.num)

t = myThread(6, 'mythread')
t.daemon=True
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

### 课堂：同步锁Lock

+ 多进程中，同一个变量，各自有一份拷贝存在于每个进程中，互不影响
+ 多线程中，多线程共享一个全局变量可以实现线程间的通信。所有变量都由所有线程共享，但是当多个线程共享同一个变量（我们通常称之为“资源”）的时候，很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用，那么我们通常称之为“临界资源”，对“临界资源”的访问需要加上保护，否则资源会处于“混乱”的状态。

<img src="picture/进程.png" width="400"> 

<img src="picture/线程.png" width="400"> 

In [None]:
#pingpang1.py
from multiprocessing import Process
from time import sleep
counter = 0
def sub_task(string):
    global counter
    while counter < 5:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)     
def main():
    p1 = Process(target=sub_task, args=('Ping', )).start()
    p2 = Process(target=sub_task, args=('Pong', )).start()
if __name__ == '__main__':
    main()

In [3]:
#pingpangThreadJoin.py
from threading import Thread
from time import sleep
counter = 0
def sub_task(string):
    global counter
    while counter < 5:
        print(string, end='')
        counter += 1
        sleep(0.001) 
def main():
    p1 = Thread(target=sub_task, args=('Ping', ))
    p2 = Thread(target=sub_task, args=('Pong', ))
    p1.start()
#     p1.join()
    p2.start()
#     p2.join()
if __name__ == '__main__':
    main()

PingPongPingPongPongPing

线程安全经典问题：银行取钱

In [None]:

from threading import Thread,current_thread
from time import sleep
# 假定这是银行存款:
balance = 1000
def draw(account):
    global balance
    if (balance>=account):
        print(current_thread().name +'取钱成功，取出钱数为：'\
              + str(account))
        sleep(0.0001) #休眠，模拟线程调度暂停
        balance = balance - account
        print( '\n账号余额为：' + str(balance))
    else:
        print(current_thread().name +'账号取钱时余额不足')
#创建两个线程模拟两个用户从账号取钱  
t1 = Thread(target=draw, name = '甲', args=(600,))
t2 = Thread(target=draw, name = '乙', args=(600,))
t1.start()
t2.start()
t1.join()
t2.join()

运行多次可能会出现了负值，这是由于线程调度的不确定性引起的。如果设置了休眠，第一个用户满足取钱条件，取出了钱，但是还没有修改余额，这时第二个用户线程取钱同样满足取钱条件，也会取出钱，因此总是会出现负值。

线程同步：threading模块提供了Lock/RLock两个类，他们都提供了以下两个方法实现加锁和释放锁。
+ acquire(locking=True,timeout=-1)方法：获取锁，timeout设置加锁多少秒
+ release()方法：释放锁


In [None]:
help(threading.Lock())

In [None]:
from threading import Thread,current_thread
from time import sleep
balance = 1000 # 假定这是你的银行存款:
lock = threading.Lock() #创建一个Lock锁
def draw(draw_account):
    global balance
    lock.acquire() #获取锁
    try:
        if (balance>=draw_account):
            print(current_thread().name +'取钱成功，取出钱数为：'+ str(draw_account))
            sleep(0.001) #休眠，模拟线程调度暂停
            balance = balance - draw_account
            print( '\n账号余额为：' + str(balance))
        else:
            print(current_thread().name +'账号取钱时余额不足')
    finally:
        lock.release() #释放锁

#创建两个线程模拟两个用户从账号取钱  
t1 = Thread(target=draw, name = '甲', args=(300,))
t2 = Thread(target=draw, name = '乙', args=(800,))
t1.start()
t2.start()
t1.join()
t2.join()

In [None]:
import time, threading
balance = 1000 # 假定这是你的银行存款:
lock = threading.Lock() #创建一个Lock锁
def draw(draw_account):
    global balance
    lock.acquire() #获取锁
    if (balance>=draw_account):
        print(threading.current_thread().name +'取钱成功，取出钱数为：'+ str(draw_account))
        time.sleep(0.0001) #休眠，模拟线程调度暂停
        balance = balance - draw_account
        print( '\n账号余额为：' + str(balance))
    else: 
        print(threading.current_thread().name +'账号取钱时余额不足')

#创建两个线程模拟两个用户从账号取钱  
t1 = threading.Thread(target=draw, name = '甲', args=(600,))
t2 = threading.Thread(target=draw, name = '乙', args=(600,))
t1.start()
t2.start()
t1.join()
t2.join()

+ 通过锁提供了对共享临界资源的独占访问，当多个线程同时执行lock.acquire()时，只有一个线程能成功地获取锁，然后继续执行代码，其他线程就继续等待直到获得锁为止。
+ 获得锁的线程用完后一定要释放锁，否则那些苦苦等待锁的线程将永远等待下去，成为死线程。所以我们用try...finally来确保锁一定会被释放。

### 死锁

使用锁有优缺点：
优点：解决了临界资源问题；
缺点：
+ 包含锁的那段代码实际上是以单线程模式执行，不能实现我们希望的多线程并发运行，因此效率低。
+ 可能引起死锁。如果有多个临界资源需要加锁，不同的线程持有不同临界资源的锁，并试图获取对方持有的锁时，可能会造成死锁，导致多个线程全部挂起，既不能执行，也无法结束，只能靠操作系统强制终止。

In [4]:
from threading import Thread
from time import sleep

class Account: 
    def __init__(self, id, balance, lock): 
        self.id = id 
        self.balance = balance 
        self.lock = lock 
    def withdraw(self, amount): 
        self.balance -= amount 
    def deposit(self, amount): 
        self.balance += amount 

def transfer(src, dst, amount): 
        if src.lock.acquire():#锁住自己的账户 
            src.withdraw(amount) 
        sleep(1)#让交易时间变长，2个交易线程时间上重叠，有足够时间来产生死锁 
        print( 'wait for lock...')
        if dst.lock.acquire():#锁住对方的账户 
            dst.deposit(amount) 
            dst.lock.release() 
        src.lock.release() 
        print( 'finish...')

a = Account('a',1000, threading.Lock()) 
b = Account('b',1000, threading.Lock()) 
Thread(target = transfer, args = (a, b, 100)).start() #a转账100到b
Thread(target = transfer, args = (b, a, 200)).start() #b转账200到a

wait for lock...
wait for lock...


编程时应尽量避免死锁，解决死锁的方法：
+ 尽量避免同一个线程对多个Lock进行锁定
+ 多个线程用多个Lock进行锁定时，采用相同的顺序获取锁
+ 线程在获取锁时，设定acquire()方法的timeout参数，限定锁的时间，超过timeout会自动释放锁
+ 设计死锁检测算法

GIL锁(Global Interpreter Lock)：全局解释锁

## 线程通信

### 使用Condition实现线程通信

In [5]:
from threading import Thread,Condition
help(Condition)

Help on class Condition in module threading:

class Condition(builtins.object)
 |  Condition(lock=None)
 |  
 |  Class that implements a condition variable.
 |  
 |  A condition variable allows one or more threads to wait until they are
 |  notified by another thread.
 |  
 |  If the lock argument is given and not None, it must be a Lock or RLock
 |  object, and it is used as the underlying lock. Otherwise, a new RLock object
 |  is created and used as the underlying lock.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |  
 |  __exit__(self, *args)
 |  
 |  __init__(self, lock=None)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  __repr__(self)
 |      Return repr(self).
 |  
 |  notify(self, n=1)
 |      Wake up one or more threads waiting on this condition, if any.
 |      
 |      If the calling thread has not acquired the lock when this method is
 |      called, a RuntimeError is raised.
 |      
 |      This method wakes up at most n of th

In [None]:
from time import sleep
from threading import Thread,Condition
class Putter(Thread): #自定义生产者线程类
    def __init__(self, threadname):
        Thread.__init__(self,name=threadname) 
    def run(self):
        global count
        while True:
            if con.acquire():
                if count > 100: # 当count 小于100 的时候进行生产
                    con.wait()
                else:
                    count = count + 50
                    print(self.name+' produce 50, count='\
                          + str(count))
                    # 完成生成后唤醒waiting状态的线程，
                    # 从waiting池中挑选一个线程，
                    # 通知其调用acquire方法尝试取到锁
                    con.notify()
                con.release()
class Taker(Thread): #自定义消费者线程类
    def __init__(self, threadname):
        Thread.__init__(self, name =threadname)  
    def run(self):
        global count
        while True: 
            if con.acquire(): # 当count 大于100的时候进行消费
                if count < 100:
                    con.wait()
                else:
                    count = count - 20
                    print(self.name+' consume 20, count='\
                          + str(count))
                    con.notify()
                        # 完成生成后唤醒waiting状态的线程，
                        # 从waiting池中挑选一个线程，
                        # 通知其调用acquire方法尝试取到锁
                con.release()
                sleep(1)

count = 200
con = Condition() #创建condition对象
def main():
    for i in range(2):
        p = Putter('Putter')
        p.start()
    for i in range(3):
        t = Taker('Taker')
        t.start()
if __name__ == '__main__':
    main()

In [None]:
from time import sleep
from threading import Thread,Condition
class Putter(Thread): #自定义生产者线程类
    def __init__(self, threadname):
        Thread.__init__(self,name=threadname) 
    def run(self):
        global count
        while True:
            with con: 
                if count > 100: # 当count小于100生产
                    con.wait()
                else:
                    count = count+50
                    print(self.name+' produce 50, count='\
                          + str(count))
                    # 完成生成后唤醒waiting状态的线程，
                    # 从waiting池中挑选一个线程，
                    # 通知其调用acquire方法尝试取到锁
                    con.notify()
class Taker(Thread): #自定义消费者线程类
    def __init__(self, threadname):
        Thread.__init__(self, name =threadname)  
    def run(self):
        global count
        while True:
            with con:  # 当count 大于等于100时消费
                if count < 100:
                    con.wait()
                else:
                    count = count-20
                    print(self.name+' consume 20, count='\
                          + str(count))
                    con.notify()
                    # 完成生成后唤醒waiting状态的线程，
                    # 从waiting池中挑选一个线程，
                    # 通知其调用acquire方法尝试取到锁
                sleep(1)

count = 200
con = Condition() #创建condition对象
def main():
    for i in range(2):
        p = Putter('Putter')
        p.start()
    for i in range(3):
        t = Taker('Taker')
        t.start()
if __name__ == '__main__':
    main()

In [None]:
from threading import Thread,Condition
from random import randint
from time import sleep
class Account(Thread):
    def __init__(self,ID,threadname):
        Thread.__init__(self,name=threadname)
        self.ID=ID
    def put(self):
        global balance
        count=randint(1,100)
        if(con.acquire()):
            balance=balance+count
            print("%s put %d balance: %d" \
                  %(self.name,count,balance))
            con.notify()
            con.release()
    def take(self):
        global balance
        if(con.acquire()):
            count=randint(1,100)
            if(balance<count):
                con.wait()
            else:
                balance-=count
                print("%s take %d balance: %d"\
                      %(self.name,count,balance))
                con.notify()
            con.release()
            #sleep(1)
con = Condition() 
balance = 100
def test():
    taker=Account("ID1","taker")
    putter=Account("ID2","putter")
    while(True):
        taker.take()
        putter.put()
if __name__=="__main__":
    test()

taker take 19 balance: 81
putter put 4 balance: 85
taker take 80 balance: 5
putter put 40 balance: 45
Taker consume 20, count=130
Taker consume 20, count=110
Taker consume 20, count=90
putter put 10 balance: 55
taker take 21 balance: 34
putter put 29 balance: 63


In [None]:
from threading import Thread,Condition
from random import randint
from time import sleep

class Accout:
    def __init__(self,user,count):
        self.user = user
        self.count = count
    def put_money(self):
        global temp_list
        while(True):
            if con.acquire():
                if len(temp_list) > 1:
                    con.wait()
                else:
                    temp_list.append(self.count)
                    print(self.user + '余额为'\
                          + str(self.count))
                    con.notify()
            con.release()
            sleep(3)
    def get_money(self):
        global temp_list
        while(True):
            if con.acquire():
                if len(temp_list)==0:
                    con.wait()
                else:
                    print(self.user + '取出'\
                          + str(temp_list[0]))
                    del temp_list[0]
                    con.notify()
                con.release()
                sleep(3)

    def put_process(self):
        t1 = Thread(target=Accout.put_money, args=(self,))
        t1.start()
    def get_process(self):
        t2 = Thread(target=Accout.get_money,args=(self,))
        t2.start()
con = threading.Condition()
temp_list = []
def test():
    while True:
        count1 = randint(1, 100)
        p = Accout('账号', count1)
        p.put_process()
        count2 = 0
        q = Accout('账号', count2)
        q.get_process()
if __name__ ==  '__main__':
    test()

### 使用queue实现线程通信

queue模块下有三种队列类，分别如下：
+ queue.Queue：先进先出FIFO的常规队列
+ queue.LifoQueue：后进先出的队列
+ queue.PriorityQueue：优先级队列，优先级最小的先出队

三种队列类提供的方法基本相同，下面以queue.Queue类为例：
+ qsize(self)：队列的实际大小
+ empty(self)：判定队列空，空返回True，否则返回False
+ full(self)：判定队列满，满返回True，否则返回False
+ put(self, item, block=True, timeout=None)：往队列中放入元素。如果队列满，block=True（阻塞方式），则当前线程被阻塞；如果队列满，block=False（非阻塞方式），则抛出异常queue.FULL
+ put_nowait(self, item)：往队列放入元素，采用非阻塞方式
+ get(self, block=True, timeout=None) ：往队列中取出元素。如果队列空，block=True（阻塞方式），则当前线程被阻塞；如果队列空，block=False（非阻塞方式），则抛出异常queue.EMPTY
+ get_nowait(self):往队列取出元素，采用非阻塞方式
+ task_done(self):前面的任务已经完成，用在队列的消费者线程中。get方法之后调用task_done方法告诉队列处理的任务完成了。
+ join(self)：队列阻塞，直到队列中所有的元素都被处理完毕。

In [None]:
import queue
help(queue.Queue)

In [None]:
from threading import Thread
from queue import Queue
from time import sleep
class Putter(Thread): #自定义生产者线程类
    def __init__(self, threadname):
        Thread.__init__(self,name=threadname) 
    def run(self):
        for i in range(5):
            sleep(2) # 等待随机时间，体现其他线程调度
            myQueue.put(i)
            print(self.getName(),' put ', i,\
                  ' to queue.' + '\n')
        myQueue.put(None)  # None表示生产者线程结束
class Taker(Thread): #自定义消费者线程类
    def __init__(self, threadname):
        Thread.__init__(self,name=threadname)    
    def run(self):
        while True:
            sleep(3) # 等待随机时间，体现其他线程调度
            item = myQueue.get()
            if item is None:
                break
            print(self.getName(),' get ', item,\
                  ' from queue.'+ '\n')
myQueue = Queue()# 创建队列
p = Putter('Putter') # 创建生产者线程
t = Taker('Taker') # 创建消费者线程
p.start()
t.start()
p.join()
t.join()

In [None]:
from threading import Thread,current_thread
from queue import Queue
from time import sleep
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        print(current_thread().name +' worker get :'+ item+ '\n')
        sleep(0.1)
        q.task_done() #注意这里   
q = Queue()
for item in ['A','B','C','D','E','F']:
    q.put(item)
threads = []
for i in range(5):
    t = Thread(target=worker)
    t.start()
    threads.append(t)    
q.join() # 阻塞，直到队列中所有元素都被处理完毕
# for i in range(5): #放入None，停止worker
#     q.put(None)
for t in threads:  #等待所有线程结束
    t.join()
print('thread %s ended.' % current_thread().name)

### 使用Event实现线程通信

In [None]:
import threading
help(threading.Event)

主线程先尝试连接服务器，如果正常的话，触发事件，各工作线程会尝试连接服务。

In [None]:
from threading import Thread,Event,current_thread
from time import sleep
def worker(event):
    print(current_thread().name,'waiting for server\n')
    event.wait()
    print(current_thread().name,'server is ready\n ')
    sleep(1)

server_ready = Event() #创建Event对象
for i in range(3):
    t = Thread(target=worker, args=(server_ready,))
    t.start()
print(current_thread().name,"connecting server\n")
sleep(3)
server_ready.set() 

In [None]:
from threading import  Thread,Event
from time import sleep,ctime
from logging import basicConfig,debug,DEBUG

basicConfig(level=DEBUG,format=\
            '(%(threadName)-10s) %(message)s',)
def worker(event):
    while not event.is_set():
        debug('Waiting for server ready...')
        event.wait(1)
        debug('server ready, [%s]', ctime())
    sleep(1)
readis_ready = Event()
for i in range(3):
    t = Thread(target=worker, args=(readis_ready,))
    t.start()
debug('connecting server')
sleep(3) 
readis_ready.set()

模拟多线程连接Redis服务器，并使用logging模块打印日志信息

In [None]:
from threading import  Thread,Event
from time import sleep,ctime
from logging import basicConfig,debug,DEBUG
  
# basicConfig(level= DEBUG, format='(%(threadName)) %(message)s',)
def worker(event):
    debug('Waiting for redis ready...')
    event.wait()
    debug('redis ready [%s]', ctime())
    sleep(1)
readis_ready = Event()
for i in range(3):
    t = Thread(target=worker, args=(readis_ready,))
    t.start()

debug('check redis server')
# sleep(3) 
readis_ready.set()

设置wait方法的超时参数，如果Redis服务器一直没有启动，通过打印日志信息来不断地告诉子线程当前没有一个可以连接的Redis服务。

In [None]:
import threading,time,logging
  
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
def worker(event):
    while not event.is_set():
        logging.debug('Waiting for redis ready...')
        event.wait(1)
        logging.debug('Redis ready, [%s]', time.ctime())
    time.sleep(1)
readis_ready = threading.Event()
for i in range(3):
    t = threading.Thread(target=worker, args=(readis_ready,))
    t.start()
logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
time.sleep(3) 
readis_ready.set()

Tips：Event不包含lock，如果要实现线程同步，需要额外的lock对象

### 随堂educoder实训

## 课堂：Thread-Local Data

+ 在多线程环境下，如果一个线程对全局变量进行了修改，将会影响到其他所有的线程。为了避免多个线程同时对变量进行修改，引入了线程同步机制，通过互斥锁，条件变量等来控制对全局变量的访问。
+ 但是在很多时候线程也需要拥有自己的私有数据，可以使用局部变量方式，同时python 还提供了 ThreadLocal 变量，它本身是一个全局变量，但是每个线程却可以利用它来保存属于自己的私有数据，这些私有数据对其他线程也是不可见的。

In [1]:
from threading import Thread,current_thread
import threading
import time
# 创建全局ThreadLocal对象:
local_data = threading.local()
def fun(n):
    for i in range(n):
        try:
            local_data.num += i
        except:
            local_data.num = i
        time.sleep(0.1)
        print('\n%s local_data is : %s' % (current_thread().name, local_data.num))
threads=[]
t1 = Thread(target=fun,name='Thread1',args=(5,))
threads.append(t1)
t2 = Thread(target=fun,name='Thread2',args=(5,))
threads.append(t2)
for t in threads:
    t.start()
for t in threads:
    t.join()


Thread2 local_data is : 0
Thread1 local_data is : 0


Thread1 local_data is : 1
Thread2 local_data is : 1


Thread1 local_data is : 3
Thread2 local_data is : 3


Thread2 local_data is : 6
Thread1 local_data is : 6


Thread2 local_data is : 10
Thread1 local_data is : 10



In [None]:
#pingpang_thread-local.py
from threading import Thread,local
from time import sleep
counter = local()
def sub_task(string):
    counter = 0
    while counter < 5:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)     
def main():
    p1 = Thread(target=sub_task, args=('Ping', )).start()
    p2 = Thread(target=sub_task, args=('Pong', )).start()
if __name__ == '__main__':
    main()

## 课堂：ThreadPoolExecutor并发编程

+ 由于启动新线程时，涉及到和操作系统的交互，因此启动新线程的成本比较高，为了提高性能，可以使用线程池来管理线程。
+ 线程池在启动时创建大量的空闲线程，我们的程序只要将一个函数提交线程池，线程池就会启动一个空闲的线程来执行它。当函数结束后，这个线程并不会死亡，而是回到线程池中称为空闲状态，等待执行下一个函数。通过线程池可以控制系统中并发线程的数量。

In [None]:
from concurrent.futures import ThreadPoolExecutor
help(ThreadPoolExecutor())

In [None]:
#ThreadPool
from threading import current_thread,local
from concurrent.futures import ThreadPoolExecutor
from time import sleep
# 创建全局ThreadLocal对象:
local_data = local()
def fun(n):
    for i in range(n):
        try:
            local_data.num += i
        except:
            local_data.num = i
        print('\n%s local_data is : %s' % (current_thread().name, local_data.num))
    return local_data.num
        
pool = ThreadPoolExecutor(max_workers=2)

task1 = pool.submit(fun,5) #提交一个任务
task2 = pool.submit(fun,10) #提交一个任务
sleep(1)

print("task1:",task1.done())
print("task2:",task2.done())
pool.shutdown()

通过with语句管理上下文

In [None]:
from threading import current_thread,local
from concurrent.futures import ThreadPoolExecutor
# 创建全局ThreadLocal对象:
local_data = local()
def fun(n):
    for i in range(n):
        try:
            local_data.num += i
        except:
            local_data.num = i
        print('\n%s local_data is : %s' % (current_thread().name, local_data.num))
    return local_data.num
        
with ThreadPoolExecutor(max_workers=2) as pool: #创建包含两个线程的线程池
    task1 = pool.submit(fun,5) #提交一个任务
    task2 = pool.submit(fun,10) #提交另一个任务
print("task1:",task1.done())
print("task2:",task2.done())

In [None]:
#ThreadPoolExp3.py
from threading import current_thread,local
from concurrent.futures import ThreadPoolExecutor
# 创建全局ThreadLocal对象:
local_data = local()
def fun(n):
    for i in range(n):
        try:
            local_data.num += i
        except:
            local_data.num = i
        print('\n%s local_data is : %s' % (current_thread().name, local_data.num))
    return local_data.num

def main():
    #创建包含三个线程的线程池 
  
    pool = ThreadPoolExecutor(max_workers=2)
    for i in pool.map(fun,(5,10)): #使用map方法启动线程
        print(i)

if __name__ == '__main__':
    main()

In [None]:
#PrimeWithfuture.py
import concurrent.futures
import math
from time import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    start = time()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        #提交给pool的工作是一个函数，如果是map，提交方式为pool.map()
        for number, prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
            print('%d is prime? %s' % (number, prime))
    end = time()
    print((end-start)/len(PRIMES))

if __name__ == '__main__':
    main()

## 本章拓展相关

### 协程

现代操作系统对I/O操作的改进中最为重要的就是支持异步I/O。如果充分利用操作系统提供的异步I/O支持，就可以用单进程单线程模型来执行多任务，这种全新的模型称为事件驱动模型。Nginx就是支持异步I/O的Web服务器，它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上，可以运行多个进程（数量与CPU核心数相同），充分利用多核CPU。用Node.js开发的服务器端程序也使用了这种工作模式，这也是当下实现多任务编程的一种趋势。