# Face classification with CNNs

## Set up

In [None]:
# Stdlib imports
from pathlib import Path
import os

# 3rd party imports
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import random

from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Input
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D 
from tensorflow.keras.callbacks import EarlyStopping

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, balanced_accuracy_score 
from sklearn.metrics import classification_report, roc_curve, roc_auc_score, f1_score
from sklearn.metrics import r2_score

from IPython.display import display

# Local imports
from facecls import fcaux, fcmodels

## Load data

Let's load the preprocessed data set from the CSV file. It contains the images and four possible attributes/targets: "gender", "ethnicity", "age", and "age_decades".

In [None]:
data = pd.read_csv("data/age_gender_preproc.csv")

## Configurations

Next, we define for which of the four targets the classifier shall be trained in this notebook and with what CNN architecture:

In [None]:
target = "gender"
cnn_architecture = "alexnet"

Also, we create a new directory specifically for the model that we will train in this notebook:

In [None]:
# Define a variable containing the root directory for all models
models_dir = Path(f"results/models/{target.title()}Classifier/")

# Identify the ID of the last, already existing model of the specified 
# CNN architecture.
try:
    last_model_id = max([int(folder.as_posix().split("_")[2]) 
                         for folder in models_dir.glob(f'{cnn_architecture}*')
                        ])
except ValueError:
    # If no model of the specified architecture exists yet, set the last 
    # model ID = 0, so that the model created now will have ID = 1
    last_model_id = 0 

# Just to check that all is right, print the identified ID of the last, existing
# model.
print("Last model id:", last_model_id)

# ID of the model that we will create now
new_model_id = last_model_id + 1

# Variable file_suffix contains info about the model architecture, the target
# and the model ID. This variable will be reused several times in this notebook
# and defines a naming convention.
file_suffix = f"{cnn_architecture}_{target}_{str(new_model_id).zfill(3)}"

# Create a directory for the model created here
new_model_dir = models_dir / file_suffix
print(f"Creating folder \"{new_model_dir}\"...")
new_model_dir.mkdir(parents=True, exist_ok=True)

## Model build

In this section, we will build the classifier using the CNN architecture defined above. The first step to do is to prepare the data:

In [None]:
seed = fcaux.set_seed(42)

### Convert data from strings (as read from file) to arrays

In [None]:
# Use helper function to convert the pixel string 
# first into a pixel vector...
full_img_vec_list = np.array([fcaux.pxlstring2pxlvec(data, i) for i in range(data.shape[0])])

# ...and then the pixel vector into a pixel array
full_img_array_list = np.array([fcaux.pxlvec2pxlarray(img_vec) for img_vec in full_img_vec_list])

### Data split

As usual we split the data set into a training, validation and a test set. The test set is made of 20% of the entire data set, the validation set of 10% of the remaining 80% (i.e. of 8% of the entire data set) and therefore 72% of the full data set make up the training set.

Notice that we perform the split using indices and not on the feature and target data directly. The motivation is so we can later just safe the train, validation and test example indices in a CSV file which saves more disk space than saving new copies of the full data for each model.

In [None]:
X_in = full_img_array_list
attrs = data[["gender", "ethnicity", "age_decades"]]
all_indices = range(len(X_in))

# Stratification is only possible for categorical targets
if target == "age":
    strat = None
else:
    strat = attrs[target].values

# Perform the train-test split
idx_train, idx_test = train_test_split(all_indices,
                                       test_size = 0.2,
                                       stratify = strat,
                                       random_state=seed
                                      )

# Perform the train-val split
idx_train, idx_val  = train_test_split(idx_train,
                                       test_size = 0.1,
                                       stratify = strat[idx_train],
                                       random_state=seed
                                      )

Now use those indices to extract the corresponding features/images and targets:

In [None]:
X_train = X_in[idx_train]
y_train = attrs.iloc[idx_train, target]

X_val = X_in[idx_val]
y_val = attr.iloc[idx_val, target]

X_test = X_in[idx_test]
y_test = attr.iloc[idx_test, target]

