In [None]:
"""Script to run benchmarks and analyze results."""

import re
from pathlib import Path
from typing import List
from datetime import datetime

import difflib
import pandas as pd

from benchmarks import benchmark_orchestrator
from benchmarks.benchmark_candidates import CANDIDATE_GENERATORS
from benchmarks.data_models import BenchmarkRunResult
from benchmarks.logger import JsonTraceLogger

# Set pandas display options
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_rows", None)

In [None]:
# ANSI escape codes for colors
class Bcolors:
  HEADER = "\033[95m"
  OKBLUE = "\033[94m"
  OKCYAN = "\033[96m"
  OKGREEN = "\033[92m"
  WARNING = "\033[93m"
  FAIL = "\033[91m"
  ENDC = "\033[0m"
  BOLD = "\033[1m"
  UNDERLINE = "\033[4m"

In [None]:
async def run_comparison(logger: JsonTraceLogger) -> List[BenchmarkRunResult]:
  """Sets up and runs the benchmark comparison."""
  print("Configuring benchmark run...")

  benchmark_suites = [
      "benchmarks/benchmark_definitions/api_understanding/benchmark.yaml",
      "benchmarks/benchmark_definitions/fix_errors/benchmark.yaml",
      "benchmarks/benchmark_definitions/diagnose_setup_errors_mc/benchmark.yaml",
      "benchmarks/benchmark_definitions/configure_adk_features_mc/benchmark.yaml",
      "benchmarks/benchmark_definitions/predict_runtime_behavior_mc/benchmark.yaml",
  ]

  answer_generators = CANDIDATE_GENERATORS

  print("Executing benchmarks...")
  benchmark_results = await benchmark_orchestrator.run_benchmarks(
      benchmark_suites=benchmark_suites,
      answer_generators=answer_generators,
      max_concurrency=20,
      logger=logger,
  )

  return benchmark_results


def extract_error_type(row) -> str:
  """Extracts error type from the result row."""
  if "error_type" in row and pd.notna(row["error_type"]):
    # If it's an Enum object (from pydantic validation), get its value
    et = row["error_type"]
    if hasattr(et, "value"):
      return et.value
    return str(et)
  return "OtherError"

In [None]:
def process_results(
    benchmark_run_results: List[BenchmarkRunResult],
) -> pd.DataFrame:
  """Converts results to a DataFrame and adds derived columns."""
  raw_results_df = pd.DataFrame([r.model_dump() for r in benchmark_run_results])

  if not raw_results_df.empty:
    raw_results_df["suite"] = raw_results_df["suite"].apply(
        lambda x: x.split("/")[-2]
    )
    raw_results_df["final_error_type"] = raw_results_df.apply(
        extract_error_type, axis=1
    )
  return raw_results_df

In [None]:
def print_summary(raw_results_df: pd.DataFrame):
  """Prints a high-level summary of pass rates."""
  if raw_results_df.empty:
    print("No results to summarize.")
    return

  # 1. General Pass/Total Summary
  summary_df = raw_results_df.groupby(["answer_generator", "suite"]).agg(
      passed=("result", "sum"),
      total=("result", "count"),
  )
  summary_df["pass_rate"] = summary_df["passed"] / summary_df["total"]

  print(f"{Bcolors.HEADER}--- Benchmark Summary ---\n{Bcolors.ENDC}")
  print(summary_df)
  print("\n")

In [None]:
def print_metrics(raw_results_df: pd.DataFrame):
  """Prints performance and cost metrics."""
  if raw_results_df.empty:
    return

  # Extract usage metrics if available
  df = raw_results_df.copy()

  def get_meta(row, key):
    if isinstance(row.get("usage_metadata"), dict):
      return row["usage_metadata"].get(key, 0)
    return 0

  df["tokens"] = df.apply(lambda r: get_meta(r, "total_tokens"), axis=1)
  df["cost"] = df.apply(lambda r: get_meta(r, "cost"), axis=1)

  # 1. Per Answer Generator Breakdown
  gen_metrics = df.groupby("answer_generator").agg(
      avg_latency=("latency", "mean"),
      avg_tokens=("tokens", "mean"),
      total_cost=("cost", "sum"),
      pass_rate=("result", "mean"),
      count=("result", "count"),
  )
  gen_metrics = gen_metrics.rename(columns={"avg_latency": "avg_latency (s)"})

  print(f"{Bcolors.HEADER}--- Metrics by Answer Generator ---\n{Bcolors.ENDC}")
  print(gen_metrics.round(4))
  print("\n")

  # 2. Detailed Breakdown by Generator and Suite
  detailed_metrics = df.groupby(["answer_generator", "suite"]).agg(
      avg_latency=("latency", "mean"),
      avg_tokens=("tokens", "mean"),
      total_cost=("cost", "sum"),
      pass_rate=("result", "mean"),
      count=("result", "count"),
  )
  detailed_metrics = detailed_metrics.rename(
      columns={"avg_latency": "avg_latency (s)"}
  )

  print(f"{Bcolors.HEADER}--- Metrics by Generator & Suite ---\n{Bcolors.ENDC}")
  print(detailed_metrics.round(4))
  print("\n")

