From 309df2b816e058316733e26ddbc49fc4aba10527 Mon Sep 17 00:00:00 2001 From: Heyi Date: Wed, 8 May 2024 14:23:14 +0800 Subject: [PATCH 1/2] Convert sync generator to async generator --- .../promptflow/_utils/async_utils.py | 13 +++++++++++++ .../promptflow/executor/flow_executor.py | 14 +++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/promptflow-core/promptflow/_utils/async_utils.py b/src/promptflow-core/promptflow/_utils/async_utils.py index feb707c7cc4..613c9bb2908 100644 --- a/src/promptflow-core/promptflow/_utils/async_utils.py +++ b/src/promptflow-core/promptflow/_utils/async_utils.py @@ -6,6 +6,7 @@ import functools import signal import threading +from types import GeneratorType from promptflow.tracing import ThreadPoolExecutorWithContext @@ -109,3 +110,15 @@ async def wrapper(*args, **kwargs): return await asyncio.get_event_loop().run_in_executor(executor, partial_func) return wrapper + + +async def sync_generator_to_async(g: GeneratorType): + with ThreadPoolExecutorWithContext(max_workers=1) as pool: + loop = asyncio.get_running_loop() + # Use object() as a default value to distinguish from None + default_value = object() + while True: + resp = await loop.run_in_executor(pool, next, g, default_value) + if resp is default_value: + return + yield resp diff --git a/src/promptflow-core/promptflow/executor/flow_executor.py b/src/promptflow-core/promptflow/executor/flow_executor.py index 4ffede414c4..69aefa29497 100644 --- a/src/promptflow-core/promptflow/executor/flow_executor.py +++ b/src/promptflow-core/promptflow/executor/flow_executor.py @@ -29,7 +29,7 @@ from promptflow._core.run_tracker import RunTracker from promptflow._core.tool import STREAMING_OPTION_PARAMETER_ATTR from promptflow._core.tools_manager import ToolsManager -from promptflow._utils.async_utils import async_run_allowing_running_loop +from promptflow._utils.async_utils import async_run_allowing_running_loop, sync_generator_to_async from promptflow._utils.context_utils import _change_working_dir from promptflow._utils.execution_utils import ( apply_default_value_for_input, @@ -724,6 +724,7 @@ def exec_line( node_concurrency, allow_generator_output, line_timeout_sec, + convert_generator=False, ) # TODO: Call exec_line_async in exec_line when async is mature. self._node_concurrency = node_concurrency @@ -754,6 +755,7 @@ async def exec_line_async( node_concurrency=DEFAULT_CONCURRENCY_FLOW, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, + convert_generator: bool = True, ) -> LineResult: """Execute a single line of the flow. @@ -769,6 +771,8 @@ async def exec_line_async( :type node_concurrency: int :param allow_generator_output: Whether to allow generator output. :type allow_generator_output: bool + :param convert_generator: Whether to convert generator output to async generator. + :type convert_generator: bool :return: The result of executing the line. :rtype: ~promptflow.executor._result.LineResult """ @@ -786,6 +790,8 @@ async def exec_line_async( validate_inputs=validate_inputs, allow_generator_output=allow_generator_output, ) + if convert_generator: + line_result.output = self._convert_generators_to_async(line_result.output) # Return line result with index if index is not None and isinstance(line_result.output, dict): line_result.output[LINE_NUMBER_KEY] = index @@ -890,6 +896,12 @@ def _start_flow_span(self, inputs: Mapping[str, Any]): enrich_span_with_input(span, inputs) yield span + def _convert_generators_to_async(self, output: dict): + for k, v in output.items(): + if isinstance(v, GeneratorType): + output[k] = sync_generator_to_async(v) + return output + async def _exec_inner_with_trace_async( self, inputs: Mapping[str, Any], From b80d729d3d4931b068546d22d3d7beb765baed44 Mon Sep 17 00:00:00 2001 From: Heyi Date: Mon, 13 May 2024 13:50:46 +0800 Subject: [PATCH 2/2] Use Iterator instead of Generator --- .../promptflow/_utils/async_utils.py | 4 +- .../promptflow/executor/flow_executor.py | 45 +++++++++---------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/promptflow-core/promptflow/_utils/async_utils.py b/src/promptflow-core/promptflow/_utils/async_utils.py index 613c9bb2908..ddd8a9cbc3c 100644 --- a/src/promptflow-core/promptflow/_utils/async_utils.py +++ b/src/promptflow-core/promptflow/_utils/async_utils.py @@ -6,7 +6,7 @@ import functools import signal import threading -from types import GeneratorType +from typing import Iterator from promptflow.tracing import ThreadPoolExecutorWithContext @@ -112,7 +112,7 @@ async def wrapper(*args, **kwargs): return wrapper -async def sync_generator_to_async(g: GeneratorType): +async def sync_iterator_to_async(g: Iterator): with ThreadPoolExecutorWithContext(max_workers=1) as pool: loop = asyncio.get_running_loop() # Use object() as a default value to distinguish from None diff --git a/src/promptflow-core/promptflow/executor/flow_executor.py b/src/promptflow-core/promptflow/executor/flow_executor.py index 69aefa29497..a9ebc393d16 100644 --- a/src/promptflow-core/promptflow/executor/flow_executor.py +++ b/src/promptflow-core/promptflow/executor/flow_executor.py @@ -14,8 +14,7 @@ from contextlib import contextmanager from pathlib import Path from threading import current_thread -from types import AsyncGeneratorType, GeneratorType -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, AsyncIterator, Callable, Dict, Iterator, List, Mapping, Optional, Tuple, Union import opentelemetry.trace as otel_trace from opentelemetry.trace.span import Span, format_trace_id @@ -29,7 +28,7 @@ from promptflow._core.run_tracker import RunTracker from promptflow._core.tool import STREAMING_OPTION_PARAMETER_ATTR from promptflow._core.tools_manager import ToolsManager -from promptflow._utils.async_utils import async_run_allowing_running_loop, sync_generator_to_async +from promptflow._utils.async_utils import async_run_allowing_running_loop, sync_iterator_to_async from promptflow._utils.context_utils import _change_working_dir from promptflow._utils.execution_utils import ( apply_default_value_for_input, @@ -724,7 +723,7 @@ def exec_line( node_concurrency, allow_generator_output, line_timeout_sec, - convert_generator=False, + sync_iterator_to_async=False, ) # TODO: Call exec_line_async in exec_line when async is mature. self._node_concurrency = node_concurrency @@ -755,7 +754,7 @@ async def exec_line_async( node_concurrency=DEFAULT_CONCURRENCY_FLOW, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, - convert_generator: bool = True, + sync_iterator_to_async: bool = True, ) -> LineResult: """Execute a single line of the flow. @@ -771,8 +770,8 @@ async def exec_line_async( :type node_concurrency: int :param allow_generator_output: Whether to allow generator output. :type allow_generator_output: bool - :param convert_generator: Whether to convert generator output to async generator. - :type convert_generator: bool + :param sync_iterator_to_async: Whether to convert sync iterator output to async iterator. + :type sync_iterator_to_async: bool :return: The result of executing the line. :rtype: ~promptflow.executor._result.LineResult """ @@ -790,8 +789,8 @@ async def exec_line_async( validate_inputs=validate_inputs, allow_generator_output=allow_generator_output, ) - if convert_generator: - line_result.output = self._convert_generators_to_async(line_result.output) + if sync_iterator_to_async: + line_result.output = self._convert_iterators_to_async(line_result.output) # Return line result with index if index is not None and isinstance(line_result.output, dict): line_result.output[LINE_NUMBER_KEY] = index @@ -896,10 +895,10 @@ def _start_flow_span(self, inputs: Mapping[str, Any]): enrich_span_with_input(span, inputs) yield span - def _convert_generators_to_async(self, output: dict): + def _convert_iterators_to_async(self, output: dict): for k, v in output.items(): - if isinstance(v, GeneratorType): - output[k] = sync_generator_to_async(v) + if isinstance(v, Iterator): + output[k] = sync_iterator_to_async(v) return output async def _exec_inner_with_trace_async( @@ -954,7 +953,7 @@ def _exec_post_process( generator_output_nodes = [ nodename for nodename, output in nodes_outputs.items() - if isinstance(output, GeneratorType) or isinstance(output, AsyncGeneratorType) + if isinstance(output, Iterator) or isinstance(output, AsyncIterator) ] # When stream is True, we allow generator output in the flow output run_tracker.allow_generator_types = stream @@ -1214,9 +1213,9 @@ async def _traverse_nodes_async(self, inputs, context: FlowExecutionContext) -> return outputs, nodes_outputs @staticmethod - async def _merge_async_generator(async_gen: AsyncGeneratorType, outputs: dict, key: str): + async def _merge_async_iterator(async_it: AsyncIterator, outputs: dict, key: str): items = [] - async for item in async_gen: + async for item in async_it: items.append(item) outputs[key] = "".join(str(item) for item in items) @@ -1224,24 +1223,24 @@ async def _stringify_generator_output_async(self, outputs: dict): pool = ThreadPoolExecutorWithContext() tasks = [] for k, v in outputs.items(): - if isinstance(v, AsyncGeneratorType): - tasks.append(asyncio.create_task(self._merge_async_generator(v, outputs, k))) - elif isinstance(v, GeneratorType): + if isinstance(v, AsyncIterator): + tasks.append(asyncio.create_task(self._merge_async_iterator(v, outputs, k))) + elif isinstance(v, Iterator): loop = asyncio.get_event_loop() - task = loop.run_in_executor(pool, self._merge_generator, v, outputs, k) + task = loop.run_in_executor(pool, self._merge_iterator, v, outputs, k) tasks.append(task) if tasks: await asyncio.wait(tasks) return outputs @staticmethod - def _merge_generator(gen: GeneratorType, outputs: dict, key: str): + def _merge_iterator(gen: Iterator, outputs: dict, key: str): outputs[key] = "".join(str(item) for item in gen) def _stringify_generator_output(self, outputs: dict): for k, v in outputs.items(): - if isinstance(v, GeneratorType): - self._merge_generator(v, outputs, k) + if isinstance(v, Iterator): + self._merge_iterator(v, outputs, k) return outputs @@ -1378,7 +1377,7 @@ def _ensure_node_result_is_serializable(f): @functools.wraps(f) def wrapper(*args, **kwargs): result = f(*args, **kwargs) - if isinstance(result, GeneratorType): + if isinstance(result, Iterator): result = "".join(str(trunk) for trunk in result) return result