<a href="https://colab.research.google.com/github/pwliuab/FaceMaskDetection/blob/main/project_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# COMP4211 Project 1

## Basic setup

In [None]:
from google.colab import drive
import zipfile
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# unzip your dataset, [user] may specifies their own directories
!unzip gdrive/MyDrive/pa3/archive.zip -d gdrive/MyDrive/

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch

## Helper Functions and Global Parameters

### Define function to convert label from words and numbers

In [None]:
def label_to_num(label):
  if label == 'with_mask':
    return 0
  elif label == 'without_mask':
    return 1
  else:
    return 2

### List storing all labels

In [None]:
ALL_LABELS = ['with_mask', 'without_mask', 'mask_weared_incorrect']

### Define function for save and load the model

In [None]:
# Define save function
import torch.nn as nn
def save(path, model, optimizer, validation_accuracy, validation_loss):
  save_path = path
  state_dict = {'model_state_dict': model.state_dict(),
              'optimizer_state_dict': optimizer.state_dict(),
              'validation_accuracy': validation_accuracy,
              'validation_loss': validation_loss}

  torch.save(state_dict, save_path)
  print(f'Model saved to {save_path}')

# Define load function
# Result returned is a tuple stroing (validation_accuracy, validation_loss)
def load(path, model, optimizer):
  load_path = path 
  state_dict = torch.load(load_path)
  model.load_state_dict(state_dict['model_state_dict'])
  optimizer.load_state_dict(state_dict['optimizer_state_dict'])
  validation_accuracy = state_dict['validation_accuracy']
  validation_loss = state_dict['validation_loss']
  return (validation_accuracy, validation_loss)

### Define function for training

In [None]:
# Define the train function
!pip3 install tqdm
from tqdm.notebook import tqdm

