In [1]:
import asyncio
import time
import tkinter as tk
import threading
import functools
import random
import aiohttp
from concurrent.futures import ThreadPoolExecutor

In [2]:
import nest_asyncio
nest_asyncio.apply()
# 如果不运行此命令,下面的循环事件运行协程函数会报错,原因如下:
# Jupyter连接着 IPython 内核，而 IPython 内核本身在事件循环上运行，而 asyncio 不允许嵌套其事件​​循环，因此会出现如上图的错误信息。

# FAQ:

In [None]:
# 1. 下面两种写法的区别: #python3.8 以前的版本,会自动蒋协程对象生成Task对象, python3.8以后会发出警告
async def main():   #入口函数
        done,pending=await asyncio.wait({sleep1(),sleep2(),sleep3()},return_when=asyncio.ALL_COMPLETED)
        for i in done:
            print(i)
        for j in pending:
            print(j)
            
async def main():
    tasks = [asyncio.create_task(num(i)) for i in range(10)]  # tasks = [num(i) for i in range(10)]??
    complete, pending = await asyncio.wait(tasks,timeout=0.5)
    for i in complete:
        print("当前数字:",i.result())

# 协程使用的基本结构:

# 协程函数(coroutine): async

In [3]:
async def myprint():   # 前面用 async装饰的函数叫协程函数
    await asyncio.sleep(3)
    print("end")

In [None]:
obj = myprint()

In [None]:
asyncio.iscoroutine(obj)

In [None]:
asyncio.iscoroutinefunction(myprint)

In [4]:
await myprint()   #它可以工作,是因为jupyter的Ipython的内核本身是在事件循环上运行的

end


# event_loop: 事件循环

# awaitable对象:即可暂停等待的对象

# Task: 任务

# Future: 

# asyncio的基本架构:

## 常见的一些高层API方法

### 运行异步协程

In [None]:
1. asyncio.run(aws,*,debug=False)   #运行一个一步协程,python3.7新添加的内容
2. 创建循环事件对象,然后运行协程函数: 见下面介绍:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine())

In [4]:
async def say_after_time(delay,what):
    await asyncio.sleep(delay)
    print(what)
    
async def main():
    print(f"开始时间为:{time.time()}")
    await say_after_time(1,"Hello")
    await say_after_time(1,'world')
    print(f"结束时间为:{time.time()}")
          
# loop=asyncio.get_event_loop()   # 创建事件循环对象       # loop = asyncio.new_event_loop()  #等价, 创建新的事件循环
# loop.run_until_complete(main())    #通过事件循环对象运行协程函数
# loop.close()
          
# 或者:
asyncio.run(main())   #python3.7 新增

开始时间为:1671064015.9909418
Hello
world
结束时间为:1671064018.011947


In [None]:
# 如果单独执行普通函数那样运行一个协程函数,只会返回一个coroutine对象:
main()

#### 创建事件循环对象

#### 运行协程函数

### asyncio.create_task: 创建任务:

### asyncio.sleep:睡眠

### asyncio.wait: 多个协程函数的等候

In [4]:
%%time
async def sleep1():  #大约1秒
        print("sleep1 begin")
        await asyncio.sleep(1)
        print("sleep1 end")

async def sleep2():  #大约2秒
        print("sleep2 begin")
        await asyncio.sleep(2)
        print("sleep2 end")
    
async def sleep3():  #大约3秒
        print("sleep3 begin")
        await asyncio.sleep(3)
        print("sleep3 end")
    
async def main():   #入口函数
        tasks = [
                 asyncio.create_task(sleep1()),
                 asyncio.create_task(sleep2()),
                 asyncio.create_task(sleep3()),
                ]
        done,pending=await asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)
        for i in done:
            print(i)
        for j in pending:
            print(j)
asyncio.run(main()) #运行入口函数

# 可以试着不同的 return_when的参数

