## Imports

In [1]:
import sys
import os
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from collections import OrderedDict 
from sklearn import metrics, model_selection
from torch.optim import Adam

sys.path.append(os.path.abspath(''))

import utils.more_torch_func as mtf

from compiling_nn.build_odd import compile_nn
from datasets.loan import get_loan_dataset
from utils.custom_activations import StepActivation
from utils.custom_loss import AsymBCELoss

pd.options.mode.copy_on_write = True

## Load data

In [2]:
np_x, np_y = get_loan_dataset(balancing=True, discretizing=False, hot_encoding=True, rmv_pct=0.985)
x_data, y_data = torch.Tensor(np_x), torch.Tensor(np_y)
input_size = x_data.size(1)
print(x_data.size())

torch.Size([280, 14])


## Metrics

In [3]:
def cm(y_true, y_pred):
    confusion_matrix = metrics.confusion_matrix(y_true, y_pred)
    cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix, display_labels=[False, True])
    return cm_display

def plot_cm(y_true, y_pred):
    cm_display = cm(y_true, y_pred)
    fig, ax = plt.subplots(1, 1, figsize=(4,8))
    cm_display.plot(ax=ax, colorbar=False)

def plot_combine_cm(cms, titles=None):
    n = len(cms)
    fig, axs = plt.subplots(1, n, figsize=(4*n, 8))
    if titles:
        for ax, cm, title in zip(axs, cms, titles):
            cm.plot(ax=ax, colorbar=False)
            ax.set_title(title)
    else:
        for ax, cm in zip(axs, cms):
            cm.plot(ax=ax, colorbar=False)
    fig.tight_layout()

def cov_score(y_true, y_pred):
    labels = np.unique(y_true)
    scores = {}

    for label in labels:
        indices_true = np.where(y_true == label)[0]
        indices_pred = np.where(y_pred == label)[0]
        scores[label] = len(np.intersect1d(indices_true, indices_pred))/len(indices_true)

    return scores

def cross_valid(X, Y, train_func, skf, **kw_train):
    for train_index, test_index in skf.split(X, Y):
        x_train, x_test = X[train_index], X[test_index]
        y_train, y_test = Y[train_index], Y[test_index]

        model, y_pred = train_func(x_train, y_train, **kw_train)
        model.eval()
        yield y_pred.detach().round(), y_train, model(x_test).detach(), y_test

def tnot(a): return torch.logical_not(a)
def tor(a,b): return torch.logical_or(a,b)
def tand(a,b): return torch.logical_and(a,b)
def txor(a,b): return torch.logical_xor(a,b)

## Networks

In [4]:
class ApproxNet(nn.Module):
    def __init__(self):
        super().__init__()
        
        hl1 = 10

        self.nn = nn.Sequential(OrderedDict([
            ('l1', nn.Linear(input_size,hl1)),
            ('a1', StepActivation()),
            ('l2', nn.Linear(hl1,1)),
            ('a2', StepActivation())
        ]))        

    def forward(self, x):
        x = self.nn(x)

        return x

class CentralNet(nn.Module):
    def __init__(self):
        super().__init__()

        hl1 = 50
        hl2 = 25

        self. nn = nn.Sequential(OrderedDict([
            ('l1', nn.Linear(input_size,hl1)),
            ('a1', nn.Sigmoid()),
            ('l2', nn.Linear(hl1,hl2)),
            ('a2', nn.Sigmoid()),
            ('l3', nn.Linear(hl2,1)),
            ('a3', StepActivation()),
        ]))
    
    def forward(self, x):
        x = self.nn(x)

        return x

class BoundNetResults():
    def __init__(self, x, xnn, xapprox1, xapprox2):
        self.x = x
        self.nn = xnn
        self.a1 = xapprox1
        self.a2 = xapprox2

    def __getattr__(self, name):
        if hasattr(self.x, name):
            return getattr(self.x, name)
        else:
            raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
    
    def __dir__(self):
        return dir(self.x)
    
    def tensors(self):
        for v in self.__dict__.values():
            yield v

    def detach(self):
        for t in self.tensors():
            t.detach()
        return self
    
    def round(self, *args):
        for t in self.tensors():
            t.round(*args)
        return self

    def __str__(self):
        return '\n'.join([str(t) for t in self.tensors()])

