Skip to content

Commit

Permalink
Merge branch 'main' into zhrua/fix_prompty_test
Browse files Browse the repository at this point in the history
  • Loading branch information
wangchao1230 committed Jun 12, 2024
2 parents 250f62c + 43d0226 commit e210af6
Show file tree
Hide file tree
Showing 25 changed files with 4,347 additions and 167 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/sdk-cli-azure-test-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ on:
- src/promptflow-core/**
- src/promptflow-devkit/**
- src/promptflow-azure/**
- src/promptflow-recording/**


env:
IS_IN_CI_PIPELINE: "true"
PROMPT_FLOW_TEST_MODE: "replay"
PROMPT_FLOW_TEST_PACKAGE: "promptflow-azure"
TRACING_DIRECTORY: ${{ github.workspace }}/src/promptflow-tracing
WORKING_DIRECTORY: ${{ github.workspace }}/src/promptflow-azure
CORE_DIRECTORY: ${{ github.workspace }}/src/promptflow-core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def create_or_update(
:param display_name: The display name of the flow to create. Default to be flow folder name + timestamp
if not specified. e.g. "web-classification-10-27-2023-14-19-10"
:type display_name: str
:param type: The type of the flow to create. One of ["standard", evaluation", "chat"].
:param type: The type of the flow to create. One of ["standard", "evaluation", "chat"].
Default to be "standard" if not specified.
:type type: str
:param description: The description of the flow to create. Default to be the description in flow yaml file.
Expand Down
5 changes: 5 additions & 0 deletions src/promptflow-azure/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
_connection_setup = False


@pytest.fixture(autouse=True)
def declare_test_package(monkeypatch: MonkeyPatch):
monkeypatch.setenv("PROMPT_FLOW_TEST_PACKAGE", "promptflow-azure")


@pytest.fixture
def local_client() -> LocalClient:
yield LocalClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Callable
from unittest.mock import patch
from datetime import datetime

import pytest
from _constants import PROMPTFLOW_ROOT
Expand Down Expand Up @@ -56,6 +57,7 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:
assert cloud_run.display_name == run.display_name
assert cloud_run.status == run.status
assert cloud_run._start_time and cloud_run._end_time
assert datetime.fromisoformat(cloud_run.created_on) == datetime.fromisoformat(run.created_on)
assert cloud_run.properties["azureml.promptflow.local_to_cloud"] == "true"
assert cloud_run.properties["azureml.promptflow.snapshot_id"]
assert cloud_run.properties[Local2CloudProperties.EVAL_ARTIFACTS]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _submit_bulk_run(
exception = None
# create run to db when fully prepared to run in executor, otherwise won't create it
run._status = Status.Running.value
run._start_time = datetime.datetime.now()
run._start_time = datetime.datetime.now().astimezone()
run._dump() # pylint: disable=protected-access

resume_from_run_storage = (
Expand Down Expand Up @@ -245,7 +245,7 @@ def _submit_bulk_run(
run = self.run_operations.update(
name=run.name,
status=status,
end_time=datetime.datetime.now(),
end_time=datetime.datetime.now().astimezone(),
system_metrics=system_metrics,
)

Expand Down
6 changes: 3 additions & 3 deletions src/promptflow-devkit/promptflow/_sdk/entities/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(
self.variant = variant
self.run = run
self._resume_from = kwargs.get("resume_from", None)
self._created_on = created_on or datetime.datetime.now()
self._created_on = created_on or datetime.datetime.now().astimezone()
self._status = status or RunStatus.NOT_STARTED
self.environment_variables = environment_variables or {}
self.connections = connections or {}
Expand Down Expand Up @@ -756,8 +756,8 @@ def _to_rest_object_for_local_to_cloud(self, local_to_cloud_info: dict, variant_
# when sending the request to the server.
# e.g. WARNING:msrest.serialization:Datetime with no tzinfo will be considered UTC.
# for start_time, switch to "_start_time" once the bug item is fixed: BUG - 3085432.
start_time = self._created_on.isoformat() + "Z" if self._created_on else None
end_time = self._end_time.isoformat() + "Z" if self._end_time else None
start_time = self._created_on.isoformat() if self._created_on else None
end_time = self._end_time.isoformat() if self._end_time else None

# extract properties that needs to be passed to the request
properties = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict

from ..._user_agent import USER_AGENT

Expand All @@ -32,17 +33,34 @@ def __exit__(self, exc_type, exc_val, exc_tb):


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, **kwargs):
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
self.evaluator_name = evaluator_name if evaluator_name is not None else ""
self.input_data = input_data
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

return aggregated_metrics


class CodeClient:
def __init__(self):
Expand Down Expand Up @@ -82,6 +100,24 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
verify_integrity=True,
)

def _calculate_aggregations(self, evaluator, run):
try:
if _has_aggregator(evaluator):
aggregate_input = None
evaluator_output = run.get_result_df(exclude_inputs=True)
if len(evaluator_output.columns) == 1 and evaluator_output.columns[0] == "output":
aggregate_input = evaluator_output["output"].tolist()
else:
aggregate_input = [AttrDict(item) for item in evaluator_output.to_dict("records")]

aggr_func = getattr(evaluator, "__aggregate__")
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
input_df = data
if not isinstance(input_df, pd.DataFrame):
Expand All @@ -92,8 +128,21 @@ def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):

input_df = pd.DataFrame(json_data)
eval_future = self._thread_pool.submit(self._calculate_metric, flow, input_df, column_mapping, evaluator_name)
return CodeRun(run=eval_future, input_data=data, evaluator_name=evaluator_name)
run = CodeRun(run=eval_future, input_data=data, evaluator_name=evaluator_name, aggregated_metrics=None)
aggregation_future = self._thread_pool.submit(self._calculate_aggregations, evaluator=flow, run=run)
run.aggregated_metrics = aggregation_future
return run

def get_details(self, run, all_results=False):
result_df = run.run.result(timeout=60 * 60)
result_df = run.get_result_df(exclude_inputs=not all_results)
return result_df

def get_metrics(self, run):
try:
aggregated_metrics = run.get_aggregated_metrics()
print("Aggregated metrics")
print(aggregated_metrics)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {run.evaluator_name}, failed with error {str(ex)}")
return None
return aggregated_metrics
Loading

0 comments on commit e210af6

Please sign in to comment.