<a href="https://colab.research.google.com/github/peeyushsinghal/DA-CV/blob/main/AffectNet_Temp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Import Statements
import os # for creating directories
import tarfile # to untar datafiles
import numpy as np # annotation files are stored as np
from PIL import Image
 

In [2]:
#@title Data Loading

#Mounting google drive
# from google.colab import drive
# drive.mount('/content/drive', force_remount=True)

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
#@title Data Configuration

BASE_PATH = '/content/drive/MyDrive/AffectNet'


TRAIN_DIR = os.path.join(BASE_PATH,'train_set','train_set')
VAL_DIR = os.path.join(BASE_PATH,'val_set','val_set')
ANNOTATION_DIR ='annotations'
IMAGE_DIR = 'images'

print(TRAIN_DIR, VAL_DIR)

MODEL_DIR = os.path.join(BASE_PATH,'models')
if not os.path.exists(MODEL_DIR):
  os.makedirs(MODEL_DIR)
  print("The new directory is created!")

/content/drive/MyDrive/AffectNet/train_set/train_set /content/drive/MyDrive/AffectNet/val_set/val_set


In [4]:
# import random
# t = random.randint(0,10)
# print (t)
# expression_path = os.path.join(VAL_DIR,ANNOTATION_DIR,str(t)+'_exp.npy')
# arousal_path = os.path.join(VAL_DIR,ANNOTATION_DIR,str(t)+'_aro.npy')
# valence_path = os.path.join(VAL_DIR,ANNOTATION_DIR,str(t)+'_val.npy')
# location_path = os.path.join(VAL_DIR,ANNOTATION_DIR,str(t)+'_lnd.npy')
# print(valence_path)
# try:
#   data = np.load(location_path)
#   # data = data.astype(int)
# except:
#   print(f'Not able to access image at {location_path.split("/")[-1]}')
#   pass
# type(data)

In [5]:
import os
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms

class AffectNetDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, mean_std = False):
        self.image_dir = image_dir
        self.annotation_dir = annotation_dir
        self.transform = transform
        self.image_list = os.listdir(self.image_dir)
        self.mean_std = mean_std

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, index):
        image_name = self.image_list[index]
        image_path = os.path.join(self.image_dir, image_name)
        expression_path = os.path.join(self.annotation_dir, image_name.replace('.jpg', '_exp.npy'))
        arousal_path = os.path.join(self.annotation_dir, image_name.replace('.jpg', '_aro.npy'))
        valence_path = os.path.join(self.annotation_dir, image_name.replace('.jpg', '_val.npy'))
        landmarks_path = os.path.join(self.annotation_dir, image_name.replace('.jpg', '_lnd.npy'))

        
        image = Image.open(image_path).convert('RGB')
        expression = int(self._read_annotation_file(expression_path).item())
        arousal = float(self._read_annotation_file(arousal_path).item())
        valence = float(self._read_annotation_file(valence_path).item()) 
        landmarks = self._read_annotation_file(landmarks_path) # 68x2 = 136

        if self.mean_std:
          self.transform = transforms.ToTensor()
        if self.transform is not None:
          image = self.transform(image)

        return image, expression,arousal, valence, landmarks

    def _read_annotation_file(self, annotation_path):
        try:
          data = np.load(annotation_path)
        except:
          pass
        return data


In [6]:
# TODO
# Mean, Std
# Transforms - Crop, Rotation
# 

In [7]:
# Create the dataset
image_dir = os.path.join(VAL_DIR,IMAGE_DIR)
label_dir = os.path.join(VAL_DIR,ANNOTATION_DIR)
print(image_dir, label_dir)
# transform = transforms.ToTensor()

transform = transforms.Compose([
    transforms.Resize((224)),
    transforms.ToTensor(),
    # Add more transformations as needed
])



dataset = AffectNetDataset(image_dir, label_dir, transform=transform)