class BoundNet(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.a1 = ApproxNet()
        self.a2 = ApproxNet()
        self.nn = CentralNet()

    def forward(self, x):
        xa1 = self.a1(x)
        xa2 = self.a2(x)
        xnn = self.nn(x)

        # /!\ to change for backward propagation /!\
        # maximum ???
        x = mtf.bitwise_big_or(*[torch.round(t).to(bool) for t in (xa1, xa2, xnn)])

        x = BoundNetResults(x, xnn, xa1, xa2)

        return x

## Training functions

In [5]:
def train_model(x, y, model, loss_fn, optimizer, max_epoch):
    for _ in range(max_epoch):
        model.train()
        y_pred = model(x)
        
        loss = loss_fn(y_pred, y)

        model.zero_grad()
        loss.backward()
        optimizer.step()

    return model, y_pred

def train_boundnet(x, y, max_epoch=5000, learning_rate=1e-2, weight_decay=1e-6):
    model = BoundNet()
    loss_fn = nn.BCELoss()
    optimizer = Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    return train_model(x, y, model, loss_fn, optimizer, max_epoch=max_epoch)

## Network evaluation

In [6]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

def show_activation(act, output):
    # Mean activation per output
    act_ones  = torch.where(output==1, act, torch.zeros(act.size()))
    act_zeros = torch.where(output==0, act, torch.zeros(act.size()))

    mean_ones  = torch.mean(act_ones, dim=0)
    mean_zeros = torch.mean(act_zeros, dim=0)

    # Figure initialization
    fig, ax = plt.subplots(2, 1)
    tick_kw = {'left': False, 'bottom': False, 'labelleft': False}

    # Normalize cmap accross both images
    min_act = min(mean_ones.min().item(), mean_zeros.min().item())
    max_act = max(mean_ones.max().item(), mean_zeros.max().item())

    color_map = 'PRGn'

    ax[0].imshow(mean_zeros.unsqueeze(0), cmap=color_map, vmin=min_act, vmax=max_act)
    ax[0].tick_params(**tick_kw)
    ax[0].set_title("activation moyenne de la couche cachée avec 0 en sortie")

    ax[1].imshow(mean_ones.unsqueeze(0), cmap=color_map, vmin=min_act, vmax=max_act)
    ax[1].tick_params(**tick_kw)
    ax[1].set_title("activation moyenne de la couche cachée avec 1 en sortie")

    # Show text on cells
    for i, (v0, v1) in enumerate(zip(mean_zeros, mean_ones)):
        ax[0].text(i, 0, f"{v0.item():.2f}", ha="center", va="center")
        ax[1].text(i, 0, f"{v1.item():.2f}", ha="center", va="center")
    
    fig.tight_layout()
    plt.show()

def compute_activation(net, layer, data):
    net.eval()
    getattr(net, layer).register_forward_hook(get_activation('__net__'))
    output = net(data).detach()
    act = activation.pop('__net__').squeeze()
    show_activation(act, output)

In [7]:
skf = model_selection.StratifiedKFold(n_splits=10, shuffle=True, random_state=104)
bnet_split_res = cross_valid(x_data, y_data, train_boundnet, skf)

dict_metrics = {(modelname, metric, key): [] for modelname in ("boundnet", "simplenet", "hinet", "lonet") 
                for metric in ("f1score", "coverage0", "coverage1") for key in ("valid", "train")}

for i, (bt, tt, bv, tv) in enumerate(bnet_split_res):
    prompts = []
    sep_model = f"{'|':^9}"
    for k, b, t in [["valid", bv, tv], ["train", bt, tt]]:
        b_f1_score = metrics.f1_score(t, b, average="binary")
        s_f1_score = 0 # TODO
        prompts.append(f"{'BoundNet':<15}{b_f1_score:.3f}{sep_model}{'SimpleNet':<15}{s_f1_score:.3f}")
        dict_metrics[('boundnet', 'f1score', k)].append(b_f1_score)
        dict_metrics[('simplenet', 'f1score', k)].append(s_f1_score)

        a1_f1_score = metrics.f1_score(t, b.a1, average="binary")
        a2_f1_score = metrics.f1_score(t, b.a2, average="binary")
        prompts.append(f"{'Approx 1':<15}{a1_f1_score:.3f}{sep_model}{'Approx 2':<15}{a2_f1_score:.3f}")
        dict_metrics[('a1net', 'f1score', k)].append(a1_f1_score)
        dict_metrics[('a2net', 'f1score', k)].append(a2_f1_score)

        a1_cov_score = cov_score(t, b.a1)
        a2_cov_score = cov_score(t, b.a2)
        prompts.append(f"{'Approx 1':<15}{a1_cov_score[1]:.3f}{sep_model}{'Approx 2':<15}{a2_cov_score[1]:.3f}")
        prompts.append(f"{'Approx 1':<15}{a1_cov_score[0]:.3f}{sep_model}{'Approx 2':<15}{a2_cov_score[0]:.3f}")
        dict_metrics[('a1net', 'coverage1', k)].append(a1_cov_score[1])
        dict_metrics[('a2net', 'coverage1', k)].append(a2_cov_score[1])
        dict_metrics[('a1net', 'coverage0', k)].append(a1_cov_score[0])
        dict_metrics[('a2net', 'coverage0', k)].append(a2_cov_score[0])

    sep_tv = f"{'||':^10}"
    print(f"Fold {i+1:3} :            {'Valid':^49}{sep_tv}{'Train':^49}",
          f"\tF1 score      {prompts[0]}{sep_tv}{prompts[4]}",
          f"\t              {prompts[1]}{sep_tv}{prompts[5]}",
          f"\tCoverage      {prompts[2]}{sep_tv}{prompts[6]}",
          f"\t              {prompts[3]}{sep_tv}{prompts[7]}",
          sep='\n')

TypeError: binary_cross_entropy(): argument 'input' (position 1) must be Tensor, not BoundNetResults

In [None]:
df_metrics = pd.DataFrame.from_dict(dict_metrics, orient='index')
mean_metrics = df_metrics.mean(axis=1)
mean_prompts = [
    f"{'BoundNet':<15}{mean_metrics[('boundnet', 'f1score', 'valid')]:.3f}{sep_model}{'SimpleNet':<15}{mean_metrics[('simplenet', 'f1score', 'valid')]:.3f}",
    f"{'Approx 1':<15}{mean_metrics[('a1net', 'f1score', 'valid')]:.3f}{sep_model}{'Approx 2':<15}{mean_metrics[('a2net', 'f1score', 'valid')]:.3f}",
    f"{'Approx 1':<15}{mean_metrics[('a1net', 'coverage1', 'valid')]:.3f}{sep_model}{'Approx 2':<15}{mean_metrics[('a2net', 'coverage1', 'valid')]:.3f}",
    f"{'Approx 1':<15}{mean_metrics[('a1net', 'coverage0', 'valid')]:.3f}{sep_model}{'Approx 2':<15}{mean_metrics[('a2net', 'coverage0', 'valid')]:.3f}",
    f"{'BoundNet':<15}{mean_metrics[('boundnet', 'f1score', 'train')]:.3f}{sep_model}{'SimpleNet':<15}{mean_metrics[('simplenet', 'f1score', 'train')]:.3f}",
    f"{'Approx 1':<15}{mean_metrics[('a1net', 'f1score', 'train')]:.3f}{sep_model}{'Approx 2':<15}{mean_metrics[('a2net', 'f1score', 'train')]:.3f}",
    f"{'Approx 1':<15}{mean_metrics[('a1net', 'coverage1', 'train')]:.3f}{sep_model}{'Approx 2':<15}{mean_metrics[('a2net', 'coverage1', 'train')]:.3f}",
    f"{'Approx 1':<15}{mean_metrics[('a1net', 'coverage0', 'train')]:.3f}{sep_model}{'Approx 2':<15}{mean_metrics[('a2net', 'coverage0', 'train')]:.3f}",
]
print(f"Average  :            {'Valid':^49}{sep_tv}{'Train':^49}",
          f"\tF1 score      {mean_prompts[0]}{sep_tv}{mean_prompts[4]}",
          f"\t              {mean_prompts[1]}{sep_tv}{mean_prompts[5]}",
          f"\tCoverage      {mean_prompts[2]}{sep_tv}{mean_prompts[6]}",
          f"\t              {mean_prompts[3]}{sep_tv}{mean_prompts[7]}",
          sep='\n')