# Pothole detection

## Import libraries

In [None]:
import numpy as np
import os
import csv
from PIL import Image

import keras
import keras.preprocessing.image as img
from keras.applications import ResNet50, VGG16
from keras.layers import Dense, Conv2D, BatchNormalization, Activation, Flatten
from keras.layers.pooling import GlobalMaxPool2D, GlobalAveragePooling2D
from keras.models import Model
from keras.optimizers import SGD, Adam
from keras import backend as K

import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

## Crop images and save in a new folder

The idea is to crop the dashboard and the sky parts of the images to save some computing power.  if the potholes in the test set appear outside these margins, this model will not be able to detect them. Which might be one of the reasons for the big difference between the validation (97%) and test accuracy (86%).

In [None]:
# get all the filenames
all_files = []
for path, subdirs, files in os.walk('data'):
    for name in files:
        all_files.append(os.path.join(path, name))

In [None]:
# crop the images and save in data_crop folder
for f in all_files:
    temp_img = Image.open(f)
    temp_img = temp_img.crop((0, 600-435, 800, 600-435+185))
    temp_img.save('data_crop' + f.split('data')[1])

## Create train and validation folders

Here we take 500 random images from the `train` folder and move it to the `valid` folder. we can do this either with the images in `data` or `data_crop`. 500 images is probably not enough to obtain a reasonable estimate of the test accuracy.

In [None]:
train_files = []
for path, subdirs, files in os.walk('data/train/'):
    for name in files:
        train_files.append(os.path.join(path, name))

In [None]:
np.random.shuffle(train_files)

valid_files = train_files[:500]
train_files = train_files[500:]

for f in valid_files:
    os.rename(f, 'data/valid/' + f.split('data/train/')[1])

