# Library import

In [120]:
from sklearn.metrics import accuracy_score, f1_score
from tqdm import tqdm
import sys

sys.path.append("../../../../")
from utils.train import convert_to_categories, compute_coverage
from datasets.boia import BOIA
from datasets.sddoia import SDDOIA
from datasets.minikandinsky import MiniKandinsky
from datasets.shortcutmnist import SHORTMNIST
from datasets.kandinsky import Kandinsky
from datasets.clipshortcutmnist import CLIPSHORTMNIST
from datasets.clipboia import CLIPBOIA
from datasets.clipsddoia import CLIPSDDOIA
from datasets.clipkandinsky import CLIPKandinsky
from sklearn.metrics import confusion_matrix
from argparse import Namespace
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch

## Load the numpy file to estimate the predicted concepts

In [121]:
def load_and_compute_mean(filepaths):
    matrices = [np.load(filepath) for filepath in filepaths]

    # Check if all matrices have the same shape
    shape = matrices[0].shape
    for matrix in matrices:
        if matrix.shape != shape:
            raise ValueError("All matrices must have the same shape")

    mean_matrix = np.mean(matrices, axis=0)

    return mean_matrix

In [122]:
def find_two_largest_indices(carr, dataset_name):

    if "clip" in dataset_name:
        tensor_part1 = carr[:, :10]
        tensor_part2 = carr[:, 10:]

        argmax_part1 = np.argmax(tensor_part1, axis=1)
        argmax_part2 = np.argmax(tensor_part2, axis=1)

        top_two_indices = np.stack((argmax_part1, argmax_part2), axis=1)
    else:
        sorted_indices = np.argsort(carr, axis=1)[:, ::-1]
        top_two_indices = sorted_indices[:, :2]

    return top_two_indices

In [123]:
def find_concepts_kandinsky(carr):
    split_18 = np.split(carr, 3, axis=1)  # 3 x [100, 18]

    argmax_results = []

    for part_18 in split_18:
        split_6 = np.split(part_18, 3, axis=1)  # 3 x [100, 6]

        shapes = []
        colors = []

        for part_6 in split_6:
            # Shape argmax
            argmax_first3 = np.argmax(part_6[:, :3], axis=1)  # 100
            shapes.append(argmax_first3)

            # Color argmax
            argmax_last3 = np.argmax(part_6[:, 3:], axis=1)  # 100
            colors.append(argmax_last3)

        # stack shaeps
        shapes = np.stack(shapes, axis=1)  # 100, 3
        colors = np.stack(colors, axis=1)  # 100, 3

        # Concatenated argmax
        pred_image = np.concatenate((shapes, colors), axis=1)  # [100, 6]
        argmax_results.append(pred_image)

    return np.stack(argmax_results, axis=1)  # [100, 3, 6]

In [124]:
def predict_concepts(mean_matrix, dataset_name):
    if dataset_name in ["shortmnist", "clipshortmnist"]:
        return find_two_largest_indices(mean_matrix, dataset_name)
    elif dataset_name in ["boia", "sddoia", "clipboia", "clipsddoia"]:
        return (mean_matrix > 0).astype(float)
    elif dataset_name in ["kandinsky", "clipkandinsky"]:
        return find_concepts_kandinsky(mean_matrix)
    else:
        print(mean_matrix.shape)
        raise NotImplementedError("Dataset not present")

# Retrieve concepts and labels


In [125]:
def process_concepts_mnist(concepts):
    return concepts.reshape(concepts.shape[0] * concepts.shape[1])

In [126]:
def process_concepts_kand(concepts):
    split_concepts = np.split(concepts, concepts.shape[1], axis=1)
    concatenated_concepts = np.concatenate(split_concepts, axis=0)
    squeezed_concepts = np.squeeze(concatenated_concepts, axis=1)
    return squeezed_concepts

In [127]:
def retrive_concepts_and_labels(dataset, dataset_name, model_name, seed, layer, add):
    true_concepts = []
    pred_c_str = f"../output/concept_presence_{dataset_name}_{model_name}_{seed}_{layer}{add}.npy"

    for _, _, concepts in tqdm(dataset):
        true_concepts.append(concepts.cpu().numpy())

    # concatenate
    true_concepts = np.concatenate(true_concepts, axis=0)

    predicted_concepts = np.load(pred_c_str)

    predicted_concepts = predict_concepts(predicted_concepts, dataset_name)

    if dataset_name in ["shortmnist", "clipshortmnist"]:
        true_concepts = process_concepts_mnist(true_concepts)
        predicted_concepts = process_concepts_mnist(predicted_concepts)

    if dataset_name in ["kandinsky", "clipkandinsky"]:
        true_concepts = process_concepts_kand(true_concepts)
        predicted_concepts = process_concepts_kand(predicted_concepts)

    assert (
        true_concepts.shape == predicted_concepts.shape
    ), f" {true_concepts.shape}, {predicted_concepts.shape}"

    return true_concepts, predicted_concepts

