Skip to content

Commit

Permalink
Merge branch 'main' into bwilliams2/tool_calls_parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
bwilliams2 committed Jun 17, 2024
2 parents 3bde017 + 32115d3 commit 1ac0e39
Show file tree
Hide file tree
Showing 19 changed files with 260 additions and 811 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_doc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
shell: powershell
working-directory: scripts/docs/
run: |-
pip install langchain
pip install langchain tenacity<8.4.0
./doc_generation.ps1 -WithReferenceDoc:$true -WarningAsError:$true
# Note: We have this job separately because some error may missing when build link check exists.
Expand Down Expand Up @@ -85,5 +85,5 @@ jobs:
shell: powershell
working-directory: scripts/docs/
run: |-
pip install langchain
pip install langchain tenacity<8.4.0
./doc_generation.ps1 -WithReferenceDoc:$true -WarningAsError:$true -BuildLinkCheck
2 changes: 1 addition & 1 deletion .github/workflows/promptflow-executor-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
run: |
gci env:* | sort-object name
az account show
pip install langchain-community
pip install langchain-community tenacity<8.4.0
# numexpr is required by langchain in e2e tests.
pip install numexpr
python scripts/building/run_coverage_tests.py `
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/promptflow-executor-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ jobs:
run: |
gci env:* | sort-object name
az account show
pip install langchain-community
pip install langchain-community tenacity<8.4.0
python scripts/building/run_coverage_tests.py `
-p ${{ env.testWorkingDirectory }}/promptflow `
-t ${{ env.testWorkingDirectory }}/tests/executor/unittests `
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/promptflow-import-linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ jobs:
echo "=== promptflow-azure full lints ==="
poetry run pip install langchain
poetry run pip install "tenacity<8.4.0"
poetry run python ${{ github.workspace }}/scripts/import_linter/import_linter.py
1 change: 1 addition & 0 deletions src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _prepare_home_dir() -> Path:
PF_TRACE_CONTEXT_ATTR = "attributes"
PF_SERVICE_DEBUG = "PF_SERVICE_DEBUG"
PF_SYSTEM_METRICS_PREFIX = "__pf__"
PF_FLOW_ENTRY_IN_TMP = "PF_FLOW_ENTRY_IN_TMP"

LOCAL_MGMT_DB_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite").resolve()
LOCAL_MGMT_DB_SESSION_ACQUIRE_LOCK_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite.lock").resolve()
Expand Down
13 changes: 11 additions & 2 deletions src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NODE,
NODE_VARIANTS,
NODES,
PF_FLOW_ENTRY_IN_TMP,
PROMPT_FLOW_DIR_NAME,
REFRESH_CONNECTIONS_DIR_LOCK_PATH,
REGISTRY_URI_PREFIX,
Expand Down Expand Up @@ -1019,8 +1020,16 @@ def create_temp_flex_flow_yaml_core(entry: Union[str, PathLike, Callable], code:
logger.warning(f"Found existing {flow_yaml_path.as_posix()}, will not respect it in runtime.")
with open(flow_yaml_path, "r", encoding=DEFAULT_ENCODING) as f:
existing_content = f.read()
if not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, it's snapshot will be empty.")

create_yaml_in_tmp = False
if os.environ.get(PF_FLOW_ENTRY_IN_TMP, "False").lower() == "true":
logger.debug("PF_FLOW_ENTRY_IN_TMP is set to true, its snapshot will be empty.")
create_yaml_in_tmp = True
elif not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, its snapshot will be empty.")
create_yaml_in_tmp = True

if create_yaml_in_tmp:
# make sure run name is from entry instead of random folder name
temp_dir = tempfile.mkdtemp(prefix=_sanitize_python_variable_name(entry) + "_")
flow_yaml_path = Path(temp_dir) / FLOW_FLEX_YAML
Expand Down
8 changes: 5 additions & 3 deletions src/promptflow-evals/promptflow/evals/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ class EvaluationMetrics:


class Prefixes:
_INPUTS = 'inputs.'
_OUTPUTS = 'outputs.'
_TGT_OUTPUTS = '__outputs.'
_INPUTS = "inputs."
_OUTPUTS = "outputs."
_TGT_OUTPUTS = "__outputs."


DEFAULT_EVALUATION_RESULTS_FILE_NAME = "evaluation_results.json"

CONTENT_SAFETY_DEFECT_RATE_THRESHOLD_DEFAULT = 4

BATCH_RUN_TIMEOUT = 3600
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from .batch_run_context import BatchRunContext
from .code_client import CodeClient
from .proxy_client import ProxyClient

__all__ = ["CodeClient", "ProxyClient", "BatchRunContext"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os

from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
from .code_client import CodeClient
from .proxy_client import ProxyClient


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

if isinstance(self.client, ProxyClient):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()

if isinstance(self.client, ProxyClient):
os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None)
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,15 @@

import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict
from promptflow.evals.evaluate._utils import _apply_column_mapping, _has_aggregator, load_jsonl
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._user_agent import USER_AGENT
from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
Expand All @@ -40,22 +24,27 @@ def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
result_df = self.run.result(timeout=BATCH_RUN_TIMEOUT)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
aggregated_metrics = (
self.aggregated_metrics.result(timeout=BATCH_RUN_TIMEOUT)
if self.aggregated_metrics is not None
else None
)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")
LOGGER.warning(
f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics"
)

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

Expand All @@ -71,8 +60,11 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
row_metric_results = []
input_df = _apply_column_mapping(input_df, column_mapping)
# Ignoring args and kwargs from the signature since they are usually catching extra arguments
parameters = {param.name for param in inspect.signature(evaluator).parameters.values()
if param.name not in ['args', 'kwargs']}
parameters = {
param.name
for param in inspect.signature(evaluator).parameters.values()
if param.name not in ["args", "kwargs"]
}
for value in input_df.to_dict("records"):
# Filter out only the parameters that are present in the input data
# if no parameters then pass data as is
Expand All @@ -83,7 +75,7 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
try:
result = row_metric_future.result()
if not isinstance(result, dict):
result = {'output': result}
result = {"output": result}
row_metric_results.append(result)
except Exception as ex: # pylint: disable=broad-except
msg_1 = f"Error calculating value for row {row_number} for metric {evaluator_name}, "
Expand Down Expand Up @@ -114,8 +106,9 @@ def _calculate_aggregations(self, evaluator, run):
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
LOGGER.warning(
f"Error calculating aggregations for evaluator {run.evaluator_name}," f" failed with error {str(ex)}"
)
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging

import numpy as np

from promptflow.client import PFClient
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class ProxyRun:
def __init__(self, run, **kwargs):
self.run = run


class ProxyClient:
def __init__(self, pf_client: PFClient):
self._pf_client = pf_client
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(self, flow, data, column_mapping=None, **kwargs):
eval_future = self._thread_pool.submit(
self._pf_client.run, flow, data=data, column_mapping=column_mapping, **kwargs
)
return ProxyRun(run=eval_future)

def get_details(self, proxy_run, all_results=False):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
result_df = self._pf_client.get_details(run, all_results=all_results)
result_df.replace("(Failed)", np.nan, inplace=True)
return result_df

def get_metrics(self, proxy_run):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
return self._pf_client.get_metrics(run)

This file was deleted.

Loading

0 comments on commit 1ac0e39

Please sign in to comment.