# Machine Learning Nanodegree - Capstone Project

## Distracted Driver Detection

## Project: Write a program to detect distracted drivers

In [None]:
from glob import glob

import numpy as np
import pandas as pd

from keras import optimizers
from keras import applications
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D
from keras.layers import Dropout, Flatten, Dense
from keras.layers import BatchNormalization, Activation
from keras.models import Sequential, Model
from keras.preprocessing import image                  
from keras.utils import np_utils

import matplotlib.pyplot as plt   

import random

from sklearn.datasets import load_files       
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, log_loss
from sklearn.utils import shuffle

import seaborn as sns

from tqdm import tqdm

random.seed(42)

In [None]:
# This dictionary is used to map class short names to their full
# description when generating graphs and charts
driving_class_to_description = {
    "c0": "safe driving",
    "c1": "texting - right",
    "c2": "talking on the phone - right",
    "c3": "texting - left",
    "c4": "talking on the phone - left",
    "c5": "operating the radio",
    "c6": "drinking",
    "c7": "reaching behind",
    "c8": "hair and makeup",
    "c9": "talking to passenger",
    0: "safe driving",
    1: "texting - right",
    2: "talking on the phone - right",
    3: "texting - left",
    4: "talking on the phone - left",
    5: "operating the radio",
    6: "drinking",
    7: "reaching behind",
    8: "hair and makeup",
    9: "talking to passenger",
}

In [None]:
# Takes a path and returns a list of files and matching labels
def load_dataset(path):
    data = load_files(path)
    driver_files = np.array(data['filenames'])
    driver_targets = np_utils.to_categorical(np.array(data['target']), 10)
    return driver_files, driver_targets

# Load all of the labeled data, which is in the train folder. This
# will be split into separate sets later, so call it all_* for now.
all_files, all_targets = load_dataset('StateFarm/imgs/train')

In [None]:
file_df = pd.read_csv('StateFarm/driver_imgs_list.csv')

In [None]:
subjects = file_df.subject.unique()
print('There are %d subjects with data'% len(subjects))

# Split the files into classes based on subject id. 
train_subjects = subjects[:-4]
validate_subjects = subjects[-4:-3]
test_subjects = subjects[-3:]

# Print out the subject id numbers for each class
print('Training subjects:')
print(train_subjects)
print('Validate subjects:')
print(validate_subjects)
print('Test subjects:')
print(test_subjects)

In [None]:
# Extract the rows of the dataframe where the subject is in the appropriate set
test_rows = file_df.loc[file_df['subject'].isin(test_subjects)]
validate_rows = file_df.loc[file_df['subject'].isin(validate_subjects)]
train_rows = file_df.loc[file_df['subject'].isin(train_subjects)]

# Generate file names based on the extracted rows
train_files = train_rows.apply(lambda x: "{}/{}".format(x[1],x[2]), axis=1)
validate_files = validate_rows.apply(lambda x: "{}/{}".format(x[1],x[2]), axis=1)
test_files = test_rows.apply(lambda x: "{}/{}".format(x[1],x[2]), axis=1)

print("train_files count: {}".format(len(train_files)))
print("validate_files count: {}".format(len(validate_files)))
print("test_files count: {}".format(len(test_files)))

train_classes = train_files.apply(lambda x: x.split('/')[0].split('c')[1])
train_classes_categories = np_utils.to_categorical(train_classes)
validate_classes = validate_files.apply(lambda x: x.split('/')[0].split('c')[1])
validate_classes_categories = np_utils.to_categorical(validate_classes)
test_classes = test_files.apply(lambda x: x.split('/')[0].split('c')[1])
test_classes_categories = np_utils.to_categorical(test_classes)

# Assert that various lengths that should be equal to detect obvious problems 
# where the wrong variable was used.
assert len(train_classes) == len(train_files)
assert len(train_classes) == len(train_classes_categories)

assert len(validate_classes) == len(validate_files)
assert len(validate_classes) == len(validate_classes_categories)

assert len(test_classes) == len(test_files)
assert len(test_classes) == len(test_classes_categories)

