Skip to content

Commit

Permalink
[Perf] Evaluate API: Support parallelized evaluator batch run through…
Browse files Browse the repository at this point in the history
… pf.run (#3380)

# Description

The change in this PR addresses the performance issues we were seeing
with the Evaluate API. This is the first step in optimizing performance.
The improvements include:

- Parallelizing the pf.run for evaluators. Previously, it ran
sequentially, contributing to most of the latency.
- Addressing the slowness of the import Evaluate API, which was due to
the import of MLClient.
- Using threads to infer signatures for eval batch runs instead of
processes. (This change has been moved to a seperated PR:
#3412)

**Improvements from this change:**
Windows OS, remote tracking disabled

- 4 evaluators, 100 rows:
    - Previous (pf.run without threadpool): 320 secs
    - Current (pf.run with threadpool): 78 secs (~75% improvement)

- 1 evaluator, 1 row:
   - Previous: 53 secs
   - Current: 17 secs (~68% improvement)

Investigation details can be found
[here](https://microsoft-my.sharepoint.com/:w:/p/ninhu/ETB_zdMkFrdAuf3Lcg9ssrUB6RVmyuFs5Un1G74O1HlwSA?e=23ngPm)


# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Clement Wang <47586720+wangchao1230@users.noreply.github.com>
  • Loading branch information
ninghu and wangchao1230 committed Jun 17, 2024
1 parent 01267fd commit 32115d3
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 806 deletions.
1 change: 1 addition & 0 deletions src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _prepare_home_dir() -> Path:
PF_TRACE_CONTEXT_ATTR = "attributes"
PF_SERVICE_DEBUG = "PF_SERVICE_DEBUG"
PF_SYSTEM_METRICS_PREFIX = "__pf__"
PF_FLOW_ENTRY_IN_TMP = "PF_FLOW_ENTRY_IN_TMP"

LOCAL_MGMT_DB_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite").resolve()
LOCAL_MGMT_DB_SESSION_ACQUIRE_LOCK_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite.lock").resolve()
Expand Down
13 changes: 11 additions & 2 deletions src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NODE,
NODE_VARIANTS,
NODES,
PF_FLOW_ENTRY_IN_TMP,
PROMPT_FLOW_DIR_NAME,
REFRESH_CONNECTIONS_DIR_LOCK_PATH,
REGISTRY_URI_PREFIX,
Expand Down Expand Up @@ -1019,8 +1020,16 @@ def create_temp_flex_flow_yaml_core(entry: Union[str, PathLike, Callable], code:
logger.warning(f"Found existing {flow_yaml_path.as_posix()}, will not respect it in runtime.")
with open(flow_yaml_path, "r", encoding=DEFAULT_ENCODING) as f:
existing_content = f.read()
if not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, it's snapshot will be empty.")

create_yaml_in_tmp = False
if os.environ.get(PF_FLOW_ENTRY_IN_TMP, "False").lower() == "true":
logger.debug("PF_FLOW_ENTRY_IN_TMP is set to true, its snapshot will be empty.")
create_yaml_in_tmp = True
elif not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, its snapshot will be empty.")
create_yaml_in_tmp = True

if create_yaml_in_tmp:
# make sure run name is from entry instead of random folder name
temp_dir = tempfile.mkdtemp(prefix=_sanitize_python_variable_name(entry) + "_")
flow_yaml_path = Path(temp_dir) / FLOW_FLEX_YAML
Expand Down
8 changes: 5 additions & 3 deletions src/promptflow-evals/promptflow/evals/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ class EvaluationMetrics:


class Prefixes:
_INPUTS = 'inputs.'
_OUTPUTS = 'outputs.'
_TGT_OUTPUTS = '__outputs.'
_INPUTS = "inputs."
_OUTPUTS = "outputs."
_TGT_OUTPUTS = "__outputs."


DEFAULT_EVALUATION_RESULTS_FILE_NAME = "evaluation_results.json"

CONTENT_SAFETY_DEFECT_RATE_THRESHOLD_DEFAULT = 4

BATCH_RUN_TIMEOUT = 3600
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from .batch_run_context import BatchRunContext
from .code_client import CodeClient
from .proxy_client import ProxyClient

__all__ = ["CodeClient", "ProxyClient", "BatchRunContext"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os

from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
from .code_client import CodeClient
from .proxy_client import ProxyClient


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

if isinstance(self.client, ProxyClient):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()

if isinstance(self.client, ProxyClient):
os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None)
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,15 @@

import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict
from promptflow.evals.evaluate._utils import _apply_column_mapping, _has_aggregator, load_jsonl
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._user_agent import USER_AGENT
from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
Expand All @@ -40,22 +24,27 @@ def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
result_df = self.run.result(timeout=BATCH_RUN_TIMEOUT)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
aggregated_metrics = (
self.aggregated_metrics.result(timeout=BATCH_RUN_TIMEOUT)
if self.aggregated_metrics is not None
else None
)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")
LOGGER.warning(
f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics"
)

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

Expand All @@ -71,8 +60,11 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
row_metric_results = []
input_df = _apply_column_mapping(input_df, column_mapping)
# Ignoring args and kwargs from the signature since they are usually catching extra arguments
parameters = {param.name for param in inspect.signature(evaluator).parameters.values()
if param.name not in ['args', 'kwargs']}
parameters = {
param.name
for param in inspect.signature(evaluator).parameters.values()
if param.name not in ["args", "kwargs"]
}
for value in input_df.to_dict("records"):
# Filter out only the parameters that are present in the input data
# if no parameters then pass data as is
Expand All @@ -83,7 +75,7 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
try:
result = row_metric_future.result()
if not isinstance(result, dict):
result = {'output': result}
result = {"output": result}
row_metric_results.append(result)
except Exception as ex: # pylint: disable=broad-except
msg_1 = f"Error calculating value for row {row_number} for metric {evaluator_name}, "
Expand Down Expand Up @@ -114,8 +106,9 @@ def _calculate_aggregations(self, evaluator, run):
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
LOGGER.warning(
f"Error calculating aggregations for evaluator {run.evaluator_name}," f" failed with error {str(ex)}"
)
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging

import numpy as np

from promptflow.client import PFClient
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class ProxyRun:
def __init__(self, run, **kwargs):
self.run = run


class ProxyClient:
def __init__(self, pf_client: PFClient):
self._pf_client = pf_client
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(self, flow, data, column_mapping=None, **kwargs):
eval_future = self._thread_pool.submit(
self._pf_client.run, flow, data=data, column_mapping=column_mapping, **kwargs
)
return ProxyRun(run=eval_future)

def get_details(self, proxy_run, all_results=False):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
result_df = self._pf_client.get_details(run, all_results=all_results)
result_df.replace("(Failed)", np.nan, inplace=True)
return result_df

def get_metrics(self, proxy_run):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
return self._pf_client.get_metrics(run)

This file was deleted.

Loading

0 comments on commit 32115d3

Please sign in to comment.