sleep1 begin
sleep2 begin
sleep3 begin
sleep1 end
<Task finished name='Task-6' coro=<sleep1() done, defined at <timed exec>:1> result=None>
<Task pending name='Task-8' coro=<sleep3() running at <timed exec>:13> wait_for=<Future pending cb=[Task.__wakeup()]>>
<Task pending name='Task-7' coro=<sleep2() running at <timed exec>:8> wait_for=<Future pending cb=[Task.__wakeup()]>>
Wall time: 1.01 s
sleep2 end
sleep3 end


In [27]:
async def num(n):
    try:
        await asyncio.sleep(n*0.1)
        return n
    except asyncio.CancelledError:
        print(f"数字{n}被取消")
        raise
        
async def main():
    tasks = [asyncio.create_task(num(i)) for i in range(10)]  # tasks = [num(i) for i in range(10)]??
    result = await asyncio.wait(tasks,timeout=0.5)
    print(result)
    # for i in complete:
    #     print("当前数字:",i.result())
    # if pending:
    #     print("取消未完成的任务")
    #     for p in pending:
    #         p.cancel()
            

In [28]:
loop = asyncio.get_event_loop()
# try:
loop.run_until_complete(main())
# finally:
#     loop.close()

({<Task finished name='Task-126' coro=<num() done, defined at <ipython-input-27-e8537d604d28>:1> result=4>, <Task finished name='Task-123' coro=<num() done, defined at <ipython-input-27-e8537d604d28>:1> result=1>, <Task finished name='Task-127' coro=<num() done, defined at <ipython-input-27-e8537d604d28>:1> result=5>, <Task finished name='Task-124' coro=<num() done, defined at <ipython-input-27-e8537d604d28>:1> result=2>, <Task finished name='Task-125' coro=<num() done, defined at <ipython-input-27-e8537d604d28>:1> result=3>, <Task finished name='Task-122' coro=<num() done, defined at <ipython-input-27-e8537d604d28>:1> result=0>}, {<Task pending name='Task-129' coro=<num() running at <ipython-input-27-e8537d604d28>:3> wait_for=<Future pending cb=[Task.__wakeup()]>>, <Task pending name='Task-131' coro=<num() running at <ipython-input-27-e8537d604d28>:3> wait_for=<Future pending cb=[Task.__wakeup()]>>, <Task pending name='Task-130' coro=<num() running at <ipython-input-27-e8537d604d28>:3

### asyncio.gather: 并发运行多个任务

In [25]:
async def main():
    tasks = [asyncio.create_task(num(i)) for i in range(10)]   #调用例2中的 num()协程函数
    complete = await asyncio.gather(*tasks)
    print(complete)

In [26]:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


### gather和await的区别:

In [None]:
https://blog.csdn.net/weixin_39791653/article/details/111700769

In [None]:
返回值:
    gather: 结果列表,且是和传入的task的准许是对应的
    wait: 包含两个集合的元组, ({done1,done2,...},
                              {pending1,pending2,...}
                             )
终止:
    gather: 无法取消
    wait: 可以用 .cancel() 进行取消

In [23]:
async def num(n):
    try:
        await asyncio.sleep(n*0.1)
        return n
    except asyncio.CancelledError:
        print(f"数字{n}被取消")
        raise
        
async def main():
    tasks = [asyncio.create_task(num(i)) for i in range(10)] 
    complete = await asyncio.gather(*tasks)
    for done in complete:
        print("实例:",done)
        # print("当前数字:",done.result())


In [24]:
asyncio.run(main())

实例: 0
实例: 1
实例: 2
实例: 3
实例: 4
实例: 5
实例: 6
实例: 7
实例: 8
实例: 9


### asyncio.shield: 防止任务取消

### asyncio.wait_for: 设置超时

In [None]:
async def eternity():
    print("我马上开始执行...")
    await asyncio.sleep(3600)
    print("终于轮到我了!!")
    
async def main():
    # wait for at most 1 second
    try:
        print("等你3秒钟")
        await asyncio.wait_for(eternity(),timeout=3)   
        #或await asyncio.wait([asyncio.create_task(eternity())],timeout=3)
        
    except asyncio.TimeoutError:
        print("超时了!")
        
asyncio.run(main())   #可以试着用wait做对比

In [None]:
async def eternity():
    print("我马上开始执行...")
    await asyncio.sleep(2)
    print("终于轮到我了!!")
    
async def main():
    # wait for at most 1 second
    try:
        print("等你3秒钟")
        await asyncio.wait_for(eternity(),timeout=3)   
        #或await asyncio.wait([asyncio.create_task(eternity())],timeout=3)
        
    except asyncio.TimeoutError:
        print("超时了!")
        
asyncio.run(main())   #可以试着用wait做对比

### asyncio.as_completed:生成器

In [None]:
%%time

async def slp5():
    print("Slp5 start")
    await asyncio.sleep(5)
    print("Slp5 finished")
    return "HaHa5"

async def slp3():
    print("Slp3 start")
    await asyncio.sleep(3)
    print("Slp3 finished")
    return "HaHa3"    

async def slp4():
    print("Slp4 start")
    await asyncio.sleep(4)
    print("Slp4 finished")
    return "HaHa4" 

async def main():
    s = asyncio.as_completed({slp5(),slp3(),slp4()})
    for f in s:         #必须要有这个循环,协程函数才会执行,因为 as_completed 返回的是迭代器
        rlt = await f
        print(rlt)
        
asyncio.run(main())

In [None]:
%%time

async def foo(n):
    print("Waiting:",n)
    await asyncio.sleep(n)
    return n

async def main():
    cor1 = foo(1)
    cor2 = foo(2)
    cor3 = foo(3)
    
    tasks = [asyncio.ensure_future(cor1),
            asyncio.ensure_future(cor2),
             asyncio.ensure_future(cor3)
            ]
    
    for task in asyncio.as_completed(tasks):
        result = await task
        print(f"Task result {result}")
        
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())

### 协程中调用普通函数

In [None]:
def callback(args,*,kwargs="default"):
    time.sleep(2)
    print(f"普通函数为回调函数,获取参数:{args},{kwargs}")
    
async def main(loop):
    print("注册callback")
    loop.call_soon(callback,1)
    
    wrapped = functools.partial(callback,kwargs="not default")
    loop.call_soon(wrapped,2)
    await asyncio.sleep(0.2)
    
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
except Exception as e:
    loop.close()

In [None]:
def callback(n):
    print(f"callback {n} invoked")
    
async def main(loop):
    print("注册callbacks")
    loop.call_later(0.2,callback,1)
    loop.call_later(0.1,callback,2)
    loop.call_soon(callback,3)
    await asyncio.sleep(0.4)
    
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
except:
    loop.close()

In [None]:
def call_back(n,loop):
    print(f"callback {n} 运行时间点{loop.time()}")
    
async def main(loop):
    now = loop.time()
    print("当前的内部时间:",now)
    print("循环时间:",now)
    print("注册callback")
    loop.call_at(now+0.1,call_back,1,loop)
    loop.call_at(now+0.2,call_back,2,loop)
    loop.call_soon(call_back,3,loop)
    await asyncio.sleep(1)
    
loop = asyncio.get_event_loop()
try:
    print("进入事件循环")
    loop.run_until_complete(main(loop))
except:
    print("关闭循环")
    loop.close()

### asyncio.Queue: 协程队列

In [None]:
https://www.jb51.net/article/123297.htm

In [None]:
async def product(queue,n):
    for x in range(n):
        print('producing {}/{}'.format(x,n))
        await asyncio.sleep(random.random())
        item = str(x)
        await queue.put(item)
        
async def consume(queue):
    while True:
        item = await queue.get()
        print('consuming{}...'.format(item))
        await asyncio.sleep(random.random())
        # 通知队列该项目已被处理
        queue.task_done()
        
async def main(n):
    queue = asyncio.Queue(maxsize=4)
    # 此时consume方法并没有真正开始运行
    consumer = asyncio.ensure_future(consume(queue))
    # 此时produce生产后，consume才开始运行
    await product(queue,n)
    # 等到消费者处理完所有项目
    await queue.join()
    # 消费者仍在等待商品，取消它
    consumer.cancel()
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(10))

