In [None]:
# ===================================================================
# 1_run_benchmark.ipynb
#
# This notebook orchestrates the entire LLM benchmarking process. 
# It loads the data and configuration, iterates through each model
# and log entry, calls the respective LLM APIs, validates the
# responses, and saves the results for analysis.
# ===================================================================

In [None]:
# === 1. SETUP AND IMPORTS ===
import yaml
import pandas as pd
import getpass
import os
import time
import logging
import json
from pathlib import Path
from tqdm.notebook import tqdm
from collections import defaultdict

# --- Custom Modules ---
from llm_clients.openai_client import OpenAIClient
from llm_clients.google_client import GoogleClient
from llm_clients.ollama_client import OllamaClient
from llm_clients.base_client import ModelNotFoundError
from utils.data_schemas import LLMOutput
from utils.logging_config import setup_logging

print("INFO: Imports loaded successfully.")

In [None]:
# === 2. RUN CONFIGURATION ===
# Set to an integer (e.g., 50) for a test run on the first n rows.
# Set to None to process the entire dataset.
ROWS_TO_PROCESS = 50

CONFIG_FILE = "configurations/config.yaml"
PROMPTS_FILE = "prompts.json" 

print("INFO: The configuration file and prompts have been successfully loaded.")

In [None]:
# === 3. HELPER FUNCTIONS ===

def load_config(config_path="config.yaml"):
    """Loads the YAML configuration file."""
    with open(config_path, "r") as f:
        return yaml.safe_load(f)

def get_api_key(key_name: str, config: dict) -> str:
    """Retrieves an API key using a 3-step check."""
    env_var = key_name.upper()
    api_key = os.getenv(env_var)
    if api_key:
        logging.info(f"Found API key in environment variable '{env_var}'.")
        return api_key
    
    api_key = config.get(key_name)
    if api_key:
        logging.info(f"Found API key in config.yaml.")
        return api_key

    logging.warning(f"API key for '{key_name}' not found as an environment variable or in the config file. "
        "Waiting for manual entry...")
    api_key = getpass.getpass(f"Please enter your {key_name}: ")
    return api_key

def build_lookup_map(prompts_config: dict) -> dict:
    """
    Builds a fast lookup map from component code to a list of valid labels,
    supporting both 'include' (list) and 'exclude' (dict) rules given in the prompts JSON.
    """
    logging.info("Building component-to-label lookup map with include/exclude logic...")
    
    all_codes = set()
    label_groups = [prompts_config["maintenance_types"], prompts_config["issue_categories"]]

    # --- 1. First, gather all unique component codes from the entire file ---
    for group in label_groups:
        for value in group.values():
            if isinstance(value, list) and "ANY" not in value:
                all_codes.update(value)

    # --- 2. Build the map using the new rules ---
    lookup_map = defaultdict(lambda: {"maintenance": [], "issue": []})

    # --- 3. Process Maintenance Types ---
    for maint_type, value in prompts_config["maintenance_types"].items():
        if isinstance(value, list): # The rule for including whatever component codes are listed
            if "ANY" in value:
                for code in all_codes:
                    lookup_map[code]["maintenance"].append(maint_type)
            else:
                for code in value:
                    lookup_map[code]["maintenance"].append(maint_type)
        elif isinstance(value, dict) and "exclude_codes" in value: # The rule for excluding whatever component codes are in a dictionary
            codes_to_exclude = set(value["exclude_codes"])
            codes_to_include = all_codes - codes_to_exclude
            for code in codes_to_include:
                lookup_map[code]["maintenance"].append(maint_type)

    # --- 4. Process Issue Categories ---
    for issue_cat, value in prompts_config["issue_categories"].items():
        if isinstance(value, list): # rule for the included component codes
            if "ANY" in value:
                for code in all_codes:
                    lookup_map[code]["issue"].append(issue_cat)
            else:
                for code in value:
                    lookup_map[code]["issue"].append(issue_cat)
        elif isinstance(value, dict) and "exclude_codes" in value: # rule for the excluded component codes
            codes_to_exclude = set(value["exclude_codes"])
            codes_to_include = all_codes - codes_to_exclude
            for code in codes_to_include:
                lookup_map[code]["issue"].append(issue_cat)

    logging.info("Lookup map built successfully.")
    return dict(lookup_map)

