In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pylab as plt
import tensorflow as tf

In [None]:
train = pd.read_csv('/kaggle/input/pothole-rainflip-aml-final/new data/train_ids_rainflip_labels.csv')

In [None]:
train.shape

In [None]:
# use sklearn train_test_split to ensure stratified split
X_train, X_test, y_train, y_test = train_test_split(train['Image_ID'].values,
                                                    train['Label'].values,
                                                    test_size=0.15,
                                                    shuffle=True,
                                                    random_state=42,
                                                    stratify=train['Label'].values)

In [None]:
# to build tf train set and test set
# the csv's are handy to calculate the loss of each instance
train_df = pd.DataFrame({'Image_ID': X_train, 'Label': y_train})
test_df = pd.DataFrame({'Image_ID': X_test, 'Label': y_test})
train_df.to_csv('new_train_id_labels.csv', index=False)
test_df.to_csv('test_id_labels.csv', index=False)

In [None]:
IMG_HEIGHT = 224
IMG_WIDTH = 224
NUM_CHANNELS = 3

In [None]:
def read_and_decode(filename, reshape_dims):
    # read the file.
    img = tf.io.read_file(filename)
    # convert the compressed string to a 3D tensor (uint8)
    img = tf.image.decode_jpeg(img, channels=3)
    # convert 3D uint8 in the range of [0,255] to floats in the [0,1] range.
    #   img = tf.image.convert_image_dtype(img, tf.float32)
    # resize the image
    return tf.image.resize(img, reshape_dims)

In [None]:
# decode function to read csv to build tf datasets
def decode_csv(csv_row):
    record_defaults = ["path", int()]
    filename, label = tf.io.decode_csv(csv_row, record_defaults)
    filename = '/kaggle/input/pothole-rainflip-aml-final/new data/all_data/all_data/'+ filename +'.JPG'
    img = read_and_decode(filename, [IMG_HEIGHT, IMG_WIDTH])
    # label = tf.math.equal(CLASS_NAMES, label_string)
    return img, label

In [None]:
# build tf datasets
dataset = (tf.data.TextLineDataset('/kaggle/working/new_train_id_labels.csv').skip(1).  # skip header
map(decode_csv)) # train set
test_dataset = (tf.data.TextLineDataset('/kaggle/working/test_id_labels.csv').skip(1).  # skip header
map(decode_csv)).batch(8) # test set

In [None]:
train_size = int(0.8 * train_df.shape[0])
# val_size = int(0.2 * train_df.shape[0])

dataset = dataset.shuffle(buffer_size=train_df.shape[0])
train_dataset = dataset.take(train_size).batch(8)
val_dataset = dataset.skip(train_size).batch(8)
# preprocess the datasets: convert input format from [0,255] to the format expected by MobileNetV2
preprocess_layer = tf.keras.layers.Lambda(lambda data: tf.keras.applications.mobilenet.preprocess_input(tf.cast(data, tf.float32)), input_shape=[*[IMG_HEIGHT, IMG_WIDTH], 3])

train_dataset.map(lambda x,y: (preprocess_layer(x), y))
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

val_dataset.map(lambda x, y: (preprocess_layer(x),y))
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)

test_dataset.map(lambda x, y: (preprocess_layer(x),y))
test_dataset = test_dataset.prefetch(tf.data.AUTOTUNE)

In [None]:
# build model
pretrained_model = tf.keras.applications.MobileNetV2(weights='imagenet', include_top=False, input_shape=[*[IMG_HEIGHT, IMG_WIDTH], 3])
pretrained_model.trainable= True

x = tf.keras.layers.GlobalAveragePooling2D()(pretrained_model.output)
x = tf.keras.layers.Dense(16, activation='relu', name='pothole_dense')(x)
outputs = tf.keras.layers.Dense(1, activation='sigmoid', name='pothole_prob')(x)

model = tf.keras.models.Model(pretrained_model.input,outputs)

model.compile(
    optimizer='adam',
    loss = tf.keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=['accuracy'],
)
model.summary()

In [None]:
# Custom learning rate scheduler function
lr_start   = 1e-4
lr_max     = 0.000015
lr_min     = 1e-7
lr_ramp_ep = 3
lr_sus_ep  = 0
lr_decay   = 0.7
   
def lrfn(epoch):
    if epoch < lr_ramp_ep:
        lr = (lr_max - lr_start) / lr_ramp_ep * epoch + lr_start            
    elif epoch < lr_ramp_ep + lr_sus_ep:
        lr = lr_max
    else:
        lr = (lr_max - lr_min) * lr_decay**(epoch - lr_ramp_ep - lr_sus_ep) + lr_min
    return lr

