# threading 模块

hreading 模块API是面向对象的，其中最重要的是线程类Thread，

常用线程函数:

threading.active_count() :返回当前处于活跃状态的线程数

threading.current_thread() :返回当前的Thread对象

threading.main_thread() :返回主线程对象，主线程是Python解释器启动的线程

In [1]:
# coding=utf-8


import threading


t = threading.current_thread()

print(t.name)


print(threading.active_count())

t = threading.main_thread()

print(t.name)

MainThread
5
MainThread


# 创建线程

创建一个可执行的线程需要线程对象和线程体这两个要素。

线程对象:线程对象是threading模块线程类Thread所创建的对象。

线程体:线程体是线程执行函数，线程启动后会执行该函数，线程处理代码是在线程体中编写的。

提供线程体主要有一下两种方式。

(1)自定义函数作为线程体。

(2)继承Thread类重写run()方法，run()方法作为线程体。

## 自定义函数作为线程体

创建Thread对象时,可以通过Thread构造方法将一个自定义函数传递给它，

threading.Thread(target=None, name=None, args())  target参数是超线程体，自定义函数可以作为线程体;name 参数可以设置线程名，如果省略，Python解释器会自动分配一个名字；args 是为自定义函数提供参数，它是一个元组类型。

In [3]:
#coding=utf-8

import threading
import time

def thread_body():
    t = threading.current_thread()
    for n in range(5):
        print("第{0}次执行线程{1}".format(n,t.name))
        
        time.sleep(1)
        print("线程{0}执行完毕。".format(t.name))
        
def main():
    t1 = threading.Thread(target=thread_body)
    
    t1.start()
    
    t2 = threading.Thread(target=thread_body,name="MyThread")
    
    t2.start()
    
if __name__=='__main__':
    main()

第0次执行线程Thread-7
第0次执行线程MyThread
线程Thread-7执行完毕。
第1次执行线程Thread-7
线程MyThread执行完毕。
第1次执行线程MyThread
线程Thread-7执行完毕。
第2次执行线程Thread-7
线程MyThread执行完毕。
第2次执行线程MyThread
线程Thread-7执行完毕。
第3次执行线程Thread-7
线程MyThread执行完毕。
第3次执行线程MyThread
线程Thread-7执行完毕。
第4次执行线程Thread-7
线程MyThread执行完毕。
第4次执行线程MyThread
线程Thread-7执行完毕。
线程MyThread执行完毕。


## 继承Thread类重写run()方法，run()方法作为线程体。

创建一个Thread子类，并重写run方法

In [5]:
import threading
import time

class MyThread(threading.Thread):
    def __init__(self,name=None):
        super().__init__(name=name)
        
    def run(self):
        t = threading.current_thread()
        for n in range(5):
            print("第{0}次执行线程{1}".format(n,t.name))
        
            time.sleep(1)
            print("线程{0}执行完毕。".format(t.name))
def main():
    t1 = MyThread()
    t1.start()
    
    t2 = MyThread(name="MyThread")
    t2.start()
    
if __name__ == "__main__":
    main()

第0次执行线程Thread-8
第0次执行线程MyThread
线程Thread-8执行完毕。
第1次执行线程Thread-8
线程MyThread执行完毕。
第1次执行线程MyThread
线程Thread-8执行完毕。
第2次执行线程Thread-8
线程MyThread执行完毕。
第2次执行线程MyThread
线程Thread-8执行完毕。
第3次执行线程Thread-8
线程MyThread执行完毕。
第3次执行线程MyThread
线程Thread-8执行完毕。
第4次执行线程Thread-8
线程MyThread执行完毕。
第4次执行线程MyThread
线程Thread-8执行完毕。
线程MyThread执行完毕。


# 线程管理

线程管理包括线程创建、线程启动、线程休眠、等待线程结束和线程停止。

## 等待线程结束

当前线程调用t1线程的join()方法时会阻塞当前线程，等待t1线程结束如果等待超时，则当前线程会回到活动状态继续执行。

join(timeout=None)

使用join()方法的场景是，一个线程依赖于另外一个线程的运行结果，所以调用另一个线程的join()方法等它运行完成。

In [8]:
import threading
import time

value = 0

def thread_body():
    global value
    
    print("ThreadA 开始...")
    for n in range(2):
        print("ThreadA 执行...")
        value += 1
        time.sleep(1)
        print("ThreadA 结束...")

def main():
    print("主线程 开始...")
    t1 = threading.Thread(target=thread_body,name="ThreadA")
    t1.start()
    t1.join()
    print("value = {0}".format(value))
    print("主线程结束")
    
if __name__ == "__main__":
    main()

主线程 开始...
ThreadA 开始...
ThreadA 执行...
ThreadA 结束...
ThreadA 执行...
ThreadA 结束...
value = 2
主线程结束


## 线程停止

当线程体结束(即run()方法或执行函数结束)，线程就会停止。但是有些业务比较复杂，子线程中会有一个死循环，为了能够停止线程，应设置一个线程停止变量.

