In [None]:
from jiwer import wer
from rouge_score import rouge_scorer
import re
from Levenshtein import distance
import pandas as pd
import cooklang
import json
import os
import glob
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as ticker
from tqdm import tqdm

# Set up the plot style
sns.set_style("whitegrid")
sns.set_palette("pastel")


def calculate_wer(true: str, pred: str) -> float:
    error = wer(true, pred)
    return error


def calculate_rogue_l(true: str, pred: str) -> float:
    scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=False)
    scores = scorer.score(true, pred)
    return scores["rougeL"].fmeasure


def cooklang_tokenizer(text: str) -> list:
    patterns = [
        (r"@([\w\s]+)(\{[^}]+\})?", "INGREDIENT"),  # Ingredients with optional quantity
        (r"#(\w+)", "COOKWARE"),  # Cookware (single word only)
        (r"~([\w\s]+)(\{[^}]+\})?", "TIMER"),  # Timer with optional duration
        (r"\[.*?\]", "METADATA"),  # Metadata in square brackets
        (r"\w+", "WORD"),  # Regular words
        (r"\s+", "WHITESPACE"),  # Whitespace
        (r"[.,!?]", "PUNCTUATION"),  # Basic punctuation
        (r".", "OTHER"),  # Catch-all for any other characters
    ]

    tokens = []
    pos = 0
    while pos < len(text):
        match = None
        for pattern, token_type in patterns:
            regex = re.compile(pattern)
            match = regex.match(text, pos)
            if match:
                value = match.group(0).strip()
                if token_type != "WHITESPACE" and value:
                    tokens.append((value, token_type))
                pos = match.end()
                break
        if not match:
            pos += 1

    return tokens


def calculate_ter(reference: str, hypothesis: str) -> float:
    """
    Calculate the Token Error Rate (TER) between a reference and hypothesis string.
    Returns TER, edit distance, and reference token count.
    """
    reference_tokens = cooklang_tokenizer(reference)
    hypothesis_tokens = cooklang_tokenizer(hypothesis)

    ref_token_strings = [value for value, type in reference_tokens]
    hyp_token_strings = [value for value, type in hypothesis_tokens]

    score = distance(" ".join(ref_token_strings), " ".join(hyp_token_strings))
    ter = score / len(reference_tokens) if reference_tokens else 1.0

    return ter


def get_ingredients_names(recipe: str):
    cooklang_recipe = cooklang.parseRecipe(recipe.replace("--", " "))
    ingredients = cooklang_recipe["ingredients"]
    ingredients = [f"{i['name']}".strip().replace(".000", "") for i in ingredients]
    return [i for i in ingredients]


def get_ingredients_units(recipe: str):
    cooklang_recipe = cooklang.parseRecipe(recipe.replace("--", " "))
    ingredients = cooklang_recipe["ingredients"]
    parsed = []
    for i in ingredients:
        if "units" not in i:
            i["units"] = ""
        parsed.append(f"{i['name']}|{i['units']}".strip().replace(".000", ""))
    return parsed


def get_ingredients_amounts(recipe: str):
    cooklang_recipe = cooklang.parseRecipe(recipe.replace("--", " "))
    ingredients = cooklang_recipe["ingredients"]
    parsed = []
    for i in ingredients:
        if "quantity" not in i:
            i["quantity"] = ""
        parsed.append(f"{i['name']}|{i['quantity']}".strip().replace(".000", ""))
    return parsed


def compare_arrays(true_arr, pred_arr):
    # Convert dictionaries to JSON strings for comparison
    true_set = set(json.dumps(d, sort_keys=True) for d in true_arr)
    pred_set = set(json.dumps(d, sort_keys=True) for d in pred_arr)

    # Check if all true values are in pred
    all_true_in_pred = true_set.issubset(pred_set)

    # Find bonus pred values
    bonus_pred = [json.loads(item) for item in pred_set - true_set]

    # Find missing ingredients
    missing_pred = [json.loads(item) for item in true_set - pred_set]

    # Calculate number of bonus and missing ingredients
    num_bonus = len(bonus_pred)
    num_missing = len(missing_pred)

    return 1 if all_true_in_pred else 0, num_bonus, num_missing