In [None]:
def print_time_profiling(raw_results_df: pd.DataFrame):
  """Analyzes latency and execution time to identify bottlenecks."""
  if raw_results_df.empty:
    return

  print(f"{Bcolors.HEADER}--- Time Profiling Analysis ---\n{Bcolors.ENDC}")

  # 1. Latency Statistics
  latency_stats = raw_results_df.groupby("answer_generator")[
      "latency"
  ].describe(percentiles=[0.5, 0.75, 0.90, 0.95])[
      ["mean", "min", "50%", "90%", "95%", "max"]
  ]
  print("Latency Statistics (seconds):")
  print(latency_stats.round(2))
  print("\n")

  # 2. Slowest Benchmarks
  print("Top 5 Slowest Benchmarks:")
  slowest = raw_results_df.nlargest(5, "latency")[
      ["answer_generator", "suite", "benchmark_name", "latency", "result"]
  ]
  print(slowest.to_string(index=False))
  print("\n")

  # 3. Complexity Analysis (Trace Logs)
  # Count model calls and tool calls if trace logs exist
  def count_trace_events(logs):
    if not isinstance(logs, list):
      return 0, 0
    model_calls = sum(
        1 for e in logs if e.get("type") in ["model_response", "model_call"]
    )
    tool_calls = sum(
        1 for e in logs if e.get("type") in ["tool_code", "tool_execution"]
    )
    return model_calls, tool_calls

  # Check if trace_logs column exists and has data
  if "trace_logs" in raw_results_df.columns:
    # Use apply with result_type='expand' to get two columns
    counts = raw_results_df["trace_logs"].apply(
        lambda x: count_trace_events(x if isinstance(x, list) else [])
    )
    # Manually assign columns to avoid inconsistencies
    raw_results_df["num_model_calls"] = counts.apply(lambda x: x[0])
    raw_results_df["num_tool_calls"] = counts.apply(lambda x: x[1])

    # Average calls per generator
    call_stats = raw_results_df.groupby("answer_generator")[
        ["num_model_calls", "num_tool_calls", "latency"]
    ].mean()
    print("Average Complexity (Calls) vs Latency:")
    print(call_stats.round(2))
    print("\n")

In [None]:
def print_detailed_breakdown(raw_results_df: pd.DataFrame):
  """Prints detailed error breakdown and Gemini CLI failures."""
  if raw_results_df.empty:
    return

  # Filter for failures only
  failed_df = raw_results_df[raw_results_df["result"] == 0]

  if not failed_df.empty:
    # Calculate counts per error type
    error_counts = (
        failed_df.groupby(["answer_generator", "suite", "final_error_type"])
        .size()
        .reset_index(name="count")
    )

    # Merge with total counts to calculate ratios relative to total runs
    # First, get total counts per generator/suite group
    total_counts = (
        raw_results_df.groupby(["answer_generator", "suite"])
        .size()
        .reset_index(name="total_runs")
    )

    # Merge error counts with totals
    error_summary = pd.merge(
        error_counts, total_counts, on=["answer_generator", "suite"]
    )

    # Calculate failure rate for each specific error type
    error_summary["failure_ratio"] = (
        error_summary["count"] / error_summary["total_runs"]
    )

    print(f"{Bcolors.HEADER}--- Detailed Error Breakdown ---\n{Bcolors.ENDC}")
    # Sort for better readability
    error_summary = error_summary.sort_values(
        ["answer_generator", "suite", "count"], ascending=[True, True, False]
    )
    print(error_summary.to_string(index=False))

    # --- DETAILED DEBUG FOR GEMINI CLI FAILURES ---
    print(
        f"\n{Bcolors.FAIL}--- DETAILED GEMINI CLI FAILURES ---\n{Bcolors.ENDC}"
    )
    cli_failures = failed_df[
        failed_df["answer_generator"].str.contains("GeminiCliAnswerGenerator")
    ]
    if not cli_failures.empty:
      # Print just the first 3 failures to avoid overwhelming output
      for _, failure_row in cli_failures.head(3).iterrows():
        print(
            f"\nBenchmark: {failure_row['benchmark_name']} (Suite:"
            f" {failure_row['suite']})"
        )
        print(f"Error Type: {failure_row['final_error_type']}")
        print(f"Full Validation Error:\n{failure_row['validation_error']}")
        print("-" * 60)
    else:
      print("No Gemini CLI failures found in this run.")
    # -----------------------------------------------
  else:
    print(f"{Bcolors.OKGREEN}No failures detected!{Bcolors.ENDC}")