In [14]:
import threading
import time

isrunning = True

def thread_body():
    while isrunning:
        print("线程运行中...")
        time.sleep(1)
        
    print("执行完成")
    
def main():
    t1 = threading.Thread(target=thread_body,name="Mythread")
    print("开始线程{0}".format(t1.name))
    t1.start()
    
    command = input("请输入停止指令:exit")
    if command =="exit":
        global isrunning
        isrunning = False
        
if __name__=="__main__":
    main()

开始线程Mythread
线程运行中...
线程运行中...
请输入停止指令:exitexit
线程运行中...
执行完成


# 线程安全

在多线程环境下，访问相同的资源，有可能会引发线程不安全问题。

## 临界资源问题

多个线程同时运行，有时线程之间需要共享数据，一个线程需要其他线程的数据，否则就不能保证程序运行结果的正确性。

In [17]:
import threading
import time

class TicketDB:
    def __init__(self):
        self.ticket_count = 5
        
    def get_ticket_count(self):
        return self.ticket_count
    
    def sell_ticket(self):
        #TODO 等待用户付款
        #线程休眠，阻塞当前线程，模拟等待用户付款
        time.sleep(1)
        print("第{0}号票，已经售出".format(self.ticket_count))
        self.ticket_count -= 1
        
db = TicketDB()
def thread1_body():
    global db
    while True:
        curr_ticket_count = db.get_ticket_count()
        
        if curr_ticket_count > 0:
            db.sell_ticket()
        else:
            break
            
def thread2_body():
    global db
    while True:
        curr_ticket_count = db.get_ticket_count()
        
        if curr_ticket_count > 0:
            db.sell_ticket()
        else:
            break
def main():
    t1 = threading.Thread(target=thread1_body)
    t1.start()
    t2 = threading.Thread(target=thread2_body)
    t2.start()
    
if __name__=="__main__":
    main()

第5号票，已经售出
第4号票，已经售出
第3号票，已经售出
第2号票，已经售出
第1号票，已经售出
第0号票，已经售出


由上可见0号票被卖出，5张票卖了6次，出现超卖情况，根本原因在于多线程间共享的数据不一致性导致。临界资源未同步

## 多线程同步

Python 提供了“互斥”机制，可以对这些资源加上互斥锁，在任一时刻只能由一个线程访问，即使该线程出现阻塞，该对象的被锁定状态也不会解除，其他线程仍不能访问该对象，这就是多线程同步。线程同步是保证线程安全的重要手段，但是线程同步客观上会导致性能下降。

对于简单线程同步可以使用threading模块的Lock类。Lock对象有两种状态，即锁定 和 未锁定 ，默认是未锁定状态。Lock对象有acquire() 和 release()两个方法实现锁定和解锁，acquire()方法可以实现锁定，release()解锁.

In [19]:
import threading
import time

class TicketDB:
    def __init__(self):
        self.ticket_count = 5
        
    def get_ticket_count(self):
        return self.ticket_count
    
    def sell_ticket(self):
        #TODO 等待用户付款
        #线程休眠，阻塞当前线程，模拟等待用户付款
        time.sleep(1)
        print("第{0}号票，已经售出".format(self.ticket_count))
        self.ticket_count -= 1
        
db = TicketDB()
lock = threading.Lock() #创建锁对象

def thread1_body():
    global db,lock
    
    while True:
        lock.acquire()
        curr_ticket_count = db.get_ticket_count()
        
        if curr_ticket_count > 0:
            print("买家1要买第{0}号票",curr_ticket_count)
            db.sell_ticket()
            
        else:
            lock.release()
            break
        lock.release()
        time.sleep(1)
            
def thread2_body():
    global db,lock
    while True:
        lock.acquire()
        curr_ticket_count = db.get_ticket_count()
        
        if curr_ticket_count > 0:
            print("买家2要买第{0}号票",curr_ticket_count)
            db.sell_ticket()
        else:
            lock.release()
            break
        lock.release()
        time.sleep(1)
def main():
    t1 = threading.Thread(target=thread1_body)
    t1.start()
    t2 = threading.Thread(target=thread2_body)
    t2.start()
    
if __name__=="__main__":
    main()

买家1要买第{0}号票 5
第5号票，已经售出
买家2要买第{0}号票 4
第4号票，已经售出
买家1要买第{0}号票 3
第3号票，已经售出
买家2要买第{0}号票 2
第2号票，已经售出
买家1要买第{0}号票 1
第1号票，已经售出


# 线程间通信

如果两个线程之间有依赖关系，线程之间必须进行通信，互相协调才能完成工作。实现线程间通信，可以使用threading 模块中的Condition和Event类。

## 使用Condition实现线程间通信

Condition 被称为条件变量，Condition类提供了对复杂线程同步问题的支持，除了提供与Lock类相似的acquire() 和 release()方法外，还提供了wait()、notify() 和notify_all()方法，