def eval_ingredients(true: str, pred: str) -> float:
    true_ingredients = get_ingredients_names(true)
    pred_ingredients = get_ingredients_names(pred)

    all_true_in_pred, num_bonus, num_missing = compare_arrays(
        true_ingredients, pred_ingredients
    )
    return all_true_in_pred, num_bonus, num_missing


def eval_units(true: str, pred: str) -> float:
    true_ingredients = get_ingredients_units(true)
    pred_ingredients = get_ingredients_units(pred)

    all_true_in_pred, num_bonus, num_missing = compare_arrays(
        true_ingredients, pred_ingredients
    )
    return all_true_in_pred, num_bonus, num_missing


def eval_amounts(true: str, pred: str) -> float:
    true_ingredients = get_ingredients_amounts(true)
    pred_ingredients = get_ingredients_amounts(pred)

    all_true_in_pred, num_bonus, num_missing = compare_arrays(
        true_ingredients, pred_ingredients
    )
    return all_true_in_pred, num_bonus, num_missing


metrics = [
    "WER",
    "ROUGE-L",
    "TER",
    "find_all_ings",
    "num_bonus_ings",
    "num_missing_ings",
    "find_all_units",
    "num_bonus_units",
    "num_missing_units",
    "find_all_amounts",
    "num_bonus_amounts",
    "num_missing_amounts",
    "failed_generations",
]

In [None]:
directory = "data/silver/"
directory = os.path.join(directory, "")

pattern = os.path.join(directory, "*.tsv")
files = glob.glob(pattern)
all_dfs = []
for file_path in tqdm(files):
    file_name = os.path.basename(file_path)
    file_name = file_name.replace(".json_predictions.tsv", "")
    prompt_techqnique = file_name.split("_")[0]
    model = file_name.split("_")[1]
    with_cooklang = True if "with_cooklang" in file_name else False
    without_ings = True if "without_ings" not in file_name else False

    df = pd.read_csv(file_path, sep="\t")
    df["cooklang"] = df["cooklang"].fillna("Cooking instructions not available")
    df["predict_cooklang"] = df["predict_cooklang"].fillna("Failed to generate")
    df["cooklang"] = df["cooklang"].apply(lambda x: x.strip())

    df["WER"] = df.apply(
        lambda row: calculate_wer(row["cooklang"], row["predict_cooklang"]), axis=1
    )
    df["ROUGE-L"] = df.apply(
        lambda row: calculate_rogue_l(row["cooklang"], row["predict_cooklang"]), axis=1
    )
    df["TER"] = df.apply(
        lambda row: calculate_ter(row["cooklang"], row["predict_cooklang"]), axis=1
    )

    df["find_all_ings"] = df.apply(
        lambda row: eval_ingredients(row["cooklang"], row["predict_cooklang"])[0],
        axis=1,
    )
    df["num_bonus_ings"] = df.apply(
        lambda row: eval_ingredients(row["cooklang"], row["predict_cooklang"])[1],
        axis=1,
    )
    df["num_missing_ings"] = df.apply(
        lambda row: eval_ingredients(row["cooklang"], row["predict_cooklang"])[2],
        axis=1,
    )

    df["find_all_units"] = df.apply(
        lambda row: eval_units(row["cooklang"], row["predict_cooklang"])[0], axis=1
    )
    df["num_bonus_units"] = df.apply(
        lambda row: eval_units(row["cooklang"], row["predict_cooklang"])[1], axis=1
    )
    df["num_missing_units"] = df.apply(
        lambda row: eval_units(row["cooklang"], row["predict_cooklang"])[2], axis=1
    )

    df["find_all_amounts"] = df.apply(
        lambda row: eval_amounts(row["cooklang"], row["predict_cooklang"])[0], axis=1
    )
    df["num_bonus_amounts"] = df.apply(
        lambda row: eval_amounts(row["cooklang"], row["predict_cooklang"])[1], axis=1
    )
    df["num_missing_amounts"] = df.apply(
        lambda row: eval_amounts(row["cooklang"], row["predict_cooklang"])[2], axis=1
    )

    df["failed_generations"] = df["predict_cooklang"].apply(
        lambda x: 1 if x == "Failed to generate" else 0
    )
    df["prompt_techqnique"] = prompt_techqnique
    df["model"] = model
    df["with_cooklang"] = with_cooklang
    df["without_ings"] = without_ings
    f = df.groupby(["prompt_techqnique", "model", "with_cooklang", "without_ings"])[
        [col for col in metrics if col != "failed_generations"]
    ].mean()

    # Calculate sum and count for failed_generations
    if "failed_generations" in metrics:
        failed_count = df.groupby(
            ["prompt_techqnique", "model", "with_cooklang", "without_ings"]
        )["failed_generations"].sum()

        f["failed_generations"] = failed_count

    all_dfs.append(f)