def TRAIN(model, train_loader, valid_loader, num_epochs, criterion, optimizer, device, save_path):
  best_acc = 0.0
  training_loss = []
  validation_loss = []
  validation_acc = []

  for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(train_loader):
      images = images.to(device)
      labels = labels.to(device)

      outputs = model(images)
      labels = labels.long()
      loss = criterion(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    training_loss.append(train_loss)

    with torch.no_grad():
      model.eval()
      running_loss = 0.0
      running_corrects = 0
      for images, labels in tqdm(valid_loader):
        images = images.to(device)
        labels = labels.to(device)
        labels = labels.long()
        outputs = model(images)
        loss = criterion(outputs, labels)

        running_loss += loss.item()
        _, preds = torch.max(outputs.data, 1)
        running_corrects += torch.sum(preds == labels.data)
      
      valid_loss = running_loss / len(valid_loader)
      validation_loss.append(valid_loss)
      valid_accuracy = running_corrects / float(len(valid_loader.dataset))
      valid_accuracy = valid_accuracy.cpu().numpy()
      validation_acc.append(valid_accuracy)

      print('Epoch [{}/{}], Training Loss: {:.4f}, Validation Loss: {:.4f}, Validation Accuracy: {:.4f}' 
              .format(epoch + 1, num_epochs, train_loss, valid_loss, valid_accuracy))
      
      if valid_accuracy > best_acc:
        best_acc = valid_accuracy
        save(save_path, model, optimizer, best_acc)
                  
  print('Finished Training')
  print('The best acuracy is:', best_acc)
  
  # Plot the training and validation loss and validation accuracy curves
  plt.figure(figsize = (10, 6))
  plt.plot(range(1, num_epochs + 1), training_loss, color = 'green', label = 'training loss')
  plt.plot(range(1, num_epochs + 1), validation_loss, color = 'blue', label = 'validation loss')
  plt.plot(range(1, num_epochs + 1), validation_acc, color = 'red', label = 'validation accuracy')
  plt.legend()
  plt.xlabel('Number of Epochs')
  plt.ylabel('Average value of loss for each epoch')
  plt.show()

## Data Preprocessing

### Get the Dataframe (modified 17:35)

In [None]:
from xml.etree.ElementTree import parse
import pandas as pd 
import os
directory = 'gdrive/MyDrive/annotations'
target_df = pd.DataFrame(columns=['imagename','xmin','ymin','xmax','ymax','target']) 

for filename in os.listdir(directory):
    #get the file, in that directory
    if filename.endswith(".xml"):
      filename = os.path.join(directory, filename)
      document = parse(filename)
      for item in document.iterfind('object'):
        imagename = document.findtext('filename')
        target = []
        target.append(item.findtext('name'))
        #go to the bndbox tag, there will be different coordinates for people in the image
        for x in item.iterfind('bndbox'):
          xmin = []
          xmax = []
          ymin = []
          ymax = []
          xmax.append(x.findtext('xmax'))
          xmin.append(x.findtext('xmin'))
          ymin.append(x.findtext('ymin'))
          ymax.append(x.findtext('ymax'))
          # convert all the different attribute into one dataframe 
          df = pd.DataFrame({'imagename':imagename,'xmin': xmin , 'ymin':ymin, 'xmax':xmax,'ymax':ymax,'target':target})
          #concat the dataframe
          target_df = pd.concat([target_df,df],axis=0)


In [None]:
print(target_df.head())

             imagename xmin ymin xmax ymax     target
0  maksssksksss101.png   48  294  164  400  with_mask
0   maksssksksss10.png   98  267  194  383  with_mask
0  maksssksksss103.png   42   54   94  110  with_mask
0  maksssksksss103.png  188   46  236  106  with_mask
0  maksssksksss103.png  261   88  303  130  with_mask


In [None]:
from sklearn.model_selection import train_test_split

with_mask = target_df[target_df.target == 'with_mask']
without_mask = target_df[target_df.target == 'without_mask']
incorrect_weared = target_df[target_df.target == 'mask_weared_incorrect']

with_mask_train, with_mask_valid = train_test_split(with_mask, test_size = 0.2, random_state = 4211)
without_mask_train, without_mask_valid = train_test_split(without_mask, test_size = 0.2, random_state = 4211)
incorrect_weared_train, incorrect_weared_valid = train_test_split(incorrect_weared, test_size = 0.2, random_state = 4211)

without_mask_train = without_mask_train.sample(len(with_mask_train), replace = True, random_state = 4211)
incorrect_weared_train = incorrect_weared_train.sample(len(with_mask_train), replace = True, random_state = 4211)

train_df = pd.concat([with_mask_train, without_mask_train, incorrect_weared_train], ignore_index=True)
valid_df = pd.concat([with_mask_valid, without_mask_valid, incorrect_weared_valid], ignore_index=True)


SyntaxError: ignored

### Dataset and DataLoader

In [None]:
# Transform
import torchvision.transforms as transforms

transform = transforms.Compose([transforms.Resize((32, 32)), 
                  transforms.RandomHorizontalFlip(p = 0.5),
                  transforms.ToTensor(),
                  transforms.Normalize((0.5, 0.5, 0.5 ), (0.5, 0.5, 0.5))])

In [None]:
# Create Dateset
import torch
import numpy as np
from PIL import Image
from torch.utils.data import Dataset
import linecache

class FaceDataset(Dataset):
    def __init__(self, dataframe, image_dir, transform):
        self.data = dataframe 
        self.image_dir = image_dir
        self.transform = transform

    def __getitem__(self, idx):

        # First get the image
        image_path = self.data.iloc[idx, 0]
        image_path = os.path.join(self.image_dir, image_path)
        image = Image.open(image_path).convert('RGB')

        # Get the corresponding part of image
        box = (int(self.data.iloc[idx, 1]), int(self.data.iloc[idx, 2]), int(self.data.iloc[idx, 3]), int(self.data.iloc[idx, 4]))
        region = image.crop(box)
        region = transform(region)

        # Get the label
        label = self.data.iloc[idx, 5]
        label = label_to_num(label)

        return region, label

    def __len__(self):
        return len(self.data)

In [None]:
train_set = FaceDataset(train_df, PATH + 'images', transform = transform)
valid_set = FaceDataset(valid_df, PATH + 'images', transform = transform)

from torch.utils.data import DataLoader
train_iter = DataLoader(dataset = train_set, batch_size = 64, shuffle = True)
valid_iter = DataLoader(dataset = valid_set)

tensor([[[-0.4275, -0.2941, -0.0353,  ..., -0.5216, -0.5216, -0.5216],
         [-0.3647, -0.2314,  0.0196,  ..., -0.5137, -0.5137, -0.5137],
         [-0.2784, -0.1529,  0.0980,  ..., -0.5059, -0.5137, -0.5137],
         ...,
         [-0.5451, -0.5451, -0.5451,  ...,  0.8353,  0.8275,  0.8275],
         [-0.5294, -0.5373, -0.5529,  ...,  0.8353,  0.8196,  0.8118],
         [-0.5137, -0.5216, -0.5451,  ...,  0.8275,  0.8118,  0.8039]]])

In [None]:
# This is to show what's inside the train_iter
for inputs, labels in train_iter:
  print(inputs.size())
  print(labels.size())
  break

## Basic Model

### Model Building

In [None]:
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class Basic_Model(nn.Module):
    def __init__(self):
        super(Basic_Model, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 6, kernel_size = 5)  
        self.pool = nn.MaxPool2d(2, 2)
        self.norm1 = nn.BatchNorm2d(6) 
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16,  kernel_size=5)  
        self.norm2 = nn.BatchNorm2d(16) 
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, image): 
        image = self.norm1(self.pool(F.relu(self.conv1(image)))) #6, 14, 14 
        image = self.norm2(self.pool(F.relu(self.conv2(image)))) #16, 5, 5
        image = image.view(-1, 16 * 5 * 5)
        image = F.relu(self.fc1(image))
        image = F.relu(self.fc2(image))
        image = self.fc3(image)
        return image

### Model Training & Validation

In [None]:
from torch.optim import Adam

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_basic = Basic_Model()
model_basic.to(device)

optimizer = Adam(model_basic.parameters())
criterion = nn.CrossEntropyLoss()
TRAIN(model_basic, train_iter, valid_iter, 15, criterion, optimizer, device, f'model_tutorial_6.pt')

