In [None]:
import _base_path
import json
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from resources.data_io import load_mappings
from resources.metrics import ConfusionMatrix

from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [None]:
DATA                = 'incidents'
LABEL               = 'hazard-category'
MODEL               = 'gpt-3.5-turbo-instruct'
SHOTS               = 2
METRICS             = {
    'micro-f1':     lambda y_true, y_pred: f1_score(y_true, y_pred, average='micro', zero_division=0.0),
    'macro-f1':     lambda y_true, y_pred: f1_score(y_true, y_pred, average='macro', zero_division=0.0),
    'recall':       lambda y_true, y_pred: recall_score(y_true, y_pred, average='macro', zero_division=0.0),
    'precision':    lambda y_true, y_pred: precision_score(y_true, y_pred, average='macro', zero_division=0.0),
#    'accuracy':     lambda y_true, y_pred: accuracy_score(y_true, y_pred)
}
CV_SPLITS           = [0]#, 1, 2, 3, 4]

# Load Class-Mappings:

In [None]:
class_map = load_mappings(f'../data/{DATA}/splits/', LABEL)
class_map

In [None]:
counts = pd.read_csv(f'../data/{DATA}/{DATA}_final.csv')[LABEL].value_counts()

class_map = list(zip(
    class_map,
    range(len(class_map)),
    [counts[c] if c in counts else 0 for c in class_map]
))
class_map.sort(key=lambda row:row[2], reverse=True)
class_map

In [None]:
classes_all = [c for c, _, n in class_map if n > 0]
classes_all

In [None]:
with open(f'../data/{DATA}/support_zones.json', 'r') as file:
    classes_high_support, classes_low_support = json.load(file)[LABEL]

In [None]:
classes_high_support

In [None]:
classes_low_support

## Filter classes:

In [None]:
for split in CV_SPLITS:
    with open(f"../data/{DATA}/splits/split_{LABEL.split('-')[0]}_{split:d}.pickle", "rb") as f:
        # load data for split:
        data = pickle.load(f)

        # get unique classes in train and test sets:
        c_train = [c for c, i, _ in class_map if sum(data['train'][LABEL].values == i) >= 4]
        c_test  = [c for c, i, _ in class_map if sum(data['test'][LABEL].values == i) >= 1]

    # only use classes that are present in the train AND test set:
    classes_all          = [c for c in classes_all if c in c_train and c in c_test]
    classes_high_support = [c for c in classes_high_support if c in c_train and c in c_test]
    classes_low_support  = [c for c in classes_low_support if c in c_train and c in c_test]

len(classes_all)

# Load Results:

In [None]:
results = pd.read_csv(f'../results/{MODEL}/{MODEL}_{LABEL}_{SHOTS:d}-shot.csv').fillna('')
results = results[['cv_split', 'label'] + [col for col in results.columns if col.startswith('output_')]]

results.head()

In [None]:
limits = pd.read_csv(f'../prompts/prompts_{LABEL}_{SHOTS:d}-shot.csv').fillna('')
limits = limits[['cv_split', 'label'] + [col for col in limits.columns if col.startswith('output_')]]

limits.head()

In [None]:
def calculate_metrics(df, classes, filter_empty=False):
    metrics = {}

    for col in df.drop(columns=['cv_split', 'label']).columns:
        if   col.startswith('output_raw_'): continue
        elif col.startswith('output_min_'): group = 'min'
        elif col.startswith('output_max_'): group = 'max'
        else:                               group = 'model'

        if group not in metrics: metrics[group] = {}

        metrics[group][col] = {metric: np.empty(len(CV_SPLITS), dtype=float) for metric in METRICS}

        for split in CV_SPLITS:
            r = df[df['cv_split'] == split][['label', col]].values

            if (r[:,1] == '').all():
                print(f'Skipping split {split:d} of column "{col}"')
                continue

            if filter_empty:
                r = r[r[:,1] != '']

            mask = np.vectorize(lambda c: c in classes)(r[:,0])
            y_true = np.stack([r[mask, 0] == c for c in classes], dtype=int, axis=1)
            y_pred = np.stack([r[mask, 1] == c for c in classes], dtype=int, axis=1)

            for metric in metrics[group][col]:
                metrics[group][col][metric][split] = METRICS[metric](y_true, y_pred)

    return metrics

In [None]:
metrics_all = calculate_metrics(limits, classes_all)
metrics_all.update(calculate_metrics(results, classes_all))

metrics_high_support = calculate_metrics(limits, classes_high_support)
metrics_high_support.update(calculate_metrics(results, classes_high_support))

metrics_low_support = calculate_metrics(limits, classes_low_support)
metrics_low_support.update(calculate_metrics(results, classes_low_support))

