# Pothole detection

## Import libraries

In [None]:
import numpy as np
import os
import csv
from PIL import Image

import keras
import keras.preprocessing.image as img
from keras.applications import ResNet50
from keras.layers import Dense
from keras.layers.pooling import GlobalMaxPool2D
from keras.models import Model
from keras.optimizers import SGD, Adam
from keras import backend as K

import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

## Crop images and save in a new folder

My idea was to crop the dashboard and the sky parts of the images to save some computing time. I found the bounding box limits of all the training images and cropped the images at these points. In hindsight, I realise that this is not the best method since if the potholes in the test set appears outside these margins, this model will not be able to detect them. Which might be one of the reasons for the big difference between the validation (97%) and test accuracy (86%).

In [None]:
# get all the filenames
all_files = []
for path, subdirs, files in os.walk('data'):
    for name in files:
        all_files.append(os.path.join(path, name))

In [None]:
# crop the images and save in data_crop folder
for f in all_files:
    temp_img = Image.open(f)
    temp_img = temp_img.crop((0, 600-435, 800, 600-435+185))
    temp_img.save('data_crop' + f.split('data')[1])

This code is not efficient at all, but luckily the number of images is small.

## Create train and validation folders

Here we take 500 random images from the `train` folder and move it to the `valid` folder. You can do this either with the images in `data` or `data_crop`. 500 images is probably not enough to obtain a reasonable estimate of the test accuracy.

In [None]:
train_files = []
for path, subdirs, files in os.walk('data_crop/train/'):
    for name in files:
        train_files.append(os.path.join(path, name))

In [None]:
np.random.shuffle(train_files)

valid_files = train_files[:500]
train_files = train_files[500:]

for f in valid_files:
    os.rename(f, 'data_crop/valid/' + f.split('data_crop/train/')[1])

If you want to reset the split you can run the following cell:

In [None]:
%mv data_crop/valid/positive/* data_crop/train/positive/
%mv data_crop/valid/negative/* data_crop/train/negative/

## Setup data generators

The following section creates the batch generators for training and validation.

Since, we are using models pretrained on ImageNet, we subtract the ImageNet means.

In [None]:
def imagenet_mean(x):
    x = x[..., ::-1]
    x[..., 0] -= 103.939
    x[..., 1] -= 116.779
    x[..., 2] -= 123.68
    return x

The data augmentations include horizontal flip and small horizontal and vertical shifts. The shifts are a bit risky since they can cut off some of the potholes, but I didn't have time to compare results.

In [None]:
train_gen = img.ImageDataGenerator(
    horizontal_flip=True,
    width_shift_range=0.05,
    height_shift_range=0.05,
    preprocessing_function=imagenet_mean
)
test_gen = img.ImageDataGenerator(
    preprocessing_function=imagenet_mean
)

Choose sizes that fit in your machine's memory. I suppose bigger is better. Note that the current specification strecthes the images vertically. I argued that this might increase the visibility of the 'flatter' potholes.

In [None]:
batch_size=64
img_size = (300,300)

I did not do this in the competition, but I should have set `class_mode='binary'` for a more compact representation of the labels.

In [None]:
train_batches = train_gen.flow_from_directory(
    'data_crop/train/',
    batch_size=batch_size,
    target_size = img_size#,
    #class_mode='binary'
)

valid_batches = test_gen.flow_from_directory(
    'data_crop/valid/',
    batch_size=batch_size,
    target_size = img_size,
    shuffle=False#,
    #class_mode='binary'
)

## Start Modelling

I used an ensemble of 3 pretrained ConvNets: ResNet50, ResNet101 and DenseNet121. Each model I trained on a different train/validation split and averaged their predictions on the test set.

In [None]:
# choose the convnet
base_model = ResNet50(include_top=False, input_shape=img_size + (3,))
#base_model = densenet121_model(img_rows=img_size[0], img_cols=img_size[1], color_type=3, num_classes=2)
#base_model = resnet101_model(img_rows=img_size[0], img_cols=img_size[1], color_type=3, num_classes=2)

In [None]:
base_model.summary()

Add new classification head. Can use max or average pooling.

In [None]:
ft_map = base_model.get_layer(index=-2).output

x = GlobalAveragePooling2D()(ft_map)
x = Dense(2, activation = 'softmax')(x)

model = Model(base_model.input, x)

If I had set `class_mode='binary'` in the `.flow_from_directory()` call, I would have changed the final layer to `Dense(1, activation='sigmoid')`.

In [None]:
model.summary()

First, train only the new classification layer.

In [None]:
# freeze all the base model layers
for layer in base_model.layers:
    layer.trainable = False

Can experiment with different optimising strategies. I found that small learning rates worked the best.

In [None]:
opt = Adam(0.001)#, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

Start training. I repeated the following sequence as necessary: fit -> save_weights -> decrease learning rate -> repeat.

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=5, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_cls.h5')

In [None]:
K.set_value(model.optimizer.lr, 0.0001)

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=3, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_cls_300300.h5')

In [None]:
for i,layer in enumerate(model.layers):
    print(i, layer.name)

Fine-tune deeper layers - either conv5 block or conv5 + conv4

In [None]:
for layer in model.layers[:141]:
    layer.trainable = False
    
for layer in model.layers[141:]:
    layer.trainable = True

In [None]:
opt = Adam(0.0001)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=5, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_crop_block5.h5')

In [None]:
K.set_value(model.optimizer.lr, 0.00001)

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=3, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

In [None]:
model.save_weights('models/rn50_crop_block5.h5')

In [None]:
K.set_value(model.optimizer.lr, 0.0000001)

In [None]:
model.fit_generator(train_batches, 
                    steps_per_epoch=np.ceil(train_batches.samples/batch_size), 
                    epochs=2, verbose=1, 
                    validation_data=valid_batches, 
                    validation_steps=np.ceil(valid_batches.samples/batch_size),
                    )

### Evaluate on hold-out set

Here we test the model on the validation set, but it can also be applied to the test set.

In [None]:
# load data in memory
valid_batches.reset()
x_valid = np.vstack([valid_batches.next()[0] for x in range(int(np.ceil(valid_batches.samples/batch_size)))])

In [None]:
valid_batches.reset()
y_valid = np.vstack([valid_batches.next()[1] for x in range(int(np.ceil(valid_batches.samples/batch_size)))])

Little bit of TTA, predict on both horisontal orientations.

In [None]:
p_valid = np.zeros_like(y_valid)
for flip in [False, True]:
    temp_x = x_valid
    if flip:
        temp_x = img.flip_axis(temp_x, axis=2)
    p_valid += 0.5 * model.predict(temp_x, verbose=1)

Accuracy

In [None]:
np.mean(np.argmax(p_valid, axis=1) == np.argmax(y_valid, axis=1), axis=0)