In [None]:
# **Importation des bibliothèques nécessaires**
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# **Fixer les graines aléatoires pour assurer la reproductibilité**
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# **Vérifier si un GPU est disponible et définir le dispositif**
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

# **Définition de la fonction pour augmenter les données**
def augmented_melspec(melspec):
    """
    Applique des transformations d'augmentation de données à un spectrogramme mél.
    """
    data_generator = ImageDataGenerator(
        width_shift_range=0.05,
        zoom_range=0.05,
        horizontal_flip=True
    )
    melspec_redimensionne = melspec.reshape((1,) + melspec.shape + (1,))
    augmented_melspec = next(data_generator.flow(melspec_redimensionne, batch_size=1))
    return augmented_melspec[0, :, :, 0]

# **Chargement des données**
hap = pd.read_csv("aligned_hap_database.csv")
ang = pd.read_csv("aligned_ang_database.csv")
neu = pd.read_csv("aligned_neu_database.csv")
sad = pd.read_csv("aligned_sad_database.csv")

print('Shape hap:', hap.shape)
print('Shape ang:', ang.shape)
print('Shape neu:', neu.shape)
print('Shape sad:', sad.shape)

# **Préparation des données**
X = np.concatenate((hap.to_numpy(), ang.to_numpy(), neu.to_numpy(), sad.to_numpy()))
X = X[:, 1:5201]
X = X.reshape((-1, 130, 40))
print('Shape X :', X.shape)

Y = np.concatenate((
    np.zeros(hap.shape[0]),
    np.ones(ang.shape[0]),
    np.full(neu.shape[0], 2),
    np.full(sad.shape[0], 3)
))
Y = Y.astype(int)
print('Shape Y:', Y.shape)

# **Définition des classes pour le modèle**
class LayerNorm(nn.Module):
    def __init__(self, d, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.alpha = nn.Parameter(torch.ones(d))
        self.beta = nn.Parameter(torch.zeros(d))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True) + self.eps
        return self.alpha * (x - mean) / std + self.beta

class FeedForward(nn.Module):
    def __init__(self, in_d, out_d, hidden, dropout=0.1, activation=F.relu):
        super(FeedForward, self).__init__()
        self.sigma = activation
        dim = [in_d] + hidden + [out_d]
        self.layers = nn.ModuleList([
            nn.Linear(dim[i], dim[i+1]) for i in range(len(dim)-1)
        ])
        self.ln = nn.ModuleList([
            LayerNorm(dim[i+1]) for i in range(len(dim)-2)
        ])
        self.dp = nn.ModuleList([
            nn.Dropout(dropout) for _ in range(len(dim)-2)
        ])

    def forward(self, x):
        for i in range(len(self.layers)-1):
            x = self.layers[i](x)
            x = self.ln[i](x)
            x = self.sigma(x)
            x = self.dp[i](x)
        x = self.layers[-1](x)
        return x

def _inner_product(f1, f2, h):
    prod = f1 * f2
    return torch.matmul((prod[:, :-1] + prod[:, 1:]) / 2, h.unsqueeze(-1))

def _l1(f, h):
    return _inner_product(torch.abs(f), torch.ones_like(f), h)

def _l2(f, h):
    return torch.sqrt(_inner_product(f, f, h))

