# Compare gold taxonomies to completed and generated

In [None]:
import random
from copy import deepcopy
from pathlib import Path
import networkx as nx
import numpy as np
import pandas as pd
import itertools

from sympy.stats.rv import probability
from tqdm.auto import tqdm
from transformers import pipeline

from llm_food_taxonomy.evaluation import WuPSimilarity, ParentMetric
from llm_food_taxonomy.graph.taxonomy import Taxonomy
from llm_food_taxonomy.data.loader import load_taxonomy, load_completion, load_subgraph_taxonomy

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', None)
pd.set_option('display.float_format', '{:,.4f}'.format)

In [None]:
trial = False
mode = "test"
dataset = "semeval_food"
data_path = Path(f"../data/{dataset}")
results_path = Path(f"../output/{dataset}")

In [None]:
terms, taxo = load_taxonomy(str(data_path), with_split=False)
id_to_name = {d["node_id"]: d["node_name"] for d in terms.to_dict(orient="records")}
taxo = taxo.apply(lambda x: x.map(id_to_name))
id_to_name = dict(zip(id_to_name.values(), id_to_name.values()))
id_to_desc = {d["node_name"]: d["desc"] for d in terms.to_dict(orient="records")}

In [None]:
terms.head()

In [None]:
uterms, utaxo = load_taxonomy(str(Path(f"../data/") / f"unsupervised_{dataset}"), with_split=False)
uid_to_name = {d["node_id"]: d["node_name"] for d in uterms.to_dict(orient="records")}
utaxo = utaxo.apply(lambda x: x.map(uid_to_name))
uedges = utaxo[['hypernym', 'hyponym']].values.tolist()
uedges = [tuple(e) for e in uedges]
utax = Taxonomy(uedges, id_to_name=dict(zip(uid_to_name.values(), uid_to_name.values())))
utaxo.head()

In [None]:
# required to build the completed taxonomy using predicted positions
pseudo_ids = ["pseudo root", "pseudo leaf"]
seed_terms, seed_taxonomy = load_subgraph_taxonomy(str(data_path), mode="test")
seed_id_to_name = {d["node_id"]: d["node_name"] for d in seed_terms.to_dict(orient="records")}
seed_taxonomy = seed_taxonomy.values.tolist()
seed_taxonomy = [(seed_id_to_name[p], seed_id_to_name[c]) for p, c in seed_taxonomy if
                 ("pseudo" not in p) and ("pseudo" not in c)]
seed_taxonomy = [(p, c) for p, c in seed_taxonomy]

In [None]:
seed_taxonomy

In [None]:
import traceback
from pathlib import Path

models_outs = {}
filter_words = ["few_basic_Meta-Llama-3-70B-Instruct_", "tacoprompt"]
outputs = [d for d in results_path.iterdir() if d.is_dir]

for o in outputs:
    try:
        try:
            model_name = f"{o.split('_')[2]}_{o.split('_')[3]}"
        except:
            model_name = Path(o).name
        if any(f in model_name for f in filter_words):
            print(f"Loading {o}")
            pred_terms, pred_triplets = load_completion(o)
            models_outs[model_name] = pred_terms, pred_triplets
    except Exception as e:
        print(f"Error loading {o}: {e}")
        traceback.print_exc()

In [None]:
model_out = list(models_outs.values())[0]
model_out[:5]

## Eval

In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

In [None]:
nli = pipeline("text-classification", model="facebook/bart-large-mnli", batch_size=128)

In [None]:
from llm_food_taxonomy.evaluation import PositionMetric
from llm_food_taxonomy.evaluation.unsupervised.robustness import CscMetric, SemanticProximity, \
    NliMetric

metrics = [
    NliMetric(model=nli, propagate=True, progress=True),
    CscMetric(sentence_transformer=model, progress=True),
]
sup_metrics = [PositionMetric(), ParentMetric()]

In [None]:
def make_tax(positions, seed_tax, node2name):
    triplets = []

    for q, positions in positions.items():
        for p, c in positions:
            triplets.append((p, q, c))

    tax = Taxonomy(seed_tax, id_to_name=node2name)
    tax.insert(triplets)
    return tax

In [None]:
models_outs.keys()

