In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Import library

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchsummary import summary
import time
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import imutils
import matplotlib.image as mpimg
from collections import OrderedDict
from skimage import io, transform
from math import *
import xml.etree.ElementTree as ET
from IPython.display import display

import torch
import torchvision
from torchsummary import summary
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms.functional as TF
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, ConcatDataset
from torch.utils.data import DataLoader
from torch.optim import lr_scheduler

Create Dataset

In [None]:
%%capture
if not os.path.exists('/content/ibug_300W_large_face_landmark_dataset'):
    !wget http://dlib.net/files/data/ibug_300W_large_face_landmark_dataset.tar.gz
    !tar -xvzf 'ibug_300W_large_face_landmark_dataset.tar.gz'
    !rm -r 'ibug_300W_large_face_landmark_dataset.tar.gz'

In [None]:
class Transforms():
    def __init__(self):
        pass

    def rotate(self, image, landmarks, angle):
        angle = random.uniform(-angle, +angle)

        transformation_matrix = torch.tensor([
            [+cos(radians(angle)), -sin(radians(angle))],
            [+sin(radians(angle)), +cos(radians(angle))]
        ])

        image = imutils.rotate(np.array(image), angle)

        landmarks = landmarks - 0.5
        new_landmarks = np.matmul(landmarks, transformation_matrix)
        new_landmarks = new_landmarks + 0.5
        return Image.fromarray(image), new_landmarks

    def resize(self, image, landmarks, img_size):
        image = TF.resize(image, img_size)
        return image, landmarks

    def crop_face(self, image, landmarks, crops):
        left = int(crops['left'])
        top = int(crops['top'])
        width = int(crops['width'])
        height = int(crops['height'])

        image = TF.crop(image, top, left, height, width)

        img_shape = np.array(image).shape
        landmarks = torch.tensor(landmarks) - torch.tensor([[left, top]])
        landmarks = landmarks / torch.tensor([img_shape[1], img_shape[0]])
        return image, landmarks

    def __call__(self, image, landmarks, crops):
        image = Image.fromarray(image)
        image, landmarks = self.crop_face(image, landmarks, crops)
        image, landmarks = self.resize(image, landmarks, (224, 224))
        image, landmarks = self.rotate(image, landmarks, angle=10)

        image = TF.to_tensor(image)
        image = TF.normalize(image, [0.5], [0.5])
        return image, landmarks


In [None]:
class FaceLandmarksDataset(Dataset):

    def __init__(self, transform=None):

        tree = ET.parse('/content/ibug_300W_large_face_landmark_dataset/labels_ibug_300W_train.xml')
        root = tree.getroot()

        self.image_filenames = []
        self.landmarks = []
        self.crops = []
        self.transform = transform
        self.root_dir = 'ibug_300W_large_face_landmark_dataset'

        for filename in root[2]:
            self.image_filenames.append(os.path.join(self.root_dir, filename.attrib['file']))

            self.crops.append(filename[0].attrib)

            landmark = []
            for num in range(68):
                x_coordinate = int(filename[0][num].attrib['x'])
                y_coordinate = int(filename[0][num].attrib['y'])
                landmark.append([x_coordinate, y_coordinate])
            self.landmarks.append(landmark)

        self.landmarks = np.array(self.landmarks).astype('float32')

        assert len(self.image_filenames) == len(self.landmarks)

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, index):
        image = cv2.imread(self.image_filenames[index])
        landmarks = self.landmarks[index]

        if self.transform:
            image, landmarks = self.transform(image, landmarks, self.crops[index])

        landmarks = landmarks - 0.5

        return image, landmarks
dataset = FaceLandmarksDataset(Transforms())


Split dataset

In [None]:
# split the dataset into validation and test sets
len_valid_set = int(0.1*len(dataset))
len_train_set = len(dataset) - len_valid_set

print("The length of Train set is {}".format(len_train_set))
print("The length of Valid set is {}".format(len_valid_set))

train_dataset , valid_dataset,  = torch.utils.data.random_split(dataset , [len_train_set, len_valid_set])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=8, shuffle=True, num_workers=4)

The length of Train set is 6000
The length of Valid set is 666




Helper function

In [None]:
import sys

def print_overwrite(step, total_step, loss, operation):
    sys.stdout.write('\r')
    if operation == 'train':
        sys.stdout.write("Train Steps: %d/%d  Loss: %.8f " % (step, total_step, loss))
    else:
        sys.stdout.write("Valid Steps: %d/%d  Loss: %.8f " % (step, total_step, loss))

    sys.stdout.flush()