def update_cost_log(model_config: dict, run_prompt_tokens: int, run_completion_tokens: int, cost_log_dir: str):
    """
    Calculates the cost of the current run and updates a persistent
    JSON log file with cumulative token and cost data for a model.
    """
    model_name = model_config["model_name"]
    logging.info(f"Updating cost log for {model_name}...")
    
    # --- 1. Calculate cost for the current run ---
    # 'individual' pricing is used for now, as batching is not implemented
    pricing = model_config.get("pricing", {}).get("individual", {})
    price_input = pricing.get("input", 0.0)
    price_output = pricing.get("output", 0.0)
    
    run_cost = ((run_prompt_tokens / 1_000_000) * price_input) + \
               ((run_completion_tokens / 1_000_000) * price_output)

    # --- 2. Read existing cumulative data ---
    cost_log_dir_path = Path(cost_log_dir)
    cost_log_dir_path.mkdir(exist_ok=True)
    log_filename = f"{model_name.replace(':', '_').replace('/', '_')}_cost_log.json"
    log_filepath = cost_log_dir_path / log_filename
    
    cumulative_data = {
        "cumulative_prompt_tokens": 0,
        "cumulative_completion_tokens": 0,
        "cumulative_cost_usd": 0.0
    }
    
    if log_filepath.exists():
        with open(log_filepath, "r") as f:
            try:
                cumulative_data = json.load(f)
            except json.JSONDecodeError:
                logging.warning(f"Could not read cost log file for {model_name}. Starting from zero.")

    # --- 3. Update cumulative totals ---
    cumulative_data["cumulative_prompt_tokens"] += run_prompt_tokens
    cumulative_data["cumulative_completion_tokens"] += run_completion_tokens
    cumulative_data["cumulative_cost_usd"] += run_cost
    cumulative_data["model_name"] = model_name
    cumulative_data["last_updated"] = time.strftime("%Y-%m-%d %H:%M:%S")

    # --- 4. Write new data back to the file ---
    with open(log_filepath, "w") as f:
        json.dump(cumulative_data, f, indent=4)

    logging.info(f"Cost for this run ({model_name}): ${run_cost:.6f}")
    logging.info(f"New cumulative cost for {model_name}: ${cumulative_data['cumulative_cost_usd']:.6f}")

In [None]:
# === 4. INITIALISATION ===

# --- Load Config ---
try:
    config = load_config(CONFIG_FILE)
    print(f"INFO: Configuration loaded from '{CONFIG_FILE}'.")
except FileNotFoundError:
    print(f"ERROR: Configuration file '{CONFIG_FILE}' not found. Aborting.")
    raise

# --- Set up dynamic paths for the current run ---
# 1. Create a unique ID for this run using a timestamp.
run_id = f"run_{time.strftime('%Y%m%d_%H%M%S')}"
print(f"INFO: Initialising Run ID: {run_id}")

# 2. Define run-specific output directories.
run_dir = Path("outputs") / "runs" / run_id
individual_dir_run = run_dir / "individual_results"
final_dir_run = run_dir 
log_dir_run = run_dir

# 3. Create these new directories.
individual_dir_run.mkdir(parents=True, exist_ok=True)

# 4. Update the config dictionary in memory to use these new paths.
config['file_paths']['log_dir'] = str(log_dir_run)
config['file_paths']['individual_results_dir'] = str(individual_dir_run)
config['file_paths']['final_results_dir'] = str(final_dir_run)

# --- Set up logging to go into the new run-specific directory ---
setup_logging(config["file_paths"]["log_dir"])

# --- Suppress verbose logs from external libraries ---
# 1. This silences the low-level HTTP request logs.
logging.getLogger("httpx").setLevel(logging.WARNING)
# 2. This silences the main OpenAI library's info logs.
logging.getLogger("openai").setLevel(logging.WARNING)