# using this function, create a Callback
lr_callback = tf.keras.callbacks.LearningRateScheduler(lrfn, verbose=True)

In [None]:
# model training
history = model.fit(train_dataset, epochs=10,
                    validation_data=val_dataset,
                    callbacks=[tf.keras.callbacks.EarlyStopping(patience=3), lr_callback])

In [None]:
# evaluate on the test set
results = model.evaluate(test_dataset)

In [None]:
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # First, we create a model that maps the input image to the activations
    # of the last conv layer as well as the output predictions
    grad_model = tf.keras.models.Model(
        model.inputs, [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    # This is the gradient of the output neuron (top predicted or chosen)
    # with regard to the output feature map of the last conv layer
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

In [None]:
from IPython.display import Image, display
import matplotlib as mpl
def save_and_display_gradcam(img_path, heatmap, cam_path="cam.jpg", alpha=0.4):
    # Load the original image
    img = tf.keras.utils.load_img(img_path)
    img = tf.keras.utils.img_to_array(img)

    # Rescale heatmap to a range 0-255
    heatmap = np.uint8(255 * heatmap)

    # Use jet colormap to colorize heatmap
    jet = mpl.colormaps["jet"]

    # Use RGB values of the colormap
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Create an image with RGB colorized heatmap
    jet_heatmap = tf.keras.utils.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = tf.keras.utils.img_to_array(jet_heatmap)

    # Superimpose the heatmap on original image
    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = tf.keras.utils.array_to_img(superimposed_img)

    # Save the superimposed image
    superimposed_img.save(cam_path)

    # Display Grad CAM
    display(Image(cam_path))

In [None]:
last_conv_layer_name = 'Conv_1'

In [None]:
# Remove last layer's activation function (sigmoid)
model.layers[-1].activation = None

In [None]:
# test the grad cam 
img_path = '/kaggle/input/pothole-rainflip-aml-final/new data/all_data/all_data/jOsYoCYLRmnFmNm.JPG'
img = read_and_decode(img_path, [IMG_HEIGHT, IMG_WIDTH])
doc = tf.expand_dims(img, axis=0)
# Generate class activation heatmap
heatmap = make_gradcam_heatmap(doc, model, last_conv_layer_name)

# Display heatmap
plt.matshow(heatmap)
plt.show()
save_and_display_gradcam(img_path, heatmap)

In [None]:
def gradcam(img_path, heatmap, alpha=0.4):
    # Load the original image
    img = tf.keras.utils.load_img(img_path)
    img = tf.keras.utils.img_to_array(img)

    # Rescale heatmap to a range 0-255
    heatmap = np.uint8(255 * heatmap)

    # Use jet colormap to colorize heatmap
    jet = mpl.colormaps["jet"]

    # Use RGB values of the colormap
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Create an image with RGB colorized heatmap
    jet_heatmap = tf.keras.utils.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = tf.keras.utils.img_to_array(jet_heatmap)

    # Superimpose the heatmap on original image
    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = tf.keras.utils.array_to_img(superimposed_img)
    
    return superimposed_img

In [None]:
filenames = ['pQVbGRebTAwmTnv','shRYejQmoIcPXFW','jOsYoCYLRmnFmNm','liCaWxHAhHpFsrz','GgbTBjlZpzCTLSo','HdyRyKBWYuJPgyG','OtentfnkpKSXBOu',
'ndJbsOThuKwfYmu','twvydkkKycsdAvn','BOrVotINGylbEGK','LOoadBMcPPPiGfA','KNjXjRYyYbABKRv','waMCvfGJarKfTJF','kOSfAJBhBylzeBu','tjEpnRouMRxTeqX']

In [None]:
fig, axs = plt.subplots(5, 3, figsize=(32, 32), sharey=True)
axs = axs.flatten()
for i in range(15):
    img_path = '/kaggle/input/pothole-rainflip-aml-final/new data/all_data/all_data/'+filenames[i]+'.JPG'
    img = read_and_decode(img_path, [IMG_HEIGHT, IMG_WIDTH])
    doc = tf.expand_dims(img, axis=0)
    # Generate class activation heatmap
    heatmap = make_gradcam_heatmap(doc, model, last_conv_layer_name)
    img_cam = gradcam(img_path,heatmap)
    axs[i].imshow(img_cam)
    axs[i].set_xticks([])
    axs[i].set_yticks([])
    axs[i].grid(False)
    fig.tight_layout(h_pad=5, w_pad=5)