df = pd.concat(
    all_dfs
)  # .to_csv('data/silver/evaluated_metrics.tsv', sep='\t', index=False)
df = df.reset_index()

In [None]:
df["model"] = df["model"].replace({"llama3.1:70b": "llama3.3:70b"})
df

In [None]:
def plot_model_comparison(data, name_prefix="", metrics=["WER", "ROUGE-L", "TER"]):
    num_metrics = len(metrics)
    plt.figure(figsize=(6 * num_metrics, 6))  # Increased figure size

    plt.rcParams.update(
        {
            "font.size": 14,  # Increased base font size
            "axes.titlesize": 16,  # Increased title font size
            "axes.labelsize": 14,  # Increased axis label font size
            "xtick.labelsize": 12,  # Increased x-tick label font size
            "ytick.labelsize": 12,  # Increased y-tick label font size
        }
    )

    for i, metric in enumerate(metrics):
        plt.subplot(1, num_metrics, i + 1)
        sns.boxplot(x="model", y=metric, data=data)
        plt.title(f"{metric} by Model", fontweight="bold")
        plt.xlabel("Model", fontweight="bold")
        plt.ylabel(metric, fontweight="bold")
        plt.xticks(rotation=45, ha="right")

    plt.tight_layout(pad=2.0)  # Increased padding
    plt.savefig(
        f"plots/{name_prefix}model_comparison.png", dpi=300, bbox_inches="tight"
    )  # Increased DPI for better quality
    plt.show()
    plt.close()


def plot_prompt_technique_analysis(
    data, name_prefix="", metrics=["WER", "ROUGE-L", "TER"]
):
    plt.figure(figsize=(20, 15))
    plt.rcParams.update(
        {
            "font.size": 14,  # Increased base font size
            "axes.titlesize": 16,  # Increased title font size
            "axes.labelsize": 14,  # Increased axis label font size
            "xtick.labelsize": 12,  # Increased x-tick label font size
            "ytick.labelsize": 12,  # Increased y-tick label font size
        }
    )

    for i, metric in enumerate(metrics):
        plt.subplot(len(metrics), 1, i + 1)
        sns.barplot(x="model", y=metric, hue="prompt_techqnique", data=data)
        plt.title(f"{metric} by Model and Prompt Technique")
        plt.xlabel("Model")
        plt.ylabel(metric)
        plt.xticks(rotation=45)
        plt.legend(title="Prompt Technique", bbox_to_anchor=(1.05, 1), loc="upper left")

    plt.tight_layout()
    plt.savefig(
        f"plots/{name_prefix}prompt_technique_analysis.png", bbox_inches="tight"
    )
    plt.show()
    plt.close()


