# Started from here!

In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

import cv2
import numpy as np
from PIL import Image
import os

import mediapipe as mp

import heapq
import itertools
from sklearn.decomposition import PCA

  warn(f"Failed to load image Python extension: {e}")
  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [2]:
test_dir = 'data/merged/test/'
train_dir = 'data/merged/train/'

classes = os.listdir(train_dir)
test_cls = os.listdir(train_dir)
num_classes = len(classes)

In [3]:
print(
    f'Number of classes: {num_classes}',
    f'\nClasses: {classes}'
)

Number of classes: 7 
Classes: ['fear', 'angry', 'sad', 'neutral', 'surprise', 'disgust', 'happy']


In [4]:
print("--------Train--------")
# check the number of images in each class
for cls in classes:
    print(f'{cls}: {len(os.listdir(train_dir + cls))}')

print("\n--------Test--------")
# check the number of images in each class
for cls in classes:
    print(f'{cls}: {len(os.listdir(test_dir + cls))}')

--------Train--------
fear: 3288
angry: 3196
sad: 3852
neutral: 3948
surprise: 3456
disgust: 5674
happy: 5786

--------Test--------
fear: 810
angry: 800
sad: 979
neutral: 1018
surprise: 873
disgust: 1390
happy: 1430


In [5]:
import os
import imghdr
import shutil

def delete_irrelevant_files(directory, classes):
    for cls in classes:
        class_dir = os.path.join(directory, cls)
        with os.scandir(class_dir) as entries:
            for entry in entries:
                # Delete directories
                if entry.is_dir():
                    print(f'Deleting directory: {entry.path}')
                    shutil.rmtree(entry.path)

                # Check if the file is empty and delete it
                elif os.path.getsize(entry.path) == 0:
                    print(f'Deleting empty file: {entry.path}')
                    os.remove(entry.path)

                # Check if the file is not an image and delete it
                elif not imghdr.what(entry.path):
                    print(f'Deleting non-image file: {entry.path}')
                    os.remove(entry.path)

print("--------Train--------")
delete_irrelevant_files(train_dir, classes)

print("\n--------Test--------")
delete_irrelevant_files(test_dir, classes)


--------Train--------

--------Test--------


# Landmark detection from images and correlation of those landmark positions

In [6]:
image_size = torch.Size([48, 48, 3])
batch_size = 32

# Compute the total size of the image input
image_input_size = np.prod(image_size) * batch_size * 4. / (1024 ** 2.)
print(f"Image input size: {image_input_size:.2f} MB")

Image input size: 0.84 MB


In [7]:
class LandmarkDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        self.labels = []  # new list to store the label index
        for i, label in enumerate(os.listdir(root_dir)):
            class_dir = os.path.join(root_dir, label)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)
                image = cv2.imread(img_path)
                self.data.append(image)
                self.labels.append(i)  # add the label index for this image

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]  # get the label index for this image

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image, label  # return the image and label index

In [8]:
# Train the model
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(48, scale=(0.8, 1.0), ratio=(0.75, 1.3333333333333333), interpolation=2),
    transforms.Resize((48, 48)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])


train_dataset = LandmarkDataset(train_dir, transform=transform)
val_dataset = LandmarkDataset(test_dir, transform=transform)



In [9]:
train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=24)
val_dataloader = DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=24)

In [10]:
# show shape of the train_dataloader
for i, data in enumerate(train_dataloader):
    print(i, data[0].shape, data[1].shape)
    break

# show shape of the val_dataloader
for i, data in enumerate(val_dataloader):
    print(i, data[0].shape, data[1].shape)
    break

# torch.Size([4, 3, 48, 48]) torch.Size([4])
# torch.Size([4, 3, 48, 48]) torch.Size([4])

0 torch.Size([256, 3, 48, 48]) torch.Size([256])
0 torch.Size([256, 3, 48, 48]) torch.Size([256])


In [11]:
class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.input_dim = input_dim
        self.num_heads = num_heads
        self.head_dim = input_dim // num_heads

        self.query = nn.Linear(input_dim, input_dim)
        self.key = nn.Linear(input_dim, input_dim)
        self.value = nn.Linear(input_dim, input_dim)

        self.fc = nn.Linear(input_dim, input_dim)

    def scaled_dot_product_attention(self, query, key, value):
        qk = torch.matmul(query, key.transpose(-2, -1))
        dk = query.size(-1)
        scaled_attention_logits = qk / torch.sqrt(torch.tensor(dk, dtype=torch.float32))

        attention_weights = F.softmax(scaled_attention_logits, dim=-1)
        output = torch.matmul(attention_weights, value)
        return output, attention_weights

    def split_heads(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, -1, self.num_heads, self.head_dim)
        return x.transpose(1, 2)

    def forward(self, x):
        batch_size = x.size(0)

        query = self.query(x)
        key = self.key(x)
        value = self.value(x)

        query = self.split_heads(query)
        key = self.split_heads(key)
        value = self.split_heads(value)

        out, attention_weights = self.scaled_dot_product_attention(query, key, value)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.input_dim)

        out = self.fc(out)
        
        return out

