In [None]:
# %load_ext tensorboard
import random
import warnings
from copy import deepcopy
from functools import partial
from typing import Dict, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import seaborn as sns
import torch
import torch.nn as nn
from scipy import stats
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from torch.utils.data import DataLoader, random_split
from torchmetrics.classification import BinaryAccuracy

from src.dataset_handler.classic_dataset import ClassicDataset
from src.examples.utils_heart import preprocess_heart_data
from src.models.lightning_wrapper import LightningWrapper
from src.models.multi_linear_layers import MultiLinearLayers
from src.neurons_importance import NeuronsImportance

warnings.simplefilter("ignore")

# TODO
Déceler les petits bugs restants, surtout en fin de traitement.

In [None]:
# %tensorboard --logdir ../lightning_logs/

In [None]:
checkpoints = '/home/lbaret/projects/explainability_sandbox/lightning_logs/version_0/checkpoints/epoch=99-step=9200.ckpt'

df_test = pd.read_csv('../data/test_heart.csv')
inputs, labels = df_test_prep = preprocess_heart_data(df_test)

In [None]:
test_set = ClassicDataset(inputs, labels)
test_loader = DataLoader(test_set, batch_size=8, shuffle=False)

network = MultiLinearLayers(inputs.shape[1], 1)
loss_function = nn.BCEWithLogitsLoss()

model = LightningWrapper(network, loss_function, metrics={'accuracy': BinaryAccuracy().to('cuda')})
model.load_from_checkpoint(checkpoint_path=checkpoints)

Bel entrainement maintenant, essayons de comprendre les distributions des neurones de chacune des couches, afin de savoir si c'est uniformément distribués où chaque neurone présente sa spécificité.

Récupérons le test set pour pouvoir commencer à travailler dessus. Un modèle simple + un cas binaire pour généraliser ensuite.

In [None]:
submodel = deepcopy(model.wrapped_model)
submodel.eval()

# Autant tout faire passer d'un coup
X_test = []
y_test = []
for x, y in test_set:
    X_test.append(x.unsqueeze(0))
    y_test.append(y)

X_test = torch.cat(X_test)
y_test = torch.cat(y_test)

# On peut chercher les logits du modèle
outputs = submodel(X_test)

BinaryAccuracy()(outputs, y_test.unsqueeze(1))

Super, maintenant entammons une analyse poussée de nos couches en sorties.

In [None]:
out_neurons_collector = {}
in_neurons_collector = {}
def forward_hook(module: nn.Module, inputs: torch.Tensor, outputs: torch.Tensor, name: str, out_neurons_collector: Dict[str, List[torch.Tensor]],
                 in_neurons_collector: Dict[str, List[torch.Tensor]]) -> None:
    if name in out_neurons_collector.keys():
        out_neurons_collector[name].append(outputs.detach().cpu())
    else:
        out_neurons_collector[name] = [outputs.detach().cpu()]
    
    if name in in_neurons_collector.keys():
        in_neurons_collector[name].append(inputs[0].detach().cpu())
    else:
        in_neurons_collector[name] = [inputs[0].detach().cpu()]

hooks = []
for name, module in submodel.named_modules():
    if name != '':
        hooks.append(module.register_forward_hook(partial(forward_hook, name=name, out_neurons_collector=out_neurons_collector, in_neurons_collector=in_neurons_collector)))

outputs = submodel(X_test)

for h in hooks:
    h.remove()

for layer_name, neurons in out_neurons_collector.items():
    out_neurons_collector[layer_name] = deepcopy(torch.cat(out_neurons_collector[layer_name]))

for layer_name, neurons in in_neurons_collector.items():
    in_neurons_collector[layer_name] = deepcopy(torch.cat(in_neurons_collector[layer_name]))

In [None]:
out_neurons_collector['linear2'].mean(dim=0), out_neurons_collector['linear2'].std(dim=0)

Avons nous à faire à des distributions normales ? Un test statistique pourra nous le dire !

In [None]:
normal_samples = {}
shapiro_pvalues = {}

for layer_name, layer_tensor in out_neurons_collector.items():
    normal_samples[layer_name] = []
    shapiro_pvalues[layer_name] = []

    for i in range(layer_tensor.shape[1]):
        samples = layer_tensor[:, i]
        shapiro_test = stats.shapiro(samples)

        normal_samples[layer_name].append(True if shapiro_test.pvalue > 0.05 else False)
        shapiro_pvalues[layer_name].append(shapiro_test.pvalue)

    normal_samples[layer_name] = torch.BoolTensor(normal_samples[layer_name])
    shapiro_pvalues[layer_name] = torch.Tensor(shapiro_pvalues[layer_name])

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))