In [None]:
async def produce(queue,n):
    for x in range(n):
        print('producing {}/{}'.format(x,n))
        await asyncio.sleep(random.random())
        item = str(x)
        #替换 await queue.put(item)
        queue.put_nowait(item)
        #打印当前queue里面item存放数量
        print('qsize:',queue.qsize())
    
async def main(n):
    queue = asyncio.Queue(maxsize=3)
    await produce(queue, n)
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(7))
    loop.close()
    
# 将await put方法替换成了put_nowait方法，将Queue的容量设为3个，而生产的任务设为6个。结果如下：
# 可以看出，在生产第4个任务的时候，因为Queue的容量只有3个，所以抛出QueueFull错误，符合预期要求。

# 注意，上面的程序只使用了produce方法，没有调用consume方法，因为我们一边调用produce，一边调用consume时，consume方法会将完成的任务进行回收。相当于出水管将水抽出水池，而入水管又将水抽入，有可能永远也不会满（即抛出QueueFull异常）。

### asyncio.Lock

In [4]:
cache = {}
lock = asyncio.Lock()
 
async def get_stuff(url):
    await lock.acquire()
    async with lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET', url)
        cache[url] = stuff
        return stuff
    lock.release()

async def parse_sutff():
    stuff = await get_stuff(r"https://www.baidu.com/")
    # do some parsing
    
