Skip to content

Commit

Permalink
Log aggregated metric generated by flex flow to eval run (#3399)
Browse files Browse the repository at this point in the history
# Description

Log aggregated metric generated by flex flow to eval run

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
singankit committed Jun 11, 2024
1 parent b10625f commit 5f82247
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import pandas as pd

from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl
from promptflow.evals.evaluate._utils import _apply_column_mapping, load_jsonl, _has_aggregator
from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api
from promptflow.contracts.types import AttrDict

from ..._user_agent import USER_AGENT

Expand All @@ -32,17 +33,34 @@ def __exit__(self, exc_type, exc_val, exc_tb):


class CodeRun:
def __init__(self, run, input_data, evaluator_name=None, **kwargs):
def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None, **kwargs):
self.run = run
self.evaluator_name = evaluator_name if evaluator_name is not None else ""
self.input_data = input_data
self.aggregated_metrics = aggregated_metrics

def get_result_df(self, exclude_inputs=False):
result_df = self.run.result(timeout=60 * 60)
if exclude_inputs:
result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")])
return result_df

def get_aggregated_metrics(self):
try:
aggregated_metrics = self.aggregated_metrics.result(timeout=60 * 60) \
if self.aggregated_metrics is not None else None
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {self.evaluator_name}, failed with error {str(ex)}")
aggregated_metrics = None

if not isinstance(aggregated_metrics, dict):
LOGGER.warning(f"Aggregated metrics for evaluator {self.evaluator_name}"
f" is not a dictionary will not be logged as metrics")

aggregated_metrics = aggregated_metrics if isinstance(aggregated_metrics, dict) else {}

return aggregated_metrics


class CodeClient:
def __init__(self):
Expand Down Expand Up @@ -82,6 +100,24 @@ def _calculate_metric(self, evaluator, input_df, column_mapping, evaluator_name)
verify_integrity=True,
)

def _calculate_aggregations(self, evaluator, run):
try:
if _has_aggregator(evaluator):
aggregate_input = None
evaluator_output = run.get_result_df(exclude_inputs=True)
if len(evaluator_output.columns) == 1 and evaluator_output.columns[0] == "output":
aggregate_input = evaluator_output["output"].tolist()
else:
aggregate_input = [AttrDict(item) for item in evaluator_output.to_dict("records")]

aggr_func = getattr(evaluator, "__aggregate__")
aggregated_output = aggr_func(aggregate_input)
return aggregated_output
except Exception as ex:
LOGGER.warning(f"Error calculating aggregations for evaluator {run.evaluator_name},"
f" failed with error {str(ex)}")
return None

def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):
input_df = data
if not isinstance(input_df, pd.DataFrame):
Expand All @@ -92,8 +128,21 @@ def run(self, flow, data, evaluator_name=None, column_mapping=None, **kwargs):

input_df = pd.DataFrame(json_data)
eval_future = self._thread_pool.submit(self._calculate_metric, flow, input_df, column_mapping, evaluator_name)
return CodeRun(run=eval_future, input_data=data, evaluator_name=evaluator_name)
run = CodeRun(run=eval_future, input_data=data, evaluator_name=evaluator_name, aggregated_metrics=None)
aggregation_future = self._thread_pool.submit(self._calculate_aggregations, evaluator=flow, run=run)
run.aggregated_metrics = aggregation_future
return run

def get_details(self, run, all_results=False):
result_df = run.run.result(timeout=60 * 60)
result_df = run.get_result_df(exclude_inputs=not all_results)
return result_df

