In [None]:
# Importing all the necessary libraries
import time
import cv2
import numpy as np
import Augmentor
import glob
import shutil
import os
from os import listdir
from os.path import isfile, join
import itertools
import keras
import numpy as np
import sklearn
import cv2
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split, KFold
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, LeakyReLU, Dense, Dropout, Flatten


In [None]:
# making a list of image names with plf error
plf_image_names = []
with open("xz_labels_plf.txt") as f:
    for line in f:
        file, label = line.strip('\n').split(": ")
        if label != "0":
            plf_image_names.append(file)


In [None]:
# copy pasting images with plf to another folder for data augmentation
src_dir = "Processed_Images_XZ"
dst_dir = "plf_XZ"
for jpgfile in glob.iglob(os.path.join(src_dir, "*.jpg")):

    if jpgfile.replace("Processed_Images_XZ\\", "") in plf_image_names:
        shutil.copy(jpgfile, dst_dir)


In [None]:
# Passing the path of the image directory (augmented images created from images with plf in this folder are deleted)
p = Augmentor.Pipeline("Processed_Images_XZ")
# Defining augmentation parameters and generating 500 images with no error
p.zoom(probability=0.95, min_factor=0.8, max_factor=1.2)
p.rotate(probability=0.95, max_left_rotation=10, max_right_rotation=10)
p.skew_corner(probability=0.8)
p.flip_top_bottom(probability=0.95)
p.sample(500)


In [None]:
# Passing the path of the image directory
p = Augmentor.Pipeline("plf_XZ")
# Defining augmentation parameters and generating 1000 images with plf error
p.zoom(probability=0.95, min_factor=0.8, max_factor=1.2)
p.rotate(probability=0.95, max_left_rotation=10, max_right_rotation=10)
p.skew_corner(probability=0.8)
p.flip_top_bottom(probability=0.95)
p.sample(1000)


In [None]:
# getting a list of augmented image names with plf error
mypath = 'plf_XZ\output'
augmented_images_plf = [f for f in listdir(mypath) if isfile(join(mypath, f))]


In [None]:
# getting a list of augmented image names with no error
mypath2 = 'Processed_Images_XZ\output'
augmented_images_no_error = [f for f in listdir(
    mypath2) if isfile(join(mypath2, f))]


In [None]:
# a function that appends to the lists a label, image name and image in grayscale
def load_images_xz(folder, image_dir, images_name, images, labels, crop=False, list=False, error=False):
    if list == True:
        if error == True:
            label = '1'
        elif error == False:
            label = '0'
        for file in folder:
            # read the image Processed_Images_XZ\output\
            image = cv2.imread(f"{image_dir}{file}")
            # resize to 224 x 224
            if crop:
                image = cv2.resize(image, (224, 224))

            # from BGR to gray
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

            # append image and label to the list
            images_name.append(f"{image_dir}{file}")
            images.append(image)
            labels.append(label)
        return images, labels, images_name
    with open(folder) as f:
        for line in f:
            # get the path of an image and the label
            file, label = line.strip("\n").split(": ")

            # read the image
            image = cv2.imread(f"{image_dir}{file}")

            # resize to 224 x 224
            if crop:
                image = cv2.resize(image, (224, 224))

            # from BGR to gray
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

            # append image and label to the list
            images_name.append(f"{image_dir}{file}")
            images.append(image)
            labels.append(label)
        return images, labels, images_name


In [None]:
images_name, images, labels = [], [], []

In [None]:
images, labels, images_name = load_images_xz(
    "xz_labels_plf.txt","Processed_Images_XZ/", images_name, images, labels)


In [None]:
images, labels, images_name = load_images_xz(
    augmented_images_plf, "plf_XZ/output/",  images_name, images, labels, list=True, error=True)


In [None]:
images, labels, images_name = load_images_xz(
    augmented_images_no_error, "Processed_Images_XZ/output/", images_name, images, labels, list=True, error=False)


In [None]:
# normalise and transform to np array
def normalise_images(images, labels):
    # Convert to numpy arrays
    images = np.array(images, dtype=np.float32)
    labels = np.array(labels)
    labels = labels.astype(np.int)
    # 0: no error, 1: plf error
    labels[labels == 2] = 1
    # Normalise the images
    images /= 255.0

    return images, labels


