同步异步互转工具
- 本项目部分依赖于nest_asyncio, 在嵌套场景下,可能需要对
asyncio
打补丁 - 本项目提取于ksrpc 中的同步异步任意转换功能
pip install revolving_asyncio -U
See more examples
import asyncio
import time
from revolving_asyncio import to_async, to_sync
if __name__ == '__main__':
@to_async
def do_sync_work(name: str):
time.sleep(5)
print(f"sync function, to async by decorator, {name}")
async def do_async_work(name: str):
await asyncio.sleep(5)
print(f"async function, to sync by wrapper, {name}")
async def async_main():
# Method 1
await do_sync_work(name="Method 1")
def sync_main():
# Method 2
to_sync(do_async_work)(name="Method 2")
asyncio.run(async_main())
sync_main()
如果直接使用to_async
和to_sync
报RuntimeError: This event loop is already running
等一类的异常,
可以尝试添加以下代码
import revolving_asyncio
revolving_asyncio.apply()