class AdaFNN(nn.Module):
    def __init__(self, n_base, base_hidden, grid, dropout, lambda1, lambda2, device):
        super(AdaFNN, self).__init__()
        self.n_base = n_base
        self.lambda1 = lambda1
        self.lambda2 = lambda2
        self.device = device

        grid = np.array(grid)
        self.t = torch.tensor(grid, dtype=torch.float32).to(device)
        self.h = torch.tensor(grid[1:] - grid[:-1], dtype=torch.float32).to(device)

        self.BL = nn.ModuleList([
            FeedForward(1, 1, base_hidden, dropout, activation=F.selu)
            for _ in range(n_base * 40)
        ])

    def forward(self, x):
        B, J, m = x.size()
        T = self.t.unsqueeze(-1)

        self.bases = [basis(T).transpose(-1, -2) for basis in self.BL]
        l2_norm = _l2(torch.cat(self.bases, dim=0), self.h).detach()
        self.normalized_bases = [
            self.bases[i] / (l2_norm[i, 0] + 1e-6)
            for i in range(self.n_base * 40)
        ]

        scores = []
        for i in range(m):
            basis_scores = torch.cat([
                _inner_product(
                    b.repeat((B, 1)),
                    x[:, :, i],
                    self.h
                ) for b in self.bases[i * self.n_base:(i + 1) * self.n_base]
            ], dim=-1)
            scores.append(basis_scores)
        score = torch.cat(scores, dim=-1)
        return score

    def R1(self, l1_k):
        if self.lambda1 == 0:
            return torch.tensor(0.0, device=self.device)
        selected = np.random.choice(self.n_base * 40, min(l1_k, self.n_base * 40), replace=False)
        selected_bases = torch.cat([self.normalized_bases[i] for i in selected], dim=0)
        return self.lambda1 * torch.mean(_l1(selected_bases, self.h))

    def R2(self, l2_pairs):
        if self.lambda2 == 0 or self.n_base == 1:
            return torch.tensor(0.0, device=self.device)
        k = min(l2_pairs, self.n_base * 40 * (self.n_base * 40 - 1) // 2)
        f1, f2 = [], []
        for _ in range(k):
            a, b = np.random.choice(self.n_base * 40, 2, replace=False)
            f1.append(self.normalized_bases[a])
            f2.append(self.normalized_bases[b])
        f1 = torch.cat(f1, dim=0)
        f2 = torch.cat(f2, dim=0)
        return self.lambda2 * torch.mean(torch.abs(_inner_product(f1, f2, self.h)))

class MultiAda(nn.Module):
    def __init__(self, n_base, base_hidden, grid, dropout, lambda1, lambda2, device):
        super(MultiAda, self).__init__()
        self.n_base = n_base
        self.device = device
        self.sigma = F.relu
        self.ln = LayerNorm(40)

        self.Ada = AdaFNN(n_base, base_hidden, grid, dropout, lambda1, lambda2, device)
        self.lstm = nn.LSTM(40, 40, batch_first=True)
        encoder_layers = nn.TransformerEncoderLayer(d_model=40, nhead=4, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=2)
        self.FF = FeedForward(self.n_base * 40, 4, [self.n_base * 40, self.n_base * 40, 128])

    def forward(self, x):
        x = self.transformer_encoder(x)
        x, _ = self.lstm(x)
        x = self.ln(x)
        x = self.sigma(x)
        score = self.Ada(x)
        out = self.FF(score)
        return out

    def L1(self, l1_k):
        return self.Ada.R1(l1_k)

    def L2(self, l2_pairs):
        return self.Ada.R2(l2_pairs)

# **Préparation du DataLoader personnalisé**
class DataLoaderCustom:
    def __init__(self, batch_size, n_augmented, X, Y, T, split=(8, 1, 1), random_seed=42):
        self.n, J, m = X.shape
        self.t = T[0]
        self.batch_size = batch_size

        # Diviser les données en ensembles d'entraînement, de validation et de test
        train_n = self.n * split[0] // sum(split)
        valid_n = self.n * split[1] // sum(split)
        test_n = self.n - train_n - valid_n

        # Mélanger les données
        np.random.seed(random_seed)
        indices = np.random.permutation(self.n)
        X = X[indices]
        Y = Y[indices]

        # Split des données
        self.train_X = X[:train_n]
        self.train_Y = Y[:train_n]
        self.valid_X = X[train_n:train_n + valid_n]
        self.valid_Y = Y[train_n:train_n + valid_n]
        self.test_X = X[train_n + valid_n:]
        self.test_Y = Y[train_n + valid_n:]

        # Augmentation des données d'entraînement
        aug_X, aug_Y = [], []
        for i in range(len(self.train_X)):
            for _ in range(n_augmented):
                augmented = augmented_melspec(self.train_X[i])
                aug_X.append(augmented)
                aug_Y.append(self.train_Y[i])
        self.train_X = np.concatenate((self.train_X, np.array(aug_X)))
        self.train_Y = np.concatenate((self.train_Y, np.array(aug_Y)))

        # Mise à l'échelle des données
        self.scaler = StandardScaler()
        self.train_X = self.scaler.fit_transform(self.train_X.reshape(-1, m)).reshape(-1, J, m)
        self.valid_X = self.scaler.transform(self.valid_X.reshape(-1, m)).reshape(-1, J, m)
        self.test_X = self.scaler.transform(self.test_X.reshape(-1, m)).reshape(-1, J, m)

        # Calcul du nombre de lots
        self.train_batches = len(self.train_X) // batch_size
        self.valid_batches = len(self.valid_X) // batch_size
        self.test_batches = len(self.test_X) // batch_size

    def shuffle(self):
        # Mélanger les données d'entraînement
        indices = np.random.permutation(len(self.train_X))
        self.train_X = self.train_X[indices]
        self.train_Y = self.train_Y[indices]

    def get_train_batch(self):
        for i in range(self.train_batches):
            x = self.train_X[i * self.batch_size:(i + 1) * self.batch_size]
            y = self.train_Y[i * self.batch_size:(i + 1) * self.batch_size]
            yield torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

    def get_valid_batch(self):
        for i in range(self.valid_batches):
            x = self.valid_X[i * self.batch_size:(i + 1) * self.batch_size]
            y = self.valid_Y[i * self.batch_size:(i + 1) * self.batch_size]
            yield torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

    def get_test_batch(self):
        for i in range(self.test_batches):
            x = self.test_X[i * self.batch_size:(i + 1) * self.batch_size]
            y = self.test_Y[i * self.batch_size:(i + 1) * self.batch_size]
            yield torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

# **Configuration des hyperparamètres**
n_augmented = 3
batch_size = 128
split = (80, 10, 10)
T = [np.linspace(0, 1, 130)]
grid = T[0].tolist()

# **Initialisation du DataLoader personnalisé**
dataLoader = DataLoaderCustom(batch_size, n_augmented, X, Y, T, split)

# **Définition du modèle**
base_hidden = [256, 256, 256, 256]
n_base = 6
lambda1, l1_k = 0.0, 2
lambda2, l2_pairs = 0.5, 3
dropout = 0.1
model = MultiAda(
    n_base=n_base,
    base_hidden=base_hidden,
    grid=grid,
    dropout=dropout,
    lambda1=lambda1,
    lambda2=lambda2,
    device=device
)
model.to(device)

# **Configuration de l'entraînement**
epochs = 100
optimizer = Adam(model.parameters(), lr=1e-3)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)
criterion = nn.CrossEntropyLoss()

