In [None]:
import os
import random
import glob
import gc
import csv
import torch
import time
import optuna
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from sklearn.metrics import confusion_matrix
from math import floor

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary

from PIL import Image
import seaborn as sns
import plotly.io as pio
pio.renderers.default = "vscode"

#### Set arguments and random seed

In [None]:
TRA_PATH = 'data/train/'
TST_PATH = 'data/test/'
LABEL_PATH = 'data/train.csv'
# DEVICE_ID = 2
SEED = 5566
use_gpu = torch.cuda.is_available()
device = torch.device("cuda" if use_gpu else "cpu")

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
random.seed(SEED)
np.random.seed(SEED)

#### Process data

In [None]:
emotion_cat = {0:'Anger', 1:'Disgust', 2:'Fear', 3:'Happiness', 
    4: 'Sadness', 5: 'Surprise', 6: 'Neutral'}

def get_train_label_and_image(label_path):
    train_label = pd.read_csv(label_path)['label'].values.tolist()
    train_image = [f'data/train/{i+7000}.jpg' for i in range(len(train_label))]
    return train_image, train_label

def load_partial_data(train_image_path, train_label):
    train_data = list(zip(train_image_path, train_label))
    data = []
    path = []
    label_list = []
    label_dict = dict()
    for (img_path, label) in train_data:
        label_dict[label] = 0

    for (img_path, label) in train_data:
        if label_dict[label] == 5:
            continue
        else:
            label_dict[label] += 1
            data.append(np.array(Image.open(img_path)))
            label_list.append(label)
            path.append(img_path)

        if sum(label_dict.values()) == 35:
            break

    X_train = np.array(data)
    y_train = np.array(label_list)
    return X_train, y_train, path

def load_train_data(label_path):
    train_label = pd.read_csv(label_path)['label'].values.tolist()
    train_image = [f'data/train/{i+7000}.jpg' for i in range(len(train_label))]
    train_data = list(zip(train_image, train_label))
    data = []
    label_list = []
    for (path, label) in train_data:
        data.append(np.array(Image.open(path)))
        label_list.append(train_label)
    X_train = np.array(data)
    y_train = np.array(label_list)
    return X_train, y_train

def load_test_data():
    data = []
    test_set = [f'data/test/{i}.jpg' for i in range(7000)]
    for path in test_set:
        data.append(np.array(Image.open(path)))
    X_test = np.array(data)
    return X_test
    
def compute_statistics(dataset):
    data = []
    for (img_path, label) in dataset:
        data.append(np.array(Image.open(img_path)))
    data = np.array(data)
    return data.mean(), data.std()

def compute_category_distribution(train_set, emotion_cat):
    emotion = dict()
    for idx, label in emotion_cat.items():
        emotion[label] = 0
    for (img_path, label) in train_set:
        emotion[emotion_cat[label]] += 1
    return emotion

# EDA

In [None]:
train_image_path, train_label = get_train_label_and_image(LABEL_PATH)
X_train_partial, y_train_partial, path = load_partial_data(train_image_path, train_label)
print(X_train_partial.shape)
print(y_train_partial.shape)
#data_partial = np.concatenate([X_train_partial, y_train_partial], axis = 2)

(35, 64, 64)
(35,)


# Data Preprocessing

In [None]:
X_train = np.load('X_train.npy')
X_test = np.load('X_test.npy')
y_train = np.array(train_label)

In [None]:
transform = transforms.Compose([
    transforms.ToTensor()
])
img_tr = transform(X_train)
img_np = np.array(img_tr)
mean_img = np.mean(img_np)
std_img = np.std(img_np)
print(f"Mean of all image is {mean_img}")
print(f"Std of all image is {std_img}")

Mean of all image is 0.5081561803817749
Std of all image is 0.2644411027431488


# Dataset

In [None]:
class FaceExpressionDataset(Dataset):
    def __init__(self, X, y, augment=None):
        self.X = X
        self.y = y
        self.augment = augment

    def __len__(self):
        return len(self.X)
    
    def read_img(self, idx):
        # img = Image.open(self.data[idx][0])
        img = self.X[idx]
        if not self.augment is None:
            img = self.augment(img)
        img = img.float()
        return img
    
    def __getitem__(self, idx):
        img = self.read_img(idx)
        label = self.y[idx]
        return img, label
    
class TestingDataset(Dataset):
    def __init__(self, X, augment=None):
        self.X = X
        self.augment = augment

    def __len__(self):
        return len(self.X)

    def read_img(self, idx):
        img = self.X[idx]
        if not self.augment is None:
            img = self.augment(img)
        img = img.float()
        return img
        
    def __getitem__(self, idx):
        img = self.read_img(idx)
        return img