In [None]:
images_norm, labels = normalise_images(images, labels)

In [None]:
# checking if the numbers are correct
unique_labels, counts_labels = np.unique(labels, return_counts=True)
print(np.asarray((unique_labels, counts_labels)).T)


In [None]:
# shuffle the data
def shuffle_data(images_norm, labels, images_name):
    X_data, y_data, images_name = sklearn.utils.shuffle(
        images_norm, labels, images_name, random_state=42)

    return X_data, y_data, images_name


In [None]:
X_data, y_data, images_name = shuffle_data(
    images_norm, labels, images_name)


In [None]:
#Reshaping
X_data = X_data.reshape(-1, X_data.shape[1], X_data.shape[2], 1)


In [None]:
# one hot encoder, e.g.: if it is a plf then y=[0, 1]
y_data = to_categorical(y_data, num_classes=len(np.unique(y_data)))


In [None]:
# building the heavy CNN model
xz_model_heavy = Sequential()
xz_model_heavy.add(Conv2D(8, kernel_size=(3, 3), activation='linear',
                          input_shape=(X_data.shape[1], X_data.shape[2], 1), padding='same'))
xz_model_heavy.add(LeakyReLU(alpha=0.1))
xz_model_heavy.add(MaxPooling2D((2, 2), padding='same'))
xz_model_heavy.add(Conv2D(16, (3, 3), activation='linear', padding='same'))
xz_model_heavy.add(LeakyReLU(alpha=0.1))
xz_model_heavy.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
xz_model_heavy.add(Conv2D(32, (3, 3), activation='linear',
             padding='same', name="just_do_it"))
xz_model_heavy.add(LeakyReLU(alpha=0.1))
xz_model_heavy.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
xz_model_heavy.add(Flatten())
xz_model_heavy.add(Dense(16, activation='linear'))
xz_model_heavy.add(LeakyReLU(alpha=0.1))
xz_model_heavy.add(Dense(y_data.shape[1], activation='softmax'))

xz_model_heavy.compile(loss=keras.losses.categorical_crossentropy,
                 optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])


In [None]:
def plot_model(model, plot=False):
    if plot:
        tf.keras.utils.plot_model(
            model,
            to_file="heavy_model_plot.png",
            show_shapes=True,
            show_dtype=False,
            show_layer_names=False,
            rankdir="TB",
            expand_nested=False,
            dpi=110,
            layer_range=None,
        )


In [None]:
# saving untrained heavy model for later use
xz_model_heavy.save("xz_model_heavy_untrained.h5")


In [None]:
# creating light CNN model
xz_model_light = Sequential()
xz_model_light.add(Conv2D(2, kernel_size=(3, 3), activation='linear',
                          input_shape=(X_data.shape[1], X_data.shape[2], 1), padding='same'))
xz_model_light.add(LeakyReLU(alpha=0.1))
xz_model_light.add(MaxPooling2D((2, 2), padding='same'))
xz_model_light.add(Conv2D(4, (3, 3), activation='linear', padding='same'))
xz_model_light.add(LeakyReLU(alpha=0.1))
xz_model_light.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
xz_model_light.add(Conv2D(4, (3, 3), activation='linear',
                          padding='same', name='just_do_it'))
xz_model_light.add(LeakyReLU(alpha=0.1))
xz_model_light.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
xz_model_light.add(Flatten())
xz_model_light.add(Dense(2, activation='linear'))
xz_model_light.add(LeakyReLU(alpha=0.1))
xz_model_light.add(Dense(y_data.shape[1], activation='softmax'))

# Compiling the model
xz_model_light.compile(loss=keras.losses.categorical_crossentropy,
                       optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])


In [None]:
# saving light model for later use
xz_model_light.save("xz_model_light_untrained.h5")


In [None]:
# Initialize to 20 and 30 epochs, later from learning curve decide on actual epochs
epochs_heavy = 20
epochs_light = 30
batch_size = 64


In [None]:
# 70-30 train test split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.3, random_state=42)