# --- Load Prompts and Build Lookup Map ---
try:
    with open(PROMPTS_FILE, "r") as f:
        prompts_config = json.load(f)
    component_label_map = build_lookup_map(prompts_config)
    prompt_template = prompts_config["prompt_template"]
    logging.info(f"Prompts loaded and processed from '{PROMPTS_FILE}'.")
except FileNotFoundError:
    logging.error(f"Prompts file '{PROMPTS_FILE}' not found. Aborting.")
    raise
    
# --- Load and Slice Input Data ---
try:
    input_df_full = pd.read_csv(config["file_paths"]["input_data"])
    if ROWS_TO_PROCESS:
        input_df = input_df_full.head(ROWS_TO_PROCESS).copy()
        logging.info(f"Loaded and sliced input data: Processing first {ROWS_TO_PROCESS} rows.")
    else:
        input_df = input_df_full.copy()
        logging.info(f"Loaded input data: Processing all {len(input_df)} rows.")
except FileNotFoundError:
    logging.error(f"Input data file not found at {config['file_paths']['input_data']}. Aborting.")
    raise

# --- Initialise API Keys & Clients ---
api_keys_cache = {}
clients = {"openai": OpenAIClient, "google": GoogleClient, "ollama": OllamaClient}

In [None]:
# === 5. MAIN BENCHMARKING LOOP ===

logging.info("Starting the main benchmarking process...")
Path(config["file_paths"]["individual_results_dir"]).mkdir(exist_ok=True)

