In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import copy
import matplotlib.pyplot as plt

In [None]:
device = torch.device("cuda:0")
print(device)
if torch.cuda.is_available():
    device = torch.device("cuda:0")  # you can continue going on here, like cuda:1 cuda:2....etc. 
    print("Running on the GPU")
else:
    device = torch.device("cpu")
    print("Running on the CPU")

In [None]:
inputloc = "../input/face-expression-recognition-dataset/images/"
validate = "validation/"
train = "train/"

In [None]:
loc = "../input/face-expression-recognition-dataset/images/train/angry/"
for i in sorted(os.listdir(loc)):
    path = os.path.join(loc,i)
    break
data = cv2.imread(path)
plt.imshow(data)
im = np.fliplr(data)
plt.imshow(im)

In [None]:
class Process:
    def __init__(self,loc,t = False):
        self.raw_data = []
        self.labels = {}
        self.labelcount = {}
        self.count = 0
        self.train = t
        self.img_size = (50,50)
        self.labeling(loc)
        self.load_images(loc)
        self.shuffle()
        self.shuffle()
        
    def labeling(self,loc):
        for folders in sorted(os.listdir(loc)):
            self.labels[folders] = self.count
            self.count += 1
            self.labelcount[folders] = 0
        
    def load_images(self,loc):
        for folders in sorted(os.listdir(loc)):
            folder_path = os.path.join(loc,folders)
            for file in tqdm(os.listdir(folder_path)):
                path = os.path.join(folder_path,file)
                img = cv2.imread(path,cv2.IMREAD_GRAYSCALE)
                img = cv2.resize(img,self.img_size)
                data = np.asarray(img)
                self.raw_data.append([data,np.eye(len(self.labels))[self.labels[folders]]])
                if self.train:
                    im = np.fliplr(data)
                    self.raw_data.append([im,np.eye(len(self.labels))[self.labels[folders]]])
                self.labelcount[folders] += 1
                if self.labelcount[folders] >= 10000000:
                    break
                
    def shuffle(self):
        np.random.shuffle(self.raw_data)

In [None]:
trainloc = os.path.join(inputloc,train)
testloc = os.path.join(inputloc,validate)
print(os.listdir(trainloc))
print(os.listdir(testloc))

In [None]:
ob = Process(trainloc,True)
training_data = ob.raw_data

In [None]:
ob = Process(testloc)
test_data = ob.raw_data


In [None]:
print(len(training_data),len(test_data))

In [None]:
X = torch.Tensor([i[0] for i in training_data]).view(-1,50,50)
X.to(device)
X = X/255.0
y = torch.Tensor([i[1] for i in training_data])

In [None]:
train_X = X
train_y = y

In [None]:
X = torch.Tensor([i[0] for i in test_data]).view(-1,50,50)
X.to(device)
X = X/255.0
y = torch.Tensor([i[1] for i in test_data])

In [None]:
test_X = X
test_y = y

In [None]:
print(len(train_X),len(test_X))

In [None]:

class Net(nn.Module):
    def __init__(self):
        super().__init__() # just run the init of parent class (nn.Module)
        self.conv1 = nn.Conv2d(1, 32, 5) # input is 1 image, 32 output channels, 5x5 kernel / window
        self.conv2 = nn.Conv2d(32, 64, 5) # input is 32, bc the first layer output 32. Then we say the output will be 64 channels, 5x5 kernel / window
        self.conv3 = nn.Conv2d(64, 128, 5)

        x = torch.randn(50,50).view(-1,1,50,50)
        self._to_linear = None
        self.convs(x)

        self.fc1 = nn.Linear(self._to_linear, 512) #flattening.
        self.fc2 = nn.Linear(512, len(ob.labels)) # 512 in, 2 out bc we're doing 2 classes (dog vs cat).

    def convs(self, x):
        # max pooling over 2x2
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv2(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv3(x)), (2, 2))

        if self._to_linear is None:
            self._to_linear = x[0].shape[0]*x[0].shape[1]*x[0].shape[2]
        return x

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, self._to_linear)  # .view is reshape ... this flattens X before 
        x = F.relu(self.fc1(x))
        x = self.fc2(x) # bc this is our output layer. No activation here.
        return F.softmax(x, dim=1)


net = Net()
print(net)

In [None]:
net.to(device)
net = Net().to(device)

In [None]:
train_X.to(device)
train_y.to(device)
def train(net):
    optimizer = optim.Adam(net.parameters(), lr=0.001)
    loss_function = nn.MSELoss()
    BATCH_SIZE = 50
    EPOCHS = 50
    for epoch in range(EPOCHS):
        for i in tqdm(range(0, len(train_X), BATCH_SIZE)): # from 0, to the len of x, stepping BATCH_SIZE at a time. [:50] ..for now just to dev
            #print(f"{i}:{i+BATCH_SIZE}")
            batch_X = train_X[i:i+BATCH_SIZE].view(-1, 1, 50, 50)
            batch_y = train_y[i:i+BATCH_SIZE]

            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            net.zero_grad()

            optimizer.zero_grad()   # zero the gradient buffers
            outputs = net(batch_X)
            loss = loss_function(outputs, batch_y)
            loss.backward()
            optimizer.step()    # Does the update

        print(f"Epoch: {epoch}. Loss: {loss}")

train(net)

In [None]:
test_X.to(device)
test_y.to(device)

def test(net):
    correct = 0
    total = 0
    with torch.no_grad():
        for i in tqdm(range(len(test_X))):
            real_class = torch.argmax(test_y[i]).to(device)
            net_out = net(test_X[i].view(-1, 1, 50, 50).to(device))[0]  # returns a list, 
            predicted_class = torch.argmax(net_out)

            if predicted_class == real_class:
                correct += 1
            total += 1

    print("Accuracy: ", round(correct/total, 3)*100)

test(net)