<a href="https://colab.research.google.com/github/yanli499/APS360-Project/blob/master/YLL_VGG_16_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

YLL Test Code

In [0]:
# ALL import statements
import os
import shutil
import copy
import time
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.models
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

from PIL import Image
from torchvision import datasets, models, transforms
from torch.optim import lr_scheduler
from torch.autograd import Variable

In [2]:
# Mount our Google Drive
# re-run whenever needed
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
# re-run when needed
# classes are folders in each directory with these names
classes = ['afraid','angry','disgusted','happy','neutral','sad','surprised']

# emotion label for KDEF photos
emotion_code = {"AF":"afraid", "AN":"angry", "DI":"disgusted", "HA":"happy", 
                "NE":"neutral", "SA":"sad", "SU":"surprised"}

data_dir='/content/drive/My Drive/Colab Notebooks/Faces'

In [4]:
# RUN ONLY ONCE!!!

# logic for sorting thru KDEF dataset for the images we want
"""
- eg file name: AF01ANFL.JPG
- Check:
    - length of name = 7, for straight profile only, ends with "S.jpg"
    - str[4:5] = {"AF":"afraid", "AN":"angry", "DI":"disgusted", "HA":"happy",
    "NE":"neutral", "SA":sad", "SU":"surprised"}
"""

# delete existing folder
if os.path.exists(data_dir+'/'):
    shutil.rmtree(data_dir+'/')

# make new directories for each emotion class + train, val, test
try:
    os.mkdir(data_dir)

    for i in range(len(classes)):
        os.mkdir(data_dir+'/'+classes[i])

    os.mkdir(data_dir+'/train')
    for i in range(len(classes)):
        os.mkdir(data_dir+'/train/'+classes[i])
    
    os.mkdir(data_dir+'/val')
    for i in range(len(classes)):
        os.mkdir(data_dir+'/val/'+classes[i])

    os.mkdir(data_dir+'/test')
    for i in range(len(classes)):
        os.mkdir(data_dir+'/test/'+classes[i])

except OSError:
    print ("Creation of the directories failed!")
else:
    print ("Successfully created the directories!")

# rootdir = path to KDEF main folder
rootdir = '/content/drive/My Drive/Colab Notebooks/PROJECT/KDEF/'

# go thru KDEF data + sort out desired photos
for subdir, dirs, files in os.walk(rootdir):
    for file in files:
        filename = subdir + os.sep + file
        if (file.endswith("S.jpg") or file.endswith("S.JPG")): 
            # for each straight profile photo:
            # convert RGB --> Grayscale
            # resize to 256 x 256 pixels, b/c will center crop to 224 x 224 later
            # then save in the corresponding emotion class folder
            img = Image.open(filename).convert('L')
            new_img = img.resize((256, 256))

            if (file[4:6] == "AF"):
                new_img.save(data_dir+'/'+emotion_code["AF"]+'/'+file)
            elif (file[4:6] == "AN"):
                new_img.save(data_dir+'/'+emotion_code["AN"]+'/'+file)
            elif (file[4:6] == "DI"): 
                new_img.save(data_dir+'/'+emotion_code["DI"]+'/'+file)
            elif (file[4:6] == "HA"): 
                new_img.save(data_dir+'/'+emotion_code["HA"]+'/'+file)
            elif (file[4:6] == "NE"):
                new_img.save(data_dir+'/'+emotion_code["NE"]+'/'+file)
            elif (file[4:6] == "SA"): 
                new_img.save(data_dir+'/'+emotion_code["SA"]+'/'+file)
            elif (file[4:6] == "SU"):
                new_img.save(data_dir+'/'+emotion_code["SU"]+'/'+file)

Successfully created the directories!


In [0]:
# Split data into train, val, test datasets (60:20:20)
# each class = 140 images --> 84 train, 28 val, 28 test

# divide data into train, val, + test
# for each emotion class, get filenames, shuffle, 
# divide, move to corresponding folders in train, val, test
for c in classes:
    filepath = data_dir+'/'+c
    names = []

    for file in os.listdir(filepath):
        names.append(file)
    random.shuffle(names)

    # TODO: Can probably simplify this
    count = 0
    for name in names:
        if(count == (84-42)):
            break
        else:
            # shutil.move(dir1, dir2) = moves file form directory 1 to directory 2
            shutil.move(filepath+'/'+name, data_dir+'/train/'+c+'/'+name)
            names.remove(name)
            count += 1

    count = 0
    for name in names:
        if(count == (28-14)):
            break
        else:
            shutil.move(filepath+'/'+name, data_dir+'/val/'+c+'/'+name)
            names.remove(name)
            count += 1

    count = 0
    for name in names:
        if(count == (28-14)):
            break
        else:
            shutil.move(filepath+'/'+name, data_dir+'/test/'+c+'/'+name)
            names.remove(name)
            count += 1

In [0]:
# crop all images to 224 x 224 for all datasets
# generate image folders + data loaders for train, val, test
data_transform = transforms.Compose([transforms.CenterCrop(224), transforms.ToTensor()])
batch_size = 30

image_datasets = {
    'train': datasets.ImageFolder(
        os.path.join(data_dir, 'train/'), 
        transform=data_transform
    ),
    'val': datasets.ImageFolder(
        os.path.join(data_dir, 'val/'), 
        transform=data_transform
    ),
    'test': datasets.ImageFolder(
        os.path.join(data_dir, 'test/'), 
        transform=data_transform
    )
}

dataloaders = {
    'train': torch.utils.data.DataLoader(
        image_datasets['train'], batch_size=batch_size
    ),
    'val': torch.utils.data.DataLoader(
        image_datasets['val'], batch_size=batch_size
    ),
    'test': torch.utils.data.DataLoader(
        image_datasets['test'], batch_size=batch_size
    )
}