# Metrics

In [128]:
class Metrics:
    def __init__(
        self,
        concept_accuracy,
        concept_f1_macro,
        concept_f1_micro,
        concept_f1_weighted,
        collapse,
    ):
        self.concept_accuracy = concept_accuracy
        self.concept_f1_macro = concept_f1_macro
        self.concept_f1_micro = concept_f1_micro
        self.concept_f1_weighted = concept_f1_weighted
        self.collapse = collapse


class BOIAMetrics(Metrics):
    def __init__(
        self,
        concept_accuracy,
        concept_f1_macro,
        concept_f1_micro,
        concept_f1_weighted,
        collapse,
        collapse_forward,
        collapse_stop,
        collapse_left,
        collapse_right,
        mean_collapse,
    ):
        super(BOIAMetrics, self).__init__(
            concept_accuracy,
            concept_f1_macro,
            concept_f1_micro,
            concept_f1_weighted,
            collapse,
        )
        self.collapse_forward = collapse_forward
        self.collapse_stop = collapse_stop
        self.collapse_left = collapse_left
        self.collapse_right = collapse_right
        self.mean_collapse = mean_collapse


class KandMetrics(Metrics):
    def __init__(
        self,
        concept_accuracy,
        concept_f1_macro,
        concept_f1_micro,
        concept_f1_weighted,
        collapse,
        collapse_shapes,
        collapse_color,
        mean_collapse,
    ):
        super(KandMetrics, self).__init__(
            concept_accuracy,
            concept_f1_macro,
            concept_f1_micro,
            concept_f1_weighted,
            collapse,
        )
        self.collapse_shapes = collapse_shapes
        self.collapse_color = collapse_color
        self.mean_collapse = mean_collapse

# Confusion matrix

