In [None]:
import numpy as np
import pandas as pd
import scipy.special as sp
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import PIL
import json
import os
import datetime
import PIL.Image

import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Input

In [None]:
print(f'Number of GPUs detected: {len(tf.config.list_logical_devices('GPU'))}')

# Data preparation

In [None]:
df = pd.read_csv('../DL_and_NN_in_Python/fer2013.csv')


print(df.head())
print(f'number of photos in the dataset is: {len(df)}')
      
X = []
Y = []

for row in df.index:
    X.append(list(map(int, df.iloc[row].pixels.split(' '))))
    Y.append(df.iloc[row].emotion)

X = np.array(X) / 255 # normalise pixel values to lie between 0 and 1
Y = np.array(Y)

print(f'check number of features is 48**2: {X.shape[1] == 48**2}')

X, Y = shuffle(X, Y, random_state=42)  # numpy's shuffle is not nice because you can't shuffle two arrays simultaneously

train_proportion = 0.8
train_index = int(train_proportion*len(X))

X_train, X_test = X[:train_index], X[train_index:]
Y_train, Y_test = Y[:train_index], Y[train_index:]
# or just use train_test_split from sklearn.model_selection for the same effect
n_classes = len(set(Y_train))

print(f'Number of samples in training set: {len(X_train)}')
print(f'Number of samples in test set: {len(X_test)}')

N, D,  = X_train.shape
D1 = int(np.sqrt(D))

In [None]:
X_train.resize((N, D1, D1))
X_test.resize((len(X_test), D1, D1))

In [None]:
emotions = ['Anger', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']

def show_sample_image(emotion: int):
    emotion_no = emotions.index(emotion)
    df_emotion = df[df.emotion == emotion_no]

    random_no = np.random.random_integers(0, len(df_emotion))
    print(random_no)
    print(df_emotion.iloc[random_no])

    img = np.array(list(map(int, df_emotion.iloc[random_no].pixels.split(' '))), dtype=np.uint8).reshape((48,48))
    img = PIL.Image.fromarray(img).resize((1000, 1000))
    img.show()

show_sample_image('Surprise')

# 1. Use Keras

In [None]:
def construct_keras_seq_dense(hidden_layer_sizes: list[int], activation_function: str = 'relu', reg: float = 1e-4):
    # instantiate the sequential model
    model = tf.keras.models.Sequential()
    
    # flatten the image inputs onto a 1D vector
    model.add(tf.keras.Input(shape = (D1,D1))) # useful if we want to print the summary of the graph in advance
    model.add(Flatten())

    for ii in range(len(hidden_layer_sizes)):
        # we want the st. dev. at each layer to be 1
        # because we assume all vars are IID and normalised such that Var(x_i) = 1
        # then to achieve this goal, we need to initialise all weights with st. dev. 1/sqrt(dim)
        # where dim = number of multiplications taking place at a given layer to produce a single matrix entry
        # for the first layer, this is equal to D - the dimensionality of the inputs
        # for subsequent layers, it is equal to the number of hidden units
        hidden_units = hidden_layer_sizes[ii]
        if ii == 0: 
            initialiser = tf.keras.initializers.RandomNormal(0, 1/np.sqrt(D))
        else:
            initialiser = tf.keras.initializers.RandomNormal(0, 1/np.sqrt(hidden_layer_sizes[ii-1]))
        regulariser = tf.keras.regularizers.L2(l2=reg)

        model.add(Dense(
            hidden_units,
            activation = activation_function,
            use_bias = True,
            kernel_initializer = initialiser,
            bias_initializer = 'zeros',
            kernel_regularizer = regulariser,
            bias_regularizer = regulariser
            )
        )
        
    # add the final layer to project onto n_classes
    model.add(Dense(
        n_classes,
        activation = activation_function,
        use_bias = True,
        kernel_initializer = tf.keras.initializers.RandomNormal(0, 1/np.sqrt(hidden_layer_sizes[-1])),
        bias_initializer = 'zeros',
        kernel_regularizer = regulariser,
        bias_regularizer = regulariser
        )
    )

    print('Instantiiated the following model:')
    print(model.summary())

    return model

In [None]:
loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# this loss function assumes that y_true is NOT one-hot encoded (use CategoricalCrossentropy in this case)
# by default, this loss expects the inpu y_pred to be a probabiltiy distribution (i.e. after softmax)
# however, we usually don't add the softmax to the end of the graph because it is not stable with all loss functions


In [None]:
model_keras_seq_dense = construct_keras_seq_dense([100, 200, 100], 'relu')

optimiser = tf.keras.optimizers.Adam(learning_rate=1e-4)

model_keras_seq_dense.compile(optimizer=optimiser,
                              loss=loss_function,
                              metrics=['accuracy'])

model_keras_seq_dense.fit(X_train, Y_train, epochs=500, batch_size=64)

In [None]:
test_loss, test_acc = model_keras_seq_dense.evaluate(X_test, Y_test)
print(test_acc)

In [None]:
# to make predictions and evaluate the model manually, add a softmax layer:
probability_model = tf.keras.models.Sequential([model_keras_seq_dense, tf.keras.layers.Softmax()])

pY = probability_model.predict(X_test)
Y_pred = tf.argmax(pY, axis=1)
test_acc_manual = np.mean(Y_pred == Y_test)

print(test_acc_manual)
np.isclose(test_acc, test_acc_manual, 1e-5)

In [None]:
# model_keras_seq_dense.summary()
model_keras_seq_dense.save('model_keras_seq_dense.keras')

In [None]:
model_loaded = tf.keras.models.load_model('model_keras_seq_dense.keras')
model_loaded.evaluate(X_test, Y_test)

# Other methods of defining a model

## Subclassing API

In [None]:
class MyModel(tf.keras.Model):
  def __init__(self):
    super().__init__()
    self.flatten = Flatten()
    self.d1 = Dense(10, activation='relu')
    self.d2 = Dense(10)

  def call(self, x):
    x = self.flatten(x)
    x = self.d1(x)
    return self.d2(x)

# Create an instance of the model
model_subclassing = MyModel()


In [None]:
loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimiser = tf.keras.optimizers.Adam(learning_rate=1e-4)

model_subclassing.compile(optimizer=optimiser,
                              loss=loss_function,
                              metrics=['accuracy'])

model_subclassing.fit(X_train, Y_train, epochs=25, batch_size=32)

In [None]:
model_subclassing.evaluate(X_test, Y_test)

## Functional API

The functional API can handle models with non-linear topology, shared layers, and even multiple inputs or outputs.

In [None]:
inputs = tf.keras.Input(shape=(D1, D1))
x = Flatten()(inputs)
x = Dense(10, activation='relu', use_bias = True)(x)
outputs = Dense(10, activation=None, use_bias = True)(x)

model_functional = tf.keras.Model(inputs=inputs, outputs=outputs)

In [None]:
loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimiser = tf.keras.optimizers.Adam(learning_rate=1e-4)

model_functional.compile(optimizer=optimiser,
                              loss=loss_function,
                              metrics=['accuracy'])

model_functional.fit(X_train, Y_train, epochs=25, batch_size=32)

When to use model subclassing vs the functional API? Good comparison:

https://www.tensorflow.org/guide/keras/functional_api#when_to_use_the_functional_api

## Low-level implementation without Keras

Let's write a simple dense NN with one hidden layer

In [None]:
class ModelLowLevel(tf.Module): # note inheritance from tf.Module, not tf.keras.Model
  def __init__(self, M):
    # Initialize model parameters
    self.W1 = tf.Variable(tf.random.normal((D, M), 0, 1/np.sqrt(D)), type=np.float32)
    self.b1 = tf.Variable(tf.zeros(M), type=np.float32)
    self.W2 = tf.Variable(tf.random.normal((M, n_classes), 0, 1/np.sqrt(M)), type=np.float32)
    self.b2 = tf.Variable(tf.zeros(n_classes), type=np.float32)
  
  @tf.function
  def __call__(self, x): # note __call__, not call
    # flatten input if necessary
    if x.ndim > 2:
      x = tf.reshape(x, list(x.shape[:-2]) + [x.shape[-2]*x.shape[-1]])
    Z  = tf.nn.relu(tf.matmul(x, self.W1) + self.b1)
    return tf.matmul(Z, self.W2) + self.b2 # return logits - without the softmax!

In [None]:
model_lowlevel = ModelLowLevel(M=10)

In [None]:
def loss_function(logits, T):
    return tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=T, logits=logits))