# get size of each dataset
dataset_sizes = {
    'train': len(image_datasets['train']),
    'val': len(image_datasets['val']),
    'test': len(image_datasets['test']) 
}

In [0]:
# From: https://www.kaggle.com/carloalbertobarbano/vgg16-transfer-learning-pytorch
def train_model(vgg, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()
    best_model_wts = copy.deepcopy(vgg.state_dict())
    best_acc = 0.0
    
    avg_loss = 0
    avg_acc = 0
    avg_loss_val = 0
    avg_acc_val = 0
    
    train_batches = len(dataloaders["train"])
    val_batches = len(dataloaders["val"])
    
    for epoch in range(num_epochs):
        print("Epoch {}/{}".format(epoch, num_epochs))
        print('-' * 10)
        
        loss_train = 0
        loss_val = 0
        acc_train = 0
        acc_val = 0
        
        vgg.train(True)
        
        for i, data in enumerate(dataloaders["train"]):
            if i % 100 == 0:
                print("\rTraining batch {}/{}".format(i, train_batches / 2), end='', flush=True)
                
            # Use half training dataset
            # if i >= train_batches / 2:
            #     break
                
            inputs, labels = data
            
            # if use_gpu:
            #     inputs, labels = Variable(inputs.cuda()), Variable(labels.cuda())
            # else:
            inputs, labels = Variable(inputs), Variable(labels)
            
            optimizer.zero_grad()
            
            outputs = vgg(inputs)
            
            _, preds = torch.max(outputs.data, 1)
            loss = criterion(outputs, labels)
            
            loss.backward()
            optimizer.step()
            
            loss_train += loss
            acc_train += torch.sum(preds == labels.data)
            
            del inputs, labels, outputs, preds
            torch.cuda.empty_cache()
        
        print()
        # * 2 as we only used half of the dataset
        avg_loss = loss_train / dataset_sizes["train"]
        avg_acc = acc_train / dataset_sizes["train"]
        
        vgg.train(False)
        vgg.eval()
            
        for i, data in enumerate(dataloaders["val"]):
            if i % 100 == 0:
                print("\rValidation batch {}/{}".format(i, val_batches), end='', flush=True)
                
            inputs, labels = data
            
            # if use_gpu:
            #     inputs, labels = Variable(inputs.cuda(), volatile=True), Variable(labels.cuda(), volatile=True)
            # else:
            inputs, labels = Variable(inputs, volatile=True), Variable(labels, volatile=True)
            
            optimizer.zero_grad()
            
            outputs = vgg(inputs)
            
            _, preds = torch.max(outputs.data, 1)
            loss = criterion(outputs, labels)
            
            loss_val += loss
            acc_val += torch.sum(preds == labels.data)
            
            del inputs, labels, outputs, preds
            torch.cuda.empty_cache()
        
        avg_loss_val = loss_val / dataset_sizes["val"]
        avg_acc_val = acc_val / dataset_sizes["val"]
        
        print()
        print("Epoch {} result: ".format(epoch))
        print("Avg loss (train): {:.4f}".format(avg_loss))
        print("Avg acc (train): {:.4f}".format(avg_acc))
        print("Avg loss (val): {:.4f}".format(avg_loss_val))
        print("Avg acc (val): {:.4f}".format(avg_acc_val))
        print('-' * 10)
        print()
        
        if avg_acc_val > best_acc:
            best_acc = avg_acc_val
            best_model_wts = copy.deepcopy(vgg.state_dict())
        
    elapsed_time = time.time() - since
    print()
    print("Training completed in {:.0f}m {:.0f}s".format(elapsed_time // 60, elapsed_time % 60))
    print("Best acc: {:.4f}".format(best_acc))
    
    vgg.load_state_dict(best_model_wts)
    return vgg

In [13]:
# get pretrained vgg16 model
vgg16 = torchvision.models.vgg16(pretrained=True)

# replace last layer in classifier to change output as needed
num_features = vgg16.classifier[6].in_features
features = list(vgg16.classifier.children())[:-1]
features.extend([nn.Linear(num_features, len(classes))])
vgg16.classifier = nn.Sequential(*features) # Replace the model classifier

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(vgg16.parameters(), lr=0.003, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

vgg16 = train_model(vgg16, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=2)


Epoch 0/2
----------
Training batch 0/5.0
Validation batch 0/4




Epoch 0 result: 
Avg loss (train): 0.0778
Avg acc (train): 0.0000
Avg loss (val): 0.0788
Avg acc (val): 0.0000
----------

Epoch 1/2
----------
Training batch 0/5.0
Validation batch 0/4
Epoch 1 result: 
Avg loss (train): 0.0725
Avg acc (train): 0.0000
Avg loss (val): 0.0796
Avg acc (val): 0.0000
----------


Training completed in 8m 52s
Best acc: 0.0000


In [9]:
"""
NOTE:
The VGG-16 is able to classify 1000 different labels; we just need 4 instead.
In order to do that we are going replace the last fully connected layer of the 
model with a new one with 4 output features instead of 1000.

In PyTorch, we can access the VGG-16 classifier with model.classifier, 
which is an 6-layer array. We will replace the last entry.
"""

'\nNOTE:\nThe VGG-16 is able to classify 1000 different labels; we just need 4 instead.\nIn order to do that we are going replace the last fully connected layer of the \nmodel with a new one with 4 output features instead of 1000.\n\nIn PyTorch, we can access the VGG-16 classifier with model.classifier, \nwhich is an 6-layer array. We will replace the last entry.\n'