In [None]:
""" Define constants and configs """
from pathlib import Path
import zipfile

import tensorflow as tf

print('Using GPU:', bool(tf.test.gpu_device_name()))


USE_GOOGLE_DRIVE = False
# BASE_DIR = Path("/content/drive/My Drive/datasett")
BASE_DIR = Path(".")
IMAGE_DIR = {
    'train': BASE_DIR / 'organized/training',
    'validation': BASE_DIR / 'organized/validation',
    'test': BASE_DIR / 'organized/test'
}

if USE_GOOGLE_DRIVE:
    from google.colab import drive

    # Connect to google drive
    drive.mount('/content/drive')
    
    # Go to https://drive.google.com/drive/folders/1-75Md9VucbQsmcb52JYk3wOlv8wqW_sl
    # and add it to your drive

# Print files per folder
!find './organized' -mindepth 2 -type d  -exec du --inodes {} \;

# Creating a emotion classifier from MobileNetV2

In [None]:
""" Load images with real-time data augmentation """

from keras.preprocessing.image import ImageDataGenerator
from keras.applications.mobilenetv2 import preprocess_input

IMAGE_SHAPE = (224, 224, 3)
BATCH_SIZE=20

# Prepare to load the images
datagen_attrs = dict(
    batch_size=BATCH_SIZE,  # How many images will be used in each step
    target_size=IMAGE_SHAPE[:2],  # Resize to fit models' input
    class_mode='categorical'  # Return labels as 2D one-hot array
)

datagen = lambda: ImageDataGenerator(preprocessing_function=preprocess_input)

print('Setting up train data:', end=' ')
train_it = datagen().flow_from_directory(IMAGE_DIR['train'], **datagen_attrs)
print('Setting up validation data:', end=' ')
val_it = datagen().flow_from_directory(IMAGE_DIR['validation'], **datagen_attrs)
# print('Setting up test data:', end=' ')
# test_it = datagen().flow_from_directory(IMAGE_DIR['test'], **datagen_attrs)

num_classes=len(train_it.class_indices)

# Print classes summary
print('Loaded', num_classes, 'classes:', train_it.class_indices)

In [None]:
""" Prepare the custom MobileNetV2 (emotion predictor) model """

from keras.applications.mobilenetv2 import MobileNetV2
from keras.layers import Dense
from keras.models import Model

FEATURE_EXTRACTOR_POLLING = 'avg'
ACTIVATION_FUNC = 'softmax'

# Create a MobileNetV2
mnv2_feature_extractor = MobileNetV2(
    input_shape=IMAGE_SHAPE,
    include_top=False,  # Don't include the last layer (1000 classes classification)
    weights='imagenet',  # Use imagenet pre trained weights
    pooling=FEATURE_EXTRACTOR_POLLING  #  Reduce the size of the feature array
)

# Create an output layer and bind it to the feature extractor MobileNetV2
custom_output = Dense(num_classes, activation=ACTIVATION_FUNC, name='predictions')(mnv2_feature_extractor.output)
mnv2_emotion_predictor = Model(inputs=mnv2_feature_extractor.input, outputs=custom_output)

# Print model architecture
mnv2_emotion_predictor.summary()

In [None]:
""" Train the custom model """
import time

from keras.callbacks import ModelCheckpoint, TensorBoard
from keras.optimizers import Adadelta

# Train config
LOSS = 'mean_squared_error'
OPTIMIZER = Adadelta(lr=0.5, rho=0.95, epsilon=1e-6)
EPOCHS = 400

# Use ModelCheckpoint to save the training progress
filepath = str(BASE_DIR / "weights/weights-improvement-224x224-{epoch:02d}-{val_acc:.2f}.hdf5")
checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=0, save_best_only=False, 
                             save_weights_only=False, mode='auto', period=1)

# Use TensorBoard to plot progress
tensorboard = TensorBoard(log_dir=str(BASE_DIR / "board/{}".format(time.time())), batch_size=BATCH_SIZE)

# Train
mnv2_emotion_predictor.compile(loss=LOSS, optimizer=OPTIMIZER, metrics=['accuracy'])
try:
    mnv2_emotion_predictor.fit_generator(
        train_it,
        epochs=EPOCHS,
        steps_per_epoch=BATCH_SIZE//2,  #  sample_size = batch_size * steps_per_epochs
        validation_data=val_it,
        validation_steps=BATCH_SIZE//4,
        callbacks=[checkpoint, tensorboard])
except KeyboardInterrupt:
    print("Interrupted via keyboard")

# Using the model as a feature extractor for a SVM

In [None]:
""" Load images with real-time data augmentation """

from keras.preprocessing.image import ImageDataGenerator
from keras.applications.mobilenetv2 import preprocess_input

IMAGE_SHAPE = (224, 224, 3)
BATCH_SIZE=100

# Prepare to load the images
datagen_attrs = dict(
    batch_size=BATCH_SIZE,  # How many images will be used in each step
    target_size=IMAGE_SHAPE[:2],  # Resize to fit models' input
    class_mode='sparse'  # Return labels as 1D integer label array
)

datagen = lambda: ImageDataGenerator(preprocessing_function=preprocess_input)

print('Setting up train data:', end=' ')
train_it = datagen().flow_from_directory(IMAGE_DIR['train'], **datagen_attrs)
print('Setting up validation data:', end=' ')
val_it = datagen().flow_from_directory(IMAGE_DIR['validation'], **datagen_attrs)
# print('Setting up test data:', end=' ')
# test_it = datagen().flow_from_directory(IMAGE_DIR['test'], **datagen_attrs)