In [None]:
_, (pred_terms, pred_triplets) = list(models_outs.items())[0]
_, (pred_terms_tp, pred_triplets_tp) = list(models_outs.items())[1]

In [None]:
def positions(tax: Taxonomy):
    triplets = tax.triplets(existing=True)
    df = pd.DataFrame(list(triplets), columns=["parent", "query", "child"])
    df = df.groupby("query").agg({"parent": list, "child": list})
    df["positions"] = df.apply(lambda x: list(zip(x.parent, x.child)), axis=1)
    df = df.drop(columns=["parent", "child"])
    df = df.reset_index(drop=False)
    pos = {row.query: row.positions for _, row in df.iterrows()}
    return pos

In [None]:
from collections import defaultdict

all_names = list({**id_to_name, **uid_to_name}.keys())
all_id_names = {n: n for n in all_names}
all_id_names = defaultdict(lambda: None, all_id_names)

In [None]:
pred = {row.query_node: row.predicted_positions for _, row in pred_triplets.iterrows()}
pred_tp = {row.query_node: row.predicted_positions for _, row in pred_triplets_tp.iterrows()}
true_tax = Taxonomy(list(map(tuple, taxo.values.tolist())), id_to_name=id_to_name)
pred_tax = make_tax(pred, seed_taxonomy, id_to_name)
tp_tax = make_tax(pred_tp, seed_taxonomy, id_to_name)
shuffled_names = list(id_to_name.values())
shuffled_descs = list(id_to_desc.values())
random.shuffle(shuffled_names)
random.shuffle(shuffled_descs)
shuffled_id_name = dict(zip(id_to_name.keys(), shuffled_names))
shuffled_desc_name = dict(zip(id_to_name.keys(), shuffled_descs))
rnd_taxo = [tuple(r) for r in taxo.apply(lambda x: x.map(shuffled_id_name))[["hypernym", "hyponym"]].values.tolist()]
rnd_tax = Taxonomy(rnd_taxo, id_to_name=shuffled_id_name)

intersection = true_tax.leaves()

In [None]:
print(len(intersection))
print("Pred/True: ", len(set(pred_tax.leaves()).intersection(intersection)))
print("TacoPrompt/True: ", len(set(tp_tax.leaves()).intersection(intersection)))
print("Generated/True: ", len(set(utax.leaves()).intersection(intersection)))

In [None]:
for tn, tax in zip(["Gold", "Completed", "Generated", "TacoPrompt"], [true_tax, pred_tax, utax, tp_tax]):
    print(f"Taxonomy: {tn}")
    print(f"Num edges: {len(tax.g.edges())}")
    print(f"Num nodes: {len(tax.g.nodes())}")
    print(f"Num leaves: {len(set(tax.leaves()))}")

In [None]:
from collections import defaultdict
from llm_food_taxonomy.graph.taxonomy import Taxonomy

results = defaultdict(list)
scores = []
nonleaf_scores = []
leaf_scores = []
results["Taxonomy"].append("True")
results["Taxonomy"].append("Completed")
results["Taxonomy"].append("Generated")
results["Taxonomy"].append("Random")
results["Taxonomy"].append("TacoPrompt")

