Skip to content

Commit

Permalink
Python: Resolve hanging issue in Jupyter notebooks when re-running as…
Browse files Browse the repository at this point in the history
…ync functions (#4237)

### Motivation and Context

Resolves #4137. The issue was that if you try to re-run the sample joke
in the 00-getting-started notebook, the subsequent call to the joke
hangs.

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

### Description

There were issues when re-running asynchronous functions
(`_invoke_semantic_async` and `_invoke_native_async`) due to improper
handling of asyncio event loops in a multi-threaded env. Specifically,
the code had an issue when an event loop was already running, leading to
hanging or blocking behavior on subsequent runs.

The change replaced the old code Python's ThreadPoolExecutor, which
simplifies thread management. It handles the creation, execution, and
cleanup of threads more efficiently and safely.

The new `run_async_in_executor` method ensures that each async function
runs in its own new event loop, avoiding conflicts with the main
thread's event loop or other threads.

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄

---------

Co-authored-by: Evan Mattson <evan.mattson@microsoft.com>
  • Loading branch information
moonbox3 and moonbox3 committed Dec 13, 2023
1 parent 49e9051 commit be89d6b
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions python/semantic_kernel/orchestration/sk_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import platform
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
Expand Down Expand Up @@ -436,18 +436,21 @@ def invoke(
if input is not None:
context.variables.update(input)

# Check if there is an event loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
loop = (
asyncio.get_running_loop()
if asyncio.get_event_loop().is_running()
else None
)

# Handle "asyncio.run() cannot be called from a running event loop"
if loop and loop.is_running():
if self.is_semantic:
return self._runThread(self._invoke_semantic_async(context, settings))
else:
return self._runThread(self._invoke_native_async(context))
coroutine_function = (
self._invoke_semantic_async
if self.is_semantic
else self._invoke_native_async
)
return self.run_async_in_executor(
lambda: coroutine_function(context, settings)
)
else:
if self.is_semantic:
return asyncio.run(self._invoke_semantic_async(context, settings))
Expand Down Expand Up @@ -652,12 +655,24 @@ def _trace_function_type_Call(self, type: Enum, log: Logger) -> None:
event loops such as Jupyter notebooks.
"""

def _runThread(self, code: Callable):
result = []
thread = threading.Thread(target=self._runCode, args=(code, result))
thread.start()
thread.join()
return result[0]
def run_async_in_executor(self, coroutine_func: Callable[[], Any]) -> Any:
"""
A unified method for async execution for more efficient and safer thread management
Arguments:
coroutine_func {Callable[[], Any]} -- The coroutine to run
Returns:
Any -- The result of the coroutine
"""

def run_async_in_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(coroutine_func())
loop.close()
return result

def _runCode(self, code: Callable, result: List[Any]) -> None:
result.append(asyncio.run(code))
with ThreadPoolExecutor() as executor:
future = executor.submit(run_async_in_thread)
return future.result()

0 comments on commit be89d6b

Please sign in to comment.