# 3. Impact of Cooklang and Ingredients
def plot_cooklang_ingredients_impact(
    data, name_prefix="", metrics=["WER", "ROUGE-L", "TER"]
):
    plt.figure(figsize=(15, 15))
    plt.rcParams.update(
        {
            "font.size": 14,  # Increased base font size
            "axes.titlesize": 16,  # Increased title font size
            "axes.labelsize": 14,  # Increased axis label font size
            "xtick.labelsize": 12,  # Increased x-tick label font size
            "ytick.labelsize": 12,  # Increased y-tick label font size
        }
    )

    for i, metric in enumerate(metrics):
        plt.subplot(len(metrics), 1, i + 1)
        sns.boxplot(x="model", y=metric, hue="with_cooklang", data=data)
        plt.title(f"{metric} by Model and Cooklang")
        plt.xticks(rotation=45)

    plt.tight_layout()
    plt.savefig(f"plots/{name_prefix}cooklang_impact.png")
    plt.show()
    plt.close()

    plt.figure(figsize=(15, 15))

    for i, metric in enumerate(metrics):
        plt.subplot(len(metrics), 1, i + 1)
        sns.boxplot(x="model", y=metric, hue="without_ings", data=data)
        plt.title(f"{metric} by Model and Ingredients")
        plt.xticks(rotation=45)

    plt.tight_layout()
    plt.savefig(f"plots/{name_prefix}ingredients_impact.png")
    plt.show()
    plt.close()

In [None]:
standard_metrics = ["WER", "ROUGE-L", "TER"]

data = (
    df.groupby(["model", "prompt_techqnique"])[standard_metrics]
    .mean()
    .round(4)
    .reset_index()
)
data.sort_values(
    by=["ROUGE-L"],
    ascending=[False]
    # inplace=True,
)

# Standard_metrics plots!

In [None]:
standard_metrics = ["WER", "ROUGE-L", "TER"]

data = (
    df.groupby(["with_cooklang", "without_ings", "model", "prompt_techqnique"])[
        standard_metrics
    ]
    .mean()
    .reset_index()
)
# data.sort_values(by="model").round(4).sort_values(by="ROUGE-L", ascending=False)
# print(data)
# sns.boxplot(x="model", y="WER", hue="with_cooklang", data=data)

In [None]:
plot_model_comparison(data, name_prefix="standard_")
plot_prompt_technique_analysis(data, name_prefix="standard_")
plot_cooklang_ingredients_impact(data, name_prefix="standard_")

In [None]:
domain_metrics = [
    "find_all_ings",
    "num_bonus_ings",
    "num_missing_ings",
    "find_all_units",
    "find_all_amounts",
]
data = df.groupby(["model", "prompt_techqnique"])[domain_metrics].mean().reset_index()
data.round(4)

In [None]:
plot_model_comparison(data, metrics=domain_metrics, name_prefix="domain_")
plot_prompt_technique_analysis(data, metrics=domain_metrics, name_prefix="domain_")
plot_cooklang_ingredients_impact(data, metrics=domain_metrics, name_prefix="domain_")

In [None]:
def create_model_comparison_plot(df, name_prefix=""):
    # Set up the matplotlib figure
    fig, ax = plt.subplots(figsize=(12, 8))

    # Create the bar plot
    sns.barplot(x="find_all_ings", y="model", data=df, orient="h", ax=ax)

    # Customize the plot
    ax.set_title(
        "Model Performance in Ingredient Identification", fontsize=22, fontweight="bold"
    )
    ax.set_xlabel("Score", fontsize=18)
    ax.set_ylabel("Model", fontsize=18)

    # Set x-axis range from 0 to 1 and add percentage labels
    ax.set_xlim(0, 1)
    ax.xaxis.set_major_formatter(ticker.PercentFormatter(xmax=1))

    # Customize grid
    ax.grid(axis="x", linestyle="--", alpha=0.7, color="gray")
    ax.tick_params(axis="both", which="major", labelsize=14)

    # Remove top and right spines
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)

    # Add value labels to the end of each bar
    for i, v in enumerate(df["find_all_ings"]):
        ax.text(v, i, f" {v:.2%}", va="center", fontsize=14, fontweight="bold")

    # Adjust layout and save with high DPI
    plt.tight_layout()
    plt.savefig(
        f"plots/{name_prefix}model_performance_in_ing_identification.png",
        dpi=300,
        bbox_inches="tight",
    )
    plt.show()
    plt.close()


# Assuming df is your DataFrame
data = df.groupby(["model"])[["find_all_ings"]].mean().reset_index()

# Sort the data by performance for better visualization
data = data.sort_values("find_all_ings", ascending=True)

create_model_comparison_plot(data)