Skip to content

Commit

Permalink
Merge branch 'main' into dev/chenyin_remove_sys_exit
Browse files Browse the repository at this point in the history
  • Loading branch information
YingChen1996 committed Jun 18, 2024
2 parents add3b9e + 08cc0d4 commit 7698cc8
Show file tree
Hide file tree
Showing 29 changed files with 371 additions and 822 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_doc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
shell: powershell
working-directory: scripts/docs/
run: |-
pip install langchain
pip install langchain tenacity<8.4.0
./doc_generation.ps1 -WithReferenceDoc:$true -WarningAsError:$true
# Note: We have this job separately because some error may missing when build link check exists.
Expand Down Expand Up @@ -85,5 +85,5 @@ jobs:
shell: powershell
working-directory: scripts/docs/
run: |-
pip install langchain
pip install langchain tenacity<8.4.0
./doc_generation.ps1 -WithReferenceDoc:$true -WarningAsError:$true -BuildLinkCheck
2 changes: 1 addition & 1 deletion .github/workflows/promptflow-executor-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
run: |
gci env:* | sort-object name
az account show
pip install langchain-community
pip install langchain-community tenacity<8.4.0
# numexpr is required by langchain in e2e tests.
pip install numexpr
python scripts/building/run_coverage_tests.py `
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/promptflow-executor-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ on:
- src/promptflow/promptflow/storage/**
- src/promptflow/tests/*
- src/promptflow/tests/executor/**
- src/promptflow/tests/test_configs/**
- src/promptflow-tracing/promptflow/**
- src/promptflow-core/promptflow/**
- src/promptflow-devkit/promptflow/**
Expand Down Expand Up @@ -150,7 +151,7 @@ jobs:
run: |
gci env:* | sort-object name
az account show
pip install langchain-community
pip install langchain-community tenacity<8.4.0
python scripts/building/run_coverage_tests.py `
-p ${{ env.testWorkingDirectory }}/promptflow `
-t ${{ env.testWorkingDirectory }}/tests/executor/unittests `
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/promptflow-import-linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ jobs:
echo "=== promptflow-azure full lints ==="
poetry run pip install langchain
poetry run pip install "tenacity<8.4.0"
poetry run python ${{ github.workspace }}/scripts/import_linter/import_linter.py
1 change: 1 addition & 0 deletions src/promptflow-devkit/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Bugs Fixed
- Fix incompatibility with `trace.NoOpTracerProvider` when set exporter to prompt flow service.
- Add missing user agent in trace usage telemetry.

## v1.12.0 (2024.06.11)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import re
from pathlib import Path
from typing import Any, Dict, List

from promptflow._constants import FlowEntryRegex
from promptflow._core.entry_meta_generator import _generate_flow_meta
from promptflow._sdk._constants import FLOW_META_JSON_GEN_TIMEOUT
from promptflow._sdk._constants import FLOW_META_JSON_GEN_TIMEOUT, PF_FLOW_META_LOAD_IN_SUBPROCESS
from promptflow._utils.flow_utils import resolve_python_entry_file

from ._base_inspector_proxy import AbstractInspectorProxy
Expand Down Expand Up @@ -35,7 +36,7 @@ def get_entry_meta(
**kwargs,
) -> Dict[str, Any]:
timeout = kwargs.get("timeout", FLOW_META_JSON_GEN_TIMEOUT)
load_in_subprocess = kwargs.get("load_in_subprocess", True)
load_in_subprocess = os.environ.get(PF_FLOW_META_LOAD_IN_SUBPROCESS, "True").lower() == "true"

flow_dag = {"entry": entry}
# generate flow.json only for eager flow for now
Expand Down
2 changes: 2 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def _prepare_home_dir() -> Path:
PF_TRACE_CONTEXT_ATTR = "attributes"
PF_SERVICE_DEBUG = "PF_SERVICE_DEBUG"
PF_SYSTEM_METRICS_PREFIX = "__pf__"
PF_FLOW_ENTRY_IN_TMP = "PF_FLOW_ENTRY_IN_TMP"
PF_FLOW_META_LOAD_IN_SUBPROCESS = "PF_FLOW_META_LOAD_IN_SUBPROCESS"