/content/drive/MyDrive/AffectNet/val_set/val_set/images /content/drive/MyDrive/AffectNet/val_set/val_set/annotations


In [8]:
# # Calculate the mean and standard deviation - Ignoring it for now
# dataset_mean_std_dev = AffectNetDataset(image_dir, label_dir, mean_std= True)
# image, expression,arousal, valence, landmarks= next(iter(dataset_mean_std_dev))
# # print(image)
# # print(expression,arousal, valence, "\n",landmarks)

# # Calculate the mean and standard deviation
# mean = torch.zeros(3)
# std = torch.zeros(3)
# for image, expression,arousal, valence, landmarks in dataset_mean_std_dev:
#     mean += torch.mean(image, dim=(1, 2))
#     std += torch.std(image, dim=(1, 2))

# mean /= len(dataset_mean_std_dev)
# std /= len(dataset_mean_std_dev)

# print("Mean:", mean) #
# print("Standard Deviation:", std) #

In [9]:
# # Example image and annotation
# image, expression,arousal, valence, landmarks= next(iter(dataset))
# # image.show()
# print(image, "\n", expression,arousal, valence, "\n",landmarks)

In [10]:
import matplotlib.pyplot as plt
def imshow(img):
  '''
  function to show an image
  '''
  # img = img / 2 + 0.5     # unnormalize, if mean and std deviation used
  npimg = img.numpy()
  plt.imshow(np.transpose(npimg, (1, 2, 0)))

In [11]:
#@title Settings and Hyperparams
import torch

RANDOM_SEED = 42
BATCH_SIZE = 128
NUM_EPOCHS = 2
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
SHUFFLE = True



In [12]:
from torch.utils.data import DataLoader
import torchvision

# Create the data loader
val_loader = DataLoader(dataset, 
                        batch_size=BATCH_SIZE, 
                        shuffle=SHUFFLE)

# checking for sample images
# images, expressions, arousals, valences, landmarks = next(iter(val_loader))
# imshow(torchvision.utils.make_grid(images))
# print(expressions,arousals, valences, "\n",landmarks)



In [13]:
#@title Model - Resnet 34

import torch
import torchvision.models as models

# Load the pre-trained ResNet-34 model
model = models.resnet34(pretrained = True) # temporary to check
# model = models.resnet34(pretrained = False)

# Modify the first layer to accept 3-channel input
num_channels = 3
model.conv1 = torch.nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
# Modify last  layer to provide 8 outputs
model.fc = torch.nn.Linear(in_features=512, out_features=8, bias=True)


Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 230MB/s]


In [14]:
model

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [15]:
! pip install torch-summary
from torchsummary import summary
summary(model, (3, 224, 224))

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torch-summary
  Downloading torch_summary-1.4.5-py3-none-any.whl (16 kB)
Installing collected packages: torch-summary
Successfully installed torch-summary-1.4.5
Layer (type:depth-idx)                   Output Shape              Param #
├─Conv2d: 1-1                            [-1, 64, 112, 112]        9,408
├─BatchNorm2d: 1-2                       [-1, 64, 112, 112]        128
├─ReLU: 1-3                              [-1, 64, 112, 112]        --
├─MaxPool2d: 1-4                         [-1, 64, 56, 56]          --
├─Sequential: 1-5                        [-1, 64, 56, 56]          --
|    └─BasicBlock: 2-1                   [-1, 64, 56, 56]          --
|    |    └─Conv2d: 3-1                  [-1, 64, 56, 56]          36,864
|    |    └─BatchNorm2d: 3-2             [-1, 64, 56, 56]          128
|    |    └─ReLU: 3-3                    [-1, 64, 56, 56]          --
|    |    └─Con