In [129]:
def plot_confusion_matrix(
    true_labels,
    predicted_labels,
    classes,
    normalize=False,
    title=None,
    cmap=plt.cm.Oranges,
):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """

    cm = np.zeros((len(classes), len(classes)))
    for i in range(len(true_labels)):
        cm[true_labels[i], predicted_labels[i]] += 1

    if normalize:
        cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(8, 6))
    sns.set(font_scale=1.8)
    sns.heatmap(
        cm,
        annot=False,
        fmt=".2f" if normalize else "d",
        cmap=cmap,
        cbar=True,
        xticklabels=classes,
        yticklabels=classes,
    )
    if title:
        plt.savefig(title, format="pdf")
    plt.xticks(rotation=0)
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()

# Compute Metrics

In [130]:
def compute_concept_collapse(true_concepts, predicted_concepts, multilabel=False):
    if multilabel:
        true_concepts = convert_to_categories(true_concepts.astype(int))
        predicted_concepts = convert_to_categories(predicted_concepts.astype(int))

    return 1 - compute_coverage(confusion_matrix(true_concepts, predicted_concepts))

In [132]:
def compute_metrics(true_concepts, predicted_concepts, dataset_name):

    # multilabel or not
    multilabel_concept = False

    if dataset_name in ["boia", "sddoia", "clipboia", "clipsddoia"]:
        multilabel_concept = True

    if dataset_name in ["kandinsky", "minikandinsky", "clipkandinsky"]:
        collapse_true_concepts_list = torch.tensor(true_concepts)
        collapse_true_concepts_list = torch.split(collapse_true_concepts_list, 3, dim=1)
        collapse_pred_concepts_list = torch.tensor(predicted_concepts)
        collapse_pred_concepts_list = torch.split(collapse_pred_concepts_list, 3, dim=1)

        collapse_true_concepts_1 = collapse_true_concepts_list[0].flatten()
        collapse_true_concepts_2 = collapse_true_concepts_list[1].flatten()
        collapse_true_concepts = torch.stack(
            (collapse_true_concepts_1, collapse_true_concepts_2), dim=1
        )
        # to int
        collapse_true_concepts = (
            collapse_true_concepts[:, 0] * 3 + collapse_true_concepts[:, 1]
        )
        collapse_true_concepts = collapse_true_concepts.detach().numpy()

        collapse_pred_concepts_1 = collapse_pred_concepts_list[0].flatten()
        collapse_pred_concepts_2 = collapse_pred_concepts_list[1].flatten()
        collapse_pred_concepts = torch.stack(
            (collapse_pred_concepts_1, collapse_pred_concepts_2), dim=1
        )
        # to int
        collapse_pred_concepts = (
            collapse_pred_concepts[:, 0] * 3 + collapse_pred_concepts[:, 1]
        )
        collapse_pred_concepts = collapse_pred_concepts.detach().numpy()

        # total collapse
        collapse = compute_concept_collapse(
            collapse_true_concepts, collapse_pred_concepts, multilabel_concept
        )
    else:
        # total collapse
        collapse = compute_concept_collapse(
            true_concepts, predicted_concepts, multilabel_concept
        )

    if dataset_name in ["boia", "sddoia", "clipboia", "clipsddoia"]:
        # additional metrics for boia and sddoia
        collapse_forward = compute_concept_collapse(
            true_concepts[:, :3], predicted_concepts[:, :3], True
        )
        collapse_stop = compute_concept_collapse(
            true_concepts[:, 3:9], predicted_concepts[:, 3:9], True
        )
        collapse_left = compute_concept_collapse(
            true_concepts[:, 9:15], predicted_concepts[:, 9:15], True
        )
        collapse_right = compute_concept_collapse(
            true_concepts[:, 15:21], predicted_concepts[:, 15:21], True
        )

        mean_collapse = np.mean(
            [collapse_forward, collapse_stop, collapse_left, collapse_right]
        )

    elif dataset_name in ["minikandinsky", "kandinsky", "clipkandinsky"]:
        # additional metrics for boia and sddoia
        collapse_color = compute_concept_collapse(
            true_concepts[:, 3:6].reshape(-1),
            predicted_concepts[:, 3:6].reshape(-1),
            False,
        )
        collapse_shapes = compute_concept_collapse(
            true_concepts[:, :3].reshape(-1),
            predicted_concepts[:, :3].reshape(-1),
            False,
        )

        mean_collapse = np.mean([collapse_color, collapse_shapes])

    if multilabel_concept:
        concept_accuracy, concept_f1_macro, concept_f1_micro, concept_f1_weighted = (
            0,
            0,
            0,
            0,
        )

        for i in range(true_concepts.shape[1]):
            concept_accuracy += accuracy_score(true_concepts[i], predicted_concepts[i])
            concept_f1_macro += f1_score(
                true_concepts[i], predicted_concepts[i], average="macro"
            )
            concept_f1_micro += f1_score(
                true_concepts[i], predicted_concepts[i], average="micro"
            )
            concept_f1_weighted += f1_score(
                true_concepts[i], predicted_concepts[i], average="weighted"
            )

        concept_accuracy = concept_accuracy / true_concepts.shape[1]
        concept_f1_macro = concept_f1_macro / true_concepts.shape[1]
        concept_f1_micro = concept_f1_micro / true_concepts.shape[1]
        concept_f1_weighted = concept_f1_weighted / true_concepts.shape[1]

        label_accuracy, label_f1_macro, label_f1_micro, label_f1_weighted = 0, 0, 0, 0
    elif dataset_name in ["kandinsky", "minikandinsky", "clipkandinsky"]:
        concept_accuracy_color = accuracy_score(
            true_concepts[:, 3:6].reshape(-1), predicted_concepts[:, 3:6].reshape(-1)
        )
        concept_f1_macro_color = f1_score(
            true_concepts[:, 3:6].reshape(-1),
            predicted_concepts[:, 3:6].reshape(-1),
            average="macro",
        )
        concept_f1_micro_color = f1_score(
            true_concepts[:, 3:6].reshape(-1),
            predicted_concepts[:, 3:6].reshape(-1),
            average="micro",
        )
        concept_f1_weighted_color = f1_score(
            true_concepts[:, 3:6].reshape(-1),
            predicted_concepts[:, 3:6].reshape(-1),
            average="weighted",
        )

        concept_accuracy_shape = accuracy_score(
            true_concepts[:, :3].reshape(-1), predicted_concepts[:, :3].reshape(-1)
        )
        concept_f1_macro_shape = f1_score(
            true_concepts[:, :3].reshape(-1),
            predicted_concepts[:, :3].reshape(-1),
            average="macro",
        )
        concept_f1_micro_shape = f1_score(
            true_concepts[:, :3].reshape(-1),
            predicted_concepts[:, :3].reshape(-1),
            average="micro",
        )
        concept_f1_weighted_shape = f1_score(
            true_concepts[:, :3].reshape(-1),
            predicted_concepts[:, :3].reshape(-1),
            average="weighted",
        )

        concept_accuracy = np.mean([concept_accuracy_color, concept_accuracy_shape])
        concept_f1_macro = np.mean([concept_f1_macro_color, concept_f1_macro_shape])
        concept_f1_micro = np.mean([concept_f1_micro_color, concept_f1_micro_shape])
        concept_f1_weighted = np.mean(
            [concept_f1_weighted_color, concept_f1_weighted_shape]
        )

    else:
        concept_accuracy = accuracy_score(true_concepts, predicted_concepts)
        concept_f1_macro = f1_score(true_concepts, predicted_concepts, average="macro")
        concept_f1_micro = f1_score(true_concepts, predicted_concepts, average="micro")
        concept_f1_weighted = f1_score(
            true_concepts, predicted_concepts, average="weighted"
        )

    if dataset_name in ["boia", "sddoia", "clipboia", "clipsddoia"]:
        metrics = BOIAMetrics(
            concept_accuracy=concept_accuracy,
            concept_f1_macro=concept_f1_macro,
            concept_f1_micro=concept_f1_micro,
            concept_f1_weighted=concept_f1_weighted,
            collapse=collapse,
            collapse_forward=collapse_forward,
            collapse_stop=collapse_stop,
            collapse_right=collapse_right,
            collapse_left=collapse_left,
            mean_collapse=mean_collapse,
        )
    elif dataset_name in ["minikandinsky", "kandinsky", "clipkandinsky"]:
        metrics = KandMetrics(
            concept_accuracy=concept_accuracy,
            concept_f1_macro=concept_f1_macro,
            concept_f1_micro=concept_f1_micro,
            concept_f1_weighted=concept_f1_weighted,
            collapse=collapse,
            collapse_color=collapse_color,
            mean_collapse=mean_collapse,
        )
    else:
        metrics = Metrics(
            concept_accuracy=concept_accuracy,
            concept_f1_macro=concept_f1_macro,
            concept_f1_micro=concept_f1_micro,
            concept_f1_weighted=concept_f1_weighted,
            collapse=collapse,
        )

    return metrics

In [133]:
def get_dataset(datasetname, args):
    if datasetname.lower() == "boia":
        return BOIA(args)
    if datasetname.lower() == "sddoia":
        return SDDOIA(args)
    if datasetname.lower() == "minikandinsky":
        return MiniKandinsky(args)
    if datasetname.lower() == "shortmnist":
        return SHORTMNIST(args)
    if datasetname.lower() == "kandinsky":
        return Kandinsky(args)
    if datasetname.lower() == "clipkandinsky":
        return CLIPKandinsky(args)
    if datasetname.lower() == "clipboia":
        return CLIPBOIA(args)
    if datasetname.lower() == "clipsddoia":
        return CLIPSDDOIA(args)
    if datasetname.lower() == "clipshortmnist":
        return CLIPSHORTMNIST(args)

    raise NotImplementedError(f"Dataset {datasetname} missing")

In [134]:
args = Namespace(
    backbone="neural",  #
    preprocess=0,
    finetuning=0,
    batch_size=1,
    n_epochs=20,
    validate=1,
    dataset="clipsddoia",
    lr=0.001,
    exp_decay=0.99,
    warmup_steps=1,
    wandb=None,
    task="boia",
    boia_model="ce",
    model="sddoiann",
    c_sup=1,
    which_c=[-1],
    joint=True,
)

# get dataset
dataset = get_dataset(args.dataset, args)

In [135]:
import os


def evaluate(test_set, dataset_name, model_name, ood_set=None):

    n_files = 0

    seeds = [123, 456, 789, 1011, 1213, 1415, 1617, 1819, 2021, 2223]
    # MNIST ADD layers = ["conv1", "conv2", "fc1", "fc2"]
    # layers = ["conv1", "conv2", "fc1", "fc2"]
    # BOIA
    # layers = ["fc1", "fc2", "fc3", "fc4"]
    # KAND
    layers = ["conv1", "conv2", "conv3", "conv4", "conv5", "fc1", "fc2"]
    layers = ["fc1", "fc2"]
    # SDDOIA
    # layers = ["conv1"] #["conv2", "conv3", "conv4", "conv5", "conv6", "fc1", "fc2"] # "conv1" [, "conv2", "conv3", "conv4", "conv5", "conv6", "fc1", "fc2"]
    add = ""  # "_padd_random"

    for layer in layers:

        print(f"\n LAYER: {layer}\n")

        # List of metics
        in_metrics_list = []
        ood_metrics_list = []
        # Loop through seeds
        for seed in seeds:

            if not os.path.exists(
                f"../output/concept_presence_{dataset_name}_{model_name}_{seed}_{layer}{add}.npy"
            ):
                print(
                    f"../output/concept_presence_{dataset_name}_{model_name}_{seed}_{layer}{add}.npy does not exists..."
                )
                continue

            n_files += 1

            ind_data = retrive_concepts_and_labels(
                test_set, dataset_name, model_name, seed, layer, add
            )

            if dataset_name in ["sddoia", "boia"] and False:
                plot_confusion_matrix(
                    convert_to_categories(ind_data[0][:, :3].astype(int)),
                    convert_to_categories(ind_data[1][:, :3].astype(int)),
                    [i for i in range(2**3)],
                    True,
                    "Forward",
                )
                plot_confusion_matrix(
                    convert_to_categories(ind_data[0][:, 3:9].astype(int)),
                    convert_to_categories(ind_data[1][:, 3:9].astype(int)),
                    [i for i in range(2**6)],
                    True,
                    "Stop",
                )
                plot_confusion_matrix(
                    convert_to_categories(ind_data[0][:, 9:15].astype(int)),
                    convert_to_categories(ind_data[1][:, 9:15].astype(int)),
                    [i for i in range(2**6)],
                    True,
                    "Left",
                )
                plot_confusion_matrix(
                    convert_to_categories(ind_data[0][:, 15:21].astype(int)),
                    convert_to_categories(ind_data[1][:, 15:21].astype(int)),
                    [i for i in range(2**6)],
                    True,
                    "Right",
                )
            elif False:  # TODO
                plot_confusion_matrix(
                    ind_data[0],
                    ind_data[1],
                    [i for i in range(10)],
                    True,
                    title=f"{dataset_name}_{model_name}_{seed}.pdf",
                )

            if ood_set is not None:
                out_data = retrive_concepts_and_labels(
                    ood_set, dataset_name, model_name, seed, layer, "ood"
                )

            in_metrics = compute_metrics(*ind_data, dataset_name)
            in_metrics_list.append(in_metrics)

            if ood_set is not None:
                ood_metrics = compute_metrics(*out_data, dataset_name)
                ood_metrics_list.append(ood_metrics)

        assert n_files > 1, "At least 2 files to compare"

        # Compute standard deviation for each metric
        for key in vars(in_metrics_list[0]):  # the key are always the same
            # skip hidden elements
            if not key.startswith("_"):
                # retrieve the list of values
                in_metric_values = [
                    getattr(metrics, key) for metrics in in_metrics_list
                ]
                ood_metric_values = [
                    getattr(metrics, key) for metrics in ood_metrics_list
                ]

                # convert lists to NumPy arrays
                in_metric_values_arr = np.array(in_metric_values)
                ood_metric_values_arr = np.array(ood_metric_values)

                # Compute the standard deviation
                in_metric_std_dev = np.std(in_metric_values_arr)
                ood_metric_std_dev = np.std(ood_metric_values_arr)

                # Compute the mean
                in_metric_std_mean = np.mean(in_metric_values_arr)
                ood_metric_std_mean = np.mean(ood_metric_values_arr)

                print(
                    "\n{} (In): ${:.2f} \pm {:.2f}$".format(
                        key.replace("_", " ").title(),
                        round(in_metric_std_mean, 2),
                        round(in_metric_std_dev, 2),
                    )
                )

                if ood_set is not None:
                    print(
                        "{} (OOD): ${:.2f} \pm {:.2f}$".format(
                            key.replace("_", " ").title(),
                            round(ood_metric_std_mean, 2),
                            round(ood_metric_std_dev, 2),
                        )
                    )

In [None]:
# Get loaders
_, _, test_loader = dataset.get_data_loaders()
# ood loader
ood_loader = None  # getattr(dataset, "ood_loader", None)
# Evaluate
evaluate(test_loader, args.dataset, args.model, ood_loader)  # ood_set=ood_loader)