In [None]:
# Just checking: number of elements per data subset
print("#training:", len(X_train))
print("#validation:", len(X_val))
print("#test:", len(X_test))

Now save the three different index data sets to file:

In [None]:
# In order to pack all three index vectors into one single pd.DataFrame
# they all need to be of the same length. To achieve this, we fill the
# test and validation index vectors with NaNs until they have the same
# length as the training index vector.
idx_val += (len(idx_train) - len(idx_val))*[np.nan]
idx_test += (len(idx_train) - len(idx_test))*[np.nan]

# Check that the vectors are now all of equal length
assert len(idx_train) == len(idx_val)
assert len(idx_train) == len(idx_test)

# Pack all three index vectors into a single pd.DataFrame for easy
# and convenient writing to file.
idx_df = pd.DataFrame({"train_idx": idx_train,
                       "val_idx": idx_val,
                       "test_idx": idx_test}, dtype="Int64")

idx_df.to_csv(new_model_dir / f"data_set_indices__{file_suffix}.csv", index=False)

### Data preprocessing

In order to train the model later on, we first need to make sure the data is in a suitable format. Specifically this means:

- the input tensors X_* need to be of shape (n_X, width, height, n_channels), where n_X is the number of examples in the tensor X_*, width and height are the pixel dimensions of each image and n_channels is the number of channels used in the image. Specifically, we are working with grayscale images, i.e. n_channels = 1.
- The pixel values need to be of type float
- The pixel values need to be normalized to the range between [0,1].
- If the target is age_decades, we need to make sure the age_decade classes are labelled by consecutive indices

In [None]:
# Upsample images if required by CNN architecture
if cnn_architecture in ["LeNet", "AlexNet", "VGG", "ResNet"]:
    newdim = 227
    X_train = np.array([fcaux.upsample_image(X_train, newdim, newdim) for X in X_train])
    X_val = np.array([fcaux.upsample_image(X_val, newdim, newdim) for X in X_val])
    X_test = np.array([fcaux.upsample_image(X_test, newdim, newdim) for X in X_test])

In [None]:
# Preprocess the data: fix the shape and data type and normalize
X_train = fcaux.preproc_data(X_train)
X_val = fcaux.preproc_data(X_val)
X_test = fcaux.preproc_data(X_test)

In [None]:
# If the target is "age_decades", we need to generate consecutive data
# classes as classes 10, 20, 30 etc. or 5, 10, 15, etc. are not accepted
# by keras.utils.to_categorical called below
if target == "age_decades":
    y_train /= age_diff
    y_val /= age_diff
    y_test /= age_diff

In [None]:
# Compute the number of classes for the current classification problem.
# This number will be the dimension of the output layer in the neural 
# network to be built.
if target == "age":
    # If target == "age", we are solving a regression and not a classifiction
    # problem, i.e. there are no classes. 
    num_classes = 0
else:
    num_classes = data[target].nunique()
    y_train = keras.utils.to_categorical(y_train, num_classes)
    y_val = keras.utils.to_categorical(y_val, num_classes)

print("num_classes =", num_classes)

### Building the CNN model 
Here we define the model according to the CNN architecture definition specified in the configurations section above:

In [None]:
if cnn_architecture == "mycnn":
    model = fcmodels.my_cnn(num_classes)
elif cnn_architecture == "lenet":
    pass
elif cnn_architecture == "alexnet":
    model = fcmodels.alex_net(num_classes)
elif cnn_architecture == "vgg":
    pass
elif cnn_architecture == "resnet":
    pass

model.summary()

### Train model until overfitting
Now we are finally ready to train the model. We do so by fitting the previously defined model to the training data for a maximal number n_epochs epochs. During the training, we measure the validation loss and use early stopping based on the validation loss in order to avoid overfitting. We use this approach not just find the optimal value of epochs but also to generate evidence that if more epochs are used the model would overfit.

In [None]:
# Maximum number of epochs
n_epochs = 30

# Definition of early stopping callback: make sure to restore the best weights
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model
history = model.fit(X_train, y_train,
                    epochs=n_epochs,
                    batch_size=128,
                    validation_data=(X_val, y_val),
                    shuffle=True,
                    callbacks=[early_stopping]
                   )