Layer (type:depth-idx)                   Output Shape              Param #
├─Conv2d: 1-1                            [-1, 64, 112, 112]        9,408
├─BatchNorm2d: 1-2                       [-1, 64, 112, 112]        128
├─ReLU: 1-3                              [-1, 64, 112, 112]        --
├─MaxPool2d: 1-4                         [-1, 64, 56, 56]          --
├─Sequential: 1-5                        [-1, 64, 56, 56]          --
|    └─BasicBlock: 2-1                   [-1, 64, 56, 56]          --
|    |    └─Conv2d: 3-1                  [-1, 64, 56, 56]          36,864
|    |    └─BatchNorm2d: 3-2             [-1, 64, 56, 56]          128
|    |    └─ReLU: 3-3                    [-1, 64, 56, 56]          --
|    |    └─Conv2d: 3-4                  [-1, 64, 56, 56]          36,864
|    |    └─BatchNorm2d: 3-5             [-1, 64, 56, 56]          128
|    |    └─ReLU: 3-6                    [-1, 64, 56, 56]          --
|    └─BasicBlock: 2-2                   [-1, 64, 56, 56]          --
|

In [16]:
#@title Training Function
from tqdm import tqdm # for beautiful model training updates
train_losses = [] # to capture train losses over training epochs
train_accuracy = [] # to capture train accuracy over training epochs

def train(model,device, train_loader,optimizer,epoch):
  model.train() # setting the model in training 
  pbar = tqdm(train_loader) # putting the iterator in pbar
  correct = 0 # for accuracy numerator
  processed =0 # for accuracy denominator

  for batch_idx, (images, expressions, _, _, _) in enumerate(pbar):
    images, expressions = images.to(device), expressions.to(device)#sending data to CPU or GPU as per device
    optimizer.zero_grad() # setting gradients to zero to avoid accumulation

    y_preds = model(images) # forward pass, result captured in y_preds (plural as there are many images in a batch)
    # the predictions are in one hot vector

    loss = criterion(y_preds, expressions) # capturing loss

    train_losses.append(loss) # to capture loss over many epochs

    loss.backward() # backpropagation
    optimizer.step() # updating the params

    preds = y_preds.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
    correct += preds.eq(expressions.view_as(preds)).sum().item()
    processed += len(images)


    pbar.set_description(desc= f'Loss={loss.item()} Batch_id={batch_idx} Accuracy={100*correct/processed:0.2f}')

    train_accuracy.append(100*correct/processed)

In [17]:
# Test Function
test_losses = [] # to capture test losses 
test_accuracy = [] # to capture test accuracy 

def test(model,device, test_loader):
  model.eval() # setting the model in evaluation mode
  test_loss = 0
  correct = 0 # for accuracy numerator

  with torch.no_grad():
    for (images, expressions, _, _, _)  in test_loader:
      images, expressions = images.to(device), expressions.to(device)#sending data to CPU or GPU as per device
      outputs = model(images) # forward pass, result captured in outputs (plural as there are many images in a batch)
      # the outputs are in batch size x one hot vector 

      test_loss = criterion(outputs,expressions).item()  # sum up batch loss
      preds = outputs.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
      correct += preds.eq(expressions.view_as(preds)).sum().item()

    test_loss /= len(test_loader.dataset) # average test loss
    test_losses.append(test_loss) # to capture loss over many batches

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
    test_loss, correct, len(test_loader.dataset),
    100. * correct / len(test_loader.dataset)))

    test_accuracy.append(100*correct/len(test_loader.dataset))

In [18]:
import torch.optim as optim # for optimizer
import torch.nn as nn


criterion = nn.CrossEntropyLoss() # multi class classification loss
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
from torchsummary.torchsummary import validate_user_params
# EPOCHS = 50
EPOCHS = 2
model = model.to(DEVICE)

for epoch in range(EPOCHS):
    print("EPOCH:", epoch+1)
    train(model, device = DEVICE, train_loader = val_loader , optimizer = optimizer,epoch = epoch)
    # test(model, device, testloader)

EPOCH: 1


Loss=2.347100019454956 Batch_id=1 Accuracy=12.50:   6%|▋         | 2/31 [06:22<1:28:51, 183.86s/it]