for model_config in config["models_to_benchmark"]:
    model_name = model_config["model_name"]
    client_type = model_config["client"]
    logging.info(f"===== Processing Model: {model_name} =====")
    
    # --- Initialise Client ---
    client_class = clients.get(client_type)
    if not client_class:
        logging.warning(f"No client found for type '{client_type}'. Skipping model {model_name}.")
        continue

    try:
        if client_type == "ollama":
            client = client_class(model_config, config["api_settings"]["ollama_endpoint"])
        else:
            # --- API key retrieval ---
            # Check if we've already fetched this key during this run
            if client_type not in api_keys_cache:
                logging.info(f"API key for '{client_type}' not yet loaded. Retrieving now...")
                key_name = f"{client_type}_api_key"
                # Fetch the key and store it in our cache for this run
                api_keys_cache[client_type] = get_api_key(key_name, config["api_settings"])
            
            # Use the key from the cache to initialise the client
            api_key = api_keys_cache.get(client_type)
            if not api_key:
                logging.error(f"Failed to retrieve API key for {client_type}. Skipping model {model_name}.")
                continue
            client = client_class(model_config, api_key)
    
    except ModelNotFoundError as e:
        # This will catch the error from the Ollama client's pre-flight check
        logging.warning(f"Skipping model '{model_name}': {e}")
        continue

    # --- Loop through DataFrame ---
    model_results = []
    start_time = time.time()
    for index, row in tqdm(input_df.iterrows(), total=len(input_df), desc=f"Model: {model_name}"):
        # --- a. Extract and Prepare Data ---
        wonum, component_name, component_code = row.get("WONUM"), row.get("Component Name"), row.get("Component Code")
        description = row.get("Description_EN") if model_config.get("requires_translation") else row.get("Description")
        observations = row.get("Observations_EN") if model_config.get("requires_translation") else row.get("Observations")
        observations = "" if pd.isna(observations) else observations

        # --- b. Construct Dynamic Prompt ---
        valid_labels = component_label_map.get(component_code, {})
        
        # Fallback to an empty list if a component has no specific labels
        maint_types_list = valid_labels.get("maintenance", [])
        issue_cats_list = valid_labels.get("issue", [])
        
        # Ensure lists are not empty; if so, could use all labels as a fallback (optional)
        if not maint_types_list: logging.warning(f"WONUM {wonum}: No specific maintenance types found for {component_code}.")
        if not issue_cats_list: logging.warning(f"WONUM {wonum}: No specific issue categories found for {component_code}.")

        formatted_maint_types = "\n".join([f"- {t}" for t in sorted(maint_types_list)])
        formatted_issue_cats = "\n".join([f"- {c}" for c in sorted(issue_cats_list)])

        prompt = prompt_template.format(
            component_name=component_name, description=description, observations=observations,
            maintenance_types=formatted_maint_types, issue_categories=formatted_issue_cats
        )

        # --- c. Get Completion ---
        client_response = client.get_completion(prompt)
        
        # --- d. Parse and Validate ---
        result_record = {
            "WONUM": wonum,
            "Model Name": model_name,
            "Prompt Tokens": client_response.prompt_tokens,
            "Completion Tokens": client_response.completion_tokens,
            "Sleep Duration": client_response.sleep_duration
        }
        if client_response.error:
            logging.error(f"WONUM {wonum}: Client error for {model_name}: {client_response.error}")
            result_record.update({"Maintenance Type": "CLIENT_ERROR", "Issue Category": "CLIENT_ERROR", "Specific Problem": client_response.error, "Certainty Level": "Unknown"})
        else:
            try:
                validated_output = LLMOutput.model_validate_json(client_response.content)
                result_record.update(validated_output.model_dump(by_alias=True))
            except Exception as e:
                logging.warning(f"WONUM {wonum}: Pydantic validation failed for {model_name}. Error: {e}")
                result_record.update({"Maintenance Type": "FORMATTING_ERROR", "Issue Category": "FORMATTING_ERROR", "Specific Problem": str(e), "Certainty Level": "Unknown"})
        
        model_results.append(result_record)

    # --- e. Log Performance and Save Results ---
    end_time = time.time()
    total_time = end_time - start_time
    
    # --- 'Actual Performance' Calculation for Google models ---
    # Sum the sleep durations for this model's run
    total_sleep_time = sum(res.get("Sleep Duration", 0.0) for res in model_results)
    actual_processing_time = total_time - total_sleep_time
    
    # Calculate performance based on actual processing time
    logs_per_second_actual = len(input_df) / actual_processing_time if actual_processing_time > 0 else 0

    logging.info(f"===== Finished Model: {model_name} =====")
    logging.info(f"Total Wall-Clock Time: {total_time:.2f}s (includes sleep).")
    if client_type == 'google':
        logging.info(f"Total Time Slept: {total_sleep_time:.2f}s.")
        logging.info(f"Actual Processing Time: {actual_processing_time:.2f}s.")
        logging.info(f"Actual Performance: {logs_per_second_actual:.2f} logs/second.")
    else:
        logging.info(f"Performance: {logs_per_second_actual:.2f} logs/second.")
    
    # --- Update Performance Log CSV ---
    try:
        performance_log_path = Path("outputs/performance_log.csv")
        performance_data = {
            "Timestamp": [time.strftime("%Y-%m-%d %H:%M:%S")], "Model": [model_name],
            "Logs Processed": [len(input_df)], "Total Time (s)": [round(total_time, 2)],
            "Actual Time (s)": [round(actual_processing_time, 2)], 
            "Performance (logs/s)": [round(logs_per_second_actual, 2)] 
        }
        perf_df = pd.DataFrame(performance_data)
        write_header = not performance_log_path.exists()
        
        # If the file exists and our new column isn't there, we should rewrite it
        if not write_header:
            existing_df = pd.read_csv(performance_log_path)
            if "Actual Time (s)" not in existing_df.columns:
                write_header = True # Force rewrite with new header

        perf_df.to_csv(performance_log_path, mode='w' if write_header else 'a', header=write_header, index=False)
        logging.info(f"Performance metrics for {model_name} saved to {performance_log_path}")
    
    except Exception as e:
        logging.error(f"Failed to write to performance log: {e}")

    # --- Save Individual Model Results ---
    # We also need to add the sleep duration to the individual results
    for i, res in enumerate(model_results):
        input_df.loc[input_df.index[i], f"{model_name.replace(':', '_')}_Sleep_Duration"] = res.get("Sleep Duration", 0.0)
    
    # Rebuild results_df from our list of dicts which is cleaner
    final_model_results = []
    for res in model_results:
        # We need to add the WONUM to each dict before creating the DataFrame
        # This assumes your original `model_results` loop is creating dicts with WONUM
        final_model_results.append(res)
        
    results_df = pd.DataFrame(final_model_results)
    
    # Rename columns before saving
    model_name_suffix = model_name.replace(':', '_').replace('/', '_')
    results_df = results_df.rename(columns={
        "Maintenance Type": f"{model_name_suffix}_Maint_Type",
        "Issue Category": f"{model_name_suffix}_Issue_Cat",
        "Specific Problem": f"{model_name_suffix}_Spec_Problem",
        "Certainty Level": f"{model_name_suffix}_Certainty",
        "Prompt Tokens": f"{model_name_suffix}_Prompt_Tokens",
        "Completion Tokens": f"{model_name_suffix}_Comp_Tokens",
        "Sleep Duration": f"{model_name_suffix}_Sleep_Duration"
    }).drop(columns=["Model Name"])

    output_filename = f"{model_name_suffix}_results.csv"
    output_path = Path(config["file_paths"]["individual_results_dir"]) / output_filename
    results_df.to_csv(output_path, index=False)
    logging.info(f"Results for {model_name} saved to {output_path}")

    # --- f. Update Cost Log (if applicable) ---
    if client_type in ['openai', 'google']:
        # Sum the tokens used in this specific run and convert to standard Python integers
        total_prompt_tokens_run = int(results_df[f"{model_name_suffix}_Prompt_Tokens"].sum())
        total_completion_tokens_run = int(results_df[f"{model_name_suffix}_Comp_Tokens"].sum())
        
        # Call the helper function to update the persistent log
        update_cost_log(
            model_config=model_config,
            run_prompt_tokens=total_prompt_tokens_run,
            run_completion_tokens=total_completion_tokens_run,
            cost_log_dir="outputs/cost_tracking"
        )