In [None]:
# heavy model train
history_heavy = xz_model_heavy.fit(X_train, y_train, epochs=epochs_heavy,
                       batch_size=batch_size, validation_split=0.2)


In [None]:
# light model train
history_light = xz_model_light.fit(X_train, y_train, epochs=epochs_light,
                                   batch_size=batch_size, validation_split=0.2)


In [None]:
def epoch_vs_performance_plot(history):
    # Plot the loss and accuracy curves for training and validation
    fig, ax = plt.subplots(2, 1)
    ax[0].plot(history.history['loss'], color='b', label="Training loss")
    ax[0].plot(history.history['val_loss'], color='r',
               label="validation loss", axes=ax[0])
    ax[1].set_xlabel("Number of epochs")
    legend = ax[0].legend(loc='best', shadow=True)

    ax[1].plot(history.history['accuracy'],
               color='b', label="Training accuracy")
    ax[1].plot(history.history['val_accuracy'],
               color='r', label="Validation accuracy")
    ax[1].set_xlabel("Number of epochs")
    legend = ax[1].legend(loc='best', shadow=True)


In [None]:
def speed_and_acc(model, X_test, y_test):
    # shows accuracy and speed of the model
    start_time = time.time()
    test_loss, test_acc = model.evaluate(X_test, y_test)
    delta_time = time.time() - start_time
    num_img = X_test.shape[0]
    print("--- %s images per second ---" % (num_img/delta_time))
    print(f"test acc: {test_acc}")


In [None]:
epoch_vs_performance_plot(history_light)


In [None]:
epoch_vs_performance_plot(history_heavy)


In [None]:
# deciding on final epoch number from the plot
epochs_light = 14

In [None]:
# deciding on final epoch number from the plot
epochs_heavy = 12

In [None]:
# loading untrained heavy model
xz_model_heavy = keras.models.load_model('xz_model_heavy_untrained.h5')


In [None]:
# loading untrained light model
xz_model_light = keras.models.load_model('xz_model_light_untrained.h5')


In [None]:
# training light model with new epoch number
history_light = xz_model_light.fit(X_train, y_train, epochs=epochs_light,
                                   batch_size=batch_size, validation_split=0.2)


In [None]:
# training heavy model with new epoch number
history_heavy = xz_model_heavy.fit(X_train, y_train, epochs=epochs_heavy,
                                   batch_size=batch_size, validation_split=0.2)


In [None]:
speed_and_acc(xz_model_heavy, X_test, y_test)


In [None]:
speed_and_acc(xz_model_light, X_test, y_test)


In [None]:
# plotting confusion matrix

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')





In [None]:
# Predict the values from the test set with the light model
Y_pred = xz_model_light.predict(X_test)
# Convert predictions classes to one hot vectors
Y_pred_classes = np.argmax(Y_pred, axis=1)
# Convert validation observations to one hot vectors
Y_true = np.argmax(y_test, axis=1)
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)
# plot the confusion matrix
plot_confusion_matrix(confusion_mtx, classes=["No error", "PLF"])


In [None]:
# Predict the values from the test set with the heavy model
Y_pred = xz_model_heavy.predict(X_test)
# Convert predictions classes to one hot vectors
Y_pred_classes = np.argmax(Y_pred, axis=1)
# Convert validation observations to one hot vectors
Y_true = np.argmax(y_test, axis=1)
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)
# plot the confusion matrix
plot_confusion_matrix(confusion_mtx, classes=["No error", "PLF"])

In [None]:
# loading untrained light model
xz_model_light = keras.models.load_model('xz_model_light_untrained.h5')


In [None]:
# training light model on full dataset
xz_model_light.fit(X_data, y_data, epochs=epochs_light,
                                      batch_size=batch_size)


In [None]:
xz_model_light.save("xz_model_light_FINAL.h5")


In [None]:
# loading untrained light model
xz_model_heavy = keras.models.load_model('xz_model_heavy_untrained.h5')


In [None]:
# training heavy model on full dataset
xz_model_light.fit(X_data, y_data, epochs=epochs_light,
                   batch_size=batch_size)


In [None]:
xz_model_heavy.save("xz_model_heavy_FINAL.h5")