Define the model

In [None]:
class ResNet18Finetune(nn.Module):
  def __init__(self, output_shape=[68, 2]):
    super().__init__()
    self.output_shape = output_shape
    backbone = models.resnet18(pretrained=True)
    layers = list(backbone.children())
    self.feature_extractor = nn.Sequential(*layers[:-1]) #Cut the fc layer in the last

    #freeze all the layers in feature extractor
    for parameter in self.feature_extractor.parameters():
      parameter.requires_grad = False

    #unfreeze some last layers:
    for param in self.feature_extractor[-2][1].parameters():
      param.requires_grad = True

    #get the input feature in the last layer
    num_filters = backbone.fc.in_features

    #create the fully connected layers in the last layer
    self.output_layer = nn.Linear(num_filters, self.output_shape[0]*self.output_shape[1])

  def forward(self, x):
    x = self.feature_extractor(x)

    #Flatten x
    x = x.view(x.size(0), -1)

    x = self.output_layer(x)
    x = x.view(x.size(0), self.output_shape[0], self.output_shape[1])
    return x


Train model

In [None]:
# Define the model
model = ResNet18Finetune()
model.cuda()  # Move model to GPU

# Loss function, optimizer, and scheduler
criterion = nn.L1Loss()
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Learning rate set to 0.001
scheduler = lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

# Initialize minimum loss to infinity
loss_min = float('inf')

# Define number of epochs for training
num_epochs = 50

# Training loop
for epoch in range(1, num_epochs + 1):
    start = time.time()
    # Initialize training and validation loss counters
    train_loss = 0.0
    valid_loss = 0.0

    # Set model to training mode
    model.train()

    # Iterate through training data
    for step, (images, landmarks) in enumerate(train_loader, start=1):
        # Move images and landmarks to GPU
        images, landmarks = images.cuda(), landmarks.view(landmarks.size(0), 68, 2).cuda()

        # Zero the optimizer gradients
        optimizer.zero_grad()

        # Forward pass
        predictions = model(images)

        # Calculate loss
        loss = criterion(predictions, landmarks)

        # Backward pass and optimization step
        loss.backward()
        optimizer.step()

        # Accumulate training loss
        train_loss += loss.item()

    # Calculate average training loss for the epoch
    avg_train_loss = train_loss / len(train_loader)

    # Set model to evaluation mode
    model.eval()

    # Iterate through validation data
    with torch.no_grad():
        for step, (images, landmarks) in enumerate(valid_loader, start=1):
            # Move images and landmarks to GPU
            images, landmarks = images.cuda(), landmarks.view(landmarks.size(0), 68, 2).cuda()

            # Forward pass
            predictions = model(images)

            # Calculate loss
            loss = criterion(predictions, landmarks)

            # Accumulate validation loss
            valid_loss += loss.item()

    # Calculate average validation loss for the epoch
    avg_valid_loss = valid_loss / len(valid_loader)

    # Log the average training and validation loss for each epoch
    print(f"\nEpoch: {epoch} - Train Loss: {avg_train_loss:.6f}, Valid Loss: {avg_valid_loss:.6f}, Elapsed time: {int(time.time() - start)}s")

    # Save the model if validation loss decreases
    if avg_valid_loss < loss_min:
        loss_min = avg_valid_loss
        model_path = '/content/drive/MyDrive/face_landmark_234241.pt'  # Specify the model save path
        torch.save(model.state_dict(), model_path)
        print(f"New minimum validation loss of {loss_min:.6f} at epoch {epoch}. Model saved at {model_path}")

    # Step the learning rate scheduler
    scheduler.step()
print("Training complete!")

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 142MB/s]



Epoch: 1 - Train Loss: 0.188101, Valid Loss: 0.124090, Elapsed time: 201s
New minimum validation loss of 0.124090 at epoch 1. Model saved at /content/drive/MyDrive/face_landmark_234241.pt

Epoch: 2 - Train Loss: 0.104344, Valid Loss: 0.095668, Elapsed time: 197s
New minimum validation loss of 0.095668 at epoch 2. Model saved at /content/drive/MyDrive/face_landmark_234241.pt

Epoch: 3 - Train Loss: 0.085914, Valid Loss: 0.081629, Elapsed time: 198s
New minimum validation loss of 0.081629 at epoch 3. Model saved at /content/drive/MyDrive/face_landmark_234241.pt

Epoch: 4 - Train Loss: 0.077808, Valid Loss: 0.083494, Elapsed time: 198s

