# asyncio协程
　　参考：[官方文档](https://docs.python.org/zh-cn/3/library/asyncio.html)<br>
　　　　　[Asyncio 所有异步协程库用法详解](https://blog.csdn.net/lzy98/article/details/114002465)<br>
　　　　　[Python3: 异步 IO - asyncio 协程与任务、流、子进程、队列、同步](https://xiets.blog.csdn.net/article/details/115557939)<br>
　　　　　[asyncio之调试asyncio](https://hatboy.github.io/2017/12/21/asyncio%E4%B9%8B%E8%B0%83%E8%AF%95asyncio/)<br>
　　　　　[python异步编程之asyncio（百万并发）](https://www.cnblogs.com/shenh/p/9090586.html)<br>
　　　　　[使用asyncio包处理并发](https://www.cnblogs.com/sidianok/p/12231548.html)<br>
　　　　　[]()<br>
　　　　　[]()<br>
　　　　　[]()<br>

## 一、介绍
　　asyncio 是用于编写 单线程内 并发 代码的库，使用 async/await 语法。<br>
　　asyncio 被用作多个提供高性能 Python 异步框架的基础，包括网络和网站服务，数据库连接库，分布式任务队列等等。<br>
　　asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。<br>
　　asyncio 提供了一组 高层级 API 用于：
>并发地 运行 Python 协程 并对其执行过程实现完全控制: Coroutines and Tasks（协程与任务）<br>
执行 网络 IO 和 IPC: asyncio Streams（流）<br>
控制 子进程: asyncio Subprocesses（子进程）<br>
通过 队列 实现分布式任务: asyncio Queue（队列集）<br>
同步 并发代码: asyncio Synchronization（同步原语）<br>

　　asyncio 还有一些 低层级 API 以支持 库和框架的开发：
>事件循环、Futures、传输和协议、策略、平台支持。<br>
低层级 API 一般只用于在库和框架的编写（底层库和框架可能需要更细致地使用 asyncio）。<br>
应用层级的代码中应尽量使用 高层级 API。<br>
指南与教程：[高层级 API 索引](https://docs.python.org/zh-cn/3/library/asyncio-api-index.html)、[低层级 API 索引](https://docs.python.org/zh-cn/3/library/asyncio-llapi-index.html)、[用 asyncio 开发](https://docs.python.org/zh-cn/3/library/asyncio-dev.html)<br>

## 二、协程与任务: Coroutines and Tasks
　　协程 通过 async/await 语法进行声明，是编写 asyncio 应用的推荐方式。<br>
　　async/await 语法：<br>
>async def：用于编写 协程函数；<br>
await：等待 协程/任务 的执行完成。<br>

　　“协程”可以用来表示两个紧密关联的概念：<br>
>协程函数：定义形式为 async def 的函数；<br>
协程对象：调用 协程函数 所返回的对象。<br>

### 2.1、协程 的简单示例
　　以下代码段（需要 Python 3.7+）会打印 “Hello”，等待 1 秒，再打印 “World”：<br>
  　运行协程过程：调用 协程函数，返回 协程对象，使用 asyncio.run(...) 运行协程对象<br>
　　注意：简单地调用一个 协程 并不会使其被调度执行（返回协程对象）<br>


In [8]:
import asyncio

async def main():               # 定义 协程函数
    print("Hello")
    await asyncio.sleep(1)      # 阻塞协程, 但不阻塞线程
    print("World")
# asyncio.run(main())             # 运行协程（Python 3.7+）  
# asyncio.run(): This function cannot be called when another asyncio event loop is running in the same thread.
# The problem in your case is that jupyter (IPython) is already running an event loop (for IPython ≥ 7.0)
# 大致就是jupyter 已经运行了loop，无需自己激活，采用上文中的await()调用即可
# await main()
m = main()          # 调用 协程函数, 返回 协程对象
await m      # 运行 协程对象（同一个对象只能被运行一次）


Hello
World


### 2.2、运行协程
　　要真正运行一个协程，asyncio 提供了 3 种主要机制均可使协程被调度执行：
>asyncio.run() 函数用来运行最高层级的入口点 main() 函数（参考上面示例）；<br>
await 等待一个协程（对象）；<br>
asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。<br>
以上 3 种方式。<br>

#### 2.2.1、主入口函数: asyncio.run()
　　函数原型: 
>asyncio.run(coroutine, \*, debug=False)<br>

　　此函数是用来运行协程的最高层级的入口点 main() 函数（协程调度的主入口函数），执行 coroutine 并返回结果。<br>
　　此函数会调度运行传入的协程对象，负责管理 asyncio 事件循环，终结异步生成器，并关闭线程池。当有其他 asyncio 事件循环在同一线程中运行时，此函数不能被调用。此函数总是会创建一个新的事件循环并在结束时关闭。<br>
　　它应当被 用作 asyncio 程序的主入口点，理想情况下应当只被调用一次。<br>

In [9]:
# import asyncio

async def main():             #  定义协程函数（协程调度的主入口函数）
    await asyncio.sleep(1)
    print("Hello World")
    return "main over"
# ret = asyncio.run(main())   #  在最高层级入口点运行协程; run()函数执行完的返回结果为 main() 协程函数的返回结果
ret = await main()
print(ret)                  # 输出: "main over"

Hello World
main over


#### 2.2.2、 await 可等待对象
　　如果一个对象可以在await语句中使用，那么该对象即为**可等待对象**，可以在其他协程中被等待。await 语句只能在 async 协程函数中使用。<br>
　　可等待对象有 3 种主要类型：协程、任务 和 Future。<br>
　　await 等待一个 协程(对象) 时，该协程将被运行，运行完成后等待才结束。<br>

In [12]:
# import asyncio
import time

async def hello():              # 定义 协程函数
    await asyncio.sleep(2)      # 等待 2 秒
    print("Hello")
    return "Hello"

async def world():              # 定义 协程函数
    await asyncio.sleep(3)      # 等待 3 秒
    print("World")
    return "World"

async def main():               # 定义 协程函数（主入口函数）
    # 函数执行流程为同步
    print("main start")
    ret1 = await hello()               # 运行协程 并 等待协程运行完毕
    print("中间。。。")
    ret2 = await world()               # 运行协程 并 等待协程运行完毕
    print("main end")

# asyncio.run(main())             # 运行协程（的主函数）
                                # main() 函数执行完毕后 run() 函数才执行完毕
start = time.time()
await main()
print(f"over --->{time.time() - start}")


main start
Hello
中间。。。
World
Hello
World
main end
over --->5.0126471519470215


#### 2.2.3、并发运行协程: asyncio.create_task()
　　前面演示的协程都是**使用 await 等待同步运行**，协程的真正的优势是异步并发运行，并发运行协程需要将协程封装为 async 任务。创建 async 任务的函数原型: <br>
>asyncio.create_task(coroutine, *, name=None)　　　#　此函数在 Python 3.7 中被加入。<br>

　　此函数将 coroutine (协程对象) 封装为一个 asyncio.Task 并立即自动调度其执行，同时（不等待协程运行完毕）返回 asyncio.Task 对象。一般需要在 async 协程函数中创建异步任务并等待。<br>
　　任务还可以调用 Task.cancel(msg=None) 方法中途取消。<br>
　　并发运行协程（任务）代码示例：<br>

In [14]:
# import asyncio

async def delay_print(time, text):  # 定义 协程函数
    await asyncio.sleep(time)
    print(text)

async def main():                   # 定义 协程函数（主入口函数）
    print("main start")

    # 将协程封装为任务, 协程 将立即被异步运行, create_task() 函数立即返回, 不阻塞
    task1 = asyncio.create_task(delay_print(2, "Hello"))
    task2 = asyncio.create_task(delay_print(3, "World"))

    print("task create complete")

    await task1                     # 等待任务完成（阻塞）
    await task2                     # 等待任务完成（阻塞）
    # await 语句返回结果为任务对应的协程函数 delay_print() 的返回值

    print("main end")

start = time.time()
# asyncio.run(main())                 # 运行协程（的主函数）
await main()
print(f"over --->{time.time() - start}")


main start
task create complete
Hello
World
main end
over --->3.005509376525879


### 2.3、异步上下文管理器: async with
　　普通 wtih 语句的上下文管理器的进入和退出的方法是 __enter__ 和 __exit__。async with 异步上下文管理器的进入和退出方法则为 __aenter__ 和 __aexit__（方法需使用 async def 声明为协程函数），能在其中暂停/等待协程任务（不阻塞线程）。异步上下文管理器 与 普通上下文管理器 原理相同（只不过进入和退出的方法需要使用 await 等待）。async with 语句需要在 async 协程函数使用。<br>
　　async with 语句执行原理：<br>
>async with obj as m:<br>
　　\# m.method(...)<br>
　　pass<br>

　　1. obj 表示一个对象（或是一个表达式, 结果为一个对象）<br>
　　2. 调用 obj 对象的 async def __aenter__() 方法并等待完成, 返回值赋值给 as 右边的变量 m, 
即: m = await obj.__aenter__()<br>
　　3. 执行 with 代码块中的代码<br>
　　4. 执行完 with 代码块中的代码后, 无论是否发生异常, 调用 obj 的 async def __aexit__() 方法并等待完成, 
即: await obj.__aexit__(...)<br>
　　上面代码相当于:<br>
>obj = ...<br>
m = await obj.__aenter__()<br>
try:<br>
　　\# m.method(...)<br>
　　pass<br>
finally:<br>
　　await obj.__aexit__(...)<br>



In [15]:
# import asyncio

class AsyncContextManager:

    async def __aenter__(self):
        """
        执行 with 语句块时, 将先调用 __aenter__ 方法, 执行预先需要处理的代码,
        返回值将赋值给 as 右边的变量（通常返回自己）
        """
        print("__aenter__")
        await asyncio.sleep(2)
        return self

    def test(self):
        print("test")

    async def __aexit__(self, exc_type, exc, tb):
        """
        执行完 with 语句块后会调用 __aexit__ 方法, 执行清理工作,

        如果 with 语句块中抛出异常, 则 exc_type, exc, tb 三个参数
        分别表示抛出异常的 异常类型、异常对象的值、异常对象的 traceback 对象,
        如果没有抛出异常, 则三个参数的值均为 None
        """
        print("__aexit__", exc_type, exc, tb)
        await asyncio.sleep(2)

        
async def main():
    async with AsyncContextManager() as m:  # obj = AsyncContextManager(); m = await obj.__aenter__()
        m.test()                            # with 语句块中如果发生异常, 则执行完 __aexit__ 后抛给上层调用者
        # other operating                   # await obj.__aexit__(...)


# asyncio.run(main())
await main()
# 执行后结果输出:
#               __aenter__
#               test
#               __aexit__ None None None


__aenter__
test
__aexit__ None None None


### 2.4、异步迭代器: async for
　　异步迭代器可以在其迭代器实现中调用异步代码（等待/暂停异步任务，不阻塞线程）。普通 for 迭代器的创建迭代器和迭代下一个元素的方法是 __iter__ 和 __next__。async for 异步迭代器的创建迭代器和迭代下一个元素的方法则为 __aiter__ 和 __anext__（方法需使用 async def 声明为协程函数），能在其中暂停/等待协程任务（不阻塞线程）。异步迭代器 与 普通迭代器 原理相同（只不过调用迭代下一个元素的方法需要使用 await 等待）。async for 语句需要在 async 协程函数使用。<br>
　　普通 for 迭代器迭代结束抛出的是 StopIteration 异常，async for 异步迭代器迭代迭代抛出的异常为 StopAsyncIteration。<br>
　　async for 语句执行原理：
>async for i in iterable_obj:<br>
　　BLOCK<br>

上面代码相当于：
>iterable_obj = ...　　　# 可迭代对象<br>
iterator = iterable_obj.__aiter__()　　　# 创建迭代器<br>
while True:<br>
　　try:<br>
　　　　i = await iterator.__anext__()　　　# 迭代下一个元素, 等待返回<br>
　　　　BLOCK<br>
　　except StopAsyncIteration:<br>
　　　　break　　　# 抛出异常, 迭代结束<br>


In [18]:
# import asyncio


class AsyncRange:
    def __init__(self, stop):
        self.stop = stop

    def __aiter__(self):
        """
        创建迭代器, 返回一个实现了 async def __anext__ 方法的对象,
        此方法不能使用 async 修饰, 需要直接返回普通对象（非协程对象）
        """
        self._i = 0
        return self

    async def __anext__(self):
        """
        迭代返回下一个元素, 此方法需要使用 async 修饰
        """
        if self._i >= self.stop:
            raise StopAsyncIteration()  # 抛出 StopAsyncIteration 异常, 表示没有更多元素, 迭代结束

        await asyncio.sleep(1)          # 可以在方法中调用异步代码

        n = self._i
        self._i += 1
        return n


async def main():
    async for i in AsyncRange(10):
        print(i)


# asyncio.run(main())
await main()


0
1
2
3
4
5
6
7
8
9


### 2.5、休眠: asyncio.sleep()
　　await除了可以等待 协程 和 任务 外，还可以等待 Future，调用 asyncio.sleep(delay) 函数返回的就是 Future 对象。<br>
　　函数原型: coroutine asyncio.sleep(delay, result=None, *)<br>
　　阻塞 delay 指定的秒数，如果指定了 result，则当协程完成时将其返回给调用者。**sleep() 总是会挂起当前任务，以允许其他任务运行。**<br>
　　asyncio.Future 对象：<br>
　　Future 是一种特殊的 低层级 可等待对象，表示一个异步操作的 最终结果。通常情况下 没有必要 在应用层级的代码中创建 Future 对象。<br>
以下协程示例每秒显示一次时间，一共显示 5 次：

In [22]:
# import asyncio
# import time

async def display_date():
    for i in range(5):
        await asyncio.sleep(1)
        print(time.strftime("%Y-%m-%d %H:%M:%S"))

# asyncio.run(display_date())
await display_date()

2022-01-17 12:15:34
2022-01-17 12:15:35
2022-01-17 12:15:36
2022-01-17 12:15:37
2022-01-17 12:15:38


### 2.6、并发运行任务: asyncio.gather()
　　并发运行任务函数原型：<br>
>awaitable asyncio.gather(*aws, return_exceptions=False)

　　并发运行 aws 序列中的**可等待对象**。如果 aws 中的某个可等待对象为协程，则它将被作为一个任务调度（自动运行协程）。<br>
　　如果所有可等待对象都成功完成，结果将是一个由所有返回值聚合而成的列表，结果值的顺序与 aws 中可等待对象的顺序一致。<br>
　　如果 return_exceptions=False (默认)，所引发的首个异常会立即传给等待 gather() 的任务，aws 序列中的其他可等待对象 不会被取消 并将继续运行。<br>
　　如果 return_exceptions=True，异常会和成功的结果一样处理，并聚合至结果列表。<br>
　　如果 gather() 被取消，所有被提交 (尚未完成) 的可等待对象也会 被取消。<br>

In [23]:
# import asyncio
async def delay_print(time, text):  # 定义 协程函数
    await asyncio.sleep(time)
    print(text)
    return f"PRINT: {text}"


async def main():  # 定义 协程函数（主入口函数）
    # 创建协程对象（不会自动调度运行）
    task1 = delay_print(1, "ABC")
    task2 = delay_print(3, "DEF")
    task3 = delay_print(3, "UVW")

    # 创建任务（会自动立即异步运行）
    task4 = asyncio.create_task(delay_print(5, "XYZ"))

    # 并发异步运行多个任务
    # task1, task2, task3 协程将作为任务被调度运行
    # task4 任务等待运行完毕
    all_task = asyncio.gather(task1, task2, task3, task4)

    # 等待所有并发任务的运行完毕（一共需要 5 秒）
    rets = await all_task

    print(rets)     # 输出: ['PRINT: ABC', 'PRINT: DEF', 'PRINT: UVW', 'PRINT: XYZ']


start = time.time()
# asyncio.run(main())  # 运行协程（的主函数）
await main()
print(f"over --->{time.time() - start}")


ABC
DEF
UVW
XYZ
['PRINT: ABC', 'PRINT: DEF', 'PRINT: UVW', 'PRINT: XYZ']
over --->5.002874851226807


### 2.7、超时: asyncio.wait_for()
　　等待超时函数原型：
>coroutine asyncio.wait_for(aw, timeout, *)

　　等待 aw 可等待对象 完成，等待 timeout 秒数后超时。如果 aw 是一个协程，它将自动被作为任务调度运行。<br>
　　timeout 可以可以 None、float 或 int，如果 timeout 为 None，则等待直到完成。<br>
　　如果发生超时，任务将取消并抛出异常 asyncio.TimeoutError。<br>

In [24]:
# import asyncio
async def delay_print(text):
    await asyncio.sleep(10.0)
    print(text)

async def main():
    try:
        await asyncio.wait_for(delay_print("Hello"), 3.0)
    except asyncio.TimeoutError:
        print("time out")

start = time.time()
# asyncio.run(main())  # 运行协程（的主函数）
await main()
print(f"over --->{time.time() - start}")


time out
over --->3.011270046234131


### 2.8、在线程中运行: asyncio.to_thread()
　　函数原型：
>coroutine asyncio.to_thread(func, /, *args, **kwargs)　　　# 3.9 新版功能.

　　在子线程中异步运行 func 函数，其中 \*args, \*\*kwargs 将作为参数被传递给 func 函数。返回一个可被等待以获取 func 函数返回结果的协程。<br>
　　一些 IO 密集型函数/方法 如果放在协程中运行，会阻塞协程任务的事件循环。因此可以将 IO 密集型函数/方法 放到子线程中执行。<br>
　　asyncio.to_thread() 内部维护了一个线程池功能。当调用 await asyncio.to_thread() 提交一个任务到子线程时，如果没有现成的线程，会先创建一个子线程，然后在子线程中执行任务，任务执行完毕后不会马上结束线程，等再次调用 await asyncio.to_thread() 方法提交任务时会复用现有的子线程。如果在第一个任务还没有完成前，再次提交任务（即并发执行任务），此时没有空闲的线程，则会创建新的子线程并立即执行任务。<br>
　　更多跨线程调度参考：[跨线程调度](https://docs.python.org/zh-cn/3/library/asyncio-task.html#scheduling-from-other-threads)、[并发和多线程](https://docs.python.org/zh-cn/3/library/asyncio-dev.html#asyncio-multithreading) 。

In [25]:
# import asyncio
# import time
import threading


def blocking_io(text, t):
    time.sleep(t)                       # 阻塞线程, 模拟 IO 密集型任务
    print(f"text: {text}; Thread: {threading.current_thread()}")
    return f"RET {text}"


async def main():
    # 创建在子线程中运行的任务, blocking_io 方法将在子线程被调用, 不阻塞当前线程
    t1 = asyncio.to_thread(blocking_io, text="T1", t=1)
    t2 = asyncio.to_thread(blocking_io, text="T2", t=2)

    # 并发执行任务（此时才会启动子线程真正执行任务）, 等待所有任务执行完毕返回
    ret = await asyncio.gather(t1, t2)
    
    # await t1
    # await t2
    # 测试 t2 复用 t1 创建的线程执行任务

    # 打印结果
    print(f"ret: {ret}; Thread: {threading.current_thread()}")


# asyncio.run(main())
start = time.time()
await main()
print(f"over --->{time.time() - start}")
# 输出：
# text: T1; Thread: <Thread(asyncio_0, started 123145466556416)>
# text: T2; Thread: <Thread(asyncio_1, started 123145483345920)>
# ret: ['RET T1', 'RET T2']; Thread: <_MainThread(MainThread, started 4488781312)>



AttributeError: module 'asyncio' has no attribute 'to_thread'

### 2.9、统计查看处理中的任务数：asyncio.Task.all_tasks()

### 2.10、asyncio的时间循环对象
#### 2.10.1、时间循环对象
　　asyncio实现了两种事件循环对象：  
1. asyncio.SelectorEventLoop：(默认使用)基于Python3.4中添加的selectors模块，它会根据OS自动选择最优的I/Omultiplexing接口，比如在Linux中会使用epoll，在BSD中使用Kqueue。  
2. asyncio.ProactorEventLoop：只能用于Windows系统，Python3.8以上会自动使用IOCP(I/O CompletionPorts)  

　　在python3.7的版本里，asyncio标准库增加了asyncio.WindowsProactorEventLoopPolicy。windows中默认使用asyncio.windows_events.\_WindowsSelectorEventLoop，只支持sockets，不支持Pipes和subprocesses，我们可以更换为asyncio.windows_events.ProactorEventLoop。  
　　注：Windows上的IOCP是一种真正的异步IO，它比默认的select性能更佳，libuv在Windows上使用的也是IOCP。 
#### 2.10.2、设置事件循环
　　使用asyncio.new_event_loop()创建并返回一个新的事件循环对象。  
　　使用asyncio.get_event_loop()获取当前的事件循环。  
　　使用asyncio.set_event_loop(loop)设置事件循环。这样，ProactorEventLoop就会代替默认的SelectorEventLoop，从而在Windows上有更好的性能。而在Python3.8中，asyncio在Windows上已经开始使用默认IOCP了。    

```python
import sys, asyncio, uvloop

loop = asyncio.get_event_loop()
print(loop)

#if sys.platform == 'win32':
loop = asyncio.ProctorEventLoop()
asyncio.set_event_loop(loop)
new_loop = asyncio.get_event_loop()
print(new_loop)


if sys.platform == 'win32': #支持windows
    if sys.version_info.major >=3 and sys.version_info.minor >= 7:
        asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
    else:
        asyncio.set_event_loop(asyncio.ProactorEventLoop())
else: #支持linux
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) #或直接用uvloop.install()效果一样的

# 在Mac OS 10.6\10.7\10.8中，默认的事件循环是asyncio.SelectEventLoop，它会自动 选择select.KqueueSelector，使用操作系统底层提供的Kququ接口实现IO multiplexing，但是它不支持字符设备，可以更换为selectors.SelectSelector()。

import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)    
```
#### 2.10.3、管理事件循环策略
+ asyncio.get_event_loop_policy():获取当前事件循环策略
+ asyncio.set_event_loop_policy():设置当前事件循环策略

　　可以替换asyncio中默认的事件循环策略，比如使用uvloop，这是用cython编写，基于libuv。uvloop可以让asyncio更快，测试表明比tornado\curio\gevent等快两位，接近于go程序的速度。但是不持windows。  

#### 2.10.4、运行管理事件循环
　　事件循环需要启动才会不断循环监视事件并处理事件，常用方法：
+ AbstractEventLoop.run_until_complete(future):运行事件循环，直到传入的asyncio.Future对象完成(如果传入协程，首先会自动将协程包装成Task对象)，返回Future对象的结果或抛出异常。
+ AbstractEventLoop.run_forever():一直运行事件循环，直到调用AbstractEventLoop.stop()后才会停止。
+ AbstractEventLoop.stop():停止正在运行的事件循环。
+ AbstractEventLoop.close():关闭事件循环，一旦关闭了事件循环后，就不能调用run_until_complete等方法。



# 三、网络 IO: asyncio Streams
　　asyncio streams 是用于处理网络连接的支持 async/await 的 高层级 原语。asyncio streams 允许异步发送和接收数据（不阻塞线程）并等待其完成。<br>
　　asyncio streams 的相关类和方法：<br>
>asyncio.start_server: 启动 Socket 服务<br>
asyncio.open_connection: 建立(TCP)网络连接并返回一对 (reader, writer) 对象<br>
asyncio.StreamReader: IO 流读取器对象<br>
asyncio.StreamWriter: IO 流写入器对象<br>

　　无论是服务端还是客户端，连接后都只返回了一对 (reader, writer) 对象，要获取其他连接信息（如地址和端口），需要使用 writer 的 get_extra_info(name, default=None) 方法获取。<br>
　　除了 高层级 API 实现外，还可使用 低层级 API 实现 TCP 和 UDP 服务端/客户端，参考：<br>
>低层级 API loop.create_server() 和 loop.create_connection() 实现 TCP 回显 服务端 和 客户端
低层级 API loop.create_datagram_endpoint() 实现 UDP 回显 服务端 和 客户端

　　下面演示基于 asyncio streams 高层级 API 的 TCP 回显 服务端/客户端 开发示例：<br>

In [None]:
# 　　使用 高层级 API asyncio.start_server() 函数实现 TCP 回显 服务端：
# import asyncio
async def client_connected_cb(reader, writer):
    # reader: asyncio.StreamReader
    # writer: asyncio.StreamWriter

    # 获取 客户端 的 地址和端口
    client_addr = writer.get_extra_info("peername")

    # 读取客户端发来的数据 (bytes)
    data = await reader.read(1024)
    message = data.decode()

    print(f"Client {client_addr}: {message}")

    # 发送数据到客户端 (bytes)
    writer.write(f"Your data = {message}".encode())
    # 等待直到可以适当地恢复写入流
    await writer.drain()

    # 关闭客户端连接
    writer.close()
    await writer.wait_closed()


async def main():
    # 启动 TCP 服务, 监听指定端口, 有客户端连接时将回调 client_connected_cb 方法
    server = await asyncio.start_server(client_connected_cb, "127.0.0.1", 8080)

    # print(type(server))               # <class 'asyncio.base_events.Server'>

    # 获取 server 监听的地址和端口
    server_addr = server.sockets[0].getsockname()
    print(f"Serving on {server_addr}")

    async with server:
        # 循环监听接收客户请求
        await server.serve_forever()

    # class asyncio.base_events.Server:
    #     async def __aenter__(self):
    #         return self
    #     async def __aexit__(self, *exc):
    #         self.close()
    #         await self.wait_closed()


# asyncio.run(main())
await main()


Serving on ('127.0.0.1', 8080)


In [None]:
# 使用 高层级 API asyncio.open_connection() 函数实现 TCP 回显 客户端：
# import asyncio
async def main():
    # 连接 TCP 服务端, 返回一对 IO 流读写对象
    reader, writer = await asyncio.open_connection("127.0.0.1", 8080)

    # 获取远端地址和端口
    server_addr = writer.get_extra_info("peername")

    # 发送数据到服务端 (bytes)
    writer.write(b"Hello World")

    # 读取服务端发送的数据
    data = await reader.read(1024)
    print(f"Server {server_addr}: {data.decode()}")

    # 关闭连接
    writer.close()


# asyncio.run(main())
await main()


# 四、子进程: asyncio Subprocesses
　　[asyncio.create_subprocess_shell(…)](https://docs.python.org/zh-cn/3/library/asyncio-subprocess.html#asyncio.create_subprocess_shell) 和 [asyncio.create_subprocess_exec(…)](https://docs.python.org/zh-cn/3/library/asyncio-subprocess.html#asyncio.create_subprocess_exec) 异步调用后等待返回一个 [asyncio.subprocess.Process](https://docs.python.org/zh-cn/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process) 实例与子进程交互。<br>
　　使用 高层级 async/await asyncio API 创建和管理子进程的两个 asyncio 方法：<br>

In [None]:
# 运行 cmd shell 命令, 返回一个 asyncio.subprocess.Process 实例
coroutine asyncio.create_subprocess_shell(cmd, 
                                          stdin=None, 
                                          stdout=None, 
                                          stderr=None, 
                                          limit=None, 
                                          **kwds)


# 创建一个子进程, 返回一个 asyncio.subprocess.Process 实例
coroutine asyncio.create_subprocess_exec(program, 
                                         *args, 
                                         stdin=None, 
                                         stdout=None, 
                                         stderr=None, 
                                         limit=None, 
                                         **kwds)


## 4.1、运行 shell 命令

In [28]:
# import sys
# import asyncio


async def run_cmd(cmd):
    # 创建子进程 shell, 返回 Process 实例
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    # 发送到子进程 stdin 的数据 (bytes), 没有则为 None
    stdin_data = None

    # 与进程交互:
    #     1. 将数据 stdin_data 发送到到子进程的 stdin;
    #     2. 从 stdout 和 stderr 读取数据, 直至到达 EOF;
    #     3. 等待进程终结。
    # 返回一个元素类型为 bytes 的元组 (stdout_data, stderr_data)
    stdout_data, stderr_data = await proc.communicate(stdin_data)

    # 获取子进程执行完毕后的返回码
    print(f"[{cmd}] return code {proc.returncode}")

    # 打印子进程输出到 标准输出(stdout) 和 标准错误(stderr) 的数据
    if stdout_data:
        print(f"[stdout]\n{stdout_data.decode()}")
    if stderr_data:
        print(f"[stderr]\n{stderr_data.decode()}", file=sys.stderr)


async def main():
    # 异步运行 shell 命令
    await run_cmd("ls -l")

    # 并发运行多个 shell 命令
    task1 = run_cmd("python3 -V")
    task2 = run_cmd("pwd")
    await asyncio.gather(task1, task2)


# asyncio.run(main())
await main()


NotImplementedError: 

## 4.2、创建子进程

In [None]:
# 先创建一个 Python 脚本文件: demo.py
import sys


def main():
    # 获取命令参数
    argv = ", ".join(sys.argv)

    # 从 stdin 读取一行数据
    in_data = input()

    # 打印输出数据到 stdout
    print(f"argv = {argv}")
    print(f"in_data = {in_data}")


if __name__ == "__main__":
    main()


In [None]:
# 创建一个 Python 的子进程运行 Python 脚本。
import asyncio


async def main():
    # 创建子进程, 返回 Process 实例
    proc = await asyncio.create_subprocess_exec(
        "python3", "demo.py", "Hello", "World",     # 命令 和 参数
        stdin=asyncio.subprocess.PIPE,              # 子进程的 标准输入 类型
        stdout=asyncio.subprocess.PIPE,             # 子进程的 标准输出 类型
        stderr=asyncio.subprocess.PIPE)             # 子进程的 标准错误 类型

    # 从 proc 中获取子进程的 stdin, stdout, stderr
    # 创建子进程时必须传入这 3 个参数才有值, 否则为 None
    stdin = proc.stdin
    stdout = proc.stdout
    stderr = proc.stderr

    # 写出数据(bytes)到子进程的 stdin
    stdin.write("你好".encode())
    # 必须 写入一个'\n' 或 写入EOF, 子进程才能从 stdin 完成一行数据的读取
    stdin.write_eof()
    # 等待直到可以适当地恢复写入流
    await stdin.drain()

    # 从子进程的的 stdout 和 stderr 读取数据(bytes)
    stdout_data = await stdout.read()
    stderr_data = await stderr.read()

    # 等待子进程退出, 返回 code
    returncode = await proc.wait()

    # 打印数据
    print(f"[returncode]\n{returncode}")
    print(f"[stdout_data]\n{stdout_data.decode()}")
    print(f"[stderr_data]\n{stderr_data.decode()}")


asyncio.run(main())


# 五、队列: asyncio Queue
　　asyncio.Queue 队列被设计成与线程版同步队列 queue 模块类似。asyncio.Queue 是基于协程的、专用于 async/await 代码的同步队列。<br>
　　与 queue 一样，asyncio.Queue 主要有 3 种队列：<br>
>asyncio.Queue: 先进先出（FIFO）队列<br>
asyncio.LifoQueue: 后进先出（LIFO）队列<br>
asyncio.PriorityQueue: 具有优先级顺序的队列<br>

　　LifoQueue 和 PriorityQueue 继承自 Queue。<br>
　　asyncio.LifoQueue 后进先出队列用法与 asyncio.Queue 相同，asyncio.PriorityQueue 优先级队列的元素（任务）通常通过 (priority_number, item) 元组的形式 put 和 get。<br>


In [None]:
# 创建一个先进先出（FIFO）队列
# maxsize 表示队列的最大尺寸: 
#     如果 maxsize <= 0, 则不限制队列尺寸, 即可以无限大;
#     如果 maxsize > 0, 则当队列尺寸达到 maxsize 时, 
#     await put() 将被阻塞, 直到队列中某个元素被 get() 取出.
class asyncio.Queue(maxsize=0, *)


# 队列最大尺寸（可存放元素的数量）
maxsize

# 返回队列中的元素(任务)数量（不包括已被 get 取出但还没 task_done 的任务）
qsize()

# 队列是否为空(qsize==0), 为空返回 True, 否则返回 False
empty()

# 队列是否已满, 队列中元素数量(qsize)达到 maxsize 返回 True, 
# 如果 maxsize <= 0, 则永远返回 False
full()


# 从队列中删除并返回一个元素。如果队列为空, 则等待, 直到队列中有 put 新元素。
coroutine get()

# 不阻塞, 立即从队列中删除并返回一个元素。如果队列为空, 则抛出 QueueEmpty 异常。
get_nowait()


# 添加一个元素进队列。如果队列已满, 则等待, 直到队列中有任务被 get 取出（无需等待 task_done)。
coroutine put(item)

# 不阻塞, 立即添加一个元素进队列, 如果队列中没有立即可用的空闲插槽, 则抛出 QueueFull 异常。
put_nowait(item)


# 通知队列上一次 get() 取出的任务已完成, 
# 通过 get() 从队列中删除并取出任务后, 任务处于被取出但未完成状态, 
# 完成后必须要调用 task_done() 通知队列
task_done()


# 阻塞直队列中的所有元素都被接收并处理完毕, 即所有任务都被取出(变成空队列)并已 task_done,
# 在 task_done() 方法中会检查是否还有未完成任务, 没有则通知 await join() 结束等待。
coroutine join()

## 5.1、Queue 先进先出（FIFO）队列

In [None]:
import asyncio
import random


class Task:
    def __init__(self, sleep, text):
        self.__sleep = sleep
        self.__text = text

    async def do_task(self):
        await asyncio.sleep(self.__sleep)
        print(f"do task ({self.__text})")


async def production_tasks(queue: asyncio.Queue[Task]):
    for i in range(5):
        # 创建任务
        task = Task(random.uniform(1.0, 2.0), f"T{i}")
        # 添加任务到队列中
        await queue.put(task)


async def consuming_tasks(queue: asyncio.Queue[Task]):
    while True:
        # 从队列中取出任务
        task = await queue.get()
        # 处理任务
        await task.do_task()
        # 通知队列任务已被处理完成
        queue.task_done()


async def main():
    # 创建 先进先出（FIFO）队列
    queue = asyncio.Queue()

    # 创建异步 生产任务
    t1 = asyncio.create_task(production_tasks(queue))
    # 创建异步 消费任务
    t2 = asyncio.create_task(consuming_tasks(queue))

    await t1
    await t2


asyncio.run(main())

## 5.2 LifoQueue 后进先出（LIFO）队列
　　其他代码与上面 FIFO 示例一致，只需将创建队列改为 asyncio.LifoQueue：<br>
>\# 创建 后进先出（LIFO）队列<br>
queue = asyncio.LifoQueue()<br>

## 5.3 PriorityQueue 优先级队列

In [None]:
import asyncio
import random
from typing import Tuple


class Task:
    def __init__(self, sleep, text):
        self.__sleep = sleep
        self.__text = text

    async def do_task(self):
        await asyncio.sleep(self.__sleep)
        print(f"do task ({self.__text})")


async def production_tasks(queue: asyncio.Queue[Tuple[int, Task]]):
    for i in range(5):
        # 创建任务
        task = Task(random.uniform(1.0, 2.0), f"T{i}")
        # 添加任务到队列中 (priority_number, task),
        # priority_number 可以是 int 或 float,
        # priority_number 越小越先被取出,
        # 在队列中还未被取出的任务的 priority_number 不能相同
        await queue.put((5 - i, task))


async def consuming_tasks(queue: asyncio.Queue[Tuple[int, Task]]):
    while True:
        # 从队列中取出任务
        task = await queue.get()
        # 处理任务
        await task[1].do_task()
        # 通知队列任务已被处理完成
        queue.task_done()


async def main():
    # 创建 优先级 队列
    queue = asyncio.PriorityQueue()

    # 创建异步 生产任务
    t1 = asyncio.create_task(production_tasks(queue))
    # 创建异步 消费任务
    t2 = asyncio.create_task(consuming_tasks(queue))

    await t1
    await t2


asyncio.run(main())


# 六、同步: asyncio Synchronization
　　asyncio 既然支持多任务并发执行，就会和多线程一样存在多个并发任务对共享资源访问以及并发代码块之间通讯的问题（即使是在同一个线程内），即 同步原语。asyncio 同步原语被设计为与 threading 线程模块类似。
　　asyncio 同步原语 的两个关键注意事项：
>asyncio 原语不是线程安全的，因此不能在 asyncio 代码中使用线程的同步;
asyncio 同步原语的方法不支持 timeout 参数, 如果需要超时操作可以使用 asyncio.wait_for()。

　　asyncio 具有以下基本的同步原语：
>同步锁: asyncio.Lock
事件对象: asyncio.Event
条件对象: asyncio.Condition
信号量对象: asyncio.Semaphore
绑定的信号量对象: asyncio.BoundedSemaphore

## 6.1、锁对象: Lock
　　asyncio.Lock 任务互斥锁，可以保证并发代码块对共享资源的独占访问。
　　asyncio.Lock 类的方法：

In [None]:
# 创建一个互斥锁
class asyncio.Lock(*)


# 获取锁
# 如果锁的状态为 locked, 则等待直至锁的状态变为 unlocked
# 当有多个协程在 acquire() 中被阻塞等待解锁时, 最终只有一个协程能获取锁并将其设为 locked
# 同一个协程获取到锁并上锁后, 直到解锁前, 不能再次调用 acquire() 获取锁（极容易造成死锁）
coroutine acquire()


# 释放锁
# 如果锁为 locked, 则将其设置为 unlocked
# 如果锁为 unlocked, 则抛出 RuntimeError 异常
release()


# 判断锁的状态, 如果锁为 locked 状态则返回 True
locked()

In [None]:
# asyncio.Lock 使用示例，下面代码需要 6 秒才能执行完
import asyncio


async def async_task(lock: asyncio.Lock, text: str):
    await lock.acquire()                # 获取锁
    try:
        await asyncio.sleep(3)
        print(f"do task: {text}")
    finally:
        if lock.locked():
            lock.release()              # 释放锁


async def main():
    lock = asyncio.Lock()

    t1 = async_task(lock, "Hello")
    t2 = async_task(lock, "World")

    await asyncio.gather(t1, t2)        # 并发运行两个任务


asyncio.run(main())


In [None]:
# asyncio.Lock 支持 async with 语句：
async with lock:
    BLOCK
    # 需要同步的相关代码...

# 上面代码相当于：
await lock.acquire()
try:
    BLOCK
    # 需要同步的相关代码...
finally:
    lock.release()


## 6.2、事件对象: Event
　　asyncio.Event 事件对象可以用来通知多个 asyncio 协程事件状态已改变，可用于 协程间的通讯。asyncio.Event 对象内部维护了一个标记变量，可用通过 set() 方法将其设置为 True，并通过 clear() 方法将其设置为 False。协程中通过 wait() 方法阻塞直至该标记被其他协程标记为 True。该标记初始值为 False。<br>
　　asyncio.Event 类的方法：<br>

In [None]:
# 创建一个事件对象
class asyncio.Event(*)


# 等待直至事件标记被设置为 True,
# 如果事件标记已被设置为 True, 则立即返回, 否则将阻塞直至其他协程调用 set()
coroutine wait()


# 设置事件, 将标记设置为 True, 
# 其他所有 wait() 等待此事件对象的协程将被立即唤醒
set()


# 清空事件, 将标记设置为 False,
clear()


# 判断此事件标记是否已被设置, 事件标记为 True 则返回 True
is_set()


In [None]:
# asyncio.Event 代码示例:
import asyncio


async def async_task_01(event: asyncio.Event):
    await asyncio.sleep(5)
    print("async_task_01")
    event.set()                 # 将事件标记设置为 True


async def async_task_02(event: asyncio.Event):
    await event.wait()          # 等待事件标记被其他协程设置为 True
    print("async_task_02")


async def main():
    event = asyncio.Event()             # 创建事件对象

    t1 = async_task_01(event)
    t2 = async_task_02(event)

    await asyncio.gather(t1, t2)        # 并发运行两个协程


asyncio.run(main())


## 6.3 条件对象: Condition
　　asyncio.Condition 条件原语可被任务用于等待某个时间的发生，然后获取对共享资源的独占访问。Condition 条件对象需要 Lock 锁的支持，本质上 Condition 合并了 Event 和 Lock 的功能。<br>
　　asyncio.Condition 类的方法：<br>

In [None]:
# 创建一个条件对象, 
# lock 参数为 asyncio.Lock 类型, 
# 如果 lock=None 则自动创建一个新的 Lock 对象
class asyncio.Condition(lock=None, *)


# 获取底层锁
coroutine acquire()


# 等待, 并释放底层锁。
# 等待直至被其他协程 notify 唤醒, 被唤醒后会重新请求获取底层锁, 获取到底层锁后才能停止阻塞。
# 调用此方法必须先获取到底层锁。
coroutine wait()


# 唤醒最多 n 个正在 wait() 等待此条件的协程, 如果没有任何协程在等待则此方法为空操作。
# 调用此方法必须先获取到底层锁, 并在调用此方法后被快速释放。
notify(n=1)


# 唤醒所有正在 wait() 等待此条件的协程, 如果没有任何协程在等待则此方法为空操作。
# 调用此方法必须先获取到底层锁, 并在调用此方法后被快速释放。
notify_all()


# 释放底层锁
release()


# 获取底层锁的锁定状态
locked()


# predicate 为可调用对象, 其返回值为 bool 类型。
# 等待直到被唤醒, 并且 predicate() 返回 True 则结束阻塞。
# 如果第一次调用 predicate() 就返回 True, 则直接返回, 不 wait()。
# 即使被唤醒且获取到底层锁, 但如果 predicate() 返回 False, 则仍然会继续 wait()。
#
# 此方法的内部代码:
#     async def wait_for(self, predicate):
#         result = predicate()
#         while not result:
#             await self.wait()
#             result = predicate()
#         return result
#
coroutine wait_for(predicate)


In [None]:
# asyncio.Condition 代码示例：
import asyncio


async def async_task_01(cond: asyncio.Condition):
    await asyncio.sleep(3)
    print("async_task_01")

    await cond.acquire()        # 获取底层锁
    try:
        cond.notify_all()       # 唤醒所有正在等待此条件的协程（必须获取到锁后才能调用）
    finally:
        cond.release()          # 释放底层锁


async def async_task_02(cond: asyncio.Condition):
    await cond.acquire()        # 获取底层锁
    try:
        await cond.wait()       # 等待 并 释放底层锁, 直到被其他协程唤醒（必须获取到锁后才能调用）,
        print("async_task_02")  # 被唤醒后会重新请求获取底层锁, 获取到底层锁后才能停止阻塞
    finally:
        cond.release()          # 释放底层锁


async def main():
    cond = asyncio.Condition()          # 创建 条件对象（自动创建底层锁）

    t1 = async_task_01(cond)
    t2 = async_task_02(cond)

    await asyncio.gather(t1, t2)        # 并发运行两个协程


asyncio.run(main())


In [None]:
# asyncio.Condition 支持 async with 语句：
async with cond:
    await cond.wait()

# 上面代码相当于：
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()


## 6.4 信号量对象: Semaphore
　　asyncio.Semaphore 信号量对象。信号量对象内部会管理一个计数器，该计数器会随着每次 调用 acquire() 递减，调用 release() 递增。计数器的值永远不会小于 0。当前调用 acquire() 时如果计数的值为 0，则将阻塞等待，直到其他协程调用了 release() 将计数器的值递增。<br>
　　当我们在一些需要控制协程并发速度时，信号量是个不错的选择。当多个协程任务同时进入loop执行时，Semaphore机制可以控制不同的协程通过竞争信号量的形式去执行，从而控制协程的执行速度。竞争到信号量的协程开始执行，如果信号量计数>0，则其他协程还可以竞争执行，若果信号量计数=0，则其他协整只能等待竞争到的协程执行完毕释放信号量，使信号量计数>0,在通过竞争信号量去执行。
　　asyncio.Semaphore 类的方法：<br>

In [None]:
# 创建信号量对象。
# value 参数表示计数器的初始值, 如果传入 小于 0 的值则抛出 ValueError 异常
class asyncio.Semaphore(value=1, *)


# 获取一个信号量。
# 如果计数器的值 > 0, 则将其减一并立即返回。
# 如果计数器的值 = 0, 则阻塞等待, 直到其他协程调用了 release() 将计数器的值递增。
coroutine acquire()


# 如果无法立即获取一个信号量（即 acquire() 会阻塞）, 则返回 True。
locked()


# 释放一个信号量, 并将计数器的值加一。
# 可以唤醒一个正在等待获取信号量的其他协程。
release()


# 不同于 BoundedSemaphore , Semaphore 允许 release() 调用次数多于 acquire(),
# 但一般情况通常先 acquire() 再 release() 成对调用（计数器的初始值需 >= 1）。


In [None]:
# asyncio.Semaphore 代码示例：
import asyncio


async def async_task_01(semaphore: asyncio.Semaphore):
    while True:
        await asyncio.sleep(1)

        await semaphore.acquire()       # 获取信号量, 获取成功后计数器减一
        try:
            print("async_task_01")
        finally:
            semaphore.release()         # 释放信号量, 计数器加一


async def async_task_02(semaphore: asyncio.Semaphore):
    while True:
        await asyncio.sleep(1)

        await semaphore.acquire()       # 获取信号量, 获取成功后计数器减一
        try:
            print("async_task_02")
        finally:
            semaphore.release()         # 释放信号量, 计数器加一


async def main():
    semaphore = asyncio.Semaphore(1)    # 创建 信号量对象

    t1 = async_task_01(semaphore)
    t2 = async_task_02(semaphore)

    await asyncio.gather(t1, t2)        # 并发运行两个协程


asyncio.run(main())


In [None]:
# asyncio.Semaphore 支持 async with 语句：
async with semaphore:
    BLOCK

# 上面代码相当于：
await semaphore.acquire()
try:
    BLOCK
finally:
    semaphore.release()


## 6.5 绑定的信号量对象: BoundedSemaphore
　　asyncio.BoundedSemaphore 绑定的信号量对象，其继承自 Semaphore。BoundedSemaphore 是特殊版本的 Semaphore，如果在调用 release() 后导致计数器的值增加到初始化以上，则会抛出 ValueError 异常。也就是说 BoundedSemaphore 必须先 acquire() 再 release() 成对调用。<br>
　　asyncio.BoundedSemaphore 类的构造方法：<br>
>class asyncio.BoundedSemaphore(value=1, \*)