# **Boucle d'entraînement**
train_losses = []
valid_losses = []
best_valid_loss = float('inf')
early_stopping_counter = 0
early_stopping_patience = 10

for epoch in range(1, epochs + 1):
    model.train()
    dataLoader.shuffle()
    batch_losses = []

    for x_batch, y_batch in dataLoader.get_train_batch():
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        loss += model.L1(l1_k) + model.L2(l2_pairs)
        loss.backward()
        optimizer.step()
        batch_losses.append(loss.item())

    train_loss = np.mean(batch_losses)
    train_losses.append(train_loss)

    # Validation
    model.eval()
    val_losses = []
    with torch.no_grad():
        for x_val, y_val in dataLoader.get_valid_batch():
            x_val, y_val = x_val.to(device), y_val.to(device)
            outputs = model(x_val)
            loss = criterion(outputs, y_val)
            val_losses.append(loss.item())

    valid_loss = np.mean(val_losses)
    valid_losses.append(valid_loss)
    scheduler.step(valid_loss)

    print(f"Epoch {epoch}/{epochs}, Training Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}")

    # Early Stopping
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pth')
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered!")
            break

# **Chargement du meilleur modèle**
model.load_state_dict(torch.load('best_model.pth'))

# **Évaluation du modèle sur le jeu de test**
model.eval()
test_preds = []
test_labels = []
with torch.no_grad():
    for x_test, y_test in dataLoader.get_test_batch():
        x_test, y_test = x_test.to(device), y_test.to(device)
        outputs = model(x_test)
        _, preds = torch.max(outputs, 1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(y_test.cpu().numpy())

# **Calcul des métriques**
print("Classification Report:")
target_names = ['Happy', 'Angry', 'Neutral', 'Sad']
print(classification_report(test_labels, test_preds, target_names=target_names))

# **Affichage de la matrice de confusion**
cm = confusion_matrix(test_labels, test_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=target_names, yticklabels=target_names, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# **Affichage des courbes de perte**
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(valid_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

# **Sauvegarde du modèle entraîné**
torch.save(model, 'emotion_recognition_model.pth')

# **Sauvegarde des courbes de perte**
plt.figure()
plt.plot(train_losses, label='Training Loss')
plt.plot(valid_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.savefig('loss_curves.png')

print("Training complete. Model saved as 'emotion_recognition_model.pth'.")