sorted_indices = shapiro_pvalues['linear1'].sort()[1][:10]
df_distplot = pd.DataFrame()
for c, ind in enumerate(sorted_indices):
    sns.distplot(out_neurons_collector['linear1'][:, ind.item()], ax=ax)

plt.show()

Souvent bimodal, c'est intéressant !

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))

sorted_indices = shapiro_pvalues['linear1'].sort(descending=True)[1][:10]
df_distplot = pd.DataFrame()
for c, ind in enumerate(sorted_indices):
    sns.distplot(out_neurons_collector['linear1'][:, ind.item()], ax=ax, label=ind)

plt.legend()
plt.show()

Ont-ils simplement un effet régularisateur ?

Dans deux cas, où se situent les valeurs pour la classe 0 et la classe 1 ?

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))

sorted_indices = shapiro_pvalues['linear1'].sort()[1][:10]
df_distplot = pd.DataFrame()
for c, ind in enumerate(sorted_indices):
    sns.distplot(out_neurons_collector['linear1'][:, ind.item()], ax=ax, label=ind)

plt.legend()
plt.show()

In [None]:
sorted_indices = shapiro_pvalues['linear1'].sort()[1][:10]

df_linear_1_unnormal = pd.DataFrame(data=out_neurons_collector['linear1'][:, sorted_indices])
df_linear_1_unnormal['label'] = y_test

sns.displot(data=df_linear_1_unnormal.melt(id_vars=['label']), x='value', hue='label', col='variable', col_wrap=3, alpha=0.5, kde=True);

In [None]:
sorted_indices = shapiro_pvalues['linear1'].sort(descending=True)[1][:10]

df_linear_1_normal = pd.DataFrame(data=out_neurons_collector['linear1'][:, sorted_indices])
df_linear_1_normal['label'] = y_test

sns.displot(data=df_linear_1_normal.melt(id_vars=['label']), x='value', hue='label', col='variable', col_wrap=3, alpha=0.5, kde=True);

Admettons que nous avons des neurones que l'on considère comme dissociatifs, pouvons nous aisément repérer les poids ou l'ensemble des poids qui vont maintenir ou créer une dissociation par la suite ? \
Si les poids n'accentuent, ne conservent ou ne créent pas de dissociation alors ça va être difficile de repérer les neurones importants. \
Une de mes hypothèses est que les neurones dont les distributions sont confondues ne sont présents qu'à des fins de régulation et ne peuvent être considérés comme importants car non dissociés.

La question que nous pouvons nous poser maintenant est : quels sont les neurones qui présentent la plus grosse dissociation ? Que ce soit fortement positif ou négatif. Si présence de ReLU alors la négation est considérée comme nul et alors le signal qui est émit ne vient que d'une classe en soi.

Dans la méthode, nous ne pouvons négliger le théorème central limite, bien que nous ayons l'équivalent de 2 variables aléatoires par neurone (l'effet peut être plus remarquable avec plus de classes par exemple). L'union des deux distributions (de la classe 0 et de la classe 1) peut donner une distribution normale. Donc, il serait intéressant de mesurer 2 valeurs :
1. La divergence de *Kullback-Leibler* (sûrement le plus intéressant)
2. La moyenne et l'écart-type.

In [None]:
X1 = np.random.normal(loc=5.0, scale=1.5, size=(10000,))
X2 = np.random.normal(loc=5.0, scale=1.5, size=(1000,))
X3 = np.random.normal(loc=3.0, scale=1.0, size=(50,))
X4 = np.random.normal(loc=5.0, scale=2.0, size=(10000,))

stats.mannwhitneyu(X1, X2), stats.mannwhitneyu(X1, X3), stats.mannwhitneyu(X1, X4)

In [None]:
X_positive_1 = df_linear_1_normal[df_linear_1_normal['label'] == 1.][1].to_numpy()
X_negative_1 = df_linear_1_normal[df_linear_1_normal['label'] == 0.][1].to_numpy()

X_positive_9 = df_linear_1_normal[df_linear_1_normal['label'] == 1.][9].to_numpy()
X_negative_9 = df_linear_1_normal[df_linear_1_normal['label'] == 0.][9].to_numpy()

stats.mannwhitneyu(X_positive_1, X_negative_1), stats.mannwhitneyu(X_positive_9, X_negative_9)

Utilisons ce test statistique pour déterminer l'importance d'un neurone. Procédons par étapes :
1. Récupérons le dictionnaire des outputs d'une couche
2. Séparons les valeurs par rapport à leur classe
3. Effectuons les tests statistiques suivants :
   1. Si distributions normales ou proches : t-test (ou Student)
   2. Sinon : test de Mann-Whitney U
      1. Si semblables : cherchons voir si la moyenne/variance est similaire

