From 7768cc2820f0a5c2e1b37eb44307299bd5490a23 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 21 Mar 2025 12:24:16 +0000 Subject: [PATCH 1/5] Use `get_running_loop` instead of `get_event_loop` --- pydantic_graph/pydantic_graph/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydantic_graph/pydantic_graph/_utils.py b/pydantic_graph/pydantic_graph/_utils.py index bc49ed6fcb..8f7eb4389c 100644 --- a/pydantic_graph/pydantic_graph/_utils.py +++ b/pydantic_graph/pydantic_graph/_utils.py @@ -12,7 +12,7 @@ def get_event_loop(): try: - event_loop = asyncio.get_event_loop() + event_loop = asyncio.get_running_loop() except RuntimeError: event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) From 7670565e6422d12914696e81f4d028627cd6fd69 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Fri, 21 Mar 2025 13:40:17 +0000 Subject: [PATCH 2/5] Make it work for every python version in a proper way --- pydantic_ai_slim/pydantic_ai/agent.py | 4 ++-- pydantic_graph/pydantic_graph/_utils.py | 24 +++++++++++++++--------- pydantic_graph/pydantic_graph/graph.py | 2 +- tests/graph/test_utils.py | 9 +++++---- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent.py b/pydantic_ai_slim/pydantic_ai/agent.py index d1fc5f07d1..e843cbe06d 100644 --- a/pydantic_ai_slim/pydantic_ai/agent.py +++ b/pydantic_ai_slim/pydantic_ai/agent.py @@ -13,7 +13,7 @@ from typing_extensions import TypeGuard, TypeVar, deprecated from pydantic_graph import End, Graph, GraphRun, GraphRunContext -from pydantic_graph._utils import get_event_loop +from pydantic_graph._utils import run_until_complete from . import ( _agent_graph, @@ -567,7 +567,7 @@ def run_sync( """ if infer_name and self.name is None: self._infer_name(inspect.currentframe()) - return get_event_loop().run_until_complete( + return run_until_complete( self.run( user_prompt, result_type=result_type, diff --git a/pydantic_graph/pydantic_graph/_utils.py b/pydantic_graph/pydantic_graph/_utils.py index 8f7eb4389c..6bf56e1fa4 100644 --- a/pydantic_graph/pydantic_graph/_utils.py +++ b/pydantic_graph/pydantic_graph/_utils.py @@ -1,7 +1,9 @@ from __future__ import annotations as _annotations import asyncio +import sys import types +from collections.abc import Coroutine from functools import partial from typing import Any, Callable, TypeVar @@ -10,15 +12,6 @@ from typing_inspection.introspection import is_union_origin -def get_event_loop(): - try: - event_loop = asyncio.get_running_loop() - except RuntimeError: - event_loop = asyncio.new_event_loop() - asyncio.set_event_loop(event_loop) - return event_loop - - def get_union_args(tp: Any) -> tuple[Any, ...]: """Extract the arguments of a Union type if `response_type` is a union, otherwise return an empty tuple.""" # similar to `pydantic_ai_slim/pydantic_ai/_result.py:get_union_args` @@ -100,3 +93,16 @@ async def run_in_executor(func: Callable[_P, _R], *args: _P.args, **kwargs: _P.k return await asyncio.get_running_loop().run_in_executor(None, partial(func, *args, **kwargs)) else: return await asyncio.get_running_loop().run_in_executor(None, func, *args) # type: ignore + + +def run_until_complete(coro: Coroutine[None, None, _R]) -> _R: + try: + loop = asyncio.get_running_loop() + return loop.run_until_complete(coro) + except RuntimeError: + if sys.version_info < (3, 10): + loop = asyncio.get_running_loop() + return loop.run_until_complete(coro) + else: + with asyncio.runners.Runner(loop_factory=asyncio.new_event_loop) as runner: + return runner.run(coro) diff --git a/pydantic_graph/pydantic_graph/graph.py b/pydantic_graph/pydantic_graph/graph.py index 9c026ea38e..e1238c7ea1 100644 --- a/pydantic_graph/pydantic_graph/graph.py +++ b/pydantic_graph/pydantic_graph/graph.py @@ -202,7 +202,7 @@ def run_sync( if infer_name and self.name is None: self._infer_name(inspect.currentframe()) - return _utils.get_event_loop().run_until_complete( + return _utils.run_until_complete( self.run(start_node, state=state, deps=deps, persistence=persistence, infer_name=False) ) diff --git a/tests/graph/test_utils.py b/tests/graph/test_utils.py index e5987feedb..20b16758d4 100644 --- a/tests/graph/test_utils.py +++ b/tests/graph/test_utils.py @@ -1,12 +1,13 @@ from threading import Thread -from pydantic_graph._utils import get_event_loop +from pydantic_graph._utils import run_until_complete -def test_get_event_loop_in_thread(): +def test_run_until_complete_in_thread(): + async def run(): ... + def get_and_close_event_loop(): - event_loop = get_event_loop() - event_loop.close() + run_until_complete(run()) thread = Thread(target=get_and_close_event_loop) thread.start() From 403255cafbc61afb20ef54bc6d9c8a446e226c0e Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 22 Mar 2025 11:08:20 +0100 Subject: [PATCH 3/5] try now --- pydantic_graph/pydantic_graph/_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pydantic_graph/pydantic_graph/_utils.py b/pydantic_graph/pydantic_graph/_utils.py index 6bf56e1fa4..7f3e5e5cc2 100644 --- a/pydantic_graph/pydantic_graph/_utils.py +++ b/pydantic_graph/pydantic_graph/_utils.py @@ -100,8 +100,8 @@ def run_until_complete(coro: Coroutine[None, None, _R]) -> _R: loop = asyncio.get_running_loop() return loop.run_until_complete(coro) except RuntimeError: - if sys.version_info < (3, 10): - loop = asyncio.get_running_loop() + if sys.version_info < (3, 11): + loop = asyncio.new_event_loop() return loop.run_until_complete(coro) else: with asyncio.runners.Runner(loop_factory=asyncio.new_event_loop) as runner: From 2a88ff047ec0207421b9cfccc6c7f38bae4b379b Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 22 Mar 2025 11:13:38 +0100 Subject: [PATCH 4/5] Fix loop for 3.11 less --- pydantic_graph/pydantic_graph/_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pydantic_graph/pydantic_graph/_utils.py b/pydantic_graph/pydantic_graph/_utils.py index 7f3e5e5cc2..411b8649fe 100644 --- a/pydantic_graph/pydantic_graph/_utils.py +++ b/pydantic_graph/pydantic_graph/_utils.py @@ -101,8 +101,11 @@ def run_until_complete(coro: Coroutine[None, None, _R]) -> _R: return loop.run_until_complete(coro) except RuntimeError: if sys.version_info < (3, 11): - loop = asyncio.new_event_loop() - return loop.run_until_complete(coro) + try: + loop = asyncio.new_event_loop() + return loop.run_until_complete(coro) + finally: + loop.close() else: with asyncio.runners.Runner(loop_factory=asyncio.new_event_loop) as runner: return runner.run(coro) From 611c22d4d86276c18a2056a2ba0a702eff17a111 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 22 Mar 2025 11:29:16 +0100 Subject: [PATCH 5/5] Fix implementation --- pydantic_graph/pydantic_graph/_utils.py | 22 +++++++++------------- tests/graph/test_utils.py | 6 ++++++ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/pydantic_graph/pydantic_graph/_utils.py b/pydantic_graph/pydantic_graph/_utils.py index 411b8649fe..dfcff0fe34 100644 --- a/pydantic_graph/pydantic_graph/_utils.py +++ b/pydantic_graph/pydantic_graph/_utils.py @@ -96,16 +96,12 @@ async def run_in_executor(func: Callable[_P, _R], *args: _P.args, **kwargs: _P.k def run_until_complete(coro: Coroutine[None, None, _R]) -> _R: - try: - loop = asyncio.get_running_loop() - return loop.run_until_complete(coro) - except RuntimeError: - if sys.version_info < (3, 11): - try: - loop = asyncio.new_event_loop() - return loop.run_until_complete(coro) - finally: - loop.close() - else: - with asyncio.runners.Runner(loop_factory=asyncio.new_event_loop) as runner: - return runner.run(coro) + if sys.version_info < (3, 11): + try: + loop = asyncio.new_event_loop() + return loop.run_until_complete(coro) + finally: + loop.close() + else: + with asyncio.runners.Runner(loop_factory=asyncio.new_event_loop) as runner: + return runner.run(coro) diff --git a/tests/graph/test_utils.py b/tests/graph/test_utils.py index 20b16758d4..2358db7513 100644 --- a/tests/graph/test_utils.py +++ b/tests/graph/test_utils.py @@ -3,6 +3,12 @@ from pydantic_graph._utils import run_until_complete +def test_run_until_complete_in_main_thread(): + async def run(): ... + + run_until_complete(run()) + + def test_run_until_complete_in_thread(): async def run(): ...