In [1]:
import pandas as pd
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import os
import seaborn as sns
import numpy as np
import json
from difflib import SequenceMatcher
from nltk.corpus import stopwords
from sklearn.metrics.pairwise import linear_kernel
from sklearn.preprocessing import MultiLabelBinarizer
from sentence_transformers import SentenceTransformer
from scipy.stats import hmean

ModuleNotFoundError: No module named 'seaborn'

In [2]:
model = "mistral"
task = "multilabel"
approach = "zero-shot"
tag = "topic_tags" if task == "multilabel" else "category_tag"
df = pd.read_json(f"{task}-{model}-{approach}.json")

In [3]:
def normalize_tag(s, possible_tags):

    for tag in possible_tags:
        if tag in s:
            return tag
        
    return None

In [4]:
sw = stopwords.words("italian")
embedder = SentenceTransformer("nickprock/sentence-bert-base-italian-xxl-uncased")

In [5]:
def morphological_similarity(a, b, delta_penalty):

    a_words = list(filter(lambda w: w not in sw, a.split()))
    b_words = list(filter(lambda w: w not in sw, b.split()))
    d = []
    
    for wa in a_words:
        d_wa = []
        for wb in b_words:
            s = SequenceMatcher(None, wa, wb).find_longest_match(0, len(wa), 0, len(wb)).size
            s /= np.sqrt(len(wa)*len(wb))
            d_wa.append(s)
        d.append(d_wa)

    D = np.array(d)

    if D.shape[0] > D.shape[1]:
        delta = D.shape[0] - D.shape[1]
        v = D.max(axis = 0)
    else:
        delta = D.shape[1] - D.shape[0]
        v = D.max(axis = 1)

    return v.mean() - delta_penalty(delta)

In [6]:
possible_tags = df[tag].explode().unique().tolist()
df['cleaned_predictions'] = df['predictions'].apply(lambda L: list(map(lambda s: normalize_tag(s, possible_tags), L))).apply(lambda L: [s for s in L if s])

In [None]:
df.head()

In [None]:
possible_tags

In [None]:
tag_embeddings = embedder.encode(possible_tags, normalize_embeddings=True)
semantic_similarity = linear_kernel(tag_embeddings)
semantic_map = {k:(k,0) for k in possible_tags}
for i in range(len(possible_tags)):
    y = possible_tags[i]
    for j in range(i+1, len(possible_tags)):
        if semantic_similarity[i,j] > semantic_map[y][-1]:
            semantic_map[y] = (possible_tags[j], semantic_similarity[i,j])

semantic_map = {k:v[0] for k,v in semantic_map.items() if v[1] > 0.6}
semantic_map

In [10]:
df[f'corrected_{tag}'] = df[tag].apply(lambda L: [semantic_map[y] if y in semantic_map else y for y in L])
df['corrected_predictions'] = df['cleaned_predictions'].apply(lambda L: [y.strip() for y in L if y]).apply(lambda L: list(set([semantic_map[y] if y in semantic_map else y for y in L])))

In [None]:
df[[f"corrected_{tag}", "corrected_predictions"]]

In [12]:
if task == "multiclass":
    df[f"corrected_{tag}"] = df[tag].apply(lambda L: L[0])
    df['corrected_predictions'] = df['cleaned_predictions'].apply(lambda L: L[0] if L else "None")
    cm = confusion_matrix(df[f'corrected_{tag}'], df['corrected_predictions'])
    ConfusionMatrixDisplay(cm).plot()

In [None]:
# Pre-correction scores
with open(f"../scores/{task}-{model}-{approach}.json", "r") as f:
    scores = json.load(f)

scores

In [None]:
# Post-correction scores
mlb = MultiLabelBinarizer()
y_true = mlb.fit_transform(df[f'corrected_{tag}'])
y_pred = mlb.transform(df['corrected_predictions'])
TP = np.sum(y_true * y_pred, axis = 1)
FP = np.sum((1 - y_true)*y_pred, axis = 1)
FN = np.sum(y_true*(1-y_pred), axis = 1)
new_scores = {
    "macro-precision": TP.sum()/(TP.sum() + FP.sum()),
    "macro-recall": TP.sum()/(TP.sum() + FN.sum()),
    "micro-precision": np.mean(TP / (TP + FP)),
    "micro-recall": np.mean(TP / (TP + FN))
}
new_scores['micro-f1'] = hmean([TP / (TP + FP), TP / (TP + FN)], axis = 0).mean()
new_scores['macro-f1'] = hmean([new_scores['macro-precision'], new_scores['macro-recall']])
new_scores

In [114]:
# Computation times
times = []
for file in os.listdir("../scores"):
    with open(os.path.join("../scores", file), "r") as f:
        d = json.load(f)
    file_features = file.split(".")[0].split("-")
    times.append({"problem": file_features[0], "model": "-".join(file_features[1:-2]), "approach": "-".join(file_features[-2:]), "time": d["time"]})

times_df = pd.DataFrame(times)

In [None]:
g = sns.FacetGrid(data = times_df, col = "problem")
g.map(sns.barplot, data = times_df, y = "approach", x = "time", hue = "model", errorbar = None, orient = "h", palette = "rainbow")
g.set_ylabels("")
g.set_xlabels("Tempo medio (secondi)")
g.add_legend()

In [None]:
times_df