LOCAL_MGMT_DB_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite").resolve()
LOCAL_MGMT_DB_SESSION_ACQUIRE_LOCK_PATH = (HOME_PROMPT_FLOW_DIR / "pf.sqlite.lock").resolve()
Expand Down
13 changes: 11 additions & 2 deletions src/promptflow-devkit/promptflow/_sdk/_utilities/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NODE,
NODE_VARIANTS,
NODES,
PF_FLOW_ENTRY_IN_TMP,
PROMPT_FLOW_DIR_NAME,
REFRESH_CONNECTIONS_DIR_LOCK_PATH,
REGISTRY_URI_PREFIX,
Expand Down Expand Up @@ -1019,8 +1020,16 @@ def create_temp_flex_flow_yaml_core(entry: Union[str, PathLike, Callable], code:
logger.warning(f"Found existing {flow_yaml_path.as_posix()}, will not respect it in runtime.")
with open(flow_yaml_path, "r", encoding=DEFAULT_ENCODING) as f:
existing_content = f.read()
if not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, it's snapshot will be empty.")

create_yaml_in_tmp = False
if os.environ.get(PF_FLOW_ENTRY_IN_TMP, "False").lower() == "true":
logger.debug("PF_FLOW_ENTRY_IN_TMP is set to true, its snapshot will be empty.")
create_yaml_in_tmp = True
elif not is_local_module(entry_string=entry, code=code):
logger.debug(f"Entry {entry} is not found in local, its snapshot will be empty.")
create_yaml_in_tmp = True

if create_yaml_in_tmp:
# make sure run name is from entry instead of random folder name
temp_dir = tempfile.mkdtemp(prefix=_sanitize_python_variable_name(entry) + "_")
flow_yaml_path = Path(temp_dir) / FLOW_FLEX_YAML
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
)
from promptflow._sdk._constants import HOME_PROMPT_FLOW_DIR, AzureMLWorkspaceTriad
from promptflow._sdk._telemetry.telemetry import get_telemetry_logger
from promptflow._sdk._user_agent import USER_AGENT
from promptflow._sdk.entities._trace import Span
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow._utils.user_agent_utils import setup_user_agent_to_operation_context
from promptflow.core._errors import MissingRequiredPackage

from .general_utils import convert_time_unix_nano_to_timestamp, json_load
Expand Down Expand Up @@ -345,6 +347,8 @@ class TraceTelemetryHelper:
CUSTOM_DIMENSIONS_TRACE_COUNT = "trace_count"

def __init__(self):
# `setup_user_agent_to_operation_context` will get user agent and return
self._user_agent = setup_user_agent_to_operation_context(USER_AGENT)
self._telemetry_logger = get_telemetry_logger()
self._lock = multiprocessing.Lock()
self._summary: typing.Dict[TraceCountKey, int] = dict()
Expand All @@ -369,6 +373,7 @@ def log_telemetry(self) -> None:
for key, count in summary_to_log.items():
custom_dimensions = key._asdict()
custom_dimensions[self.CUSTOM_DIMENSIONS_TRACE_COUNT] = count
custom_dimensions["user_agent"] = self._user_agent
self._telemetry_logger.info(self.TELEMETRY_ACTIVITY_NAME, extra={"custom_dimensions": custom_dimensions})


Expand Down
1 change: 1 addition & 0 deletions src/promptflow-devkit/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ promptflow = {path = "../promptflow"}
promptflow-tools = {path = "../promptflow-tools"}
promptflow-recording = {path = "../promptflow-recording"}
numpy = "<2.0.0" # avoid pandas incompatibility
tenacity = "<8.4.0" # tenacity has breaking in 8.4.0

[tool.poetry.group.test.dependencies]
pytest = "*"
Expand Down
32 changes: 30 additions & 2 deletions src/promptflow-devkit/tests/sdk_cli_test/unittests/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
import uuid
from typing import Dict
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from mock import mock
Expand All @@ -34,7 +34,13 @@
ContextAttributeKey,
)
from promptflow._sdk._tracing import setup_exporter_to_pfs, start_trace_with_devkit
from promptflow._sdk._utilities.tracing_utils import WorkspaceKindLocalCache, append_conditions, parse_protobuf_span
from promptflow._sdk._utilities.tracing_utils import (
TraceCountKey,
TraceTelemetryHelper,
WorkspaceKindLocalCache,
append_conditions,
parse_protobuf_span,
)
from promptflow.client import PFClient
from promptflow.exceptions import UserErrorException
from promptflow.tracing._operation_context import OperationContext
Expand Down Expand Up @@ -325,3 +331,25 @@ def test_expired_cache(self):
mock_get_kind.return_value = kind
assert ws_local_cache.get_kind() == kind
assert not ws_local_cache.is_expired