for m in sup_metrics:
    print(f"Metric: {type(m).__name__}")
    pred_pos = positions(pred_tax)
    uns_pos = positions(utax)
    true_pos = positions(true_tax)
    rnd_pos = positions(rnd_tax)
    tp_pos = positions(tp_tax)
    print(f"Metric (Completed): {type(m).__name__}")
    pscore, _, _ = m.calculate(
        pred_positions=deepcopy(pred_pos),
        true_positions=deepcopy(true_pos),
        node2name=deepcopy(all_id_names),
        seed_taxonomy=[],
        verbose=True,
        first_only=False,
    )

    print(f"Metric (Generated): {type(m).__name__}")
    uscore, _, _ = m.calculate(
        pred_positions=deepcopy(uns_pos),
        true_positions=deepcopy(true_pos),
        node2name=deepcopy(all_id_names),
        seed_taxonomy=[],
        verbose=True,
        first_only=False,
    )

    print(f"Metric (True): {type(m).__name__}")
    tscore, _, _ = m.calculate(
        pred_positions=deepcopy(true_pos),
        true_positions=deepcopy(true_pos),
        node2name=deepcopy(all_id_names),
        seed_taxonomy=[],
        verbose=True,
        first_only=False,
    )

    print(f"Metric (Random): {type(m).__name__}")
    rscore, _, _ = m.calculate(
        pred_positions=deepcopy(rnd_pos),
        true_positions=deepcopy(true_pos),
        node2name=deepcopy(all_id_names),
        seed_taxonomy=[],
        verbose=True,
        first_only=False,
    )

    print(f"Metric (TacoPrompt): {type(m).__name__}")
    tp_score, _, _ = m.calculate(
        pred_positions=deepcopy(tp_pos),
        true_positions=deepcopy(true_pos),
        node2name=deepcopy(all_id_names),
        seed_taxonomy=[],
        verbose=True,
        first_only=False,
    )
    key = list(rscore.keys())[0]
    results[type(m).__name__].append(0)
    results[type(m).__name__].append(pscore[key])
    results[type(m).__name__].append(uscore[key])
    results[type(m).__name__].append(rscore[key])
    results[type(m).__name__].append(tp_score[key])
    
for m in metrics:
    print(f"Metric (True): {type(m).__name__}")
    tscore = m.calculate(
         deepcopy(true_tax.g.edges()),
         node2name=true_tax.id_to_name | {"pseudo root": "pseudo root"},
         descriptions=true_tax.id_to_name | {"pseudo root": "pseudo root"}, subset=intersection)

    print(f"Metric (Completed): {type(m).__name__}")
    pscore = m.calculate(deepcopy(pred_tax.g.edges()), node2name=true_tax.id_to_name | {"pseudo root": "pseudo root"}, descriptions=true_tax.id_to_name | {"pseudo root": "pseudo root"}, subset=intersection)
    # 
    print(f"Metric (Generated): {type(m).__name__}")
    uscore = m.calculate(deepcopy(utax.g.edges()), node2name=utax.id_to_name| {"pseudo root": "pseudo root"}, descriptions=utax.id_to_name| {"pseudo root": "pseudo root"}, subset=intersection)
    # 
    print(f"Metric (TacoPrompt): {type(m).__name__}")
    tpscore = m.calculate(deepcopy(tp_tax.g.edges()), node2name=tp_tax.id_to_name| {"pseudo root": "pseudo root"}, descriptions=tp_tax.id_to_name| {"pseudo root": "pseudo root"}, subset=intersection)
    # 
    print(f"Metric (Random): {type(m).__name__}")
    rscore = m.calculate(deepcopy(true_tax.g.edges()), node2name=shuffled_id_name| {"pseudo root": "pseudo root"}, descriptions=shuffled_id_name| {"pseudo root": "pseudo root"}, subset=intersection)
    
    results[type(m).__name__].append(tscore)
    results[type(m).__name__].append(pscore)
    results[type(m).__name__].append(uscore)
    results[type(m).__name__].append(tpscore)
    results[type(m).__name__].append(rscore)

res_df = pd.DataFrame(results)
res_df

In [None]:
res_df = pd.DataFrame(results)
res_df

In [None]:
res_df["NLIV-W"] = res_df["NliMetric"].apply(lambda x: x["NLIV-Weak"])
res_df["NLIV-S"] = res_df["NliMetric"].apply(lambda x: x["NLIV-Strong"])
res_df["CSC"] = res_df["CscMetric"].apply(lambda x: x["csc_coef"])

In [None]:
results

In [None]:
results["Taxonomy"] = ["Random"]

In [None]:
cols = [c for c in res_df.columns if str(res_df.dtypes.loc[c]) == "float64"]
df_s = res_df.drop(columns=["NliMetric", "CscMetric"]).style.format('{:.4f}', subset=cols)
#for c in cols:
    #if str(res_df.dtypes.loc[c]) == "float64":
        #row1, row2 = res_df.index.values[res_df[c].argsort()[::-1]][:2]
        #df_s = df_s.format(lambda x: "\\textbf{" + f'{x:.4f}' + "}", subset=(row1, c))
        #df_s = df_s.format(lambda x: "\\underline{" + f'{x:.4f}' + "}", subset=(row2, c))
print(df_s.hide(axis="index").to_latex())