In [None]:
negative_indices = torch.where(y_test == 0)[0]
positive_indices = torch.where(y_test == 1)[0]

layers_important_neurons = {}
layers_non_important_neurons = {}

for layer_name, layer_outputs in out_neurons_collector.items():
    if layer_name == 'fc':
        continue
    linear1 = out_neurons_collector['linear1']

    important_neurons = torch.zeros(size=(layer_outputs.shape[1],), dtype=torch.bool)

    for i in range(layer_outputs.shape[1]):
        samples = layer_outputs[:, i]
        important_neurons[i] = torch.BoolTensor([NeuronsImportance.neuron_is_important(samples, positive_indices, negative_indices)])[0]

    layers_important_neurons[layer_name] = torch.where(important_neurons == True)[0]
    layers_non_important_neurons[layer_name] = torch.where(important_neurons == False)[0]

Pour tester notre méthode, prenons aléatoirement et en répétant 10 fois, des neurones importants, effacons les (à 0) puis évaluons la perte en performance (faire pareil avec les neurones pas importants).

In [None]:
layers_important_neurons

Pour masquer les neurones il faut masquer tous les poids et biais arrivant à ce neurone. Ce faisant nous empêchons tout signal d'arriver jusqu'à ce neurone et donc il ne sera pas utile pour la suite des traitements.

Cependant ce qu'il faut prendre en compte, c'est que cette méthode statistique ne permet pas de prendre en compte toutes les interactions entre les neurones des couches successives. Donc éteindre le signal provenant d'un neurone peut avoir une conséquence dans la couche suivante. Il faut donc trouver une amélioration à cette méthode.

In [None]:
important_masked_accuracy = []
non_important_masked_accuracy = []
for _ in range(10):
    important_masked_model = NeuronsImportance.mask_important_neurons(submodel, layers_important_neurons, percentage_masked=0.9)
    non_important_masked_model = NeuronsImportance.mask_important_neurons(submodel, layers_non_important_neurons, percentage_masked=0.9)

    important_outputs = important_masked_model(X_test)
    non_important_outputs = non_important_masked_model(X_test)

    important_accuracy = BinaryAccuracy()(important_outputs, y_test.unsqueeze(1))
    non_important_accuracy = BinaryAccuracy()(non_important_outputs, y_test.unsqueeze(1))

    important_masked_accuracy.append(important_accuracy)
    non_important_masked_accuracy.append(non_important_accuracy)

In [None]:
important_masked_accuracy

In [None]:
non_important_masked_accuracy

Trouver un modèle plus gros, et commencer à réfléchir aux interactions !

Pour les interactions, je pense faire couche par couche afin de noter les changements de distribution d'une couche vers sa suivante, lorsque l'on désactive cette couche. Nous sommes donc dans le cas de la causalité à nous demander ce qu'il se passerait sur la couche suivante si on coupe le signal de la couche précédente.

# Traitement par rapport aux distributions

Afin de simplifier l'approche, commençons par traiter les données en exploitant les quantiles.
Récupérons les quantiles suivants :
1. 0.05 -> Valeur extrême faible (on pourrait récupérer 0.25 à la place)
2. 0.5 -> Valeur moyenne
3. 0.95 -> Valeur extrême forte (on pourrait récupérer 0.75 à la place)
   
=> Une fois chacune des valeurs étudiées en sortie du réseau de neurones, nous pouvons dresser un intervalle de variation sur les valeurs que peuvent prendre la sortie. Si nous procédons de la sortie vers l'entrée du réseau, nous pouvons déterminer un chemin de neurones idéal. Pourquoi procéder ainsi ? Car il est plus logique d'étudier les interactions entre les neurones directes, comme si nous étions dans le cadre d'un graphe. Les neurones précédents influencants directement les valeurs des neurones suivants.

Pour procéder, découpons le traitement :
1. Définir un hook qui prend en input le dictionnaire du tenseur des quantiles et des valeurs à mettre à zéro.
2. À chaque itération, nous reculons d'une couche en donnant les indices des neurones ayant été impactés à l'itération précédente.
3. Nous faisons ainsi varier les valeurs de neurones en entrée qui vont directement impacter les neurones déterminés à l'itération précédente.
4. Nous répétons ces étapes afin de dresser les chemins de neurones optimaux.

De cette façon, nous aurons une chance de découvrir les caractéristiques essentielles du jeu de données offrant la plus grosse variation de la sortie pour une classe donnée, bien que nous commençons par le cas simple binaire.

