In [None]:
import time
from typing import List, Dict

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset
from torchvision.transforms import InterpolationMode
import pandas as pd

import matplotlib.pyplot as plt

from google.colab import drive
drive.mount('/content/drive')

dir_path = '/content/drive/MyDrive/Colab Notebooks/ECE50024/Mini_Challenge/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Crop the Image

In [None]:
import cv2

face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

def crop_face(img_path):
    img = cv2.imread(img_path)
    if img is None or img.size == 0:
        return None, (0, 0, 0)

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.1, 4)

    if len(faces)==0:
        return None, (0, 0, 0)

    max_face = 0

    for (x, y, w, h) in faces:
        if w*h > max_face:
            face = img[y:y+h+1, x:x+w+1, :]
            max_face = w*h

    if face.size<=0:
        return None, (0, 0, 0)

    return face, face.shape

train_num = 69540
for i in range(train_num):
    if i%1500==0:
        print(f'Progress: [{i}/{train_num}]')
    face, (w, h, d) = crop_face('train/' + str(i) + '.jpg')
    if w>0 and h>0:
        cv2.imwrite('train_crop/' + str(i) + '.jpg', face)

# Extract the Zipfile

In [None]:
import zipfile

zip_file_path = dir_path + 'train_crop.zip'

extracted_folder = dir_path + 'train_crop_1'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder)

print("Extraction complete.")

Extraction complete.


# Make Sure the GPU Works
Revised from my former AI assignment

In [None]:
rand_tensor = torch.rand(5,2)
simple_model = nn.Sequential(nn.Linear(2,10), nn.ReLU(), nn.Linear(10,1))
print(f'input is on {rand_tensor.device}')
print(f'model parameters are on {[param.device for param in simple_model.parameters()]}')
print(f'output is on {simple_model(rand_tensor).device}')

# device = torch.device('cuda')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# ----------- <Your code> ---------------
rand_tensor = rand_tensor.to(device)
simple_model = simple_model.to(device)
# --------- <End your code> -------------
print(f'input is on {rand_tensor.device}')
print(f'model parameters are on {[param.device for param in simple_model.parameters()]}')
print(f'output is on {simple_model(rand_tensor).device}')

input is on cpu
model parameters are on [device(type='cpu'), device(type='cpu'), device(type='cpu'), device(type='cpu')]
output is on cpu
input is on cuda:0
model parameters are on [device(type='cuda', index=0), device(type='cuda', index=0), device(type='cuda', index=0), device(type='cuda', index=0)]
output is on cuda:0


# Train and Test Functions
Revised from my former AI assignment