# Save the optimal model
model.save(new_model_dir / f'{file_suffix}__nepochs{n_epochs}.keras')

Let's now have a look at the loss curve to make sure the training process went as expected. First we put the data in a pandas.DataFrame, then we save that to a CSV file...

In [None]:
history = model.history.history  # Just for simplification/convenience

# Add an explicit "epoch" key with values enumerating the epochs
history["epoch"] = list(range(1, n_epochs+1))

# In general we don't know the names of the other keys of the history dictionary
# as they depend on the specific configuration of the model training process.
# Therefore, extract those unknown keys.
other_columns = [k for k, v in history.items() if k!="epoch"] 

# Convert the history dictionary into a dataframe (with 'epoch' as first column)
# for convenient saving.
history_df = pd.DataFrame(history, columns = ["epoch"] + other_columns)
file_name = f'history__{file_suffix}__nepochs{n_epochs}.csv'
history_df.to_csv(new_model_dir / file_name, index=False)

# Remark: by creating the column "epoch" and by setting the index kwarg in
# the last line to False, the epoch enumarting column has actually in the
# saved CSV file (otherwise there wouldn't be a name and when loading the file
# again from disk there would be a generic "Unnamed: 0".

...and last but not least, we plot it:

In [None]:
fig, axs = plt.subplots(2,1, figsize=(5,4), sharex=True)
ax = axs[0]  # panel for loss curves
ax.plot(an_history.history["loss"], label="training")
ax.plot(an_history.history["val_loss"], label="validation")
ax.grid(True)
ax.set_xticks(range(n_epochs))
ax.set_ylabel("Loss")
ax.set_title(f"AlexNet ({target.title()})")
ax.legend(loc="best")

ax = axs[1]  # panel for accuracy curves
ax.plot(an_history.history["accuracy"], label="training")
ax.plot(an_history.history[f"val_accuracy"], label="validation")
ax.grid(True)
ax.set_xticks(range(n_epochs))
ax.set_xlabel("Epochs")
ax.set_ylabel("Accuracy")
ax.legend(loc="best")

# Save figure to disk
plt.savefig(new_model_dir / f"loss_curve__{file_suffix}__nepochs{n_epochs}.png",
            bbox_inches='tight')
plt.show()

### Refit model with optimal number of epochs

Remark: optimal number of epochs = number of epochs leading to minimal validation loss

In [None]:
set_seeds()
n_epochs = np.argmin(an_history.history["val_loss"])
print(f"Optimal epoch: #{n_epochs}.")
alex_net_refit = create_alex_net_model(num_classes)

refit_history = alex_net_refit.fit(X_train, y_train,
                                   epochs=n_epochs,
                                   batch_size=32,
                                   validation_data=(X_val, y_val),
                                   shuffle=True
                                  )    

alex_net_refit.save(new_model_dir / f'{file_suffix}__refit_nepochs{n_epochs}.keras')

In [None]:
refit_history = alex_net_refit.history.history
n_epochs = len(refit_history[list(refit_history.keys())[0]])
history["epoch"] = list(range(1, n_epochs+1))

# in general I don't know the names of the other columns. Therefore:
other_columns = [k for k, v in refit_history.items() if k!="epoch"] 

refit_history_df = pd.DataFrame(refit_history, columns = ["epoch"] + other_columns)
file_name = f'history__{file_suffix}__refit__nepochs{n_epochs}.csv'
#refit_history_df.to_csv(new_model_dir / file_name, index=False)

In [None]:
fig, axs = plt.subplots(2,1, figsize=(5,4), sharex=True)
ax = axs[0]
ax.plot(refit_history["loss"], label="training")
ax.plot(refit_history["val_loss"], label="validation")
ax.grid(True)
ax.set_xticks(range(n_epochs))
ax.set_ylabel("Loss")
ax.set_title(f"AlexNet ({target.title()})")
ax.legend(loc="best")