In [None]:
%mv data/valid/positive/* data/train/positive/
%mv data/valid/negative/* data/train/negative/

I realise what caused the huge difference between validation and test accuracy. The images were taken from different frames of video footage. Therefore, images that were taken say 1 second frames apart are very similar to each other, depending how fast the driver was going at the time. Thus if we randomly sample images from the training set for a validation set, some of the validation images will look very similar to the images in the training set. I suspect the images in the test set are from totally different video footage and not similar to any of the training images. Thus the big difference!
For a more appropriate validation set, we need to find a way to group the images by the sequence they were taken in.

## Setup data generators

The following section creates the batch generators for training and validation.

Since, we are using models pretrained on ImageNet, we subtract the ImageNet means.

In [None]:
def imagenet_mean(x):
    x = x[..., ::-1]
    x[..., 0] -= 103.939
    x[..., 1] -= 116.779
    x[..., 2] -= 123.68
    return x

The data augmentations include horizontal flip and small horizontal and vertical shifts. The shifts are a bit risky since they can cut off some of the potholes

In [None]:
train_gen = img.ImageDataGenerator(
    horizontal_flip=True,
    width_shift_range=0.05,
    height_shift_range=0.1,
    preprocessing_function=imagenet_mean
)
test_gen = img.ImageDataGenerator(
    preprocessing_function=imagenet_mean
)

In [None]:
batch_size=64
img_size = (300,300)

In [None]:
train_batches = train_gen.flow_from_directory(
    'data/train/',
    batch_size=batch_size,
    target_size = img_size,
    class_mode='binary'
)

valid_batches = test_gen.flow_from_directory(
    'data/valid/',
    batch_size=batch_size,
    target_size = img_size,
    shuffle=False,
    class_mode='binary'
)

test_batches = test_gen.flow_from_directory(
    'data/test/',
    batch_size=batch_size,
    target_size = img_size,
    shuffle=False,
    class_mode='binary'
)

Test the one of the generators output.

In [None]:
temp_train_batch = train_batches.next()
print('X shape: ', temp_train_batch[0].shape)
print('Y shape: ', temp_train_batch[1].shape)

plt.imshow(temp_train_batch[0][0].astype('uint8'))

## Start Modelling

using an ensemble of 3 pretrained ConvNets: ResNet50, ResNet101 and DenseNet121. Each model I trained on a different train/validation split and averaged their predictions on the test set.

In [None]:
# choose the convnet
base_model = ResNet50(include_top=False, input_shape=img_size + (3,))
#base_model = densenet121_model(img_rows=img_size[0], img_cols=img_size[1], color_type=3, num_classes=2)
#base_model = resnet101_model(img_rows=img_size[0], img_cols=img_size[1], color_type=3, num_classes=2)

In [None]:
base_model.summary()

Add new classification head. Can use max or average pooling.

In [None]:
ft_map = base_model.get_layer(index=-2).output

x = Conv2D(128, (3,3), padding='same')(ft_map)
x = BatchNormalization()(x)
x = Activation('relu')(x)

x = Conv2D(1, (3,3), activation='sigmoid', padding='same')(x)
x = GlobalAveragePooling2D()(x)

model = Model(base_model.input, x)

In [None]:
model.summary()

First, train only the new classification layer.

In [None]:
# freeze all the base model layers
for layer in base_model.layers:
    layer.trainable = False

Can experiment with different optimising strategies. I feel that small learning rates worked the best.

In [None]:
opt = Adam(0.001)#, momentum=0.9)
model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=5, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_cls.h5')

In [None]:
K.set_value(model.optimizer.lr, 0.00001)

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=5, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_cls.h5')

In [None]:
for i,layer in enumerate(model.layers):
    print(i, layer.name)

Fine-tune deeper layers - either conv5 block or conv5 + conv4

In [None]:
for layer in model.layers[:141]:
    layer.trainable = False
    
for layer in model.layers[141:]:
    layer.trainable = True

In [None]:
opt = Adam(0.0001)
model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=5, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_block5.h5')

In [None]:
K.set_value(model.optimizer.lr, 0.00001)

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=3, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_block5.h5')

In [None]:
K.set_value(model.optimizer.lr, 0.000001)

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=2, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

## Evaluate on hold-out sets

### Validation

Here we test the model on the validation set, but it can also be applied to the test set.

In [None]:
# load data in memory
valid_batches.reset()
x_valid = np.vstack([valid_batches.next()[0] for x in range(int(np.ceil(valid_batches.samples/batch_size)))])

In [None]:
valid_batches.reset()
y_valid = np.concatenate([valid_batches.next()[1] for x in range(int(np.ceil(valid_batches.samples/batch_size)))])

In [None]:
p_valid = np.zeros_like(y_valid)
for flip in [False, True]:
    temp_x = x_valid
    if flip:
        temp_x = img.flip_axis(temp_x, axis=2)
    p_valid += 0.5 * np.reshape(model.predict(temp_x, verbose=1), y_valid.shape)

Accuracy

In [None]:
np.mean((p_valid > 0.5) == y_valid)

### Test

In [None]:
# load data in memory
test_batches.reset()
x_test = np.vstack([test_batches.next()[0] for x in range(int(np.ceil(test_batches.samples/batch_size)))])

In [None]:
test_batches.reset()
y_test = np.concatenate([test_batches.next()[1] for x in range(int(np.ceil(test_batches.samples/batch_size)))])

In [None]:
p_test = np.zeros_like(y_test)
for flip in [False, True]:
    temp_x = x_test
    if flip:
        temp_x = img.flip_axis(temp_x, axis=2)
    p_test += 0.5 * np.reshape(model.predict(temp_x, verbose=1), y_test.shape)

Accuracy

In [None]:
from sklearn.metrics import confusion_matrix

In [None]:
np.mean((p_test > 0.5) == y_test)

## Pothole localisation

In [None]:
cam_extract = Model(base_model.input, model.get_layer(index=-3).output)

In [None]:
cam_valid = cam_extract.predict(x_valid, verbose=1)

In [None]:
valid_ind = np.random.randint(low=0,high=500)
valid_file = valid_batches.filenames[valid_ind]
print(valid_file)

In [None]:
valid_cam = cam_extract.predict(np.expand_dims(x_valid[valid_ind], 0))
np.max(valid_cam)

In [None]:
overlay = img.array_to_img(valid_cam[0]).resize((800,600), Image.BILINEAR).convert('RGB')
bg = img.load_img('data/valid/' + valid_file)#.resize((300,300))

In [None]:
Image.blend(alpha=0.5, im1=bg, im2=overlay)

In [None]:
test_ind = np.random.randint(high=1500,low=0)
test_file = test_batches.filenames[test_ind]
print(test_file)

In [None]:
test_cam = cam_extract.predict(np.expand_dims(x_test[test_ind], 0))
np.max(test_cam)

In [None]:
overlay = img.array_to_img(test_cam[0]).resize((800,600), Image.BILINEAR).convert('RGB')
bg = img.load_img('data/test/' + test_file)#.resize((300,300))

In [None]:
Image.blend(alpha=0.5, im1=bg, im2=overlay)