## Training (Share)

In [None]:
# Define the train function
# Define save function
def save(path, model, optimizer, validation_accuracy, validation_loss):
  save_path = path
  state_dict = {'model_state_dict': model.state_dict(),
              'optimizer_state_dict': optimizer.state_dict(),
              'validation_accuracy': validation_accuracy,
              'validation_loss': validation_loss}

  torch.save(state_dict, save_path)
  print(f'Model saved to {save_path}')

# Define load function
# Result returned is a tuple stroing (validation_accuracy, validation_loss)
def load(path, model, optimizer):
  load_path = path 
  state_dict = torch.load(load_path)
  model.load_state_dict(state_dict['model_state_dict'])
  optimizer.load_state_dict(state_dict['optimizer_state_dict'])
  validation_accuracy = state_dict['validation_accuracy']
  validation_loss = state_dict['validation_loss']
  return (validation_accuracy, validation_loss)
  
def TRAIN(model, train_loader, valid_loader, num_epochs, criterion, optimizer, device, save_path):
  training_loss = []
  training_acc = []
  validation_loss = []
  validation_acc = []

  for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    for images, labels in train_loader:
      images = images.to(device)
      labels = labels.to(device)

      outputs = model(images)
      labels = labels.long()
      loss = criterion(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      running_loss += loss.item()
      _, preds = torch.max(outputs.data, 1)
      running_corrects += torch.sum(preds == labels.data)

    train_loss = running_loss / len(train_loader)
    training_loss.append(train_loss)
    train_accuracy = running_corrects / float(len(train_loader.dataset))
    training_acc.append(train_accuracy)

    with torch.no_grad():
      model.eval()
      running_loss = 0.0
      running_corrects = 0
      for images, labels in valid_loader:
        images = images.to(device)
        labels = labels.to(device)
        labels = labels.long()
        outputs = model(images)
        loss = criterion(outputs, labels)

        running_loss += loss.item()
        _, preds = torch.max(outputs.data, 1)
        running_corrects += torch.sum(preds == labels.data)
      
      valid_loss = running_loss / len(valid_loader)
      validation_loss.append(valid_loss)
      x = running_corrects / float(len(valid_loader.dataset))
      validation_acc.append(x)

      print('Epoch [{}/{}], Training Loss: {:.4f}, Validation Loss: {:.4f}' 
              .format(epoch + 1, num_epochs, train_loss, valid_loss))
                  
  print('Finished Training')
  
  # Plot the training and validation loss and accuracy curves
  plt.figure(figsize = (10, 6))
  plt.plot(range(1, num_epochs + 1), training_loss, color = 'green', label = 'training loss')
  plt.plot(range(1, num_epochs + 1), training_acc, color = 'yellow', label = 'validation loss')
  plt.plot(range(1, num_epochs + 1), validation_loss, color = 'purple', label = 'training accuracy')
  plt.plot(range(1, num_epochs + 1), validation_acc, color = 'red', label = 'validation accuracy')
  plt.legend()
  plt.xlabel('Number of Epochs')
  plt.ylabel('Average value of loss for each epoch')
  plt.show()

## Model (Share)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
    

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1, stride=1)  
        self.norm1 = nn.BatchNorm2d(32) 

        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3,padding=1, stride=1)
        self.norm2 = nn.BatchNorm2d(32) 

        self.MaxPool = nn.MaxPool2d(kernel_size=(2,2), stride=2)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1, stride=1)
        self.norm3 = nn.BatchNorm2d(64) 

        self.conv4 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, stride=1)
        self.norm4 = nn.BatchNorm2d(128)

        self.conv5 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1, stride=1)
        self.norm5 = nn.BatchNorm2d(256)

        self.conv6 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1, stride=1)
        self.norm6 = nn.BatchNorm2d(512)
        self.AvgPool = nn.AvgPool2d(kernel_size=(16,16))

        self.drop = nn.Dropout(0.5)
        self.fc1 = nn.Linear(1*1*512, 512)
        self.fc2 = nn.Linear(1*1*512, 3)
    
    
    def forward(self, x1):
        #  3, 32, 32
        # out_dim = in_dim - kernel_size + 1  
        x1 = F.relu(self.norm1(self.conv1(x1))) #6, 14, 14 
        x1 = F.relu(self.norm2(self.conv2(x1)))
        x1 = self.MaxPool(x1)
        x1 = F.relu(self.norm3(self.conv3(x1)))
        x1 = F.relu(self.norm4(self.conv4(x1)))
        x1 = F.relu(self.norm5(self.conv5(x1)))
        x1 = F.relu(self.norm6(self.conv6(x1)))
        x1 = self.AvgPool(x1)

        x1 = x1.view(-1, 1*1*512)

        h3 = x1
        
        h3 = F.relu(self.fc1(h3))
        h3 = self.drop(h3)

        h3 = self.fc2(h3)
        # h3 = self.sig(h3)
        return h3
    
    
    def aggregation(self, x1, x2):
      
      combined_x = abs(x1 - x2)
      
      return combined_x

    def merge(self, x1, x2):
      
      return x1.append(x2)