In [12]:
class FacialExpressionDetectionModel(nn.Module):
    def __init__(self, num_classes, num_heads):
        super(FacialExpressionDetectionModel, self).__init__()
        
        # Block-1
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 32, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.5)
        )

        # Block-2
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.5)
        )

        # Block-3
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25)
        )

        # Block-4
        self.conv4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256, kernel_size=3, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.125)
        )

        # Block-5
        self.fc1 = nn.Sequential(
            nn.Linear(64 * 6 * 6, 128),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(128),
            nn.Dropout(0.25)
        )

        # Attention Layer
        self.attention = MultiHeadAttention(128, num_heads)
        # Change the output size to match the input size of fc2
        self.attention_transform = nn.Linear(128, 64)

        # Block-6
        self.fc2 = nn.Sequential(
            nn.Linear(64, 128),  # Update the input size to 64
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(128),
            nn.Dropout(0.125),
        )

        # Block-7
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.attention(x)
        x = x.view(x.size(0), -1)  # Flatten the output of the attention layer
        x = self.attention_transform(x)
        #print(x.shape)  # Add this line to print the shape
        x = self.fc2(x)
        x = self.fc3(x)
        return F.softmax(x, dim=1)


In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [14]:
num_classes = 7
num_heads = 32
model = FacialExpressionDetectionModel(num_classes, num_heads).to(device)

In [15]:
with open('model_v2.txt', 'w') as f:
    f.write(str(model))

In [16]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    running_corrects = 0

    for data, targets in dataloader:
        data = data.to(device)
        targets = targets.to(device)

        # Use autocast for mixed precision
        with autocast():
            outputs = model(data)
            loss = criterion(outputs, targets)
            _, preds = torch.max(outputs, 1)

        # Scale the loss and perform backpropagation
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * data.size(0)
        running_corrects += torch.sum(preds == targets.data)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    return epoch_loss, epoch_acc


In [17]:
def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for data, targets in dataloader:
            data, targets = data.to(device), targets.to(device)

            # Use autocast for mixed precision
            with autocast():
                outputs = model(data)
                loss = criterion(outputs, targets)
                _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * data.size(0)
            running_corrects += torch.sum(preds == targets.data)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    return epoch_loss, epoch_acc

In [18]:
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast
from torch.optim.lr_scheduler import ReduceLROnPlateau

model = FacialExpressionDetectionModel(num_classes, num_heads).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
scaler = GradScaler()

In [19]:
print(device)

cuda


In [20]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [21]:
import matplotlib.pyplot as plt

# create empty lists to store loss and accuracy for each epoch
train_loss_list = []
train_acc_list = []
val_loss_list = []
val_acc_list = []

