In [2]:
import os
from PIL import Image
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import tqdm.notebook as tq
import time

# Import Dataset

There are 6 folders containing pictures divided into the 6 movements

Labels will be one-hot:
1)
2)
3)
4)
5)
6)

In [3]:
main_folder_path = "./Data/faces_6class"

class_folders = sorted(os.listdir(main_folder_path))
num_classes = len(class_folders)

labels = {}  # Dictionary to map class names to one-hot labels
for i, class_folder in enumerate(class_folders):
    labels[class_folder] = i

images = []
one_hot_labels = []

for class_folder in class_folders:
    class_path = os.path.join(main_folder_path, class_folder)
    class_label = labels[class_folder]
    
    for image_file in os.listdir(class_path):
        #print(image_file)
        image_path = os.path.join(class_path, image_file)
        img = Image.open(image_path)
        
        # Process the image as needed (e.g., resize, normalize)
        img = img.resize((400, 400))
        
        #print(img.size)
                
        # Append the image to the list of images
        images.append(np.array(img))
        
        # Create a one-hot label vector
        one_hot = np.zeros(num_classes)
        one_hot[class_label] = 1
        one_hot_labels.append(one_hot)

# Convert images and labels to numpy arrays
images = np.array(images)
one_hot_labels = np.array(one_hot_labels)

# Now, you have your images in the 'images' array and corresponding one-hot label vectors in 'one_hot_labels'.
# You can use these data for machine learning tasks.

# Shuffle the dataset
Since the images are divided by class, we shuffle the dataset to have a sparser dataset

In [4]:
# Get the number of samples
num_samples = len(images)

# Create a permutation index
permutation = np.random.permutation(num_samples)

# Shuffle both 'images' and 'labels' using the same permutation
shuffled_images = images[permutation]
shuffled_labels = one_hot_labels[permutation]

# Dataset Class

In [5]:
# Define the custom class for accessing our dataset
class DatasetRGB(Dataset):
    def __init__(self, images, labels):
        self.images = images
        self.labels=labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # returns a training sample and its label
        element = self.images[idx]
        label = torch.tensor(self.labels[idx])
        element = torch.tensor(element) 
                
        return element, label

# Train and Test functions

In [6]:
def test(model, validation_dataset, y_validation, RGB = False, is_ensamble = False, normalized_cm = False):
    # Stop parameters learning
    model.eval()

    validation_loader = torch.utils.data.DataLoader(validation_dataset)

    criterion = nn.CrossEntropyLoss()
    correct = 0
    total = 0
    total_loss = 0
    confusion_matrix = np.zeros((8, 8), dtype=int)
    i=0
    
    with torch.no_grad():
        for inputs, label in validation_loader:
            if(i%100==0):
                print(i,"/",(len(validation_dataset)))
            
            
            #predict label
            output = model(inputs.permute(0, 3, 1, 2))
           
            
            # Compute loss
            loss = criterion(output, label)
            total_loss += loss.item()

            max_index = torch.argmax(output).item()  # The index with maximum probability

            
            correct += (max_index == torch.argmax(label).item())
            i+=1

    
    accuracy = 100 * correct / len(y_validation)
    average_loss = total_loss / len(y_validation)

    model.train()
    return accuracy, average_loss, confusion_matrix

In [7]:
def train(model, dataset, validation_dataset, batch_size, num_epochs, learning_rate, verbose = False,
          reg=1e-5, RGB = False, is_ensamble = False, normalized_cm=False):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    val_loss_list=[]
    val_acc_list=[]
    train_loss_list=[]
    train_acc_list=[]
    counted_labels=[0,0,0,0,0,0,0,0]
    
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=reg)
    criterion = nn.CrossEntropyLoss()

    if not isinstance(dataset, Dataset):
        raise ValueError("The dataset parameter should be an instance of torch.utils.data.Dataset.")

    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    num_batches = len(data_loader)
    
    
    for epoch in range(num_epochs):
        running_loss = 0.0 
        running_accuracy = 0.0
        #initialize correctly predicted samples
        
        # Initialize the progress bar
        progress_bar = tq.tqdm(total=num_batches, unit="batch")
    
        # Initialize the progress bar description
        progress_bar.set_description(f"Epoch {epoch+1}/{num_epochs}")
        start_time = time.time()
        
        for batch_idx, batch in enumerate(data_loader):
            
            correct = 0 # reset train accuracy each batch
            
            inputs,labels = batch[0],batch[1]
            if(verbose == True):
                print("\ninputs shape:",inputs.size(),", dtype:",inputs.dtype," content: ",inputs)
                print("min value:",torch.min(inputs))
                print("max value:",torch.max(inputs))
                print("\nlabels shape:",labels.size(),",dtype:",labels.dtype,", content: ",labels)
          
            
            # Extract the inputs and targets
            optimizer.zero_grad()
            inputs = inputs.permute(0, 3, 1, 2)
            outputs = model(inputs)
                        
            if(verbose == True):
                print("\noutputs size:",outputs.size(),"content:",outputs)
                print("List of labels until now:",counted_labels)

            loss = criterion(outputs, labels) #labels need to be a vector of class indexes (0-7) of dim (batch_size)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            #calculate train accuracy
            for index, output in enumerate(outputs):
                max_index = torch.argmax(output).item() #the index with maximum probability
                if(torch.argmax(labels[index]).item() == max_index):
                    correct += 1
            
                if(verbose==True):
                    print("considering output at index {}:".format(index,output))
                    print("max output index = {}",max_index)
                    if(labels[index].item() == max_index):
                        print("correct! in fact labels[index] = {}, max_index = {}".format(labels[index].item(),max_index))
                    else:
                        print("NOT correct! in fact labels[index] = {}, max_index = {}".format(labels[index].item(),max_index))

            
            accuracy = 100 * correct / batch_size
            running_accuracy += accuracy #epoch running_accuracy
            
            # Update the progress bar description and calculate bps
            #progress_bar.set_postfix({"Loss": running_loss / (batch_idx + 1)})
            average_accuracy = running_accuracy / (batch_idx + 1)
            average_loss = running_loss / (batch_idx + 1)
            progress_bar.set_postfix({"avg_loss": average_loss, "acc": accuracy, "avg_acc": average_accuracy})

            # Update the progress bar
            progress_bar.update(1)
            # Evaluate the model on the validation dataset
        
        #calculate train loss and accuracy
        average_loss = running_loss / len(data_loader)
        average_accuracy = running_accuracy / len(data_loader)
        train_loss_list.append(average_loss)
        train_acc_list.append(average_accuracy)
        
        #calculate validation loss and accuracy
        val_acc, val_loss,_ = test(model, validation_dataset, y_validation, RGB = RGB, is_ensamble = is_ensamble, normalized_cm=False)
        val_loss_list.append(val_loss)
        val_acc_list.append(val_acc)
        
        
        print(f"Epoch [{epoch+1}/{num_epochs}],Train Loss: {average_loss:.4f}. Train Accuracy: {average_accuracy} Val Loss: {val_loss} Val Accuracy: {val_acc}")
        progress_bar.close()
    return train_loss_list, train_acc_list, val_loss_list, val_acc_list