def get_metrics(self, run):
try:
aggregated_metrics = run.get_aggregated_metrics()
print("Aggregated metrics")
print(aggregated_metrics)
except Exception as ex:
LOGGER.debug(f"Error calculating metrics for evaluator {run.evaluator_name}, failed with error {str(ex)}")
return None
return aggregated_metrics
5 changes: 5 additions & 0 deletions src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ def evaluate(
# get_details needs to be called within BatchRunContext scope in order to have user agent populated
for evaluator_name, evaluator_info in evaluators_info.items():
evaluator_info["result"] = batch_run_client.get_details(evaluator_info["run"], all_results=True)
evaluator_info["metrics"] = batch_run_client.get_metrics(evaluator_info["run"])

# Concatenate all results
evaluators_result_df = None
evaluators_metric = {}
for evaluator_name, evaluator_info in evaluators_info.items():
evaluator_result_df = evaluator_info["result"]

Expand All @@ -431,13 +433,16 @@ def evaluate(
else evaluator_result_df
)

evaluators_metric.update({f"{evaluator_name}.{k}": v for k, v in evaluator_info["metrics"].items()})

# Rename columns, generated by target function to outputs instead of inputs.
# If target generates columns, already present in the input data, these columns
# will be marked as outputs already so we do not need to rename them.
input_data_df = _rename_columns_conditionally(input_data_df)

result_df = pd.concat([input_data_df, evaluators_result_df], axis=1, verify_integrity=True)
metrics = _aggregate_metrics(evaluators_result_df, evaluators)
metrics.update(evaluators_metric)

studio_url = _log_metrics_and_instance_results(
metrics, result_df, trace_destination, target_run
Expand Down
4 changes: 4 additions & 0 deletions src/promptflow-evals/promptflow/evals/evaluate/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,7 @@ def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace
result_df.rename(columns=column_mapping, inplace=True)

return result_df


def _has_aggregator(evaluator):
return hasattr(evaluator, "__aggregate__")
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import List

import numpy as np


class AnswerLength:

def __init__(self, *, return_json: bool = False, aggregate_return_json: bool = False):
self.return_json = return_json
self.aggregate_return_json = aggregate_return_json

def __call__(self, answer: str, **kwargs):
return {"length": len(answer)} if self.return_json else len(answer)

def __aggregate__(self, line_results: List[str]) -> dict:
median_value = np.median([v.length for v in line_results]) if self.return_json else np.median(line_results)
return {"median": median_value} if self.aggregate_return_json else median_value
51 changes: 51 additions & 0 deletions src/promptflow-evals/tests/evals/e2etests/test_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,57 @@ def test_evaluate_track_in_cloud_no_target(
assert remote_run["runMetadata"]["properties"]["_azureml.evaluation_run"] == "azure-ai-generative-parent"
assert remote_run["runMetadata"]["displayName"] == evaluation_name

@pytest.mark.parametrize(
"return_json, aggregate_return_json",
[
(True, True),
(True, False),
(False, True),
(False, False),
],
)
def test_evaluate_aggregation_with_threadpool(self, data_file, return_json, aggregate_return_json):
from .custom_evaluators.answer_length_with_aggregation import AnswerLength

result = evaluate(
data=data_file,
evaluators={
"answer_length": AnswerLength(
return_json=return_json, aggregate_return_json=aggregate_return_json),
"f1_score": F1ScoreEvaluator(),
},
)
assert result is not None
assert "metrics" in result
if aggregate_return_json:
assert "answer_length.median" in result["metrics"].keys()

@pytest.mark.parametrize(
"return_json, aggregate_return_json",
[
(True, True),
(True, False),
(False, True),
(False, False),
],
)
def test_evaluate_aggregation(self, data_file, return_json, aggregate_return_json):
from .custom_evaluators.answer_length_with_aggregation import AnswerLength

result = evaluate(
data=data_file,
evaluators={
"answer_length": AnswerLength(
return_json=return_json, aggregate_return_json=aggregate_return_json),
"f1_score": F1ScoreEvaluator(),
},
_use_thread_pool=False,
)
assert result is not None
assert "metrics" in result
if aggregate_return_json:
assert "answer_length.median" in result["metrics"].keys()

@pytest.mark.skip(reason="TODO: Add test back")
def test_prompty_with_threadpool_implementation(self):
pass

0 comments on commit 5f82247

Please sign in to comment.