async def use_stuff():
    stuff = await get_stuff(r"https://www.baidu.com/")
    # use stuff to do something interesting
    
tasks = [parse_sutff(), use_stuff()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks,timeout=10))

  tasks = [parse_sutff(), use_stuff()]
  tasks = [parse_sutff(), use_stuff()]


(set(),
 {<Task pending name='Task-2' coro=<use_stuff() running at <ipython-input-4-d19c63fe73fc>:19> wait_for=<Future pending cb=[Task.__wakeup()]>>,
  <Task pending name='Task-3' coro=<parse_sutff() running at <ipython-input-4-d19c63fe73fc>:15> wait_for=<Future pending cb=[Task.__wakeup()]>>})

### loop.run_in_executor: 异步+threadPool/ProcessPool

In [13]:
async def random_sleep(num):
    print(f'sleep start:{num}s')
    await asyncio.sleep(num)
    print(f"sleep endswith: {num}s")

async def main():
    executor = ThreadPoolExecutor(5)
    tasks = []
    for _ in range(5):
        sleep_time = random.randint(1,5)
        task = loop.run_in_executor(executor,random_sleep,sleep_time)
        tasks.append(task)
        
    done,pending = await asyncio.wait(tasks,timeout=(3))

if __name__ == '__main__':

    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print(time.time()-start_time)
    # loop.close()

0.0019960403442382812


In [1]:
from concurrent.futures import ProcessPoolExecutor

In [5]:
def random_sleep(num):
    print(f"sleep start: {num}s")
    time.sleep(num)
    print(f"End: {num}s")
    
async def main():
    executor = ProcessPoolExecutor(5)
    tasks = []
    for _ in range(5):
        sleep_time = random.randint(1,5)
        task = loop.run_in_executor(executor,random_sleep,sleep_time)
        tasks.append(task)
    await asyncio.wait(tasks)
    
if __name__ == '__main__':  
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # asyncio.run(main())
    
    print('end time: {}'.format(time.time()-start_time))

Future exception was never retrieved
future: <Future finished exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Future exception was never retrieved
future: <Future finished exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Future exception was never retrieved
future: <Future finished exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Future exc

end time: 0.39392924308776855


### run_coroutine_threadsafe: 在其它线程执行协程

In [6]:
from threading import Thread
from functools import partial

In [8]:
async def a():
    time.sleep(1)
    return 'A'

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
    
def shutdown(loop):
    loop.stop()
    
if __name__ == '__main__':
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop,args=(new_loop,))
    t.start()
    future = asyncio.run_coroutine_threadsafe(a(),new_loop)
    print(future)
    print(f"Resutl:{future.result(timeout=2)}")
    new_loop.call_soon_threadsafe(partial(shutdown,new_loop))

<Future at 0x1bb8479d880 state=pending>
Resutl:A


### asyncio.Semaphore: 信号量