# Split into train, validation and test set

In [8]:
# Assuming you have 'shuffled_images' and 'shuffled_labels'

# Define the proportions for the splits (e.g., 60% train, 20% validation, 20% test)
train_percent = 0.6
validation_percent = 0.2

num_samples = len(shuffled_images)

train_size = int(train_percent * num_samples)
validation_size = int(validation_percent * num_samples)

x_train, x_validation, x_test = (
    shuffled_images[:train_size],
    shuffled_images[train_size:train_size + validation_size],
    shuffled_images[train_size + validation_size:]
)

y_train, y_validation, y_test = (
    shuffled_labels[:train_size],
    shuffled_labels[train_size:train_size + validation_size],
    shuffled_labels[train_size + validation_size:]
)

# Neural Network Architecture

In [27]:

class Raw_Images_NN(nn.Module):
    def __init__(self, num_classes):
        super(Raw_Images_NN, self).__init__()        
        # First convolutional layer with batch normalization
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3)
        self.bn1 = nn.BatchNorm2d(32)  # Batch normalization for the first convolutional layer
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size=8)
        # First convolutional layer with batch normalization
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
        self.bn2 = nn.BatchNorm2d(64)  # Batch normalization for the first convolutional layer
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size=4)
        
        
        # Fully connected layers with batch normalization and dropout
        self.fc1 = nn.Linear(7744, 128)  # Adjust the input size accordingly
        self.bn3 = nn.BatchNorm1d(128)  # Batch normalization for the first linear layer
        self.relu2 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.4)  # Dropout after the first linear layer
        
        self.fc3 = nn.Linear(128, 64)  # Add another linear layer
        self.bn4 = nn.BatchNorm1d(64)  # Batch normalization for the second linear layer
        self.relu3 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.3)  # Dropout after the second linear layer
        
        self.fc4 = nn.Linear(64, 32)  # Add one more linear layer
        self.bn5 = nn.BatchNorm1d(32)  # Batch normalization for the third linear layer
        self.relu4 = nn.ReLU()
        self.dropout3 = nn.Dropout(0.2)  # Dropout after the third linear layer
        
        self.fc5 = nn.Linear(32, num_classes)  # Output layer
        
    def forward(self, x):
        x = self.conv1(x.float())
        x = self.bn1(x)  # Apply batch normalization
        x = self.relu1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.bn2(x)  # Apply batch normalization
        x = self.relu2(x)
        x = self.maxpool2(x)
        
        x = x.view(x.size(0), -1)  # Flatten
        
        x = self.fc1(x)
        x = self.bn3(x)  # Apply batch normalization
        x = self.relu2(x)
        x = self.dropout1(x)  # Apply dropout
        
        x = self.fc3(x)
        x = self.bn4(x)  # Apply batch normalization
        x = self.relu3(x)
        x = self.dropout2(x)  # Apply dropout
        
        x = self.fc4(x)
        x = self.bn5(x)  # Apply batch normalization
        x = self.relu4(x)
        x = self.dropout3(x)  # Apply dropout
        
        x = self.fc5(x)  # Output layer
        
        return x

In [28]:
train_set=DatasetRGB(x_train,y_train)
validation_set=DatasetRGB(x_validation,y_validation)
test_set=DatasetRGB(x_test,y_test)

In [29]:
model=Raw_Images_NN(6)
train_loss_list, train_acc_list, val_loss_list, val_acc_list =train(model, train_set,validation_set, batch_size=16, num_epochs=50, learning_rate=0.001, reg=1e-5)


  0%|          | 0/35 [00:00<?, ?batch/s]

KeyboardInterrupt: 