ax = axs[1]
ax.plot(refit_history["accuracy"], label="training")
ax.plot(refit_history[f"val_accuracy"], label="validation")
ax.grid(True)
ax.set_xticks(range(n_epochs))
ax.set_xlabel("Epochs")
ax.set_ylabel("Accuracy")#metric.title())
ax.legend(loc="best")

plt.savefig(new_model_dir / f"loss_curve__{file_suffix}__refit_nepochs{n_epochs}.png",
            bbox_inches='tight')
plt.show()

In [None]:
y_prob_train = alex_net.predict(X_train)
y_pred_train = np.array([np.argmax(i) for i in y_prob_train])

y_prob_val = alex_net.predict(X_val)
y_pred_val = np.array([np.argmax(i) for i in y_prob_val])

y_prob_test = alex_net.predict(X_test)
y_pred_test = np.array([np.argmax(i) for i in y_prob_test])

In [None]:
train_metrics = {"accuracy": accuracy_score(np.array([np.argmax(i) for i in y_train]), y_pred_train),
                "balanced_accuracy": balanced_accuracy_score(np.array([np.argmax(i) for i in y_train]), y_pred_train),
                "roc_auc": roc_auc_score(np.array([np.argmax(i) for i in y_train]), y_prob_train[:,1]),
                "F1": f1_score(np.array([np.argmax(i) for i in y_train]), y_pred_train)}

val_metrics = {"accuracy": accuracy_score(np.array([np.argmax(i) for i in y_val]), y_pred_val),
                "balanced_accuracy": balanced_accuracy_score(np.array([np.argmax(i) for i in y_val]), y_pred_val),
                "roc_auc": roc_auc_score(np.array([np.argmax(i) for i in y_val]), y_prob_val[:,1]),
                "F1": f1_score(np.array([np.argmax(i) for i in y_val]), y_pred_val)}

test_metrics = {"accuracy": accuracy_score(y_test, y_pred_test),
                "balanced_accuracy": balanced_accuracy_score(y_test, y_pred_test),
                "roc_auc": roc_auc_score(y_test, y_prob_test[:,1]),
                "F1": f1_score(y_test, y_pred_test)}

metrics_df = pd.DataFrame({"train": train_metrics, 
                           "val": val_metrics, 
                           "test": test_metrics})

display(metrics_df)
metrics_df.to_csv(new_model_dir / f"metrics__{file_suffix}.csv")

In [None]:
fpr, tpr, thr = roc_curve(y_test, y_prob_test[:,1])
pd.DataFrame({"FPR": fpr, "TPR": tpr}).to_csv(new_model_dir / f"fpr_vs_tpr__{file_suffix}.csv")

In [None]:
fig, ax = plt.subplots()
ax.plot(fpr,tpr)
ax.plot([0,1], [0,1], ls="--", c="k")
ax.grid(True)
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title(f"{target.title()} -- ROC AUC: {np.round(roc_auc_score(y_test, y_prob_test[:,1]),4)}")
plt.tight_layout()
plt.savefig(new_model_dir / f"roc_curve__{file_suffix}.png",
            bbox_inches='tight')
plt.show()

In [None]:
cls_report = pd.DataFrame(classification_report(y_test, y_pred_test, output_dict=True))
cls_report.to_csv(new_model_dir / f"classificationo_report__{file_suffix}.png")

### Analysis

In [None]:
test_data = data.iloc[[i for i in idx_test if i==i]].reset_index(drop=True)

In [None]:
ethnicity_groups = dict()

for ethn_idx in test_data["ethnicity"].unique():
    ethnicity_groups[ethn_idx] = list(test_data[test_data["ethnicity"]==ethn_idx].index)

In [None]:
performance_by_ethnicity = dict()
for ethn in range(5):
    y_prob_ethn = alex_net.predict(X_test[ethnicity_groups[ethn]])
    y_pred_ethn = np.array([np.argmax(i) for i in y_prob_ethn])
    acc = accuracy_score(y_test[ethnicity_groups[ethn]], y_pred_ethn)
    rocauc = roc_auc_score(y_test[ethnicity_groups[ethn]], y_prob_ethn[:,1])

    performance_by_ethnicity[ethn] = {"accuracy": acc, 
                                      "ROC_AUC": rocauc}