In [None]:
def get_layers_ordered(module: nn.Module, module_name: str=None) -> List[nn.Module]:
    """
        Get ordered layers in the network as a list of tuples containing : (<name of layer as str>, <layer module as nn.Module>)
    """
    children_modules = list(module.named_children())
    leaves_modules = []
    for child_name, child_mod in children_modules:
        leaves_modules += get_layers_ordered(child_mod, child_name)
    
    if len(children_modules) == 0 and len(list(module.parameters())) > 0:
        return [(module_name, module)]
    
    return leaves_modules
    
layers = get_layers_ordered(model)
layers.reverse()

all_quantiles = [0., 0.05, 0.25, 0.5, 0.75, 0.95, 1.]
quantiles_by_layers = {}
for layer_name, layer_inputs in in_neurons_collector.items():
    quantiles_by_layers[layer_name] = {
        q: torch.quantile(layer_inputs, q=torch.scalar_tensor(q), dim=0)
        for q in all_quantiles
    }

In [None]:
def quantile_hook_pre_forward(module: nn.Module, inputs: Tuple[torch.Tensor], remplacement_tensor: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
    return (remplacement_tensor * mask, )

In [None]:
model.eval()

variations = []
with torch.no_grad():
    for ni in range(layers[0][1].in_features):
        for q in all_quantiles:
            mask = torch.zeros(size=(layers[0][1].in_features,))
            mask[ni] = 1
            
            h = layers[0][1].register_forward_pre_hook(partial(quantile_hook_pre_forward, remplacement_tensor=quantiles_by_layers[layers[0][0]][q], mask=mask))
            out = model(torch.zeros(size=(1, 17)))
            variations.append(out)

            h.remove()

variations = torch.cat(variations)
variations = variations.reshape(layers[0][1].in_features, len(all_quantiles))

In [None]:
def _quantile_hook_pre_forward(module: nn.Module, inputs: Tuple[torch.Tensor], remplacement_tensor: torch.Tensor,
                                  mask: torch.Tensor) -> torch.Tensor:
        return (remplacement_tensor * mask, )

class DistributionNeuronsImportance:
    """
        Works only with straightforward models. Parallel processing into the model is not handled (e.g. ResNet downsampling in parallel with block process).
    """
    def __init__(self, network: nn.Module, num_features: int, target_label: int, in_neurons_collector: Dict[str, torch.Tensor], target_layer_name: str=None, 
                 process_quantiles: List[float]=[0., 0.05, 0.25, 0.5, 0.75, 0.95, 1.], positive_minimum_quantile: float=0.95, 
                 negative_maximum_quantile: float=0.05) -> None:
        self.network = network
        self.network.eval()

        self.num_features = num_features
        self.target_label = target_label

        self.positive_minimum_quantile = positive_minimum_quantile
        self.negative_maximum_quantile = negative_maximum_quantile

        self.target_layer_name = None
        self.target_layer_module = None
        self._set_target_layer_infos(target_layer_name)

        layers = self.get_layers_ordered(model)
        layers.reverse()
        self.layers = layers

        self.process_quantiles = process_quantiles
        self.in_neurons_collector = in_neurons_collector

        self.quantiles_by_layer = self._set_quantiles_by_layers()

    def _set_target_layer_infos(self, target_layer_name: str) -> nn.Module:
        if target_layer_name is None:
            name, mod = [(name, mod) for name, mod in self.network.named_modules()][-1]
        else:
            mod = self.network.get_submodule(target_layer_name)
            name = target_layer_name

        self.target_layer_name = name
        self.target_layer_module = mod

    def _set_quantiles_by_layers(self) -> Dict[float, torch.Tensor]:
        quantiles_by_layers = {}
        for layer_name, layer_inputs in self.in_neurons_collector.items():
            quantiles_by_layers[layer_name] = {
                q: torch.quantile(layer_inputs, q=torch.scalar_tensor(q), dim=0)
                for q in self.process_quantiles
            }
        
        return quantiles_by_layers
    
    def get_layers_ordered(self, module: nn.Module, module_name: str=None) -> List[nn.Module]:
        """
            Get ordered layers in the network as a list of tuples containing : (<name of layer as str>, <layer module as nn.Module>)
        """
        children_modules = list(module.named_children())
        leaves_modules = []
        for child_name, child_mod in children_modules:
            leaves_modules += get_layers_ordered(child_mod, child_name)
        
        if len(children_modules) == 0 and len(list(module.parameters())) > 0:
            return [(module_name, module)]
        
        return leaves_modules
    
    def _get_positive_indices_from_variations(self, variations: torch.Tensor) -> torch.Tensor:
        variations_of_outputs = variations[:, len(self.process_quantiles)-1] - variations[:, 0]

        positive_indices_variations = torch.where(variations_of_outputs > 0)[0]
        positive_quantile_threshold = torch.quantile(variations_of_outputs[positive_indices_variations], q=self.positive_minimum_quantile).item()
        best_positive_variations_of_outputs = torch.where(variations_of_outputs >= positive_quantile_threshold)[0]
        best_positive_variations_of_outputs = best_positive_variations_of_outputs.sort(descending=False).values

        return best_positive_variations_of_outputs
    
    def _get_negative_indices_from_variations(self, variations: torch.Tensor) -> torch.Tensor:
        variations_of_outputs = variations[:, len(self.process_quantiles)-1] - variations[:, 0]

        negative_indices_variations = torch.where(variations_of_outputs < 0)[0]
        negative_quantile_threshold = torch.quantile(variations_of_outputs[negative_indices_variations], q=self.negative_maximum_quantile).item()
        best_negative_variations_of_outputs = torch.where(variations_of_outputs <= negative_quantile_threshold)[0]
        best_negative_variations_of_outputs = best_negative_variations_of_outputs.sort(descending=False).values

        return best_negative_variations_of_outputs

    def _get_hooked_results_model(self, layer_module: nn.Module, replacement_tensor: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
        hook = layer_module.register_forward_pre_hook(
            partial(_quantile_hook_pre_forward, remplacement_tensor=replacement_tensor, mask=mask)
        )
        out = self.network(torch.zeros(size=(1, self.num_features)))
        hook.remove()
        
        return out
    
    def _compute_layer_variations_search(self, layer_module: nn.Module, module_name: str, previous_positive_neurons: torch.Tensor,
                                         previous_negative_neurons: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        positive_variations = []
        negative_variations = []
        with torch.no_grad():
            for ni in range(layer_module.in_features):
                for q in self.process_quantiles:
                    positive_mask = torch.zeros(size=(layer_module.out_features, layer_module.in_features))
                    positive_mask[previous_positive_neurons, ni] = 1

                    positive_variations.append(
                        self._get_hooked_results_model(layer_module, self.quantiles_by_layer[module_name][q], positive_mask)
                    )

                    negative_mask = torch.zeros(size=(layer_module.out_features, layer_module.in_features))
                    negative_mask[previous_negative_neurons, ni] = 1

                    negative_variations.append(
                        self._get_hooked_results_model(layer_module, self.quantiles_by_layer[module_name][q], negative_mask)
                    )
        
        return positive_variations, negative_variations
    
    def get_neurons_importance_indices(self) -> Dict[str, Dict[str, torch.Tensor]]:      
        """ Iterate over network layers to compute impotant neurons indices which have positive and negative effects on the target label.
            Furthermore, only nn.Linear layers are handled for the moment.
        
        :param test: _description_
        :type test: None
        :return: positive and negative effects indices for each layers
        :rtype: Dict[str, Dict[str, torch.Tensor]]
        """
        previous_positive_neurons = torch.tensor([self.target_label], dtype=torch.long)
        previous_negative_neurons = torch.tensor([self.target_label], dtype=torch.long)

        important_neurons_indices = {}
        for name, module in self.layers:
            print(previous_positive_neurons.shape)
            positive_variations, negative_variations = self._compute_layer_variations_search(
                module, name, previous_positive_neurons, previous_negative_neurons
            )

            positive_variations = torch.cat(positive_variations)
            positive_variations = positive_variations.reshape(module.in_features, self.target_layer_module.out_features, len(self.process_quantiles))[:, self.target_label, :]

            negative_variations = torch.cat(negative_variations)
            negative_variations = negative_variations.reshape(module.in_features, self.target_layer_module.out_features, len(self.process_quantiles))[:, self.target_label, :]
            
            best_positive_variations_of_outputs = self._get_positive_indices_from_variations(positive_variations)
            best_negative_variations_of_outputs = self._get_positive_indices_from_variations(negative_variations)
            
            important_neurons_indices[name] = {
                'positive': best_positive_variations_of_outputs,
                'negative': best_negative_variations_of_outputs
            }

            previous_positive_neurons = best_positive_variations_of_outputs.clone()
            previous_negative_neurons = best_negative_variations_of_outputs.clone()

        return important_neurons_indices

In [None]:
512*7

In [None]:
distrib_neur_imp = DistributionNeuronsImportance(model.wrapped_model, 17, 0, in_neurons_collector, 'fc')
distrib_neur_imp.get_neurons_importance_indices()

In [None]:
1024*512*7

In [None]:
model