In [None]:
class FaceExpressionNet(nn.Module):
    def __init__(self, trial):
        super(FaceExpressionNet, self).__init__()

        ###########################general information###########################
        self.conv1_out_channels = trial.suggest_int("conv1_out_ch", 48, 96, step = 16)
        self.leakyReLU1_slope = trial.suggest_float("leakyReLU1_slope", 0.05, 0.2, step = 0.05)
        self.conv1_dropout_rate = trial.suggest_float("conv1_dropout_rate", 0, 0.3, step = 0.1)
        #self.leakyReLU1_slope = 0.2
        #self.conv1_dropout_rate = 0.2

        self.conv0 = nn.Sequential(
            nn.Conv2d(1, self.conv1_out_channels, kernel_size=5, padding=2),
            nn.LeakyReLU(negative_slope=self.leakyReLU1_slope),
            nn.BatchNorm2d(self.conv1_out_channels),
            nn.Dropout2d(self.conv1_dropout_rate)
        )
        self.conv1 = nn.Sequential(
            nn.Conv2d(self.conv1_out_channels, self.conv1_out_channels, kernel_size=5, padding=2),
            nn.LeakyReLU(negative_slope=self.leakyReLU1_slope),
            nn.BatchNorm2d(self.conv1_out_channels),
            nn.Dropout2d(self.conv1_dropout_rate)
        )

        #########################location information######################        
        self.conv2_out_channels = trial.suggest_int("conv2_out_ch", 256, 1024, step = 128)
        self.leakyReLU2_slope = trial.suggest_float("leakyReLU2_slope", 0.05, 0.2, step = 0.05)
        self.conv2_dropout_rate = trial.suggest_float("conv2_dropout_rate", 0.3, 0.5, step = 0.1)
        #self.leakyReLU2_slope = 0.05
        #self.conv2_dropout_rate = 0.4

        self.conv2 = nn.Sequential(
            nn.Conv2d(self.conv1_out_channels, 128, kernel_size=3, padding=1),
            nn.LeakyReLU(negative_slope=self.leakyReLU2_slope),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2),
            nn.Dropout2d(self.conv2_dropout_rate)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, self.conv2_out_channels, kernel_size=3, padding=1),
            nn.LeakyReLU(negative_slope=self.leakyReLU2_slope),
            nn.BatchNorm2d(self.conv2_out_channels),
            nn.MaxPool2d(2),
            nn.Dropout2d(self.conv2_dropout_rate)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(self.conv2_out_channels, self.conv2_out_channels, kernel_size=3, padding=1),
            nn.LeakyReLU(negative_slope=self.leakyReLU2_slope),
            nn.BatchNorm2d(self.conv2_out_channels),
            nn.MaxPool2d(2),
            nn.Dropout2d(self.conv2_dropout_rate)
        )
        self.fc1 = nn.Sequential(
            nn.Linear(8*8*self.conv2_out_channels, self.conv2_out_channels),
            nn.ReLU(),
            nn.BatchNorm1d(self.conv2_out_channels),
            nn.Dropout(0.5)
        )
        self.fc2 = nn.Sequential(
            nn.Linear(self.conv2_out_channels, self.conv2_out_channels),
            nn.ReLU(),
            nn.BatchNorm1d(self.conv2_out_channels),
            nn.Dropout(0.5)
        )
        self.fc3 = nn.Linear(self.conv2_out_channels, 7)

    def forward(self, x):
        x = self.conv0(x)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = x.view(-1, 8*8*self.conv2_out_channels)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

#### Define training and testing process

In [None]:
#original input dimension: 64 x 64
image_transforms = {'train':transforms.Compose([
                transforms.ToPILImage(),
                transforms.RandomVerticalFlip(),
                transforms.RandomRotation(degrees = (-20, 20)),
                transforms.ToTensor(),
                transforms.Normalize(mean=mean_img, std=std_img)
                     ]),
            'valid': transforms.Compose([
                transforms.ToPILImage(),
                transforms.ToTensor(),
                transforms.Normalize(mean=mean_img, std=std_img)
                    ]),
            'test': transforms.Compose([
                transforms.ToPILImage(),
                transforms.ToTensor(),
                transforms.Normalize(mean=mean_img, std=std_img)
                ])
            }

X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, stratify=y_train)
train_dataset = FaceExpressionDataset(X_train, y_train, image_transforms["train"])
valid_dataset = FaceExpressionDataset(X_valid, y_valid, image_transforms["valid"])
test_dataset = TestingDataset(X_test, image_transforms["test"])

# emotion_dist = compute_category_distribution(train_dataset, emotion_cat)
# emotion_label_list = emotion_dist.keys()
# emotion_count = emotion_dist.values()
#weights = class_weight.compute_class_weight("balanced" , classes = np.unique(y_train), y = y_train)

