In [0]:
import numpy as np
import torch
import torch.nn as nn
import os
import torch.optim as optim
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import torchvision
import torchvision.transforms as transforms
from PIL import Image, ImageDraw
from numpy import genfromtxt
import skimage
from skimage.transform import rotate
from skimage.util import random_noise
import copy
from copy import deepcopy
from matplotlib.font_manager import FontProperties
from sklearn.model_selection import train_test_split
from skimage import io
from collections import OrderedDict
from functools import partial
from keras.preprocessing import image
from keras.preprocessing.image import load_img

In [0]:
filname = 'drive/My Drive/fer2013.csv'
label_map = ['Anger', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
names=['emotion','pixels','usage']
df=pd.read_csv('drive/My Drive/fer2013.csv',names=names, na_filter=False)
im=df['pixels']
df.head(10)

In [0]:
def getData(filname):
    Y = []
    X = []
    train_set = []
    first = True
    for line in open(filname):
        if first:
            first = False
        else:
            row = line.split(',')
            Y.append(int(row[0]))
            X.append([int(p) for p in row[1].split()])

    X, Y = np.array(X) / 255.0, np.array(Y)
    
    for i in range(len(X)):
        train_set.append([X[i], Y[i]])
        
    return X, Y, train_set

In [0]:
X, Y, train = getData(filname)
num_class = len(set(Y))
pic_size = 48
chanels_num = 1
print(num_class)

In [0]:
N, D = X.shape
X = X.reshape(N, pic_size, pic_size, 1)
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.1, random_state=0)

In [0]:
final_train_data = []
final_target_train = []
for i in tqdm(range(X_train.shape[0])):
    final_train_data.append(X_train[i])
    final_train_data.append(rotate(X_train[i], angle=45, mode = 'wrap'))
    final_train_data.append(np.fliplr(X_train[i]))
    final_train_data.append(np.flipud(X_train[i]))
    final_train_data.append(random_noise(X_train[i],var=0.2**2))
    for j in range(5):
        final_target_train.append(y_train[i])

In [0]:
X_train = np.array(deepcopy(final_train_data))
y_train = np.array(deepcopy(final_target_train))

In [0]:
X_train = torch.Tensor(X_train)
y_train = torch.Tensor(y_train).long()
X_test = torch.Tensor(X_test)
y_test = torch.Tensor(y_test).long()

