Skip to content

Commit

Permalink
Merge to main
Browse files Browse the repository at this point in the history
  • Loading branch information
nick863 committed Jun 12, 2024
2 parents 7e192b4 + 43b1f6b commit 63bd646
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Callable
from unittest.mock import patch
from datetime import datetime

import pytest
from _constants import PROMPTFLOW_ROOT
Expand Down Expand Up @@ -56,6 +57,7 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:
assert cloud_run.display_name == run.display_name
assert cloud_run.status == run.status
assert cloud_run._start_time and cloud_run._end_time
assert datetime.fromisoformat(cloud_run.created_on) == datetime.fromisoformat(run.created_on)
assert cloud_run.properties["azureml.promptflow.local_to_cloud"] == "true"
assert cloud_run.properties["azureml.promptflow.snapshot_id"]
assert cloud_run.properties[Local2CloudProperties.EVAL_ARTIFACTS]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _submit_bulk_run(
exception = None
# create run to db when fully prepared to run in executor, otherwise won't create it
run._status = Status.Running.value
run._start_time = datetime.datetime.now()
run._start_time = datetime.datetime.now().astimezone()
run._dump() # pylint: disable=protected-access

resume_from_run_storage = (
Expand Down Expand Up @@ -245,7 +245,7 @@ def _submit_bulk_run(
run = self.run_operations.update(
name=run.name,
status=status,
end_time=datetime.datetime.now(),
end_time=datetime.datetime.now().astimezone(),
system_metrics=system_metrics,
)

Expand Down
6 changes: 3 additions & 3 deletions src/promptflow-devkit/promptflow/_sdk/entities/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(
self.variant = variant
self.run = run
self._resume_from = kwargs.get("resume_from", None)
self._created_on = created_on or datetime.datetime.now()
self._created_on = created_on or datetime.datetime.now().astimezone()
self._status = status or RunStatus.NOT_STARTED
self.environment_variables = environment_variables or {}
self.connections = connections or {}
Expand Down Expand Up @@ -756,8 +756,8 @@ def _to_rest_object_for_local_to_cloud(self, local_to_cloud_info: dict, variant_
# when sending the request to the server.
# e.g. WARNING:msrest.serialization:Datetime with no tzinfo will be considered UTC.
# for start_time, switch to "_start_time" once the bug item is fixed: BUG - 3085432.
start_time = self._created_on.isoformat() + "Z" if self._created_on else None
end_time = self._end_time.isoformat() + "Z" if self._end_time else None
start_time = self._created_on.isoformat() if self._created_on else None
end_time = self._end_time.isoformat() if self._end_time else None

# extract properties that needs to be passed to the request
properties = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict

from ..._user_agent import USER_AGENT

Expand All @@ -32,17 +33,34 @@ def __exit__(self, exc_type, exc_val, exc_tb):


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, **kwargs):
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
self.evaluator_name = evaluator_name if evaluator_name is not None else ""
self.input_data = input_data
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

return aggregated_metrics


class CodeClient:
def __init__(self):
Expand Down Expand Up @@ -82,6 +100,24 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
verify_integrity=True,
)

def _calculate_aggregations(self, evaluator, run):
try:
if _has_aggregator(evaluator):
aggregate_input = None
evaluator_output = run.get_result_df(exclude_inputs=True)
if len(evaluator_output.columns) == 1 and evaluator_output.columns[0] == "output":
aggregate_input = evaluator_output["output"].tolist()
else:
aggregate_input = [AttrDict(item) for item in evaluator_output.to_dict("records")]

aggr_func = getattr(evaluator, "__aggregate__")
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
input_df = data
if not isinstance(input_df, pd.DataFrame):
Expand All @@ -92,8 +128,21 @@ def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):

input_df = pd.DataFrame(json_data)
eval_future = self._thread_pool.submit(self._calculate_metric, flow, input_df, column_mapping, evaluator_name)
return CodeRun(run=eval_future, input_data=data, evaluator_name=evaluator_name)
run = CodeRun(run=eval_future, input_data=data, evaluator_name=evaluator_name, aggregated_metrics=None)
aggregation_future = self._thread_pool.submit(self._calculate_aggregations, evaluator=flow, run=run)
run.aggregated_metrics = aggregation_future
return run

def get_details(self, run, all_results=False):
result_df = run.run.result(timeout=60 * 60)
result_df = run.get_result_df(exclude_inputs=not all_results)
return result_df

def get_metrics(self, run):
try:
aggregated_metrics = run.get_aggregated_metrics()
print("Aggregated metrics")
print(aggregated_metrics)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {run.evaluator_name}, failed with error {str(ex)}")
return None
return aggregated_metrics
13 changes: 7 additions & 6 deletions src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ def __init__(self,
self._resource_group_name: str = group_name
self._workspace_name: str = workspace_name
self._ml_client: MLClient = ml_client
self._url_base = urlparse(self._tracking_uri).netloc
self._is_broken = self._start_run(run_name)
self._url_base = urlparse(self._tracking_uri).netloc
self._is_broken = self._start_run(run_name)
self._is_terminated = False
self.name: str = run_name if run_name else self.info.run_id

Expand All @@ -126,13 +126,14 @@ def _get_scope(self):
self._workspace_name,
)

def _start_run(self, run_name: Optional[str]) -> bool:

def _start_run(self, run_name: Optional[str]) -> bool:
"""
Make a request to start the mlflow run. If the run will not start, it will be
marked as broken and the logging will be switched off.
marked as broken and the logging will be switched off.
:param run_name: The display name for the run.
:type run_name: Optional[str]
:type run_name: Optional[str]
:returns: True if the run has started and False otherwise.
"""
url = (
Expand All @@ -150,7 +151,7 @@ def _start_run(self, run_name: Optional[str]) -> bool:
]
}
if run_name:
body["run_name"] = run_name
body["run_name"] = run_name
response = self.request_with_retry(
url=url,
method='POST',
Expand Down
5 changes: 5 additions & 0 deletions src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ def evaluate(
# get_details needs to be called within BatchRunContext scope in order to have user agent populated
for evaluator_name, evaluator_info in evaluators_info.items():
evaluator_info["result"] = batch_run_client.get_details(evaluator_info["run"], all_results=True)
evaluator_info["metrics"] = batch_run_client.get_metrics(evaluator_info["run"])

# Concatenate all results
evaluators_result_df = None
evaluators_metric = {}
for evaluator_name, evaluator_info in evaluators_info.items():
evaluator_result_df = evaluator_info["result"]

Expand All @@ -431,13 +433,16 @@ def evaluate(
else evaluator_result_df
)

evaluators_metric.update({f"{evaluator_name}.{k}": v for k, v in evaluator_info["metrics"].items()})

# Rename columns, generated by target function to outputs instead of inputs.
# If target generates columns, already present in the input data, these columns
# will be marked as outputs already so we do not need to rename them.
input_data_df = _rename_columns_conditionally(input_data_df)

result_df = pd.concat([input_data_df, evaluators_result_df], axis=1, verify_integrity=True)
metrics = _aggregate_metrics(evaluators_result_df, evaluators)
metrics.update(evaluators_metric)

studio_url = _log_metrics_and_instance_results(
metrics, result_df, trace_destination, target_run, evaluation_name
Expand Down
4 changes: 4 additions & 0 deletions src/promptflow-evals/promptflow/evals/evaluate/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,7 @@ def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace
result_df.rename(columns=column_mapping, inplace=True)

return result_df


def _has_aggregator(evaluator):
return hasattr(evaluator, "__aggregate__")
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import List

import numpy as np


class AnswerLength:

def __init__(self, *, return_json: bool = False, aggregate_return_json: bool = False):
self.return_json = return_json
self.aggregate_return_json = aggregate_return_json

def __call__(self, answer: str, **kwargs):
return {"length": len(answer)} if self.return_json else len(answer)

def __aggregate__(self, line_results: List[str]) -> dict:
median_value = np.median([v.length for v in line_results]) if self.return_json else np.median(line_results)
return {"median": median_value} if self.aggregate_return_json else median_value
51 changes: 51 additions & 0 deletions src/promptflow-evals/tests/evals/e2etests/test_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,57 @@ def test_evaluate_track_in_cloud_no_target(
assert remote_run["runMetadata"]["properties"]["_azureml.evaluation_run"] == "azure-ai-generative-parent"
assert remote_run["runMetadata"]["displayName"] == evaluation_name

@pytest.mark.parametrize(
"return_json, aggregate_return_json",
[
(True, True),
(True, False),
(False, True),
(False, False),
],
)
def test_evaluate_aggregation_with_threadpool(self, data_file, return_json, aggregate_return_json):
from .custom_evaluators.answer_length_with_aggregation import AnswerLength

result = evaluate(
data=data_file,
evaluators={
"answer_length": AnswerLength(
return_json=return_json, aggregate_return_json=aggregate_return_json),
"f1_score": F1ScoreEvaluator(),
},
)
assert result is not None
assert "metrics" in result
if aggregate_return_json:
assert "answer_length.median" in result["metrics"].keys()

@pytest.mark.parametrize(
"return_json, aggregate_return_json",
[
(True, True),
(True, False),
(False, True),
(False, False),
],
)
def test_evaluate_aggregation(self, data_file, return_json, aggregate_return_json):
from .custom_evaluators.answer_length_with_aggregation import AnswerLength

result = evaluate(
data=data_file,
evaluators={
"answer_length": AnswerLength(
return_json=return_json, aggregate_return_json=aggregate_return_json),
"f1_score": F1ScoreEvaluator(),
},
_use_thread_pool=False,
)
assert result is not None
assert "metrics" in result
if aggregate_return_json:
assert "answer_length.median" in result["metrics"].keys()

@pytest.mark.skip(reason="TODO: Add test back")
def test_prompty_with_threadpool_implementation(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def questions_file():
return os.path.join(data_path, "questions.jsonl")


@pytest.fixture
def setup_data(azure_ml_client, project_scope):
@pytest.fixture
def setup_data(azure_ml_client, project_scope):
run = EvalRun(
run_name='test',
tracking_uri=(
Expand All @@ -43,7 +43,7 @@ def setup_data(azure_ml_client, project_scope):
subscription_id=project_scope["subscription_id"],
group_name=project_scope["resource_group_name"],
workspace_name=project_scope["project_name"],
ml_client=azure_ml_client
ml_client=azure_ml_client
)
yield
run.end_run("FINISHED")
Expand Down

0 comments on commit 63bd646

Please sign in to comment.