# 实现支持异步任务的线程池
很多语言都提供了线程池，但是我们不再应该局限于如何使用
![](images/2022-08-27-21-59-21.png)
![](images/2022-08-27-22-03-38.png)

## python 同步原语
1. 互斥量 
   1. lock=threading.Lock() 申请
   2. lock.acquire()        加锁
   3. lock.release()        解锁
2. 条件变量 pyhon中与互斥量一起使用
   1. condition=threading.Condition()   申请
   2. condiction.acquire()              加锁
   3. condition.release()               解锁
   4. condition.wait()                  等待
   5. condition.notify()                唤醒

## 实现线程安全的队列Queue
![](images/2022-08-27-22-14-47.png)
- 队列可能有多个线程同时工作，因此需要保证线程安全
  ![](images/2022-08-27-22-24-33.png)

In [1]:
#线程安全的队列
import queue,time
import threading

class ThreadSafeQueueException(Exception):
    pass

class TreadSafeQueue(object):
    def __init__(self,max_size=0):
        #0表示无限
        self.queue=[]
        self.max_size=0
        self.lock=threading.Lock()
        self.condition=threading.Condition()

    #获取当前元素的数量，size可能被多线程调用，因此要加解锁，以保证获取size安全
    def size(self):
        self.lock.acquire()
        size=len(self.queue)
        self.lock.release()
        return size

    #往队列加入元素
    def put(self,item):
        if self.max_size!=0 and self.size()>self.max_size:
            return ThreadSafeQueueException()
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        self.condition.notify() 
        self.condition.release()
        pass
    #往队列批量加入元素
    def batch_put(self,item_list):
        #保证item_list是个列表
        if not isinstance(item_list,list):
            item_list=list(item_list)
        for item in item_list:
            self.put(item)

    #从队列取出元素
    def pop(self,block=False,timeout=0):
        if self.size()==0:
            #需要阻塞等待
            if block:
                self.condition.acquire()
                self.condition.wait(timeout=timeout)
                self.Condition.release()
            else:
                return None
        self.lock.acquire()
        item=None
        if len(self.queue)>0:
            item=self.queue.pop()
        self.lock.release()
        return item
    
    def get(self,index):
        self.lock.acquire()
        item=self.queue[index]
        self.lock.release()
        return item

if __name__ == '__main__':
    queue=TreadSafeQueue(max_size=100)
    def producer():
        while True:
            queue.put(1)
            print("生产者生产一个")
            time.sleep(3)

    def consumer():
        while True:
            item=queue.pop()
            print('消费者消费一个')
            print('get item from queue: ',item) 
            time.sleep(1)
           

    
    thread1=threading.Thread(target=producer)
    thread2=threading.Thread(target=consumer)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()


        
    



生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
生产者生产一个
消费者消费一个
get item from queue:  1
消费者消费一个
get item from queue:  None
消费者消费一个
get item from queue:  None