Epoch: 5 - Train Loss: 0.072980, Valid Loss: 0.072830, Elapsed time: 198s
New minimum validation loss of 0.072830 at epoch 5. Model saved at /content/drive/MyDrive/face_landmark_234241.pt

Epoch: 6 - Train Loss: 0.069114, Valid Loss: 0.068305, Elapsed time: 196s
New minimum validation loss of 0.068305 at epoch 6. Model saved at /content/drive/MyDrive/face

Continue to finetune

In [None]:
model = ResNet18Finetune()
pretrained_model = '/content/drive/MyDrive/face_landmark_28424_6.pt'
model.load_state_dict(torch.load(pretrained_model))

model.cuda()
criterion = nn.L1Loss()

optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = lr_scheduler.StepLR(optimizer, step_size = 10, gamma = 0.1)

loss_min = np.inf
num_epochs = 100

for epoch in range(1, num_epochs+1):
  start_time = time.time()
  loss_train = 0
  loss_valid = 0
  running_loss = 0

  model.train()
  train_loader_iteration = iter(train_loader)
  for step in range(len(train_loader)):
    images, landmarks = next(train_loader_iteration)
    images, landmarks = images.cuda(), landmarks.view(landmarks.size(0), 68, 2).cuda()

    predictions = model(images)
    #optimizer
    optimizer.zero_grad()

    #find the loss for current step
    loss_train_step = criterion(predictions, landmarks)

    #calculate the gradient
    loss_train_step.backward()

    #update the parameter
    optimizer.step()

    loss_train += loss_train_step.item()

  avg_train_loss = loss_train / len(train_loader)
  model.eval()
  with torch.no_grad():
    for (images, landmarks) in valid_loader:
      images = images.cuda()
      landmarks = landmarks.view(landmarks.size(0),68, 2).cuda()

      predictions = model(images)

      # find the loss for the current step
      loss_valid_step = criterion(predictions, landmarks)

      loss_valid += loss_valid_step.item()


  avg_valid_loss = loss_valid/ len(valid_loader)
  # Log the average training and validation loss for each epoch
  print(f"\nEpoch: {epoch} - Train Loss: {avg_train_loss:.6f}, Valid Loss: {avg_valid_loss:.6f}, Elapsed time: {int(time.time() - start_time)}s")

  # Save the model if validation loss decreases
  if avg_valid_loss < loss_min:
      loss_min = avg_valid_loss
      model_path = '/content/drive/MyDrive/face_landmark_28424_7.pt'  # Specify the model save path
      torch.save(model.state_dict(), model_path)
      print(f"New minimum validation loss of {loss_min:.6f} at epoch {epoch}. Model saved at {model_path}")

  # Step the learning rate scheduler
  scheduler.step()
print("Training complete!")


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 152MB/s]
  self.pid = os.fork()



Epoch: 1 - Train Loss: 0.011557, Valid Loss: 0.010725, Elapsed time: 216s
New minimum validation loss of 0.010725 at epoch 1. Model saved at /content/drive/MyDrive/face_landmark_28424_7.pt


  self.pid = os.fork()



Epoch: 2 - Train Loss: 0.011416, Valid Loss: 0.010445, Elapsed time: 221s
New minimum validation loss of 0.010445 at epoch 2. Model saved at /content/drive/MyDrive/face_landmark_28424_7.pt

Epoch: 3 - Train Loss: 0.011116, Valid Loss: 0.010540, Elapsed time: 219s

Epoch: 4 - Train Loss: 0.011015, Valid Loss: 0.010927, Elapsed time: 219s

Epoch: 5 - Train Loss: 0.010915, Valid Loss: 0.010819, Elapsed time: 218s

Epoch: 6 - Train Loss: 0.010831, Valid Loss: 0.011122, Elapsed time: 216s

Epoch: 7 - Train Loss: 0.010764, Valid Loss: 0.011084, Elapsed time: 220s

Epoch: 8 - Train Loss: 0.010653, Valid Loss: 0.010924, Elapsed time: 218s

Epoch: 9 - Train Loss: 0.010635, Valid Loss: 0.011732, Elapsed time: 217s

Epoch: 10 - Train Loss: 0.010657, Valid Loss: 0.010960, Elapsed time: 219s

Epoch: 11 - Train Loss: 0.009344, Valid Loss: 0.010170, Elapsed time: 219s
New minimum validation loss of 0.010170 at epoch 11. Model saved at /content/drive/MyDrive/face_landmark_28424_7.pt

Epoch: 12 - Trai