In [1]:
%matplotlib inline
import pandas as pd
import os
import json
from glob import glob
import numpy as np
np.set_printoptions(precision=4, linewidth=100)
from matplotlib import pyplot as plt
from scipy.misc import imresize

In [2]:
import csv
from tqdm import tqdm
from tqdm import tnrange, tqdm_notebook
from keras.utils import np_utils
source_size = (48,48)
target_size = (224,224)
cmap = plt.get_cmap('hot')

# Data Labels Defined
labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']

# Read in data in an efficient manner
# We need to convert the black and white image data to an RGB image that the VGG16 model expects
# We accomplish the color conversion with the use of a color map

class DataReader:
    def __init__(self, batch_size=1000):
        self.X = None
        self.Y = None
        self.X_acc = []
        self.Y_acc = []
        self.batch_size = batch_size
        
    def append(self, X, Y):
        self.X_acc.append(X)
        self.Y_acc.append(Y)
        if len(self.X_acc) >= self.batch_size:
            self.consolidate()
            
    def consolidate(self):
        if len(self.X_acc) > 0:
            if self.X is None:
                self.X = np.stack(self.X_acc)
                self.Y = np.stack(self.Y_acc)
            else:
                self.X = np.concatenate((self.X, self.X_acc), axis=0)
                self.Y = np.concatenate((self.Y, self.Y_acc), axis=0)
            self.X_acc = []
            self.Y_acc = []

        self.X = np.stack(self.X)
        self.Y = np.stack(self.Y)

        
class DataReaderSimple:
    def __init__(self, batch_size=1000):
        self.X = None
        self.Y = None
        self.X_acc = []
        self.Y_acc = []
        self.batch_size = batch_size
        
    def append(self, X, Y):
        self.X_acc.append(X)
        self.Y_acc.append(Y)
            
    def consolidate(self):
        if len(self.X_acc) > 0:
            self.Y = np.stack(self.Y_acc)
            self.Y_acc = []
            self.X = np.stack(self.X_acc)
            self.X_acc = []
        
def load_data(filename, dataset=None, batch_size=1000, limit=0):
    assert dataset in ['Training', 'PublicTest', 'PrivateTest']
    X_out = []
    Y_out = []    
    def append(X, Y):
        X_out = np.stack(X_out, X)
        Y_out = np.stack(Y_out, Y)
        
    data = DataReaderSimple(batch_size=batch_size)
    
    categories = set()
    
    with open(filename, 'rb') as csvfile:
        reader = csv.reader(csvfile)
        header = reader.next()
        X_accumulate = []
        Y_accumulate = []
        usages = set()
        i=0
        for row in tqdm_notebook(reader):
            emotion = int(row[0])
            usage = row[2]
            usages.add(usage)
            categories.add(emotion)

            if usage != dataset: continue
            
            i+=1
            if limit != 0 and i > limit:
                break
                
            image_data = imresize(pd.DataFrame(row[1].split(), dtype=int).values.reshape(*source_size),target_size)
            image_data = np.delete(cmap(image_data), 3, 2)

            data.append(image_data, emotion)
            
        # final bunch
        data.consolidate()
    
    num_classes = len(categories)
    # rearrange axes to fit the VGG model
    # switch outputs to categorical
    return np.swapaxes(data.X, 3, 1), np_utils.to_categorical(data.Y, num_classes)

Using Theano backend.
Using gpu device 0: Tesla K80 (CNMeM is disabled, cuDNN 5103)


In [3]:
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Dense, GlobalAveragePooling2D, GlobalMaxPooling2D
from keras.models import Model, load_model
from keras import optimizers
from keras.applications import vgg16

nb_classes = len(labels)
weights_file = "weights.h5"

def create_default_model():
    # build the model from scratch using VGG16 as a base
    model_vgg16 = vgg16.VGG16(weights='imagenet', include_top=True)
    #model_vgg16_conv.summary()
    layer = model_vgg16.layers[-2].output
    # create a new layer for generating predictions for this data set
    predictions_layer = Dense(nb_classes, activation='softmax', name='predictions')(layer)
    model = Model(input=model_vgg16.input, output=predictions_layer)
    #model.summary()
    return model

def add_compiler(model, lr=0.001):
    model.compile(optimizer=optimizers.Adam(lr=lr), loss='categorical_crossentropy', metrics=['accuracy'])

def load_model_with_weights_if_available():
    # if weights exists on disk, then load it
    model = create_default_model()
    if os.path.exists(weights_file):
        model.load_weights(weights_file)
        print("Model loaded from file %s" % weights_file)
    else:
        print("Model built from scratch")
        
    add_compiler(model)
    return model

def get_batches(i, o, gen=ImageDataGenerator(), batch_size=32, shuffle=True):
    return gen.flow(i, o, batch_size=batch_size, shuffle=shuffle)

In [5]:
# Load the training data set
X, Y = load_data('fer2013/fer2013.csv', dataset='Training', batch_size=1000, limit=10000)

Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"


          10000it [00:40, 249.75it/s]

In [7]:
# Load Testing data sets
X_public, Y_public = load_data('fer2013/fer2013.csv', dataset='PublicTest', batch_size=100, limit=100)
X_private, Y_private = load_data('fer2013/fer2013.csv', dataset='PrivateTest', batch_size=100, limit=100)

Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"





Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"


In [53]:
model = load_model_with_weights_if_available()

Model loaded from file weights.h5


In [54]:
for layer in model.layers:
    if layer.name in ['predictions']:
        layer.trainable = True
    else:
        layer.trainable = False
add_compiler(model)
model.summary()

____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
input_10 (InputLayer)            (None, 3, 224, 224)   0                                            
____________________________________________________________________________________________________
block1_conv1 (Convolution2D)     (None, 64, 224, 224)  0           input_10[0][0]                   
____________________________________________________________________________________________________
block1_conv2 (Convolution2D)     (None, 64, 224, 224)  0           block1_conv1[0][0]               
____________________________________________________________________________________________________
block1_pool (MaxPooling2D)       (None, 64, 112, 112)  0           block1_conv2[0][0]               
___________________________________________________________________________________________

In [70]:
# MAIN TRAINING CYCLE
# Fit the model, and then save the weights to disk
nb_epoch = 5
batch_size = 64
N=Y.shape[0]
nb_sample = N
N_test=64
batches = get_batches(X, Y, batch_size=batch_size)
val_batches = get_batches(X_public[:N_test], Y_public[:N_test], batch_size=batch_size)
model.fit_generator(batches, 
                    samples_per_epoch=nb_sample, 
                    nb_epoch=nb_epoch, 
                    validation_data=val_batches, 
                    nb_val_samples=N_test, 
                    nb_worker=3)
model.save_weights(weights_file)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [71]:
# Check Accuracy of the test data sets
loss, acc = model.evaluate(X_public, Y_public, verbose=0)
print("Public Test Loss: %.4f, Accuracy: %.4f" % (loss, acc))
loss, acc = model.evaluate(X_private, Y_private, verbose=0)
print("Private Test Loss: %.4f, Accuracy: %.4f" % (loss, acc))

Public Test Loss: 1.7229, Accuracy: 0.3100
Private Test Loss: 1.7422, Accuracy: 0.2800


In [205]:
# predict private set results and save submission file to disk
results = model.predict(X_private)
values = np.argmax(results, axis=1)
with open("submission_private.csv", "wb") as fp:
    for x in values:
        fp.write("%d\n" % x)
    fp.close()