@pytest.mark.unittest
@pytest.mark.sdk_test
class TestTraceTelemetry:
def test_user_agent_in_custom_dimensions(self):
def mock_info(*args, **kwargs):
extra: dict = kwargs.get("extra")
custom_dimensions: dict = extra.get("custom_dimensions")
assert "user_agent" in custom_dimensions.keys()
assert "promptflow-sdk/" in custom_dimensions["user_agent"]

mock_telemetry_logger = MagicMock()
mock_telemetry_logger.info = mock_info
with patch("promptflow._sdk._utilities.tracing_utils.get_telemetry_logger", return_value=mock_telemetry_logger):
telemetry_helper = TraceTelemetryHelper()
summary = dict()
k = TraceCountKey(None, None, None, "script", "code")
summary[k] = 1
# append the mock summary and log
telemetry_helper.append(summary)
telemetry_helper.log_telemetry()
8 changes: 5 additions & 3 deletions src/promptflow-evals/promptflow/evals/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ class EvaluationMetrics:


class Prefixes:
_INPUTS = 'inputs.'
_OUTPUTS = 'outputs.'
_TGT_OUTPUTS = '__outputs.'
_INPUTS = "inputs."
_OUTPUTS = "outputs."
_TGT_OUTPUTS = "__outputs."


DEFAULT_EVALUATION_RESULTS_FILE_NAME = "evaluation_results.json"

CONTENT_SAFETY_DEFECT_RATE_THRESHOLD_DEFAULT = 4

BATCH_RUN_TIMEOUT = 3600
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from .batch_run_context import BatchRunContext
from .code_client import CodeClient
from .proxy_client import ProxyClient

__all__ = ["CodeClient", "ProxyClient", "BatchRunContext"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os

from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP, PF_FLOW_META_LOAD_IN_SUBPROCESS
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
from .code_client import CodeClient
from .proxy_client import ProxyClient


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

if isinstance(self.client, ProxyClient):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"
os.environ[PF_FLOW_META_LOAD_IN_SUBPROCESS] = "false"

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()

if isinstance(self.client, ProxyClient):
os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None)
os.environ.pop(PF_FLOW_META_LOAD_IN_SUBPROCESS, None)
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,15 @@

import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict
from promptflow.evals.evaluate._utils import _apply_column_mapping, _has_aggregator, load_jsonl
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor

from ..._user_agent import USER_AGENT
from ..._constants import BATCH_RUN_TIMEOUT

LOGGER = logging.getLogger(__name__)


class BatchRunContext:
def __init__(self, client):
self.client = client

def __enter__(self):
if isinstance(self.client, CodeClient):
ClientUserAgentUtil.append_user_agent(USER_AGENT)
inject_openai_api()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
Expand All @@ -40,22 +24,27 @@ def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
result_df = self.run.result(timeout=BATCH_RUN_TIMEOUT)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
aggregated_metrics = (
self.aggregated_metrics.result(timeout=BATCH_RUN_TIMEOUT)
if self.aggregated_metrics is not None
else None
)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")
LOGGER.warning(
f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics"
)

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

Expand All @@ -71,8 +60,11 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
row_metric_results = []
input_df = _apply_column_mapping(input_df, column_mapping)
# Ignoring args and kwargs from the signature since they are usually catching extra arguments
parameters = {param.name for param in inspect.signature(evaluator).parameters.values()
if param.name not in ['args', 'kwargs']}
parameters = {
param.name
for param in inspect.signature(evaluator).parameters.values()
if param.name not in ["args", "kwargs"]
}
for value in input_df.to_dict("records"):
# Filter out only the parameters that are present in the input data
# if no parameters then pass data as is
Expand All @@ -83,7 +75,7 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
try:
result = row_metric_future.result()
if not isinstance(result, dict):
result = {'output': result}
result = {"output": result}
row_metric_results.append(result)
except Exception as ex: # pylint: disable=broad-except
msg_1 = f"Error calculating value for row {row_number} for metric {evaluator_name}, "
Expand Down Expand Up @@ -114,8 +106,9 @@ def _calculate_aggregations(self, evaluator, run):
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
LOGGER.warning(
f"Error calculating aggregations for evaluator {run.evaluator_name}," f" failed with error {str(ex)}"
)
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
Expand Down
Loading

0 comments on commit 7698cc8

Please sign in to comment.