In [None]:
def train(train_loader, model, loss_fn, optimizer,device):
    model.train()
    gc.collect()
    torch.cuda.empty_cache()
    train_loss = []
    train_acc = []
    for (img, label) in train_loader:
        img = img.to(device)
        label = label.to(device)
        optimizer.zero_grad()
        output = model(img)
        loss = loss_fn(output, label)
        loss.backward()            
        optimizer.step()
        with torch.no_grad():
            predict = torch.argmax(output, dim=-1)
            acc = np.mean((label == predict).cpu().numpy())
            train_acc.append(acc)
            train_loss.append(loss.item())
        #release memory
        del img, label, output, loss, predict, acc
        torch.cuda.empty_cache()
    #average over all batches
    train_acc = np.mean(train_acc)
    train_loss = np.mean(train_loss)
    return train_acc, train_loss
    
    
def valid(valid_loader, model, loss_fn, device):
    model.eval()
    with torch.no_grad():
        valid_loss = []
        valid_acc = []
        for idx, (img, label) in enumerate(valid_loader):
            img = img.to(device)
            label = label.to(device)
            output = model(img)
            loss = loss_fn(output, label)
            predict = torch.argmax(output, dim=-1)
            acc = (label == predict).cpu().tolist()
            valid_loss.append(loss.item())
            valid_acc += acc

            #release memory
            del img, label, output, loss, predict, acc
            torch.cuda.empty_cache()
    #average over all batches
    valid_acc = np.mean(valid_acc)
    valid_loss = np.mean(valid_loss)
    gc.collect()
    torch.cuda.empty_cache()
    return valid_acc, valid_loss

def save_checkpoint(model, prefix='model_1112'):
    checkpoint_path = f'./models/{prefix}.pth'
    torch.save(model.state_dict(), checkpoint_path)
    #print('model saved to %s' % checkpoint_path)

In [None]:
def init_weights(m):
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

def objective(trial):
    model = FaceExpressionNet(trial)
    model.apply(init_weights)
    model.to(device)
    #summary(model, (1, 64, 64))

    #tunning
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam"])
    lr = round(trial.suggest_float("lr", 5*1e-3, 1e-2, log=True), 3)
    batch_size = 128

    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
    # loss_fn = nn.CrossEntropyLoss(weight=torch.FloatTensor(weights).to(device))
    loss_fn = nn.CrossEntropyLoss()
    
    num_epoch = 500

    #build data loader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)      

    #record
    acc_record = []
    best_valid_acc = 0
    best_valid_loss = 999999
    best_epoch = 0

    print(trial.params)

    no_update_cnt = 0
    patience = 10
    x = time.time()
    rand = round(np.random.rand(), 4)
    for epoch in range(num_epoch):

        train_acc, train_loss = train(train_loader, model, loss_fn, optimizer, device)
        valid_acc, valid_loss = valid(valid_loader, model, loss_fn, device)
        acc_record.append(valid_acc)

        #record
        trial.report(valid_acc, epoch)
        if valid_loss < best_valid_loss:
            model_name = "1113" + "_lr_"+ str(lr) + "opt_" + str(optimizer_name) + "_" + str(rand)
            save_checkpoint(model, prefix=model_name)
            best_valid_loss = valid_loss
            best_valid_acc = valid_acc
            best_epoch = epoch
            no_update_cnt = 0
        else:
            no_update_cnt += 1
        y = time.time()
        time_train = (y - x) / 60
        print(f"Epoch {epoch}: {round(time_train, 2)} min elapsed, train acc: {round(train_acc * 100, 2)}%, train loss: {round(train_loss, 3)}, valid acc: {round(valid_acc * 100, 2)}%, valid loss: {round(valid_loss, 3)}")
        gc.collect()
        torch.cuda.empty_cache()
        #Early stop
        if no_update_cnt > patience:
            break

    print('########################################################')
    print("Finish model tuning")
    print(f"Best epoch is {best_epoch}, Accuracy: {best_valid_acc}, Loss: {best_valid_loss}")
    print('########################################################')
    
    if trial.should_prune():
        raise optuna.exceptions.TrialPruned() #不會break for loop
    
    del model, train_loader, valid_loader
    return valid_acc

In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10)

trial = study.best_trial

print('Accuracy: {}'.format(trial.value))
print("Best hyperparameters: {}".format(trial.params))

