In [None]:
# read evaluation_results.json

import json
import os


file_path = "evaluation_results.json"
# Read a JSON file and return its content.
if not os.path.exists(file_path):
    raise FileNotFoundError(f"File not found: {file_path}")

with open(file_path, "r") as file:
    results = json.load(file)

data = results[0].get("rows", None)
# from results extract all k,v pairs without "rows" and assing to metrics var
metrics = {k: v for k, v in results[0].items() if k != "rows"}
# Print the metrics
print("### Metrics:")
for key, value in metrics.items():
    print(f"{key}: {value}")
# Print the number of rows in the data
print("### Number of rows in data:")
print(len(data))

In [None]:
metrics_names = metrics.keys()
metrics_names = [f"outputs.{name}" for name in metrics_names]
print("### Metrics names:")
for name in metrics_names:
    print(name)

In [None]:
import pandas as pd

# Create a DataFrame from the data
df = pd.DataFrame(data)
# Print the first few rows of the DataFrame
print("### DataFrame:")
df.head(2)

In [None]:
df["outputs.chat_history"].apply(lambda x: len(x) if type(x) is list else 0).mean()

In [None]:
# aggregate metrics by scenario type
def aggregate_metrics(df):
    # Group by scenario_type and calculate the mean for each metric
    aggregated_df = df.groupby("inputs.scenarioType")[metrics_names].mean()
    return aggregated_df


# Aggregate the metrics
aggregated_df = aggregate_metrics(df)
# Print the aggregated DataFrame
print("### Aggregated DataFrame:")
aggregated_df

In [None]:
# sort the data from lowest to highest based on weighted average of metrics
# metrics containing "Recall" are multiplied by 2
# metrics containing "Reliability" are multiplied by 3
def calculate_weighted_average(row):
    weighted_sum = 0
    total_weight = 0
    for name in metrics_names:
        if "Recall" in name:
            weight = 2
        elif "Reliability" in name:
            weight = 3
        else:
            weight = 1
        value = row[name]
        weighted_sum += value * weight
        total_weight += weight
    return weighted_sum / total_weight if total_weight != 0 else 0


# sort the data based on weighted average
data.sort(key=calculate_weighted_average)
# Print the sorted data
print("### Sorted data:")
for row in data:
    # Print the row with the metrics names
    print(
        row["inputs.scenarioType"],
        {
            name.replace("outputs.", "").replace(".score", ""): row[name]
            for name in metrics_names
        },
    )

In [None]:
from evaluation.chatbot.evaluators.compare import is_similar


def compare_fn_details(
    function_calls1,
    function_calls2,
    ignore_functions=[
        "CobraPlugin-get_module_orgs",
        "CommonPlugin-ask_clarification",
        "CommonPlugin-start_over",
    ],
):
    outputs = []
    for call1 in function_calls1:
        function_name = call1["functionName"]
        if function_name in ignore_functions:
            continue
        # check if functionName is in expected_function_calls
        if function_name not in [call["functionName"] for call in function_calls2]:
            outputs.append(
                f'    - "{function_name}" not found in target function calls'
            )
        else:
            args_error_flag = False
            # check if arguments are the same
            call2 = next(
                call
                for call in function_calls2
                if call["functionName"] == function_name
            )
            arguments1 = call1["arguments"]
            arguments2 = call2["arguments"]
            # compare call1 and call2 keys and values wise and print the differences
            for key in arguments1.keys():
                if key not in arguments2.keys():
                    if not args_error_flag:
                        args_error_flag = True
                        outputs.append(f'    - "{function_name}" arguments differ')
                    # check argument name level mistakes
                    outputs.append(f'      - "{function_name}" has extra key {key}')
                elif not is_similar(str(arguments1[key]), str(arguments2[key])):
                    if not args_error_flag:
                        args_error_flag = True
                        outputs.append(f'    - "{function_name}" arguments differ')
                    # check argument value level mistakes
                    outputs.append(
                        f'      - "{key}": "{arguments1[key]}" != "{arguments2[key]}"'
                    )
    return outputs


# analyze and display differences in the data by comparing "outputs.function_calls" and "inputs.expected_function_calls"
def compare_function_calls(row):
    # Compare the function calls in the row
    function_calls = row["outputs.function_calls"]
    expected_function_calls = row["inputs.expected_function_calls"]
    # Check if they are the same
    actual_vs_expected = compare_fn_details(function_calls, expected_function_calls)
    expected_vs_actual = compare_fn_details(expected_function_calls, function_calls)
    if actual_vs_expected or expected_vs_actual:
        print(f'Function calls differ for scenarioType "{row["inputs.scenarioType"]}":')
        print(
            f"Metrics: {[{name.replace('outputs.', '').replace('.score', ''): row[name]} for name in metrics_names]}"
        )
        # Print the differences
        if actual_vs_expected:
            print(f"  Actual function calls vs Expected function calls:")
            for line in actual_vs_expected:
                print(line)
        if expected_vs_actual:
            print(f"  Expected function calls vs Actual function calls:")
            for line in expected_vs_actual:
                print(line)
        return True
    return False


# Print the differences for each row
print("### Function calls differences:")
print("====================================")
for row in data:
    if compare_function_calls(row):
        print("====================================")