In [None]:
batch_size = 32
dataset = tf.data.Dataset.from_tensor_slices((X_train, Y_train))
dataset = dataset.shuffle(buffer_size=X_train.shape[0]).batch(batch_size)

In [None]:
# Set training parameters
epochs = 100
learning_rate = 1e-4
losses = []

# Format training loop
for epoch in range(epochs):
  for x_batch, y_batch in dataset:
    x_batch = tf.cast(x_batch, np.float32)
    with tf.GradientTape() as tape:
      batch_loss = loss_function(model_lowlevel(x_batch), y_batch)
    # Update parameters with respect to the gradient calculations
    grads = tape.gradient(batch_loss, model_lowlevel.variables)
    for g,v in zip(grads, model_lowlevel.variables):
        v.assign_sub(learning_rate*g)
  # Keep track of model loss per epoch
  loss = loss_function(model_lowlevel(tf.cast(X_train, np.float32)), Y_train)
  losses.append(loss)
  if epoch % 10 == 0:
    print(f'Cross-entropy loss for step {epoch}: {loss.numpy():0.3f}')

# Plot model results
print("\n")
plt.plot(range(epochs), losses)
plt.xlabel("Epoch")
plt.ylabel("Cross-entropy loss")
plt.title('MSE loss vs training iterations');


In [None]:
test_preds = tf.argmax(tf.nn.softmax(model_lowlevel(tf.cast(X_test, np.float32)), axis=1), axis=1)
print(f'error rate: {np.mean(test_preds != Y_test)}')

# Understanding sparse_softmax_cross_entropy_with_logits

In [None]:
logits = np.array([[4.0, 2.0, 1.0], [0.0, 5.0, 1.0]])
labels = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]])
print(tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=logits)))

np.sum(-np.log(tf.nn.softmax(logits, axis=1))*labels)

In [None]:
logits = np.array([[4.0, 2.0, 1.0], [0.0, 5.0, 1.0]])
labels = np.array([0, 1])
print(tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)))

-np.log(sp.softmax(logits, axis=1))[[0, 1], [0, 1]].sum()