# 说明
本篇内容基于python3.11

基于教程[Asyncio in Python - Full Tutorial - YouTube](https://www.youtube.com/watch?v=Qb9s3UiMSTA)

In [None]:
print('hello world')

# 1.event loop
用于管理event run or wait

# 2.coroutines 协程


In [1]:
import asyncio

async def main():
	print('hello world')


# 通过async def定义的函数，只是一个coroutine object，直接使用main()并不会执行它
# 需要通过asyncio.run()来运行它


In [2]:
main()
# print(main())
# 比如这样运行main()，只会看到<coroutine object main at xxxxxx>，并不会执行它

<coroutine object main at 0x10a4b2500>

In [3]:
import asyncio

"""
下面模拟一个异步操作，比如从网络获取数据
"""

async def fetch_data(delay):
    print('start fetching')
    await asyncio.sleep(delay)
    print('done fetching')
    return {'data': 'some data'}

async def main():
    print('start of main coroutine')
    task = fetch_data(2)
    print('end of main coroutine')
    result = await task
    print(f'Received result: {result}')
    

# .py里可以使用 asyncio.run(main())，但是在jupyter里只能使用await (main())，有原因，但是没看明白，以后再琢磨
await main()

start of main coroutine
end of main coroutine
start fetching
done fetching
Received result: {'data': 'some data'}


In [None]:
import asyncio

"""
下面模拟一个异步操作，比如从网络获取数据
"""

async def fetch_data(delay):
    print('start fetching')
    await asyncio.sleep(delay)
    print('done fetching')
    return {'data': 'some data'}

async def main():
    print('start of main coroutine')
    task = fetch_data(2)
    print('end of main coroutine')
    result = await task
    print(f'Received result: {result}')

"""
调换一下 print 语句的位置，你会发现main()先把fetch_data()的任务创建好，然后立刻打印'end of main coroutine'，最后才等待fetch_data()完成并获取结果。
"""

# 正确调用 asyncio.run()
await (main())

In [4]:
"""
现在我们写一个看起来用上了异步操作的代码，但是实际上并没有发挥异步的优势，因为我们是顺序等待每一个任务完成的。
备注：pyinstrument是用来追踪每一步代码花了多长时间的一个性能分析工具，可以通过 pip install pyinstrument 来安装
"""
from pyinstrument import Profiler
profiler = Profiler()
profiler.start()


import asyncio

async def fetch_data(delay,id):
	print(f'start fetching {id}')
	await asyncio.sleep(delay)
	print(f'done fetching {id}')
	return {'data': f'some data {id}'}

async def main():
	task1 = fetch_data(2,1)
	task2 = fetch_data(2,2)
 
	result1 = await task1
	print(f'Received result1: {result1}')
	result2 = await task2
	print(f'Received result2: {result2}')

await main()


profiler.stop()
profiler.print()

start fetching 1
done fetching 1
Received result1: {'data': 'some data 1'}
start fetching 2
done fetching 2
Received result2: {'data': 'some data 2'}

  _     ._   __/__   _ _  _  _ _/_   Recorded: 11:18:11  Samples:  5
 /_//_/// /_\ / //_// / //_'/ //     Duration: 4.006     CPU time: 0.006
/   _/                      v5.1.1

Profile at /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/413639139.py:7

4.006 <module>  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/413639139.py:1
└─ 4.005 main  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/413639139.py:18
   └─ 4.005 fetch_data  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/413639139.py:12
      └─ 4.003 sleep  asyncio/tasks.py:653
         └─ 4.003 [await]  asyncio/tasks.py




# 3.task

In [5]:
"""
现在进行优化，真正发挥异步的优势，同时发起多个任务，然后等待它们全部完成，猜猜看这次运行会花多长时间
"""
from pyinstrument import Profiler
profiler = Profiler()
profiler.start()


import asyncio

async def fetch_data(id, sleep_time):
	print(f'coroutine {id} start fetching')
	await asyncio.sleep(sleep_time)
	return {'id':id, 'data':f'some data from coroutine {id}'}

async def main():
	task1 = asyncio.create_task(fetch_data(1,2))
	task2 = asyncio.create_task(fetch_data(2,3))
	task3 = asyncio.create_task(fetch_data(3,1))

	result1 = await task1
	result2 = await task2
	result3 = await task3

	print(result1, result2, result3)

await main()


profiler.stop()
profiler.print()

coroutine 1 start fetching
coroutine 2 start fetching
coroutine 3 start fetching
{'id': 1, 'data': 'some data from coroutine 1'} {'id': 2, 'data': 'some data from coroutine 2'} {'id': 3, 'data': 'some data from coroutine 3'}

  _     ._   __/__   _ _  _  _ _/_   Recorded: 11:19:32  Samples:  4
 /_//_/// /_\ / //_// / //_'/ //     Duration: 3.003     CPU time: 0.005
/   _/                      v5.1.1

Profile at /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/2958518639.py:6

3.003 Handle._run  asyncio/events.py:86
├─ 2.002 fetch_data  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/2958518639.py:11
│  └─ 2.002 sleep  asyncio/tasks.py:653
│     └─ 2.002 [await]  asyncio/tasks.py
└─ 1.001 ZMQInteractiveShell.run_cell_async  IPython/core/interactiveshell.py:3231
   └─ 1.001 ZMQInteractiveShell.run_ast_nodes  IPython/core/interactiveshell.py:3540
      └─ 1.000 <module>  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/2958518639.py:1
        

In [6]:
"""
现在进行优简化，通过asyncio.gather()来同时等待多个任务完成
"""
from pyinstrument import Profiler
profiler = Profiler()
profiler.start()


import asyncio

async def fetch_data(id, sleep_time):
	print(f'coroutine {id} start fetching')
	await asyncio.sleep(sleep_time)
	return {'id':id, 'data':f'some data from coroutine {id}'}

async def main():
	results = await asyncio.gather(
		fetch_data(1,2),
		fetch_data(2,3),
		fetch_data(3,1)
	)
	for result in results:
		print(f'Received result: {result}')

await main()


profiler.stop()
profiler.print()

coroutine 1 start fetching
coroutine 2 start fetching
coroutine 3 start fetching
Received result: {'id': 1, 'data': 'some data from coroutine 1'}
Received result: {'id': 2, 'data': 'some data from coroutine 2'}
Received result: {'id': 3, 'data': 'some data from coroutine 3'}

  _     ._   __/__   _ _  _  _ _/_   Recorded: 11:20:11  Samples:  3
 /_//_/// /_\ / //_// / //_'/ //     Duration: 3.002     CPU time: 0.006
/   _/                      v5.1.1

Profile at /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/1154455003.py:6

3.001 _UnixSelectorEventLoop._run_once  asyncio/base_events.py:1922
├─ 1.999 KqueueSelector.select  selectors.py:558
│  └─ 1.999 kqueue.control  <built-in>
└─ 1.002 Handle._run  asyncio/events.py:86
   └─ 1.002 fetch_data  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/1154455003.py:11
      └─ 1.002 sleep  asyncio/tasks.py:653
         └─ 1.002 [await]  asyncio/tasks.py




In [7]:
"""
asyncio.gather()虽然方便了，但是debug很费劲，现在我们继续改进，用asyncio.TaskGroup代替asyncio.gather()
"""
from pyinstrument import Profiler
profiler = Profiler()
profiler.start()


import asyncio

async def fetch_data(id, sleep_time):
	print(f'coroutine {id} start fetching')
	await asyncio.sleep(sleep_time)
	return {'id':id, 'data':f'some data from coroutine {id}'}

async def main():
	tasks=[]
	async with asyncio.TaskGroup() as tg:
		for i, sleep_time in enumerate([2,3,1], start=1):
			task = tg.create_task(fetch_data(i, sleep_time))
			tasks.append(task)
	results = [task.result() for task in tasks]
	for result in results:
		print(f'Received result: {result}')

await main()


profiler.stop()
profiler.print()

coroutine 1 start fetching
coroutine 2 start fetching
coroutine 3 start fetching
Received result: {'id': 1, 'data': 'some data from coroutine 1'}
Received result: {'id': 2, 'data': 'some data from coroutine 2'}
Received result: {'id': 3, 'data': 'some data from coroutine 3'}

  _     ._   __/__   _ _  _  _ _/_   Recorded: 11:20:53  Samples:  4
 /_//_/// /_\ / //_// / //_'/ //     Duration: 3.004     CPU time: 0.007
/   _/                      v5.1.1

Profile at /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/390384911.py:6

3.003 _UnixSelectorEventLoop._run_once  asyncio/base_events.py:1922
├─ 2.000 KqueueSelector.select  selectors.py:558
│  └─ 2.000 kqueue.control  <built-in>
└─ 1.003 Handle._run  asyncio/events.py:86
   └─ 1.002 fetch_data  /var/folders/9t/nk50v6yd64bc66hg12wnjkqm0000gn/T/ipykernel_1870/390384911.py:11
      └─ 1.002 sleep  asyncio/tasks.py:653
         └─ 1.002 [await]  asyncio/tasks.py




# 4.futures
一般用在底层库中，熟悉一下就行（其实我也没懂）

概括起来就是知道未来会有什么结果，只是不知道什么时候完成

In [9]:
import asyncio

async def set_future_result(future,value):
	await asyncio.sleep(2)
	# 设置 Future 的结果
	future.set_result(value)
	print(f"set future's result to {value}")

async def main():
	# 创建一个 Future 对象
	loop = asyncio.get_running_loop()
	future = loop.create_future()
	# 启动一个cotoutine 来设置 Future 的结果
	asyncio.create_task(set_future_result(future, "future result is ready"))
	# 等待 Future 完成并获取结果
	result = await future
	print(f"received the future's result: {result}")

await main()
									   

set future's result to future result is ready
received the future's result: future result is ready


# 5.synchroization 同步

In [10]:
# 第一个同步方法是lock，可以保证同一时间只有一个coroutine在操作共享变量
import asyncio

# 创建一个共享变量
shared_resource=0

# 创建一个async锁，如果有其他进程正在使用共享变量，就让新进程先等着，前一个执行完才执行下一个
lock = asyncio.Lock()

async def modify_shared_resource():
	global shared_resource
	async with lock:
		print(f"resource before modification: {shared_resource}")
		shared_resource +=1
		await asyncio.sleep(2)  # 模拟一下IO操作
		print(f"resource after modification: {shared_resource}")
		# 释放锁自动发生在async with块结束时

async def main():
	await asyncio.gather(*(modify_shared_resource() for _ in range(5))) 
	#因为有lock存在，就算开了5个coroutine同时运行，修改shared_resource的操作也是一个接一个地进行的
	# 使用场景比如操作数据库时，只允许一个进程修改数据

await main()


resource before modification: 0
resource after modification: 1
resource before modification: 1
resource after modification: 2
resource before modification: 2
resource after modification: 3
resource before modification: 3
resource after modification: 4
resource before modification: 4
resource after modification: 5


In [11]:
# 第二个同步方法是semaphore，信号量，可以控制同时访问某个资源的最大并发数量
import asyncio


async def access_resource(semaphore, resource_id):
	async with semaphore:
		print(f"accessing resource {resource_id}")
		await asyncio.sleep(2)  # 模拟资源访问的时间
		print(f"releasing resource {resource_id}")

async def main():
	semaphore = asyncio.Semaphore(2) # 最多允许2个并发访问
	await asyncio.gather(*(access_resource(semaphore, i) for i in range(5)))
	# 不同于lock，semaphore允许多个coroutine同时访问资源，但数量受限于semaphore的值
	# 使用场景比如同时发起多个相同的网络连接请求
await main()

accessing resource 0
accessing resource 1
releasing resource 0
releasing resource 1
accessing resource 2
accessing resource 3
releasing resource 2
releasing resource 3
accessing resource 4
releasing resource 4


# 6.back to event

In [None]:
import asyncio
async def waiter(event):
	print("Waiting for event to be set...")
	await event.wait()
	print("Event has been set,  continuing execution.")

async def setter(event):
	await asyncio.sleep(2)
	event.set()
	print("Event has been set by setter.")

async def main():
	event = asyncio.Event()
	await asyncio.gather(
		waiter(event),
		setter(event)
	)

await main()