In [None]:
def train(model: nn.Module,
          loss_fn: nn.modules.loss._Loss,
          optimizer: torch.optim.Optimizer,
          train_loader: torch.utils.data.DataLoader,
          epoch: int=0)-> List:
    # ----------- <Your code> ---------------
    model = model.to(device)
    model.train() # we need to set the mode for our model
    train_loss = []

    for batch_idx, (images, targets) in enumerate(train_loader):
      images = images.to(device)
      targets = targets.to(device)
      optimizer.zero_grad()
      output = model(images)
      loss = loss_fn(output, targets) # Here is a typical loss function (negative log likelihood)
      loss.backward()
      optimizer.step()

      train_loss.append(loss.item())

      if batch_idx % (len(train_loader)//8) == 0: # We visulize our output every 10 batches
        print(f'Epoch {epoch}: [{batch_idx*len(images)}/{len(train_loader.dataset)}] Loss: {loss.item():.3f}')
      # ----------- <End Your code> ---------------
    assert len(train_loss) == len(train_loader)
    return train_loss

def test(model: nn.Module,
         loss_fn: nn.modules.loss._Loss,
         test_loader: torch.utils.data.DataLoader,
         epoch: int=0)-> Dict:
    # ----------- <Your code> ---------------
    model = model.to(device)
    model.eval() # we need to set the mode for our model

    test_loss = 0
    correct = 0
    total_num = 0
    test_stat = {'loss': 0.0, 'accuracy': 0.0, 'prediction': torch.tensor([])}
    test_stat['prediction'] = test_stat['prediction'].to(device)

    with torch.no_grad():
      for images, targets in test_loader:
        images = images.to(device)
        targets = targets.to(device)
        output = model(images)
        test_loss += loss_fn(output, targets).item()
        pred = output.data.max(1, keepdim=True)[1] # we get the estimate of our result by look at the largest class value
        test_stat['prediction'] = torch.cat((test_stat['prediction'], pred))
        correct += pred.eq(targets.data.view_as(pred)).sum() # sum up the corrected samples
        total_num += len(images)

    test_stat['loss'] = test_loss / len(test_loader)
    test_stat['accuracy'] = correct / len(test_loader.dataset)

    print(f"Test result on epoch {epoch}: total sample: {total_num}, Avg loss: {test_stat['loss']:.3f}, Acc: {100*test_stat['accuracy']:.3f}%")
    # ----------- <End Your code> ---------------
    # dictionary output should include loss, accuracy and prediction
    assert "loss" and "accuracy" and "prediction" in test_stat.keys()
    # "prediction" should be a 1D tensor
    assert len(test_stat["prediction"]) == len(test_loader.dataset)
    assert isinstance(test_stat["prediction"], torch.Tensor)
    return test_stat

# Create a Dataset
Revised from: https://dilithjay.com/blog/custom-image-classifier-with-pytorch

In [None]:
import cv2
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data import random_split
import torch
import torchvision
from sklearn.model_selection import train_test_split

category_df = pd.read_csv(dir_path + 'category.csv')
original_df = pd.read_csv(dir_path + 'train.csv')

names = category_df['Category'].tolist()

def name_to_idx(name):
    return names.index(name)

class CustomDataset(Dataset):
    def __init__(self, dataframe, transform):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        while True:
            img_path = dir_path + 'train_crop/' + str(self.dataframe.iloc[idx, 1])
            image = cv2.imread(img_path)
            if image is None or image.size == 0:
                idx+=1
                continue

            image = Image.fromarray(image)
            label = name_to_idx(self.dataframe.iloc[idx, 2])

            if self.transform:
                image = self.transform(image)

            return image, label

In [None]:
transform = torchvision.transforms.Compose([
            transforms.Resize(232, interpolation=InterpolationMode.BILINEAR),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
            ])

train_df, valid_df = train_test_split(original_df, test_size=0.15, random_state=5)

print(train_df.shape)
print(valid_df.shape)

(59109, 3)
(10431, 3)


In [None]:
train_dataset = CustomDataset(dataframe=train_df, transform=transform)
valid_dataset = CustomDataset(dataframe=valid_df, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=64, shuffle=False)

resnet50 = models.resnet50(pretrained=False, weights=torchvision.models.ResNet50_Weights.DEFAULT.transforms())
resnet50 = resnet50.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(resnet50.parameters(), lr=0.001)

resnet50.fc=nn.Linear(in_features=2048, out_features=100, bias=True)



In [None]:
for epoch in range(1, 20+1):
    train(resnet50, criterion, optimizer, train_loader, epoch)
    test(resnet50, criterion, valid_loader, epoch)

Epoch 1: [0/59109] Loss: 4.630
Epoch 1: [7360/59109] Loss: 3.843
Epoch 1: [14720/59109] Loss: 3.295
Epoch 1: [22080/59109] Loss: 3.186
Epoch 1: [29440/59109] Loss: 2.730
Epoch 1: [36800/59109] Loss: 2.702
Epoch 1: [44160/59109] Loss: 2.629
Epoch 1: [51520/59109] Loss: 2.176
Epoch 1: [58880/59109] Loss: 1.807
Test result on epoch 1: total sample: 10431, Avg loss: 2.306, Acc: 48.902%
Epoch 2: [0/59109] Loss: 2.146
Epoch 2: [7360/59109] Loss: 1.893
Epoch 2: [14720/59109] Loss: 1.782
Epoch 2: [22080/59109] Loss: 2.028
Epoch 2: [29440/59109] Loss: 2.031
Epoch 2: [36800/59109] Loss: 1.995
Epoch 2: [44160/59109] Loss: 1.816
Epoch 2: [51520/59109] Loss: 1.695
Epoch 2: [58880/59109] Loss: 2.011
Test result on epoch 2: total sample: 10431, Avg loss: 1.848, Acc: 60.138%
Epoch 3: [0/59109] Loss: 1.637
Epoch 3: [7360/59109] Loss: 1.529
Epoch 3: [14720/59109] Loss: 1.914
Epoch 3: [22080/59109] Loss: 1.490
Epoch 3: [29440/59109] Loss: 2.126
Epoch 3: [36800/59109] Loss: 1.431
Epoch 3: [44160/59109] Lo

KeyboardInterrupt: 

In [None]:
model_path = f"{dir_path}resnet50_v2.pth"
torch.save(resnet50.state_dict(), model_path)

# Test the data and generate csv file

In [None]:
import torch
from torchvision import transforms
from torchvision.models import resnet50
from PIL import Image
import csv
import os

dir_path = '/content/drive/MyDrive/Colab Notebooks/ECE50024/Mini_Challenge/'

category_df = pd.read_csv(dir_path + 'category.csv')

model = models.resnet50(pretrained=False)  # Don't load pretrained weights
model.fc = torch.nn.Linear(model.fc.in_features, 100)
state_dict = torch.load(dir_path + 'resnet50_v2.pth', map_location=torch.device('cuda'))
model.load_state_dict(state_dict)
model.eval()

transform = torchvision.transforms.Compose([
    transforms.Resize(232, interpolation=InterpolationMode.BILINEAR),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_num = 4977
predictions = [['Id', 'Category']]

for id in range(test_num):
    if id%500==0:
        print(f'Progress: [{id}/{test_num}]')
    prediction = []
    prediction.append(id)

    image_path = f'{dir_path}test_crop/{id}.jpg'
    if not os.path.exists(image_path):
        image_path = f'{dir_path}test/{id}.jpg'

    image = Image.open(image_path).convert('RGB')
    if image is None or image.size == 0:
        prediction.append('None')
        predictions.append(prediction)
        continue

    image = transform(image).unsqueeze(0)

    if torch.cuda.is_available():
        image = image.to('cuda')
        model.to('cuda')

    with torch.no_grad():
        output = model(image)

    _, predicted = torch.max(output, 1)
    prediction.append(category_df.iloc[predicted.item(), 1])
    predictions.append(prediction)

# Specify the CSV file name
file_name = f"{dir_path}test_prediction.csv"

# Open the file in write mode
with open(file_name, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(predictions)



Progress: [0/4977]
Progress: [500/4977]
Progress: [1000/4977]
Progress: [1500/4977]
Progress: [2000/4977]
Progress: [2500/4977]
Progress: [3000/4977]
Progress: [3500/4977]
Progress: [4000/4977]
Progress: [4500/4977]