In [0]:
class Conv2dAuto(nn.Conv2d):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.padding =  (self.kernel_size[0] // 2, self.kernel_size[1] // 2)

conv3x3 = partial(Conv2dAuto, kernel_size=3, bias=False)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.in_channels, self.out_channels =  in_channels, out_channels
        self.blocks = nn.Identity()
        self.shortcut = nn.Identity()   
    
    def forward(self, x):
        residual = x
        if self.should_apply_shortcut: residual = self.shortcut(x)
        x = self.blocks(x)
        x += residual
        return x
    
    @property
    def should_apply_shortcut(self):
        return self.in_channels != self.out_channels

def conv_bn(in_channels, out_channels, conv, *args, **kwargs):
    return nn.Sequential(OrderedDict({'conv': conv(in_channels, out_channels, *args, **kwargs), 
                          'bn': nn.BatchNorm2d(out_channels) }))
    
class ResNetResidualBlock(ResidualBlock):
    def __init__(self, in_channels, out_channels, expansion=1, downsampling=1, conv=conv3x3, *args, **kwargs):
        super().__init__(in_channels, out_channels)
        self.expansion, self.downsampling, self.conv = expansion, downsampling, conv
        self.shortcut = nn.Sequential(OrderedDict(
        {
            'conv' : nn.Conv2d(self.in_channels, self.expanded_channels, kernel_size=1,
                      stride=self.downsampling, bias=False),
            'bn' : nn.BatchNorm2d(self.expanded_channels)
            
        })) if self.should_apply_shortcut else None
        
        
    @property
    def expanded_channels(self):
        return self.out_channels * self.expansion
    
    @property
    def should_apply_shortcut(self):
        return self.in_channels != self.expanded_channels

class ResNetBasicBlock(ResNetResidualBlock):
    expansion = 1
    def __init__(self, in_channels, out_channels, activation=nn.ReLU, *args, **kwargs):
        super().__init__(in_channels, out_channels, *args, **kwargs)
        self.blocks = nn.Sequential(
            conv_bn(self.in_channels, self.out_channels, conv=self.conv, bias=False, stride=self.downsampling),
            activation(),
            conv_bn(self.out_channels, self.expanded_channels, conv=self.conv, bias=False),
        )

class ResNetBottleNeckBlock(ResNetResidualBlock):
    expansion = 4
    def __init__(self, in_channels, out_channels, activation=nn.ReLU, *args, **kwargs):
        super().__init__(in_channels, out_channels, expansion=4, *args, **kwargs)
        self.blocks = nn.Sequential(
           conv_bn(self.in_channels, self.out_channels, self.conv, kernel_size=1),
             activation(),
             conv_bn(self.out_channels, self.out_channels, self.conv, kernel_size=3, stride=self.downsampling),
             activation(),
             conv_bn(self.out_channels, self.expanded_channels, self.conv, kernel_size=1),
        )

class ResNetLayer(nn.Module):
    def __init__(self, in_channels, out_channels, block=ResNetBasicBlock, n=1, *args, **kwargs):
        super().__init__()
        downsampling = 2 if in_channels != out_channels else 1
        
        self.blocks = nn.Sequential(
            block(in_channels , out_channels, *args, **kwargs, downsampling=downsampling),
            *[block(out_channels * block.expansion, 
                    out_channels, downsampling=1, *args, **kwargs) for _ in range(n - 1)]
        )

    def forward(self, x):
        x = self.blocks(x)
        return x

class ResNetEncoder(nn.Module):
    def __init__(self, in_channels=3, blocks_sizes=[64, 128, 256, 512], deepths=[2,2,2,2], 
                 activation=nn.ReLU, block=ResNetBasicBlock, *args,**kwargs):
        super().__init__()
        
        self.blocks_sizes = blocks_sizes
        
        self.gate = nn.Sequential(
            nn.Conv2d(in_channels, self.blocks_sizes[0], kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(self.blocks_sizes[0]),
            activation(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        self.in_out_block_sizes = list(zip(blocks_sizes, blocks_sizes[1:]))
        self.blocks = nn.ModuleList([ 
            ResNetLayer(blocks_sizes[0], blocks_sizes[0], n=deepths[0], activation=activation, 
                        block=block,  *args, **kwargs),
            *[ResNetLayer(in_channels * block.expansion, 
                          out_channels, n=n, activation=activation, 
                          block=block, *args, **kwargs) 
              for (in_channels, out_channels), n in zip(self.in_out_block_sizes, deepths[1:])]       
        ])
        
        
    def forward(self, x):
        x = self.gate(x)
        for block in self.blocks:
            x = block(x)
        return x

class ResnetDecoder(nn.Module):
    def __init__(self, in_features, n_classes):
        super().__init__()
        self.avg = nn.AdaptiveAvgPool2d((1, 1))
        self.decoder = nn.Linear(in_features, n_classes)

    def forward(self, x):
        x = self.avg(x)
        x = x.view(x.size(0), -1)
        x = self.decoder(x)
        return x

class ResNet(nn.Module):
    
    def __init__(self, in_channels, n_classes, *args, **kwargs):
        super().__init__()
        self.encoder = ResNetEncoder(in_channels, *args, **kwargs)
        self.decoder = ResnetDecoder(self.encoder.blocks[-1].blocks[-1].expanded_channels, n_classes)
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

def resnet18(in_channels, n_classes):
    return ResNet(in_channels, n_classes, block=ResNetBasicBlock, deepths=[2, 2, 2, 2])

def resnet34(in_channels, n_classes):
    return ResNet(in_channels, n_classes, block=ResNetBasicBlock, deepths=[3, 4, 6, 3])

def resnet50(in_channels, n_classes):
    return ResNet(in_channels, n_classes, block=ResNetBottleNeckBlock, deepths=[3, 4, 6, 3])

def resnet101(in_channels, n_classes):
    return ResNet(in_channels, n_classes, block=ResNetBottleNeckBlock, deepths=[3, 4, 23, 3])

def resnet152(in_channels, n_classes):
    return ResNet(in_channels, n_classes, block=ResNetBottleNeckBlock, deepths=[3, 8, 36, 3])

net = resnet152(1, 7).cuda()
optimizer = optim.Adam(net.parameters(), lr=0.0001)
loss_function = nn.CrossEntropyLoss()
PATH = 'ResNet.h5'

In [0]:
fig,ax = plt.subplots(nrows=1,ncols=5,figsize=(48,48))
for i in range(5):
    ax[i].imshow(X_train[i+30].view(48, 48))
    ax[i].axis('off')

In [0]:
EPOCHS = 10
batch_size = 100
train_acc = [0] * EPOCHS
train_loss = [0] * EPOCHS
test_acc = [0] * EPOCHS
test_loss = [0] * EPOCHS
index = 0

for epochs in range(EPOCHS):
    correct1 = 0
    total1 = 0
    permutation = torch.randperm(X_train.size()[0])
    for i in tqdm(range(0, len(X_train), batch_size)): 
        indices = permutation[i:i+batch_size]     
        batch_X = X_train[indices].view(-1, 1, 48, 48).cuda()     
        batch_y = y_train[indices].cuda()     
        net.zero_grad()      
        outputs = net(batch_X)
        for j in range(len(outputs)):
                ind = torch.argmax(outputs[j])
                if (batch_y[j] == ind):
                    correct1 += 1
                total1 += 1   
        loss = loss_function(outputs, batch_y)   
        loss.backward()     
        optimizer.step()  
    print(f"Epoch: {epochs}. Loss: {loss}")
    train_acc[index] = round(correct1 / total1, 3)
    train_loss[index] = loss
    correct = 0
    total = 0
    net.eval()
    with torch.no_grad():
        for new_i in tqdm(range(0, len(X_test), batch_size)):
            real_class = y_test[new_i:new_i + batch_size].cuda()
            batch_X = X_test[new_i:new_i + batch_size].view(-1, 1, 48, 48).cuda()
            ans = net(batch_X)
            loss = loss_function(ans, real_class)
            for j in range(len(ans)):
                ind = torch.argmax(ans[j])
                if (real_class[j] == ind):
                    correct += 1
                total += 1
    net.train()
    test_acc[index] = round(correct/total, 3)
    test_loss[index] = loss
    index += 1

torch.save(net.state_dict(), PATH)

In [0]:
list1 = [0] * EPOCHS
for i in range(EPOCHS):
  list1[i] = i
font = FontProperties()
font = FontProperties()
font.set_family('serif')
font.set_name('Times New Roman')
font.set_style('italic')
fig, ax = plt.subplots(figsize=(5, 3))
fig.subplots_adjust(bottom=0.15, left=0.2)
ax.plot(list1, train_acc, label = 'Train accuracy')
ax.plot(list1, train_loss, label = 'Train loss')
ax.set_xlabel('EPOCHS', fontproperties=font)
plt.legend(loc='upper right')
plt.show()

In [0]:
font = FontProperties()
font = FontProperties()
font.set_family('serif')
font.set_name('Times New Roman')
font.set_style('italic')
fig, ax = plt.subplots(figsize=(5, 3))
fig.subplots_adjust(bottom=0.15, left=0.2)
ax.plot(list1, test_acc, label = 'Test accuracy')
ax.plot(list1, test_loss, label = 'Test loss')
ax.set_xlabel('EPOCHS', fontproperties=font)
plt.legend(loc='upper right')
plt.show()

In [0]:
correct = 0
total = 0
ind = 0
net.eval()
with torch.no_grad():
    for i in tqdm(range(0, len(X_train), batch_size)):
        real_class = y_train[i:i + batch_size]
        batch_X = X_train[i:i + batch_size].view(-1, 1, 48, 48).cuda()
        ans = net(batch_X)
        for j in range(len(ans)):
            ind = torch.argmax(ans[j])
            if (real_class[j] == ind):
                correct += 1
            total += 1
print("Accuracy on train: ", round(correct/total, 3))
net.train()

In [0]:
correct = 0
total = 0
ind = 0
net.eval()
with torch.no_grad():
    for i in tqdm(range(0, len(X_test), batch_size)):
        real_class = y_test[i:i + batch_size]
        batch_X = X_test[i:i + batch_size].view(-1, 1, 48, 48).cuda()
        ans = net(batch_X)
        for j in range(len(ans)):
            ind = torch.argmax(ans[j])
            if (real_class[j] == ind):
                correct += 1
            total += 1
print("Accuracy on test: ", round(correct/total, 3))
net.train()

In [0]:
objects = ('angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral')
y_pos = np.arange(len(objects))
print(y_pos)

In [0]:
def emotion_analysis(emotions):
    objects = ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']
    y_pos = np.arange(len(objects))
    plt.bar(y_pos, emotions, align='center', alpha=0.9)
    plt.tick_params(axis='x', which='both', pad=10,width=4,length=10)
    plt.xticks(y_pos, objects)
    plt.ylabel('percentage')
    plt.title('emotion')
    
plt.show()

In [0]:
objects = ('angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral')
y_pos = np.arange(len(objects))
print(y_pos)

In [0]:
def emotion_analysis(emotions):
    objects = ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']
    y_pos = np.arange(len(objects))
    plt.bar(y_pos, emotions, align='center', alpha=0.9)
    plt.tick_params(axis='x', which='both', pad=10,width=4,length=10)
    plt.xticks(y_pos, objects)
    plt.ylabel('percentage')
    plt.title('emotion')
    
plt.show()

In [0]:
img = image.load_img('w.jpg', grayscale=True, target_size=(48, 48))
show_img=image.load_img('w.jpg', grayscale=False, target_size=(200, 200))
a = image.img_to_array(img)
a = np.expand_dims(a, axis = 0)

a /= 255

x = [a, a]

x = torch.Tensor(x).view(-1, 1, 48, 48).cuda()

net.eval()
custom = net(x)
net.train()

custom = custom.cpu().detach().numpy()

emotion_analysis(custom[0])

x = x[0].cpu().detach().numpy()

x = np.array(x, 'float32')
x = x.reshape([48, 48]);

plt.gray()
plt.imshow(show_img)
plt.show()

m=0.000000000000000000001
a=custom[0]
for i in range(0,len(a)):
    if a[i]>m:
        m=a[i]
        ind=i
        
print('Expression Prediction:',objects[ind])