# Convert to data frame for easier plotting
performance_by_ethnicity_df = pd.DataFrame(performance_by_ethnicity).transpose()

In [None]:
age_groups = dict()

for age_idx in test_data["age_decades"].unique():
    age_groups[age_idx] = list(test_data[test_data["age_decades"]==age_idx].index)

In [None]:
performance_by_age = dict()
for age_idx in test_data["age_decades"].unique():
    y_prob_age = alex_net.predict(X_test[age_groups[age_idx]])
    y_pred_age = np.array([np.argmax(i) for i in y_prob_age])
    acc = accuracy_score(y_test[age_groups[age_idx]], y_pred_age)
    try:
        rocauc = roc_auc_score(y_test[age_groups[age_idx]], y_prob_age[:,1])
    except ValueError:
        rocauc = np.nan

    performance_by_age[age_idx] = {"accuracy": acc, 
                                      "ROC_AUC": rocauc}

sorted_columns = sorted([k for k,v in performance_by_age.items()])
performance_by_age_df = pd.DataFrame(performance_by_age, 
                                     columns = sorted_columns).transpose()

In [None]:
fig, axs = plt.subplots(1,2, figsize=(10,4), sharey=True, gridspec_kw = {"hspace": 0.07})
ax = axs[0]
performance_by_ethnicity_df.plot(kind="bar", ax = ax, grid=True, legend=False)
ax.set_xlabel("Ethnicity")
ax.set_ylabel("Metric")

ax = axs[1]
performance_by_age_df.plot(kind="bar", ax = ax, grid=True)
ax.set_xlabel("Age (decade)")
ax.legend(loc="center", bbox_to_anchor=(-0.15,-0.2), ncol=2)

fig.suptitle("Classification performance by ethnicity and age group")

plt.savefig(new_model_dir / f"cls_performance_analysis__{file_suffix}", 
            bbox_inches='tight')
plt.show()

In [None]:
gender_groups = dict()

for gen_idx in test_data["gender"].unique():
    gender_groups[gen_idx] = list(test_data[test_data["gender"]==gen_idx].index)

In [None]:
performance_by_gender = dict()
for gen_idx in test_data["gender"].unique():
    y_prob_gen = alex_net.predict(X_test[gender_groups[gen_idx]])
    y_pred_gen = np.array([np.argmax(i) for i in y_prob_gen])
    acc = accuracy_score(y_test[gender_groups[gen_idx]], y_pred_gen)
    try:
        rocauc = roc_auc_score(y_test[gender_groups[gen_idx]], y_prob_gen[:,1])
    except ValueError:
        rocauc = np.nan

    performance_by_gender[gen_idx] = {"accuracy": acc, 
                                      "ROC_AUC": rocauc}

sorted_columns = sorted([k for k,v in performance_by_gender.items()])
performance_by_gender_df = pd.DataFrame(performance_by_gender, 
                                     columns = sorted_columns).transpose()

In [None]:
performance_by_gender_df

In [None]:
fig, axs = plt.subplots(1,3, figsize=(10,4), sharey=True, gridspec_kw = {"hspace": 0.07})
ax = axs[0]
performance_by_ethnicity_df.plot(kind="bar", ax = ax, grid=True, legend=False)
ax.set_xlabel("Ethnicity")
ax.set_ylabel("Metric")

ax = axs[1]
performance_by_age_df.plot(kind="bar", ax = ax, grid=True)
ax.set_xlabel("Age (decade)")
ax.legend(loc="center", bbox_to_anchor=(-0.15,-0.2), ncol=2)

ax = axs[2]
performance_by_gender_df.plot(kind="bar", ax = ax, grid=True)
ax.set_xlabel("Gender")
#ax.legend(loc="center", bbox_to_anchor=(-0.15,-0.2), ncol=2)

fig.suptitle("Classification performance by ethnicity and age group")

#plt.savefig(new_model_dir / f"cls_performance_analysis__{file_suffix}", 
#            bbox_inches='tight')
plt.show()