In [1]:
import torch
import PIL as pil
import openpyxl as xl
import torchvision.transforms as transforms
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
import numpy as np
import random
import matplotlib.pyplot as plt
%matplotlib inline


In [2]:
pil2tensor = transforms.ToTensor()
tensor2pil = transforms.ToPILImage()
normalize = transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))

In [3]:
def plot_image(tensor):
    plt.figure()
    plt.imshow(tensor.numpy().transpose(1, 2, 0))
    plt.show()

In [4]:
def fixImage(path):
    pil_image = pil.Image.open(path)
    new_size = pil_image.size
    
    delta_w = 400 - new_size[0]
    delta_h = 400 - new_size[1]
    padding = (delta_w//2, delta_h//2, delta_w-(delta_w//2), delta_h-(delta_h//2))
    new_im = pil.ImageOps.expand(pil_image, padding)

    rgb_image = pil2tensor(new_im)
    rgb_image = normalize(rgb_image)
    
    return rgb_image
    
    

In [5]:
def makeSets(data):
    indices = [(i) for (i,j) in enumerate(data)]
    train = []
    test = []
    
    trainIndices = random.sample(indices, int(len(data) * .8))
    testIndices = [x for x in indices if x not in trainIndices]
    
    for index in trainIndices:
        train.append(data[index])
    for ind in testIndices:
        test.append(data[index])
    return train, test

In [6]:
def makeData():
    wb = xl.load_workbook(filename='CASME/CASME2-coding-20190701.xlsx')
    sheet = wb.active
    max_row = sheet.max_row
    
    labelDict = {"others": 1, "happiness": 2, "disgust": 3, "repression": 4, "surprise": 5, "fear": 6, "sadness": 7}

    data = []

    for i in range(2, max_row + 1):
        subject = sheet.cell(row=i, column=1).value
        filename = sheet.cell(row = i, column=2).value
        apex = sheet.cell(row = i, column=5).value
        emotion = sheet.cell(row = i, column=9).value
        path = f"CASME/Cropped-updated/Cropped/sub{subject}/{filename}/reg_img{apex}.jpg"
        data.append([fixImage(path), labelDict[emotion]])
    
    return data

In [7]:
class Unit(nn.Module):
    def __init__(self,in_channels,out_channels):
        super(Unit,self).__init__()
        

        self.conv = nn.Conv2d(in_channels=in_channels,kernel_size=3,out_channels=out_channels,stride=1,padding=1)
        self.bn = nn.BatchNorm2d(num_features=out_channels)
        self.relu = nn.ReLU()

    def forward(self,input):
        output = self.conv(input)
        output = self.bn(output)
        output = self.relu(output)

        return output


In [14]:
class SimpleNet(nn.Module):
    def __init__(self,num_classes=7):
        super(SimpleNet,self).__init__()

        #Create 14 layers of the unit with max pooling in between
        self.unit1 = Unit(in_channels=3,out_channels=32)
        self.unit2 = Unit(in_channels=32, out_channels=32)
        self.unit3 = Unit(in_channels=32, out_channels=32)

        #self.pool1 = nn.MaxPool2d(kernel_size=2)

        #self.unit4 = Unit(in_channels=32, out_channels=64)
        #self.unit5 = Unit(in_channels=64, out_channels=64)
        #self.unit6 = Unit(in_channels=64, out_channels=64)
        #self.unit7 = Unit(in_channels=64, out_channels=64)

        #self.pool2 = nn.MaxPool2d(kernel_size=2)

        #self.unit8 = Unit(in_channels=64, out_channels=128)
        #self.unit9 = Unit(in_channels=128, out_channels=128)
        #self.unit10 = Unit(in_channels=128, out_channels=128)
        #self.unit11 = Unit(in_channels=128, out_channels=128)

        #self.pool3 = nn.MaxPool2d(kernel_size=2)

        #self.unit12 = Unit(in_channels=128, out_channels=128)
        #self.unit13 = Unit(in_channels=128, out_channels=128)
        #self.unit14 = Unit(in_channels=128, out_channels=128)

        self.avgpool = nn.AvgPool2d(kernel_size=4)
        
        #Add all the units into the Sequential layer in exact order
        self.net = nn.Sequential(self.unit1, self.unit2, self.unit3, self.avgpool)

        self.fc = nn.Linear(in_features= 400*400*32,out_features=num_classes)

    def forward(self, input):
        output = self.net(input)
        output = output.view(32, 7)
        output = self.fc(output)
        return output


In [15]:
#Create a learning rate adjustment function that divides the learning rate by 10 every 30 epochs
def adjust_learning_rate(epoch):

    lr = 0.001

    if epoch > 180:
        lr = lr / 1000000
    elif epoch > 150:
        lr = lr / 100000
    elif epoch > 120:
        lr = lr / 10000
    elif epoch > 90:
        lr = lr / 1000
    elif epoch > 60:
        lr = lr / 100
    elif epoch > 30:
        lr = lr / 10

    for param_group in optimizer.param_groups:
        param_group["lr"] = lr



In [16]:
def save_models(epoch):
    torch.save(model.state_dict(), "microexpressionsmodel_{}.model".format(epoch))
    print("Checkpoint saved")

In [17]:
def test(testSet):
    model.eval()
    test_acc = 0.0
    for i, (images, labels) in enumerate(testSet):

        #Predict classes using images from the test set
        outputs = model(images)
        _,prediction = torch.max(outputs.data, 1)
        prediction = prediction.cpu().numpy()
        test_acc += torch.sum(prediction == labels.data)
        


    #Compute the average acc and loss over all 10000 test images
    test_acc = test_acc / 10000

    return test_acc

In [18]:
def train(num_epochs, train, test):
    best_acc = 0.0

    for epoch in range(num_epochs):
        model.train()
        train_acc = 0.0
        train_loss = 0.0
        for i, (images, labels) in enumerate(train):
            print(labels)
            
            #Clear all accumulated gradients
            optimizer.zero_grad()
            #Predict classes using images from the test set
            outputs = model(images)
            #Compute the loss based on the predictions and actual labels
            loss = loss_fn(outputs,labels)
            #Backpropagate the loss
            loss.backward()

            #Adjust parameters according to the computed gradients
            optimizer.step()

            train_loss += loss.cpu().data[0] * images.size(0)
            _, prediction = torch.max(outputs.data, 1)
            
            train_acc += torch.sum(prediction == labels.data)

        #Call the learning rate adjustment function
        adjust_learning_rate(epoch)

        #Compute the average acc and loss over all 50000 training images
        train_acc = train_acc / 50000
        train_loss = train_loss / 50000

        #Evaluate on the test set
        test_acc = test(test)

        # Save the model if the test acc is greater than our current best
        if test_acc > best_acc:
            save_models(epoch)
            best_acc = test_acc


        # Print the metrics
        print("Epoch {}, Train Accuracy: {} , TrainLoss: {} , Test Accuracy: {}".format(epoch, train_acc, train_loss,test_acc))

In [19]:
if __name__ == "__main__":
    data = makeData()
    trainSet, testSet = makeSets(data)
    
    batch_size = 16

    test_loader = DataLoader(testSet, batch_size=32, shuffle=False, num_workers=4)
    train_loader = DataLoader(trainSet, batch_size=32, shuffle=False, num_workers=4)

    #Create model, optimizer and loss function
    model = SimpleNet(num_classes=7)

    optimizer = Adam(model.parameters(), lr=0.001,weight_decay=0.0001)
    loss_fn = nn.CrossEntropyLoss()
    
    train(10, train_loader, test_loader)
    
    plot_image(pictures[8])

tensor([1, 1, 1, 5, 3, 3, 3, 1, 3, 1, 3, 1, 3, 1, 3, 5, 1, 1, 2, 1, 1, 2, 5, 3,
        1, 1, 3, 1, 3, 1, 1, 1])


RuntimeError: [enforce fail at ..\c10\core\CPUAllocator.cpp:72] data. DefaultCPUAllocator: not enough memory: you tried to allocate 655360000 bytes. Buy new RAM!
