In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import time
import glob
from collections import defaultdict
from evaluation_docstrings.llm.utils import load_secrets
from evaluation_docstrings.llm.config import costs
import matplotlib.pyplot as plt
import seaborn as sns
from evaluation_docstrings.evaluation.LLMEvals import LLMEvals
from evaluation_docstrings.evaluation.ProgrammaticEvals import ProgrammaticEvals
from evaluation_docstrings.task.utils import (
    read_code_into_dict,
    convert_code_dict_to_string,
    format_result_as_string,
)
from evaluation_docstrings.plotting.utils import generate_basic_analysis_cost_latency
from evaluation_docstrings.prompting.task_prompts import (
    DocStringPrompt,
    CodeImprovements,
)
from evaluation_docstrings.llm.structured_output_callers import (
    AnthropicStructuredOutputCaller,
    OpenAIStructuredOutputCaller,
    GeminiStructuredOutputCaller,
)
import warnings

warnings.filterwarnings("ignore")
sns.set_context("poster")

In [None]:
secrets = load_secrets()
gemini_caller = GeminiStructuredOutputCaller(
    api_key=secrets["GEMINI_API_KEY"], model="gemini-1.5-flash"
)
open_ai_caller = OpenAIStructuredOutputCaller(api_key=secrets["OPENAI_API_KEY"])
claude_caller = AnthropicStructuredOutputCaller(api_key=secrets["ANTHROPIC_API_KEY"])

In [None]:
def generation_loop(llm, codes, model_name=None):
    result = defaultdict(list)
    for i, code_example in enumerate(codes):
        print(f"Processing code example {i} : {code_example}")

        code_class = code_example.split("/")[-2]
        code_dict = read_code_into_dict(code_example)
        code_string = convert_code_dict_to_string(code_dict)

        code_string = f"""
        <input_code>
        {code_string}
        <input_code>
        """

        t1 = time.time()
        if model_name:
            res = llm.invoke(
                code_string,
                DocStringPrompt,
                CodeImprovements,
                temperature=0,
                input_model_name=model_name,
            )
        else:
            res = llm.invoke(
                code_string,
                DocStringPrompt,
                CodeImprovements,
                temperature=0,
            )

        model_output = res["model_dump"]
        t2 = time.time()

        is_suitable_code = model_output.get("suitable_code", False)
        if not is_suitable_code:
            print(f"{code_string} not found to be suitable for docstrings")

        tokens = llm.token_counter(res)
        input_tokens = tokens["input_tokens"]
        output_tokens = tokens["output_tokens"]

        result["code_id"].append(i)
        result["code_type"].append(code_class)
        result["input_code"].append(code_string)
        result["model_output"].append(model_output)
        result["input_tokens"].append(input_tokens)
        result["output_tokens"].append(output_tokens)
        result["suitable_code"].append(is_suitable_code)

        if is_suitable_code:
            result["number_of_docstrings"].append(
                len(model_output.get("docstrings", []))
            )
        else:
            result["number_of_docstrings"].append(0)

        if "summary" in model_output:
            result["summary"].append(model_output["summary"])
        else:
            result["summary"].append(None)
        result["process_time"].append(t2 - t1)

    result_df = pd.DataFrame(result)
    result_df["code_path"] = codes
    result_df["docstrings"] = result_df["model_output"].apply(format_result_as_string)
    return result_df

In [None]:
def evaluation_loop(llm, df, ground_truth=None, **kwargs):
    evaluation_result = defaultdict(list)

    for i, row in df.iterrows():
        code_path = row["code_path"]
        code_id = row["code_id"]
        input_code = row["input_code"]
        model_output = eval(str(row["model_output"]))

        # programmatic tests
        fields_correct = ProgrammaticEvals.check_correct_fields(model_output)
        word_count_correct = ProgrammaticEvals.check_explanation_word_count(
            model_output
        )
        line_numbers_match = ProgrammaticEvals.check_line_numbers_and_methods(
            model_output, input_code
        )

        # LLM tests
        t1 = time.time()
        evaluation_response, tokens = LLMEvals.docstring_quality(
            llm, input_code, model_output, **kwargs
        )
        t2 = time.time()
        model_output = evaluation_response["model_dump"]
        input_tokens = tokens["input_tokens"]
        output_tokens = tokens["output_tokens"]

        evaluation_result["code_id"].append(code_id)
        evaluation_result["code_path"].append(code_path)
        evaluation_result["code_type"].append(row["code_type"])
        evaluation_result["input_code"].append(row["input_code"])
        evaluation_result["docstrings"].append(row["docstrings"])
        evaluation_result["critique"].append(model_output["critique"])
        evaluation_result["fields_correct"].append(fields_correct)
        evaluation_result["word_count_correct"].append(word_count_correct)
        evaluation_result["line_numbers_correct"].append(line_numbers_match)
        evaluation_result["accuracy_score"].append(int(model_output["accuracy"]))
        evaluation_result["coverage_score"].append(int(model_output["coverage"]))
        evaluation_result["clarity_score"].append(int(model_output["clarity"]))
        evaluation_result["process_time"].append(t2 - t1)
        evaluation_result["input_tokens"].append(input_tokens)
        evaluation_result["output_tokens"].append(output_tokens)
        evaluation_df = pd.DataFrame(evaluation_result)

    # join on ground truth labels if we have them
    if isinstance(ground_truth, pd.DataFrame):
        evaluation_df = evaluation_df.merge(ground_truth, on="code_id")
    evaluation_df["code_type"] = evaluation_df["code_path"].apply(
        lambda x: x.split("/")[-2]
    )
    evaluation_df["code_name"] = evaluation_df["code_path"].apply(
        lambda x: x.split("/")[-1]
    )
    return evaluation_df