num_epochs = 100
for epoch in range(num_epochs):
    
    train_loss, train_acc = train_epoch(model, train_dataloader, criterion, optimizer, device)
    val_loss, val_acc = validate_epoch(model, val_dataloader, criterion, device)
    scheduler.step(val_loss)
    
    # append the loss and accuracy to the lists
    train_loss_list.append(train_loss)
    train_acc_list.append(train_acc)
    val_loss_list.append(val_loss)
    val_acc_list.append(val_acc)


    print(f"Epoch: {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

Epoch: 1/100, Train Loss: 1.7894, Train Acc: 0.3694, Val Loss: 1.7838, Val Acc: 0.3729
Epoch: 2/100, Train Loss: 1.7614, Train Acc: 0.3953, Val Loss: 1.7608, Val Acc: 0.3892
Epoch: 3/100, Train Loss: 1.7375, Train Acc: 0.4186, Val Loss: 1.7276, Val Acc: 0.4264
Epoch: 4/100, Train Loss: 1.7131, Train Acc: 0.4428, Val Loss: 1.7276, Val Acc: 0.4296
Epoch: 5/100, Train Loss: 1.6962, Train Acc: 0.4590, Val Loss: 1.6859, Val Acc: 0.4688
Epoch: 6/100, Train Loss: 1.6821, Train Acc: 0.4725, Val Loss: 1.6787, Val Acc: 0.4748
Epoch: 7/100, Train Loss: 1.6698, Train Acc: 0.4842, Val Loss: 1.6621, Val Acc: 0.4940
Epoch: 8/100, Train Loss: 1.6597, Train Acc: 0.4954, Val Loss: 1.6573, Val Acc: 0.4949
Epoch: 9/100, Train Loss: 1.6628, Train Acc: 0.4943, Val Loss: 1.6564, Val Acc: 0.4997
Epoch: 10/100, Train Loss: 1.6512, Train Acc: 0.5045, Val Loss: 1.6536, Val Acc: 0.5044
Epoch: 11/100, Train Loss: 1.6443, Train Acc: 0.5125, Val Loss: 1.6380, Val Acc: 0.5190
Epoch: 12/100, Train Loss: 1.6428, Train 

In [31]:
import torch
import torch.onnx

# Assuming your model is defined and named 'model'
# Replace 'model' with the actual name of your model instance

# Check the device of the model
device = next(model.parameters()).device

# Create a dummy input tensor of the same shape as your actual input
# Use a single input for the dummy input tensor, so the shape will be [1, 3, 48, 48]
dummy_input = torch.randn(1, 3, 48, 48).to(device)

# Export the model to an ONNX file
onnx_filename = "model.onnx"
torch.onnx.export(model, dummy_input, onnx_filename)

print(f"Model saved as {onnx_filename}")

verbose: False, log level: Level.ERROR

Model saved as model.onnx


  scaled_attention_logits = qk / torch.sqrt(torch.tensor(dk, dtype=torch.float32))
  scaled_attention_logits = qk / torch.sqrt(torch.tensor(dk, dtype=torch.float32))


In [None]:
model2 = FacialExpressionDetectionModel(num_classes, num_heads).to(device)
criterion2 = nn.CrossEntropyLoss()
optimizer2 = optim.RMSprop(model2.parameters(), lr=0.001, weight_decay=0.0001)
scheduler2 = optim.lr_scheduler.StepLR(optimizer2, step_size=10, gamma=0.1)
scaler2 = GradScaler()

In [None]:
import matplotlib.pyplot as plt

# create empty lists to store loss and accuracy for each epoch
train_loss_list2 = []
train_acc_list2 = []
val_loss_list2 = []
val_acc_list2 = []

num_epochs = 60
for epoch in range(num_epochs):
    
    train_loss, train_acc = train_epoch(model2, train_dataloader, criterion, optimizer2, device)
    val_loss, val_acc = validate_epoch(model2, val_dataloader, criterion, device)
    scheduler2.step(val_loss)
    
    # append the loss and accuracy to the lists
    train_loss_list2.append(train_loss)
    train_acc_list2.append(train_acc)
    val_loss_list2.append(val_loss)
    val_acc_list2.append(val_acc)


    print(f"Epoch: {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

In [None]:
model3 = FacialExpressionDetectionModel(num_classes, num_heads).to(device)
criterion3 = nn.CrossEntropyLoss()
scaler3 = GradScaler()
optimizer3 = optim.SGD(model3.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001)
scheduler3 = optim.lr_scheduler.CosineAnnealingLR(optimizer3, T_max=50, eta_min=0)

In [None]:
import matplotlib.pyplot as plt

# create empty lists to store loss and accuracy for each epoch
train_loss_list3 = []
train_acc_list3 = []
val_loss_list3 = []
val_acc_list3 = []

num_epochs = 80
for epoch in range(num_epochs):
    
    train_loss, train_acc = train_epoch(model3, train_dataloader, criterion3, optimizer3, device)
    val_loss, val_acc = validate_epoch(model3, val_dataloader, criterion3, device)
    scheduler3.step(val_loss)
    
    # append the loss and accuracy to the lists
    train_loss_list3.append(train_loss)
    train_acc_list3.append(train_acc)
    val_loss_list3.append(val_loss)
    val_acc_list3.append(val_acc)


    print(f"Epoch: {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

train_acc_np = [t.cpu().numpy() for t in train_acc_list]
val_acc_np = [t.cpu().numpy() for t in val_acc_list]

sns.set_style('darkgrid')

plt.figure(figsize=(10, 6))
sns.lineplot(data=train_acc_np, label='Train')
sns.lineplot(data=val_acc_np, label='Validation')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# convert lists to dataframes
train_acc_df = pd.DataFrame({'accuracy': train_acc_list})
val_acc_df = pd.DataFrame({'accuracy': val_acc_list})


In [None]:
# define a function to check pytorch version
def check_pytorch_version():
    if torch.__version__ >= '1.6.0':
        return True
    else:
        return False
    
# define a function to check cuda version
def check_cuda_version():
    if torch.cuda.is_available():
        return True
    else:
        return False
    
# define a function to check cudnn version
def check_cudnn_version():
    if check_cuda_version():
        if torch.backends.cudnn.enabled:
            return True
        else:
            return False
    else:
        return False
    
# define a function to check if the system is ready for training
def check_system():
    if check_pytorch_version():
        print('PyTorch version: {}'.format(torch.__version__))
    else:
        print('PyTorch version: {} (update required)'.format(torch.__version__))
        
    if check_cuda_version():
        print('CUDA version: {}'.format(torch.version.cuda))
    else:
        print('CUDA version: {} (install CUDA to enable GPU training)'.format(torch.version.cuda))
        
    if check_cudnn_version():
        print('cuDNN version: {}'.format(torch.backends.cudnn.version()))
    else:
        print('cuDNN version: {} (install cuDNN to enable GPU training)'.format(torch.backends.cudnn.version()))

check_system()