[32m[I 2021-11-12 16:53:28,564][0m A new study created in memory with name: no-name-eba976e5-6696-45ea-be70-012b0b6cfef9[0m


{'conv1_out_ch': 64, 'leakyReLU1_slope': 0.2, 'conv1_dropout_rate': 0.2, 'conv2_out_ch': 896, 'leakyReLU2_slope': 0.05, 'conv2_dropout_rate': 0.5, 'optimizer': 'Adam', 'lr': 0.006803819791168891}
Epoch 0: 1.94 min elapsed, train acc: 19.81%, train loss: 2.363, valid acc: 25.62%, valid loss: 2.01
Epoch 1: 3.86 min elapsed, train acc: 21.64%, train loss: 2.06, valid acc: 25.81%, valid loss: 1.966
Epoch 2: 5.79 min elapsed, train acc: 23.4%, train loss: 1.955, valid acc: 25.53%, valid loss: 1.812
Epoch 3: 7.74 min elapsed, train acc: 25.75%, train loss: 1.868, valid acc: 28.81%, valid loss: 1.766
Epoch 4: 9.68 min elapsed, train acc: 27.81%, train loss: 1.793, valid acc: 30.43%, valid loss: 1.714
Epoch 5: 11.61 min elapsed, train acc: 29.15%, train loss: 1.749, valid acc: 32.29%, valid loss: 1.667
Epoch 6: 13.53 min elapsed, train acc: 31.19%, train loss: 1.703, valid acc: 35.05%, valid loss: 1.63
Epoch 7: 15.44 min elapsed, train acc: 33.67%, train loss: 1.663, valid acc: 38.22%, valid l

In [None]:
df = study.trials_dataframe()
df

Unnamed: 0,number,value,datetime_start,datetime_complete,duration,params_conv1_dropout_rate,params_conv1_out_ch,params_conv2_dropout_rate,params_conv2_out_ch,params_leakyReLU1_slope,params_leakyReLU2_slope,params_lr,params_optimizer,state
0,0,0.533727,2021-11-12 12:54:09.158294,2021-11-12 13:43:21.140303,0 days 00:49:11.982009,0.5,48,0.5,128,0.2,0.15,0.007563,Adam,COMPLETE
1,1,0.255047,2021-11-12 13:43:21.143094,2021-11-12 13:56:06.365131,0 days 00:12:45.222037,0.3,36,0.5,256,0.1,0.1,0.005637,SGD,COMPLETE
2,2,0.253816,2021-11-12 13:56:06.366902,2021-11-12 14:10:27.557440,0 days 00:14:21.190538,0.4,36,0.5,256,0.2,0.2,0.005617,SGD,COMPLETE
3,3,0.255047,2021-11-12 14:10:27.559971,2021-11-12 14:26:47.415012,0 days 00:16:19.855041,0.4,24,0.4,128,0.15,0.2,0.007239,SGD,COMPLETE
4,4,0.552437,2021-11-12 14:26:47.416791,2021-11-12 15:06:46.080484,0 days 00:39:58.663693,0.3,48,0.4,128,0.2,0.05,0.007641,Adam,COMPLETE


In [None]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_parallel_coordinate(study)

In [None]:
optuna.visualization.plot_param_importances(study)

In [None]:
optuna.visualization.plot_contour(study, params=['optimizer', 'lr'])

In [None]:
optuna.visualization.plot_contour(study, params=['conv1_out_ch', 'conv2_out_ch'])

In [None]:
optuna.visualization.plot_contour(study, params=['conv1_dropout_rate', 'conv2_dropout_rate'])

# Problem 3

In [None]:
from torchviz import make_dot
model = FaceExpressionNet(trial)
train_loader = DataLoader(train_dataset, batch_size=20, shuffle=True)
for (img, label) in train_loader:
    yhat = model(img)
    break
make_dot(yhat, params=dict(list(model.named_parameters()))).render("cnn_torchviz", format="png")

'cnn_torchviz.png'

In [None]:
cf_matrix = confusion_matrix(y_true, y_pred)                                
# 計算每個class的accuracy
per_cls_acc = cf_matrix.diagonal()/cf_matrix.sum(axis=0)
emotion_cat = {0:'Anger', 1:'Disgust', 2:'Fear', 3:'Happiness', 4: 'Sadness', 5: 'Surprise', 6: 'Neutral'}                  
class_names = list(emotion_cat.values)
# print(class_names)
# print(per_cls_acc)                                                      
# print("Plot confusion matrix")

df_cm = pd.DataFrame(cf_matrix, class_names, class_names)     
plt.figure(figsize = (9,6))
sns.heatmap(df_cm, annot=True, fmt="d", cmap='BuGn')
plt.xlabel("prediction")
plt.ylabel("label (ground truth)")
plt.savefig("confusion_matrix.png")

# Problem 4

# Testing

In [None]:
def test(test_loader, model, file_name='./submission/predict_1011.csv'):
    with torch.no_grad():
        predict_result = []
        for idx, img in enumerate(test_loader):
            if use_gpu:
                img = img.to(device)
            output = model(img)
            predict = torch.argmax(output, dim=-1).tolist()
            predict_result += predict
        
    with open(file_name, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['id', 'label'])
        for i in range(len(predict_result)):
            writer.writerow([str(i), str(predict_result[i])])

In [None]:
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
test(test_loader, model)