In [None]:
all_codes = glob.glob("../code_database/testing/leetcode/*")

In [None]:
gpt_4o_mini_result = generation_loop(
    open_ai_caller, all_codes, model_name="gpt-4o-mini"
)

In [None]:
gemini_flash_result = generation_loop(gemini_caller, all_codes)

In [None]:
gpt_4o_mini_result["total_cost"] = (
    gpt_4o_mini_result["input_tokens"] * costs["gpt-4o-mini"]["input"]
    + gpt_4o_mini_result["output_tokens"] * costs["gpt-4o-mini"]["output"]
) * 1e-6

In [None]:
gemini_flash_result["total_cost"] = (
    gemini_flash_result["input_tokens"] * costs["gemini-1.5-flash"]["input"]
    + gemini_flash_result["output_tokens"] * costs["gemini-1.5-flash"]["output"]
) * 1e-6

In [None]:
generate_basic_analysis_cost_latency(gemini_flash_result, "gemini-1.5-flash")
plt.savefig("task_cost_gemini-1.5-flash.png")

In [None]:
generate_basic_analysis_cost_latency(gpt_4o_mini_result, "gpt-4o-mini")
plt.savefig("task_cost_gpt_4o_mini.png")

In [None]:
gpt_4o_mini_result.to_csv(
    "evaluation_docstrings/datasets/gpt_4o_mini_test_result.csv", index=False
)
gemini_flash_result.to_csv(
    "evaluation_docstrings/datasets/gemini_flash_test_result.csv", index=False
)

In [None]:
gpt_4o_mini_eval_df = evaluation_loop(
    claude_caller,
    gpt_4o_mini_result,
    ground_truth=None,
    model_name="claude-3-5-sonnet-latest",
)
gpt_4o_mini_eval_df.to_csv(
    "evaluation_docstrings/datasets/gpt_4o_mini_task_evaluation_result.csv", index=False
)

In [None]:
gemini_eval_df = evaluation_loop(
    claude_caller,
    gemini_flash_result,
    ground_truth=None,
    model_name="claude-3-5-sonnet-latest",
)
gemini_eval_df.to_csv(
    "evaluation_docstrings/datasets/gemini-1.5_task_evaluation_result.csv", index=False
)

In [None]:
tmp = gpt_4o_mini_eval_df[
    [
        "fields_correct",
        "word_count_correct",
        "line_numbers_correct",
        "accuracy_score",
        "coverage_score",
        "clarity_score",
    ]
]
tmp["final_score"] = (
    (
        tmp[
            [
                "fields_correct",
                "word_count_correct",
                "line_numbers_correct",
                "accuracy_score",
                "coverage_score",
                "clarity_score",
            ]
        ]
        == 1
    )
    .all(axis=1)
    .astype(int)
)
totals = tmp.sum().sort_values()
totals = totals / len(gpt_4o_mini_eval_df)

# Create bar plot
plt.figure(figsize=(16, 10))
ax = totals.plot(kind="barh")
for i, v in enumerate(totals):
    ax.text(v, i, " {:.1f}%".format(v * 100), va="center")
plt.title("Percent Correct by Category: gpt 4o mini")
plt.xlabel("Categories")
plt.ylabel("Total Score")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
plt.savefig("score_by_category_gpt4o_mini.png")

In [None]:
tmp = gemini_eval_df[
    [
        "fields_correct",
        "word_count_correct",
        "line_numbers_correct",
        "accuracy_score",
        "coverage_score",
        "clarity_score",
    ]
]
tmp["final_score"] = (
    (
        tmp[
            [
                "fields_correct",
                "word_count_correct",
                "line_numbers_correct",
                "accuracy_score",
                "coverage_score",
                "clarity_score",
            ]
        ]
        == 1
    )
    .all(axis=1)
    .astype(int)
)
totals = tmp.sum().sort_values()
totals = totals / len(gemini_eval_df)

# Create bar plot
plt.figure(figsize=(16, 10))
ax = totals.plot(kind="barh")
for i, v in enumerate(totals):
    ax.text(v, i, " {:.1f}%".format(v * 100), va="center")
plt.title("Percent Correct by Category: Gemini 1.5 flash")
plt.ylabel("Category")
plt.xlabel("Percentage correct")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
plt.savefig("score_by_category_gemini_flash.png")