In [None]:
def path_to_tensor(img_path):
    # loads RGB image as PIL.Image.Image type
    img = image.load_img("{}/{}".format('StateFarm/imgs/train', img_path), target_size=(224, 224))
    # convert PIL.Image.Image type to 3D tensor with shape (224, 224, 3)
    x = image.img_to_array(img)
    # convert 3D tensor to 4D tensor with shape (1, 224, 224, 3) and return 4D tensor
    return np.expand_dims(x, axis=0)

def paths_to_tensor(img_paths):
    list_of_tensors = [path_to_tensor(img_path) for img_path in tqdm(img_paths)]
    return np.vstack(list_of_tensors)

# Normalize the image to the expected 0->1 values for pretrained networks
train_tensors = paths_to_tensor(train_files).astype('float32')/255
validate_tensors = paths_to_tensor(validate_files).astype('float32')/255
test_tensors = paths_to_tensor(test_files).astype('float32')/255

train_classes_categories = train_classes_categories
validate_classes_categories = validate_classes_categories
test_classes_categories = test_classes_categories

# Assert that the tensor count matches the file count to catch errors where
# the wrong variable was used to generate the tensors
assert len(train_files) == len(train_tensors)
assert len(validate_files) == len(validate_tensors)
assert len(test_files) == len(test_tensors)


In [None]:
plt.hist(np.argmax(train_classes_categories, axis=1))
plt.title('Training Class Distribution')
plt.xlabel('Image Count')
plt.ylabel('Driving Class')
plt.show()

In [None]:
def show_class_sample(tcc, tensors):
    fig = plt.figure(figsize=(50, 50))  # width, height in inches
    dim = 4
    for target_class in range(10):
        c = np.argmax(tcc, axis=1)
        idx = np.where(c == target_class)[0][0]
        tensor = tensors[idx]
        sub = fig.add_subplot(dim, dim, target_class + 1)
        sub.imshow(tensor, interpolation='nearest')
        text_params = {'fontweight': 'bold'}

        sub.text(0,
                 10,
                 "actual: {}".format(driving_class_to_description[target_class]),
                 color='g',
                 size=20,
                 bbox=dict(boxstyle="square", ec=(1., 0.5, 0.5), fc=(1., 0.8, 0.8), ),
                 **text_params)
    plt.savefig('analysis/sample.png', format='png')

# Print out a sample of each class from the training tensors to understand the classes
show_class_sample(train_classes_categories, train_tensors)


In [None]:
# https://medium.com/@14prakash/transfer-learning-using-keras-d804b2e04ef8
# https://www.safaribooksonline.com/library/view/python-deep-learning/9781787125193/a0b05e70-9f53-404c-a975-09ac766389a1.xhtml
# https://alexisbcook.github.io/2017/using-transfer-learning-to-classify-images-with-keras/

img_width, img_height = 224, 224
# nb_train_samples = len(train_tensors)
# nb_validation_samples = len(validate_tensors) 


In [None]:
def train_params_fast():
    return 75, 1

def train_params_full():
    return 20, 20

def optimizer_1(model):
    lr=0.0001
    momentum=0.9
    opt="SGD"
    model.compile(loss = "categorical_crossentropy",
                  optimizer = optimizers.SGD(lr=lr, momentum=momentum),
                  metrics=["accuracy"])
    return "opt={},lr={},momentum={}".format(opt,lr, momentum)

def optimizer_2(model):
    lr="default"
    momentum="default"
    opt="Adam"
    model.compile(loss = "categorical_crossentropy",
                  optimizer = optimizers.Adam(),
                  metrics=["accuracy"])
    return "opt={},lr={},momentum={}".format(opt,lr, momentum)