num_classes=len(train_it.class_indices)

# Print classes summary
print('Loaded', num_classes, 'classes:', train_it.class_indices)

In [None]:
from keras import backend as K
from keras.models import load_model

def load_model_from_file(model_path):
    keras_backend = K.backend()
    assert keras_backend == "tensorflow", \
        "Only tensorflow-backed Keras models are supported, tried to load Keras model " \
        "with backend %s." % (keras_backend)
    return load_model(model_path)

def get_model_input_shape(model):
    """ Returns the models first layer input shape as a tuple """
    return tuple(model.layers[0].input.shape.as_list())

def get_feature_vector(data, model, learning_phase=0):
    """ Returns the second-to-last layer output from a pretrained model

    Params
    ------
    data: ndarray. Data to input into the model, must match its shape.
    model: keras.engine.training.Model. Pretrained model
    learning_phase: int. If the model has a different behavior in
        training/testing phase, a suitable `learning_phase` must be 
        set: 0=TEST (default), 1=TRAIN.
    Return
    ------
    ndarray. The feature array for all the images.
    """

    get_layer_output = K.function(
        [model.layers[0].input, K.learning_phase()], 
        [model.layers[-2].output])
      
    return get_layer_output([data, learning_phase])[0]

In [None]:
mnv2_classify_emotions  = load_model_from_file('weights-improvement-128x128-226-0.73.hdf5')

In [None]:
""" Find min-max values for features over the whole train dataset """
import math

max_array = np.zeros(1280)
min_array = np.full(1280, fill_value=math.inf)

for i, batch in enumerate(train_it, start=1):
    X = batch[0]
    print(i*BATCH_SIZE, end='\r')
    
    features = get_feature_vector(X, mnv2_classify_emotions)
    features = np.array(features)
    max_array = np.maximum(max_array, np.amax(features, axis=0))
    min_array = np.minimum(min_array, np.amin(features, axis=0))
    
    if i * BATCH_SIZE > train_it.samples:
        break
        
max_array.tofile('max')
min_array.tofile('min')

In [None]:
""" Save the normalized features """
from sklearn.preprocessing import MinMaxScaler

# Load features min/max arrays
max_array = np.fromfile('max.ndarray')
min_array = np.fromfile('min.ndarray')

# Feature Normalization 
min_max_scaler = MinMaxScaler()
min_max_scaler.fit([min_array, max_array])

# Save each batch in a file
for i, batch in enumerate(train_it):
    X, Y = batch
    features = get_feature_vector(X, mnv2_classify_emotions)
    features = min_max_scaler.transform(features)
    np.array(features).tofile(f'{i:05d}-features.ndarray')
    np.array(Y).tofile(f'{i:05d}-labels.ndarray')

    if i * BATCH_SIZE > train_it.samples:
        break

In [None]:
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import MinMaxScaler
import pickle

max_array = np.fromfile('max.ndarray')
min_array = np.fromfile('min.ndarray')

# Feature Normalization 
min_max_scaler = MinMaxScaler()
min_max_scaler.fit([min_array, max_array])

def go(model, clf, classes, max_iter=1000, tol=1e-5, epochs=300, steps_per_epoch=BATCH_SIZE//2,
       validation_steps=BATCH_SIZE//4):

    scores = {}
    val_scores = {}

    last_iter_score = 0
    for epoch in range(1, epochs+1):
        scores[epoch] = {}
        # For each batch
        for epoch_step in range(1, steps_per_epoch + 1):
            # Get normalized features from the images
            X, Y = next(train_it)
            features = get_feature_vector(X, model)
            features = min_max_scaler.transform(features)

            # Iter in this batch until max_iter or enhancement < tol
            for i in range(max_iter):
                clf.partial_fit(features, Y, classes=classes)
                iter_score = clf.score(features, Y)

                # Check iter enhancement
                if iter_score - last_iter_score < tol:
                    break

                last_iter_score = iter_score

            print(f'Epoch {epoch:4d} ({epoch_step:2d}/{steps_per_epoch}) - score: {iter_score:.5f}', end='\r')
            scores[epoch][epoch_step] = iter_score

        # Reprint the last epoch step 
        print(f'Epoch {epoch:4d} ({steps_per_epoch}/{steps_per_epoch}) - score: {iter_score:.5f}', end='')

        # Run some validation steps to compute the score
        epoch_val_scores = []
        for val_step in range(validation_steps):
            X, Y = next(val_it)
            features = get_feature_vector(X, model)  
            features = min_max_scaler.transform(features)
            val_score = clf.score(features, Y)
            
            epoch_val_scores.append(val_score)

        val_scores[epoch] = np.average(epoch_val_scores)
        print(f' - val_score: {val_scores[epoch]}')
        
        # save the classifier
        with open(f'sgd-epoch-{epoch:03d}-(valscore-{val_scores[epoch]:.3f}).pkl', 'wb') as fid:
            pickle.dump(clf, fid)    

        yield

# load it again
# with open('sgd-epoch-30-valscore-0.5.pkl', 'rb') as fid:
#     clf_loaded = pickle.load(fid)
    
clf = SGDClassifier(
    penalty='l2',
    loss='hinge',
    random_state=0,
    tol=1e-3,
    n_jobs=4,
)

for _ in go(mnv2_classify_emotions, clf, classes=np.arange(7)):
    pass

In [None]:
""" Utils """

def extract_zip(zip_path, output_path):
    """ Extract zip to a folder """
    import zipfile
    zip_ref = zipfile.ZipFile(zip_path, 'r')
    zip_ref.extractall(output_path)
    zip_ref.close()