wait(timeout=None): 是当前线程释放锁，然后当前线程进入等待状态，等待相同条件变量中其他线程唤醒或超时，timeout是设置超时时间。

notify():唤醒相同条件变量中的一个线程

notify_all():唤醒相同条件变量中的所有线程

In [31]:
#coding=utf-8

"""
经典的堆栈数据结构，一个线程生成一些数据，将数据压栈;另一个线程消费这些数据，将数据出栈。这两个线程相互依赖，当堆栈为空，消费线程无法取出
数据时，应通知生成线程添加数据；当堆栈已满，生成线程无法添加数据时，应该通知消费线程取出数据。
"""
import threading
import time
import random

#创建条件变量condition对象

condition = threading.Condition()

class Stack():
    def __init__(self):
        self.pointer = 0 #堆栈指针初始值为0
        self.data = [-1,-1,-1,-1,-1] #堆栈有5个数字的空间
        
    #压栈   
    def push(self,c):
        global condition
        condition.acquire()
        #堆栈已经满了不能压栈
        while self.pointer == len(self.data):
            #等待其他线程把数据出栈
            condition.wait()
            
        #通知其他线程把数据出栈
        condition.notify()
        #数据压栈
        self.data[self.pointer] = c
        #print(self.data)
        #指针向上移动
        self.pointer += 1
        condition.release()
    #出栈   
    def pop(self):
        global condition
        condition.acquire()
        #堆栈无数据不能出栈
        while self.pointer == 0:
            #等待其他线程进行压栈
            condition.wait()
        #通知其他线程压栈
        condition.notify()
        #指针向下移动
        self.pointer -= 1
        data = self.data[self.pointer]
        condition.release()
        #数据出栈
        return data

stack = Stack()

#生产者
def producer_thread_body():
    global stack
    #产生10个数字
    for i in range(1,6):
        #把数字压栈
        stack.push(i)
        print("生产数字:{0}".format(i))
        #每产生一个数字，线程睡眠1秒
        time.sleep(1)
#消费者       
def consumer_thread_body():
    global stack
    #从堆栈取出一个数字
    for i in range(1,6):
        #从堆中读取数字
        x = stack.pop()
        print("消费数字:{0}".format(x))
        time.sleep(1)
        
def main():
    producer = threading.Thread(target=producer_thread_body)
    producer.start()
    consumer = threading.Thread(target=consumer_thread_body)
    consumer.start()
    
if __name__ == "__main__":
    main()

生产数字:1
消费数字:1
生产数字:2
消费数字:2
生产数字:3
消费数字:3
生产数字:4
消费数字:4
生产数字:5
消费数字:5


## 使用Event实现线程间通信

使用Condition 实现线程间通信还是有些繁琐，threading模块提供的Event可是实现线程间通信。Event对象调用wait(timeout=None)方法会阻塞当前线程，是线程进入等待状态，知道另一个线程调用该Event对象的set()方法，通知所有等待线程恢复运行

In [33]:
#使用Event 重构堆栈实例
import threading
import time
import random

#创建条件变量condition对象

event = threading.Event()

class Stack():
    def __init__(self):
        self.pointer = 0 #堆栈指针初始值为0
        self.data = [-1,-1,-1,-1,-1] #堆栈有5个数字的空间
        
    #压栈   
    def push(self,c):
        global event
        #堆栈已经满了不能压栈
        while self.pointer == len(self.data):
            #等待其他线程把数据出栈
            event.wait()
            
        #通知其他线程把数据出栈
        event.set()
        #数据压栈
        self.data[self.pointer] = c
        #print(self.data)
        #指针向上移动
        self.pointer += 1
    #出栈   
    def pop(self):
        global event
        #堆栈无数据不能出栈
        while self.pointer == 0:
            #等待其他线程进行压栈
            event.wait()
        #通知其他线程压栈
        event.set()
        #指针向下移动
        self.pointer -= 1
        data = self.data[self.pointer]
        #数据出栈
        return data

stack = Stack()

#生产者
def producer_thread_body():
    global stack
    #产生10个数字
    for i in range(1,6):
        #把数字压栈
        stack.push(i)
        print("生产数字:{0}".format(i))
        #每产生一个数字，线程睡眠1秒
        time.sleep(1)
#消费者       
def consumer_thread_body():
    global stack
    #从堆栈取出一个数字
    for i in range(1,6):
        #从堆中读取数字
        x = stack.pop()
        print("消费数字:{0}".format(x))
        time.sleep(1)
        
def main():
    producer = threading.Thread(target=producer_thread_body)
    producer.start()
    consumer = threading.Thread(target=consumer_thread_body)
    consumer.start()
    
if __name__ == "__main__":
    main()

生产数字:1
消费数字:1
生产数字:2
消费数字:2
生产数字:3
消费数字:3
生产数字:4
消费数字:4
生产数字:5
消费数字:5


Event 实现线程间通信要比使用condition实现线程间通信简单。Event 不需要会用锁同步代码。