In [None]:
def metric2latex(metrics_dict, report_max=False): 
    metrics = np.array([[metrics_dict[model][metric] for metric in metrics_dict[model]] for model in metrics_dict], dtype=float)
    
    avg     = metrics.mean(axis=-1)
    best    = np.round(avg, 2) == np.round(np.max(avg, axis=0), 2)
    if metrics.shape[-1] == 1: return np.vectorize(
        lambda a, b:    f'\\cellcolor\u007Bblue!15\u007D\\footnotesize $\\bf {a:.2f}$'
                        if b else  f'\\footnotesize ${a:.2f}$'
    )(avg, best)

    if report_max:
        return np.vectorize(
            lambda a, m, b: f'\\cellcolor\u007Bblue!15\u007D\\footnotesize $\\bf {a:.2f}$ & \\cellcolor\u007Bblue!15\u007D\\footnotesize $\\bf {m:.2f}$'
                            if b else f'\\footnotesize ${a:.2f}$ & \\footnotesize ${m:.2f}$'
        )(avg, metrics.max(axis=-1), best)

    else:
        err     = np.abs(metrics - avg.reshape(avg.shape + (1,))).mean(axis=-1)
        return np.vectorize(
            lambda a, e, b: f'\\cellcolor\u007Bblue!15\u007D\\footnotesize $\\bf {a:.2f}$ \\tiny $\\bf\\pm {e:.2f}$'
                            if b else f'\\footnotesize ${a:.2f}$ \\tiny $\\pm {e:.2f}$'
        )(avg, err, best)

In [None]:
for group in ['min', 'max', 'model']:

    ltx_all = metric2latex(metrics_all[group])
    ltx_hs  = metric2latex(metrics_high_support[group])
    ltx_ls  = metric2latex(metrics_low_support[group])

    for i, col in enumerate(metrics_all[group]):
        row =  f'{col.upper()} &\n'

        if col in metrics_all[group]:           row += ' & '.join(ltx_all[i])
        else:                                   row += ' &'*(len(METRICS)-1)
        row += ' &\n'

        if col in metrics_high_support[group]:  row += ' & '.join(ltx_hs[i])
        else:                                   row += ' &'*(len(METRICS)-1)
        row += ' &\n'

        if col in metrics_low_support[group]:   row += ' & '.join(ltx_ls[i])
        else:                                   row += ' &'*(len(METRICS)-1)
        row += ' \\\\\n'

        print(row)

    print('\\hline\n')

# Failure analysis:

In [None]:
label_types = class_map.copy()
label_types.sort(key=lambda item:item[1])
label_types = [item[0] for item in label_types]

label_types

In [None]:
for col in limits.columns:
    if col.startswith('output_raw_'):
        mask = [limits[limits.cv_split == i][col].apply(lambda p: p in label_types) for i in CV_SPLITS]
        print(f'{col[11:].upper()}: ${sum(~mask[0]) / len(mask[0]) * 100:.0f}\%$')

In [None]:
for col in results.columns:
    mask = [results[results.cv_split == i][col].apply(lambda p: p in label_types) for i in CV_SPLITS]
#    mask = [results[results.cv_split == i][col].apply(lambda p: True) for i in CV_SPLITS]

    f1 = [f1_score(
        results[results.cv_split == i].label.values[mask[i]],
        results[results.cv_split == i][col].values[mask[i]],
        average='macro',
        zero_division=0
    ) for i in CV_SPLITS]

    acc = [accuracy_score(
        results[results.cv_split == i].label.values[mask[i]],
        results[results.cv_split == i][col].values[mask[i]]
    ) for i in CV_SPLITS]

    fail = [np.mean(~mask[i]) for i in CV_SPLITS]

    empty = [np.mean(
        results[results.cv_split == i][col].apply(lambda p: p == '')
    ) for i in CV_SPLITS]

    print(f'{col.upper()}:')
    print(f'  F1:       {np.mean(f1):.2f} \u00b1 {np.std(f1):.2f}')
    print(f'  Accuracy: {np.mean(acc):.2f} \u00b1 {np.std(acc):.2f}')
    print(f'  Failed:   {np.mean(fail):.2f} \u00b1 {np.std(fail):.2f}')
    print(f'  Empty:    {np.mean(empty):.2f} \u00b1 {np.std(empty):.2f}')
    print('\n')

In [None]:
fails = {}
for col in results.columns:
    fails[col] = []
    for split in CV_SPLITS:
        try:
            task = LABEL.split('-')[0]

            with open(f'../data/{DATA}/splits/split_{task}_{split:d}.pickle', 'rb') as f:
                texts = pickle.load(f)['test'][[LABEL, task + '-title', 'title']]

            labels = results[results.cv_split == split]['label'].values
            preds  = results[results.cv_split == split][col].values

            assert all([label_types[i] for i in texts[LABEL]] == labels)

            mask = np.vectorize(lambda p: p in label_types)(preds)
            
            fails[col].append(list(zip(
                texts['title'].values[~mask],
                labels[~mask],
                preds[~mask]
            )))

        except FileNotFoundError: continue

In [None]:
[item for item in fails['output_sim-20'][0] if item[1] in classes_low_support]

# Plot confusion matrix

In [None]:
r = results[results['cv_split'] == 0][['label', 'output_conformal_5%']].values
r = r[r[:,1] != '']

for i in [0,1]:
    hs_mask = np.array([s in classes_high_support for s in r[:,i]])
    ls_mask = np.array([s in classes_low_support for s in r[:,i]])
    ms_mask = ~(hs_mask | ls_mask)

    r[hs_mask, i] = 0
    r[ms_mask, i] = 1
    r[ls_mask, i] = 2

cm = ConfusionMatrix(r[:,0], r[:,1], classes=["High", "Medium", "Low"])

In [None]:
fig, axs = plt.subplots(1, 1, figsize=(3, 3))
cm.plot(axs)
fig.savefig(f'../pictures/plots/cm_conformal_{LABEL}.pdf')