def strip_ansi(text: str) -> str:
  """Strips ANSI escape codes from the text."""
  ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
  return ansi_escape.sub("", text)

In [None]:
def generate_detailed_reports(raw_results_df: pd.DataFrame, output_dir: Path):
  """Generates detailed Markdown reports for each answer generator."""
  if raw_results_df.empty:
    return

  print(f"\n{Bcolors.HEADER}--- Generating Detailed Reports ---{Bcolors.ENDC}")
  print(f"Output Directory: {output_dir.absolute()}")

  for generator, group in raw_results_df.groupby("answer_generator"):
    report_lines = [f"# Benchmark Report: {generator}", ""]

    # Summary
    total = len(group)
    passed = len(group[group["result"] == 1])
    pass_rate = (passed / total) * 100 if total > 0 else 0

    report_lines.extend([
        "## Summary",
        f"- **Total Cases:** {total}",
        f"- **Passed:** {passed}",
        f"- **Pass Rate:** {pass_rate:.2f}%",
        "",
        "## Details",
        "",
    ])

    for _, row in group.iterrows():
      status_icon = "✅" if row["result"] == 1 else "❌"
      report_lines.extend([
          f"### {status_icon} {row['benchmark_name']}",
          f"- **Suite:** {row['suite']}",
          f"- **Status:** {row['status']}",
          f"- **Error Type:** {row['final_error_type']}",
          "",
      ])

      if row["validation_error"]:
        cleaned_error = strip_ansi(str(row["validation_error"]))
        report_lines.extend(
            ["**Validation Error:**", "```", cleaned_error, "```", ""]
        )

      if row["rationale"]:
        cleaned_rationale = strip_ansi(str(row["rationale"]))
        report_lines.extend(["**Rationale:**", cleaned_rationale, ""])

        report_lines.extend([
            "**Generated Answer:**",
            "```python",
            str(row["answer"]),
            "```",
        ])

        # Add diff for fix_errors benchmarks
        if row["benchmark_type"] == "fix_error":
          fixed_content = row.get("ground_truth", "") or ""

          generated_content = str(row["answer"])
          diff = difflib.unified_diff(
              [l.rstrip() for l in fixed_content.splitlines(keepends=True)],
              [l.rstrip() for l in generated_content.splitlines(keepends=True)],
              fromfile="expected/fixed.py",
              tofile="generated/answer.py",
          )

          diff_text = "\n".join(diff)

          if diff_text:
            report_lines.extend([
                "**Diff (Expected vs. Generated):**",
                "```diff",
                diff_text,
                "```",
            ])
          else:
            report_lines.extend([
                "**Diff (Expected vs. Generated):**",
                "```",
                "No differences.",
                "```",
            ])

        report_lines.extend(["---", ""])

    # Sanitize filename
    safe_name = "".join(
        c if c.isalnum() or c in "._- " else "_" for c in generator
    )
    file_path = output_dir / f"{safe_name}_report.md"
    with open(file_path, "w", encoding="utf-8-sig") as f:
      f.write("\n".join(report_lines))
      print(f"  - Report saved: {file_path}")

In [None]:
# Setup unified output directory
current_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
run_output_dir = Path("benchmark_runs") / current_timestamp
run_output_dir.mkdir(parents=True, exist_ok=True)

# Initialize logger
logger = JsonTraceLogger(output_dir=str(run_output_dir), filename="trace.jsonl")

# Execute the benchmarks
benchmark_run_results = await run_comparison(logger=logger)

In [None]:
raw_results_df = process_results(benchmark_run_results)

In [None]:
if not raw_results_df.empty:
  print_summary(raw_results_df)
  print_metrics(raw_results_df)
  print_time_profiling(raw_results_df)
  print_detailed_breakdown(raw_results_df)
  generate_detailed_reports(raw_results_df, output_dir=run_output_dir)
else:
  print("No results returned.")