In [None]:
# === 6. FINAL COMPILATION ===
# This step combines all individual results into a single master file for easier analysis.
logging.info("Compiling all individual results into a master file...")

individual_dir = Path(config["file_paths"]["individual_results_dir"])
all_dfs = []

# Check if the directory exists and has files, to prevent errors on a clean run
if individual_dir.exists():
    for csv_file in individual_dir.glob("*_results.csv"):
        df = pd.read_csv(csv_file)
        all_dfs.append(df)

if all_dfs:
    # Merge the individual results with the original input data on WONUM
    master_df = input_df.copy()
    
    # Reload the individual CSVs to merge them
    for csv_file in individual_dir.glob("*_results.csv"):
        res_df = pd.read_csv(csv_file)
        
        # --- Get the model name from the filename ---
        # e.g., "gpt-5_results.csv" -> "gpt-5_results" -> "gpt-5"
        model_name_suffix = csv_file.stem.replace('_results', '')
        
        # Rename the columns from the loaded file
        res_df = res_df.rename(columns={
            "Maintenance Type": f"{model_name_suffix}_Maint_Type",
            "Issue Category": f"{model_name_suffix}_Issue_Cat",
            "Specific Problem": f"{model_name_suffix}_Spec_Problem",
            "Certainty Level": f"{model_name_suffix}_Certainty",
            "Prompt Tokens": f"{model_name_suffix}_Prompt_Tokens",
            "Completion Tokens": f"{model_name_suffix}_Comp_Tokens",
            "Sleep Duration": f"{model_name_suffix}_Sleep_Duration"
        })
        
        # Merge this model's data into the master DataFrame
        master_df = pd.merge(master_df, res_df, on="WONUM", how="left")

    Path(config["file_paths"]["final_results_dir"]).mkdir(exist_ok=True)
    master_output_path = Path(config["file_paths"]["final_results_dir"]) / "Master_Benchmark_Results.csv"
    master_df.to_csv(master_output_path, index=False)
    logging.info(f"Master results file saved to {master_output_path}")
else:
    logging.warning("No individual result files found to compile.")

logging.info("Benchmark run complete.")