def model_v1():
    pretrain_model = applications.ResNet50(weights = "imagenet", 
                                           include_top=False, 
                                           input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-flat-drop0.5"

def model_v2():
    pretrain_model = applications.ResNet50(weights = "imagenet", include_top=False, input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    x = Flatten()(x)
    x = Dense(1024, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(1024, activation="relu")(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-flat-d1024-drop0.5-dense1024"

def model_v3():
    pretrain_model = applications.ResNet50(weights = "imagenet", include_top=False, input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(512, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(512, activation="relu")(x)
    x = Dropout(0.2)(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-model-v3"

def model_v4():
    pretrain_model = applications.ResNet50(weights = "imagenet", include_top=False, input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(256, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(256, activation="relu")(x)
    x = Dropout(0.2)(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-model-v4"

def model_v5():
    pretrain_model = applications.ResNet50(weights = "imagenet", include_top=False, input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation="relu")(x)
    x = Dropout(0.2)(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-model-v5"

def model_v6():
    pretrain_model = applications.ResNet50(weights = "imagenet", 
                                           include_top=False, 
                                           pooling='avg',
                                           input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    # x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation="relu")(x)
    x = Dropout(0.2)(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-model-v6"

def model_v7():
    pretrain_model = applications.ResNet50(weights = "imagenet", include_top=False, input_shape = (img_width, img_height, 3))
    train_layers = 0
    for layer in pretrain_model.layers[:-train_layers]:
        layer.trainable = False
    x = pretrain_model.output
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(64, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(64, activation="relu")(x)
    x = Dropout(0.2)(x)
    predictions = Dense(10, activation="softmax")(x)

    model = Model(input = pretrain_model.input, output = predictions)
    return model, "resnet50-model-v7"


# Experiments are tracked by having immutable functions that define the
# model, optimizer, and training parameters. The model_desc string describes
# the experiment and is used when generating the graph titles and filenames
# so the exact experiment can be located using the output files.
batch_size, epochs = train_params_full()
model_final, model_name = model_v5()
model_optimizer_text = optimizer_2(model_final)

model_desc = "batch_size={},epochs={},{},{}".format(batch_size, epochs, model_name, model_optimizer_text)

In [None]:
print(model_desc)
model_final.summary()


In [None]:
def do_train():
    # Checkpoint the model so we can restore it later. If the model 
    # degrades with additional training epochs we can restore the 
    # weights with the lowest log loss.
    checkpoint = ModelCheckpoint("weights/{}.h5".format(model_desc), 
                                 monitor='val_loss',
                                 verbose=1, 
                                 save_best_only=True, 
                                 save_weights_only=False, 
                                 mode='auto', 
                                 period=1)
    # Stop if the model doesn't improve in 10 epochs to save training time
    early = EarlyStopping(monitor='val_loss', 
                          min_delta=0, 
                          patience=10, 
                          verbose=1, 
                          mode='auto')

    history = model_final.fit(x=train_tensors,
                              y=train_classes_categories,
                              batch_size=batch_size,
                              epochs=epochs,
                              verbose=1,
                              validation_data=(validate_tensors, validate_classes_categories),
                              callbacks = [checkpoint, early]
                              )
    return history
history = None
# Comment this out if prior weights have been computed for this model
# Useful to modify analysis code without rerunning training

history = do_train()

In [None]:
# Plot the train and validate accuracy for each training epoch, saving 
# it to a file in the analysis directory to allow experiment results to 
# be preserved.
def generate_accuracy_fig(hist, desc):
    plt.figure(figsize=(8,8))
    plt.plot(np.arange(len(hist.history['acc'])), hist.history['acc'], label='training')
    plt.plot(np.arange(len(hist.history['val_acc'])), hist.history['val_acc'], label='validation')
    plt.title('Accuracy\n {}'.format(desc))
    plt.xlabel('epochs')
    plt.ylabel('accuracy ')
    plt.legend(loc=0)
    plt.savefig('analysis/acc{}.png'.format(desc), format='png')
    plt.show()
if history is not None:
    generate_accuracy_fig(history, model_desc)

In [None]:
# Plot the train and validate log loss for each training epoch, saving 
# it to a file in the analysis directory to allow experiment results to 
# be preserved.
def generate_loss_fig(hist, desc):
    plt.figure(figsize=(8,8))
    plt.plot(np.arange(len(hist.history['loss'])), hist.history['loss'], label='loss')
    plt.plot(np.arange(len(hist.history['val_loss'])), hist.history['val_loss'], label='val_loss')
    plt.title('Loss\n {}'.format(desc))
    plt.xlabel('epochs')
    plt.ylabel('loss ')
    plt.legend(loc=0)
    plt.savefig('analysis/loss{}.png'.format(desc), format='png')
    plt.show()
if history is not None:
    generate_loss_fig(history, model_desc)

In [None]:
# Load the best weights found during the experiment instead of leaving
# the final weights from the last training epoch.
model_final.load_weights("weights/{}.h5".format(model_desc))

# Generate test predictions using the best weights for the 
# experiment
test_predictions = model_final.predict(x=test_tensors)

In [None]:
def prediction_metrics(pred, actual):
    actual_classes = np.argmax(actual, axis=1)
    predicted_classes = np.argmax(pred, axis=1)

    test_accuracy = 100*np.sum(actual_classes==predicted_classes)/len(actual)
    loss = log_loss(y_pred=pred, y_true=actual)

    return "Test accuracy: %.4f%%\nLog loss: %f" % (test_accuracy, loss)

In [None]:
# Save the metrics from the test set to a file in the analysis directory to 
# allow experiment results to be preserved.
test_results = prediction_metrics(test_predictions, test_classes_categories)
with open("analysis/test-results{}.txt".format(model_desc), "w") as text_file:
    text_file.write(test_results)
print(test_results)

In [None]:
# Generate predictions for the naive classifier that always picks safe driving
pred_attentive = [1.0] + [0] * 9
benchmark_all_pred_attentive = [pred_attentive] * len(test_classes_categories)

print(prediction_metrics(benchmark_all_pred_attentive, test_classes_categories))

In [None]:
# Show a sampling of misclassified images with label annotations showing 
# the true and predicted classes. This data can inform an understanding of
# why misclassifications might happen.
def generate_mispredict_fig():
    predicted_classes = np.argmax(test_predictions, axis=1)
    actual_classes = np.argmax(test_classes_categories, axis=1)
    fig = plt.figure(figsize=(50, 50))  # width, height in inches
    j = 0
    dim = 4
    for i in shuffle(range(len(test_tensors))):
        tensor = test_tensors[i]
        if predicted_classes[i] != actual_classes[i]:
            sub = fig.add_subplot(dim, dim, j + 1)
            sub.imshow(tensor, interpolation='nearest')
            text_params = {'fontweight': 'bold'}
            sub.text(0,
                     10,
                     "pred: {}".format(driving_class_to_description[predicted_classes[i]]),
                     color='r',
                     size=20,
                     bbox=dict(boxstyle="square", ec=(1., 0.5, 0.5), fc=(1., 0.8, 0.8), ),
                     **text_params)
            sub.text(0, 
                     25, 
                     "actual: {}".format(driving_class_to_description[actual_classes[i]]), 
                     color='g',
                     size=20,
                     bbox=dict(boxstyle="square", ec=(1., 0.5, 0.5), fc=(1., 0.8, 0.8), ),
                     **text_params)

            j = j + 1
            if j == dim * dim:
                break
    plt.savefig('analysis/mispredicts.png', format='png')
generate_mispredict_fig()

In [None]:
# Generate a confusion matrix to provide a high level summary
# of where mispedictions are happening. 
def generate_confusion_fig():   
    predicted_classes = np.argmax(test_predictions, axis=1)
    actual_classes = np.argmax(test_classes_categories, axis=1)
    
    confusion = confusion_matrix(actual_classes, predicted_classes)
    plt.figure(figsize = (10,7))
    hm = sns.heatmap(confusion, annot=True, fmt="d")
    hm.set_ylabel('True label')
    hm.set_xlabel('Predicted label')
    hm.set_title('Confusion Matrix\n{}'.format(model_desc))
    plt.savefig('analysis/confusion{}.png'.format(model_desc), format='png')
generate_confusion_fig()