In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sklearn as sk
import torch
import torch.nn as nn
import torch.optim as optim
import os

from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models import resnet18,googlenet, densenet121,mobilenet_v3_small,mobilenet_v3_large
from sklearn.model_selection import train_test_split
from PIL import Image

def multiple_dir_files_to_df(dirs, label):
    df = pd.DataFrame()
    for idx,dir in enumerate(dirs):
        files = os.listdir(dir)
        files = [dir + '/' + file for file in files]
        df = pd.concat([df,pd.DataFrame({'file': files, 'label': label[idx]})])
    return df

common_path="Image/"

labels=["neutral","happy","sad","fear","angry","surprise","disgust"]

database=[common_path+label for label in labels]

def training_test_model(train_loader,test_loader,model,model_name,epochs=50, learning_rate=0.001,weight_decay=1e-4,gamma=0.9,early_stopping_patience=5,early_stopping_save=True):
    # Initialiser le modèle et le journal
    model = model
    patience = early_stopping_patience
    best_val_loss = float('inf')
    counter = 0
    accuracy_test_list=[]
    accuracy_train_list=[]
    loss_test_list=[]
    loss_train_list=[]
    
    # Entraîner le modèle
    num_epochs = epochs
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Définir l'optimiseur et la fonction de perte
    optimizer = optim.Adam(model.parameters(), lr=learning_rate,weight_decay=weight_decay)
    criterion = nn.CrossEntropyLoss()
    scheduler = optim.lr_scheduler.ExponentialLR(optimizer,gamma=gamma)
    # Entraîner le modèle
    for epoch in range(num_epochs):
        accuracy_train=[]
        loss_train=[]
        accuracy_test=[]
        loss_test=[]
        current_lr = [param_group['lr'] for param_group in optimizer.param_groups][0]

        model.train()
        for batch_idx, (data, targets) in enumerate(train_loader):
            # Get data to cuda if possible
            data = data.to(device=device)
            targets = targets.to(device=device)

            # forward
            scores = model(data)
            loss = criterion(scores, targets.long())

            # backward
            optimizer.zero_grad()
            loss.backward()

            optimizer.step()

            likelihood, predictions = scores.max(1)
            loss_train.append(loss.item())
            accuracy_train.append((predictions == targets).float().mean().item())

            # gradient descent or adam step
        scheduler.step()

        model.eval()
        for batch_idx, (data, targets) in enumerate(test_loader):
            # Get data to cuda if possible
            data = data.to(device=device)
            targets = targets.to(device=device)

            # forward
            scores = model(data)
            loss = criterion(scores, targets.long())
            
            likelihood, predictions = scores.max(1)
            loss_test.append(loss.item())
            accuracy_test.append((predictions == targets).float().mean().item())

        accuracy_train=np.array(accuracy_train).mean()
        loss_train=np.array(loss_train).mean()
        accuracy_test=np.array(accuracy_test).mean()
        loss_test=np.array(loss_test).mean()
        accuracy_test_list.append(accuracy_test)
        accuracy_train_list.append(accuracy_train)
        loss_test_list.append(loss_test)
        loss_train_list.append(loss_train)
        
        if early_stopping_save:
            if loss_test < best_val_loss:
                best_val_loss = loss_test
                counter = 0
            else:
                counter += 1
            if counter > patience:
                print("Early stopping at epoch: ",epoch)
                print("Saving model...")
                torch.save(model.state_dict(), f"./models_tg/{model_name+'_'+str(epoch)}.pth")
                return accuracy_test_list,accuracy_train_list,loss_test_list,loss_train_list
        print(f"Train epoch: {epoch}, accuracy = {accuracy_train} ,loss = {loss_train}, lr = {np.round(current_lr,6)}")
        print(f"Test epoch: {epoch}, accuracy = {accuracy_test} ,loss = {loss_test}")
    torch.save(model.state_dict(), f"./models/{model_name}.pth")
    return accuracy_test_list,accuracy_train_list,loss_test_list,loss_train_list

def plot_train_loss(accuracy_test_list,accuracy_train_list,loss_test_list,loss_train_list,model_name):
    plt.figure(figsize=(10,5))
    plt.plot(accuracy_test_list,label="accuracy_test")
    plt.plot(accuracy_train_list,label="accuracy_train")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.title("Accuracy for "+model_name+" model")
    plt.legend()
    plt.figure(figsize=(10,5))
    plt.plot(loss_test_list,label="loss_test")
    plt.plot(loss_train_list,label="loss_train")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Loss for "+model_name+" model")
    plt.legend()


df = multiple_dir_files_to_df(database,np.linspace(0,6,7))
X,y=df["file"].tolist(),df["label"].tolist()

#Split the dataset into train and test
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2)

print("Train head: ",X_train[:5],"\n",y_train[:5])
print("Test head: ",X_test[:5],"\n",y_test[:5])

# Définir un Dataset personnalisé
class CustomDataset(Dataset):
    def __init__(self, file_names, labels, transform=None):
        self.file_names = file_names
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        img_path = f'{self.file_names[idx]}' 
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return image, label
    
# Define the image transformations
transform = transforms.Compose([
    # transforms.CenterCrop((500, 500)),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Créer les jeux de données et les chargeurs de données
train_dataset = CustomDataset(X_train, y_train, transform)
test_dataset = CustomDataset(X_test, y_test, transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

class MobileNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.mobilenet = mobilenet_v3_small(weights='IMAGENET1K_V1')
        self.mobilenet.classifier[3] = nn.Linear(1024, 7)
    def forward(self, x):
        return self.mobilenet(x)

Train head:  ['Image/neutral/01_038.jpg', 'Image/angry/05_077.jpg', 'Image/happy/02_167.jpg', 'Image/happy/02_054.jpg', 'Image/angry/05_036.jpg'] 
 [0.0, 4.0, 1.0, 1.0, 4.0]
Test head:  ['Image/happy/02_195.jpg', 'Image/happy/02_067.jpg', 'Image/disgust/07_184.jpg', 'Image/neutral/01_017.jpg', 'Image/fear/04_204.jpg'] 
 [1.0, 1.0, 6.0, 0.0, 3.0]


In [4]:
#check if gpu is used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [None]:
model=MobileNet()
accuracy_test_list,accuracy_train_list,loss_test_list,loss_train_list=training_test_model(train_loader,test_loader,model,"MobileNet",epochs=50, learning_rate=0.001,weight_decay=1e-4,gamma=0.9,early_stopping_patience=5,early_stopping_save=True)