# Import Libraries

In [None]:
100000000000000<float('inf')

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

import cv2
import random
import shutil
import time
import matplotlib
import glob
import operator
import psutil
import numpy as np
import math
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import *
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.regularizers import l2

from shutil import copyfile
import pandas as pd
import PIL
from mlxtend.plotting import plot_confusion_matrix
from tqdm import tqdm 

# Setting up hardware

In [None]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')

if physical_devices != []:
    print("Using GPU")
    for i in physical_devices:
        tf.config.experimental.set_memory_growth(i, True)
else:
    print("Using CPU")
    pass

# Load Dataset

In [None]:
root_dir = '/mnt/87433d53-66f3-438e-a7f4-fc990fe61283/PROJECTS/JRF/sound-classification/specs'
classify_train = os.path.join(root_dir, 'classify train')

train_directory = os.path.join(classify_train, 'training')
validation_directory = os.path.join(classify_train, 'validation')
test_directory = os.path.join(classify_train, 'testing')

In [None]:
train_directory, validation_directory, test_directory

# Hyper-parameter setting-1

In [None]:
learning_rate = float(input("Enter the initial learning rate: "))
epoch = int(input("Enter the maximum number of epochs: "))
batch_size = int(input("Enter batch size: "))
lambd = float(input("Enter lambda for L2 regularization: "))

# Characteristics folder
Creating a seperate folder where 'classify train' belongs to store model and results

In [None]:
char_name = str(input("Enter name of the characteristics folder: "))

char = os.path.join(root_dir, char_name)

if not os.path.exists(char):
    os.mkdir(char)
else:
    replace = str(input("Folder already exists ! Do you want to replace it ?(Y/N) "))
    if replace.upper() == 'Y':      
        shutil.rmtree(char)
        os.mkdir(char)
    elif replace.upper() == 'N':
        print('\nThe following folders already exist:')
        for i in os.listdir(root_dir): 
            print(i)
        char_name = str(input("\nEnter a new name of the characteristics folder: "))
        char = os.path.join(root_dir, char_name)
        if not os.path.exists(char):
            os.mkdir(char)
        else:
            print(f"{char_name} replaced")
            shutil.rmtree(char)
            os.mkdir(char)

# Learning rate decay

In [None]:
steps = 10 # change steps to 1 to apply exponential decay

def lr_schedule(epoch):
    return learning_rate * (0.1 ** int(epoch / steps))
    
best_model_address = os.path.join(char, 'best_model.h5')

# Callbacks

In [None]:
monitor = int(input("Press 1 to monitor Validation Accuracy\nPress 2 to monitor Validation Loss\nPress 3 to monitor Training Accuracy\nPress 4 to monitor Training Loss\n"))
patience = int(input('Enter number of epochs that will produce monitored quantity with no improvement after which training will be stopped: '))


if monitor == 1:
    metric = 'val_accuracy'
    mode = 'max'
    print("\nMONITORING VALIDATION ACCURACY..........\n")

elif monitor == 2:
    metric = 'val_loss'
    mode = 'min'
    print("\nMONITORING VALIDATION LOSS..........\n")

elif monitor == 3:
    metric = 'accuracy'
    mode = 'max'
    print("\nMONITORING TRAINING ACCURACY..........\n")

elif monitor == 4:
    metric = 'loss'
    mode = 'min'
    print("\nMONITORING TRAINING LOSS..........\n")

callback = [keras.callbacks.LearningRateScheduler(lr_schedule, verbose = 1),
            keras.callbacks.EarlyStopping(monitor = metric, min_delta = 0.001, patience = patience, verbose=1, mode = mode, restore_best_weights = True),
            keras.callbacks.ModelCheckpoint(best_model_address, monitor = metric, verbose=1, save_best_only=True, save_weights_only=False, mode = mode)]

print(f"\nTraining will stop if {metric} doesn't show any improvements for {patience} epcohs\n")

# Model Architecture

In [None]:
def classical_cnn(dim, output_neurons, output_activation):
    print("\nTRAINING ON classical_cnn MODEL:-")
    
    
    def block(tensor, conv_reps, n_filters):
        x = Conv2D(filters = n_filters, kernel_size = (3,3), padding = 'same')(tensor)
        x = LeakyReLU()(x)
        
        for i in range(conv_reps-1):
            x = Conv2D(filters = n_filters, kernel_size = (3,3), padding = 'same')(x)
            x = LeakyReLU()(x)
            
        x = MaxPooling2D(pool_size = (2,2), strides = 2, padding = 'same')(x)
        
        return x

    input = Input(shape = dim)
        
    k = 8
    x = block(input, conv_reps = 1, n_filters = k)
    x = block(x, conv_reps = 1, n_filters = 2*k)
    x = block(x, conv_reps = 1, n_filters = 4*k)
    x = block(x, conv_reps = 1, n_filters = 8*k)
    x = block(x, conv_reps = 1, n_filters = 16*k)
    
    x = Flatten()(x)
    
    dense_reps = 1
    
    for i in range(dense_reps):
        x = Dense(128)(x)
        x = LeakyReLU()(x)
    
    output = Dense(output_neurons, output_activation)(x)  
    
    model = Model(inputs = input, outputs = output)
    
    return model

In [None]:
class_no = len(os.listdir(train_directory))

print("This is a " + str(class_no) + "-Class Classification")

if class_no <= 2:
    class_mode = 'binary'
    output_activation = 'sigmoid'
    output_neurons = 1
    losses = 'binary_crossentropy'

else:
    class_mode = 'categorical'
    output_activation = 'softmax'
    output_neurons = class_no
    losses = 'categorical_crossentropy'

In [None]:
def optimizer_selection():
    print("\nSelect a optimizer which will reduce the loss of the model.\n")

    optimizer_select = int(input("Press 1 to select Stochastic Gradient Descent\nPress 2 to select RMSprop\nPress 3 to select Adagrad\nPress 4 to select Adadelta\nPress 5 to select Adam\nPress 6 to select Adamax\nPress 7 to select Nadam\n"))

    if optimizer_select == 1:
        optimizer = SGD(lr = learning_rate, decay = 1e-6, momentum = 0.9, nesterov = True)

    elif optimizer_select == 2:
        optimizer = RMSprop(learning_rate, rho = 0.9)

    elif optimizer_select == 3:
        optimizer = Adagrad(learning_rate)

    elif optimizer_select == 4:
        optimizer = Adadelta(learning_rate, rho = 0.95)

    elif optimizer_select == 5:
        optimizer = Adam(learning_rate = learning_rate, beta_1 = 0.9, beta_2 = 0.999, amsgrad = False)

    elif optimizer_select == 6:
        optimizer = Adamax(learning_rate = learning_rate, beta_1 = 0.9, beta_2 = 0.999)

    elif optimizer_select == 7:
        optimizer = Nadam(learning_rate = learning_rate, beta_1 = 0.9, beta_2 = 0.999)
   
    return optimizer

In [None]:
h = int(input("Image Dimension(H or W): "))
w = h
color = int(input("Press 1 for RGB \nPress 2 for Grayscale "))
if color == 1:
    color_mode = 'rgb'
    dim = (h,w,3)
elif color == 2:
    color_mode = 'grayscale'
    dim = (h,w,1)

In [None]:
train_datagen = ImageDataGenerator(rescale=1.0/255.)
train_generator = train_datagen.flow_from_directory(train_directory,
                                                    batch_size = batch_size,
                                                    class_mode = class_mode,
                                                    color_mode = color_mode,
                                                    target_size = (h,w),
                                                    shuffle=True)

validation_datagen = ImageDataGenerator(rescale=1.0/255.)
validation_generator = validation_datagen.flow_from_directory(validation_directory,
                                                              batch_size = batch_size,
                                                              class_mode = class_mode,
                                                              color_mode = color_mode,
                                                              target_size = (h,w),
                                                              shuffle=True)

test_datagen = ImageDataGenerator(rescale=1.0/255.)
test_generator = test_datagen.flow_from_directory(test_directory,
                                                  batch_size = batch_size,
                                                  class_mode = class_mode,
                                                  color_mode = color_mode,
                                                  target_size = (h,w),
                                                  shuffle=True)

In [None]:
optimizer = optimizer_selection()

In [None]:
model = classical_cnn(dim, output_neurons, output_activation)
model.compile(optimizer = optimizer, loss = losses, metrics = ['accuracy', 
                                                               tf.keras.metrics.Precision(), 
                                                               tf.keras.metrics.Recall(), 
                                                               tf.keras.metrics.TruePositives(), 
                                                               tf.keras.metrics.TrueNegatives(), 
                                                               tf.keras.metrics.FalsePositives(),
                                                               tf.keras.metrics.FalseNegatives()])


model.summary()

In [None]:
with tf.device('/GPU:0'):
    start = time.time()
    history = model.fit(train_generator,
                        epochs = epoch,
                        verbose = 1,
                        callbacks = callback,
                        validation_data = validation_generator,
                        shuffle=True)

    end = time.time()
    
    duration = end-start

train_score = model.evaluate(train_generator)
val_score = model.evaluate(validation_generator)
test_score = model.evaluate(test_generator)

In [None]:
print("Execution Time: {} seconds".format(duration))

# Plot characteristic curves

In [None]:
acc=history.history['accuracy']
val_acc=history.history['val_accuracy']
loss=history.history['loss']
val_loss=history.history['val_loss']
epochs=range(len(acc))

In [None]:
plt.plot(epochs, acc, 'r', label='Training Accuracy')
plt.plot(epochs, val_acc, 'b', label='Validation Accuracy')

plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title('Training and validation accuracy vs Epochs')
plt.legend()

fig_name_eps = "accuracy.eps"
fig_name_jpg = "accuracy.jpg"

plt.savefig(os.path.join(char, fig_name_eps))
plt.savefig(os.path.join(char, fig_name_jpg))

plt.show()

In [None]:
plt.plot(epochs, loss, 'r', label="Training Loss")
plt.plot(epochs, val_loss, 'b', label="Validation Loss")

plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title('Training and validation loss vs Epochs')
plt.legend()

fig_name_eps = "loss.eps"
fig_name_jpg = "loss.jpg"

plt.savefig(os.path.join(char, fig_name_eps))
plt.savefig(os.path.join(char, fig_name_jpg))

plt.show()

In [None]:
plt.plot(epochs, acc, 'r', label='Training Accuracy')
plt.plot(epochs, val_acc, 'b', label='Validation Accuracy')
plt.plot(epochs, loss, 'r', label="Training Loss")
plt.plot(epochs, val_loss, 'b', label="Validation Loss")

plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title('Training and validation loss vs Epochs')
plt.legend()

fig_name_eps = "char.eps"
fig_name_jpg = "char.jpg"

plt.savefig(os.path.join(char, fig_name_eps))
plt.savefig(os.path.join(char, fig_name_jpg))

# Classification metrices

In [None]:
training_accuracy = train_score[1]*100
validation_accuracy = val_score[1]*100
test_accuracy = test_score[1]*100

print("The training accuracy is: " + str(training_accuracy) + ' %')
print("The validation accuracy is: " + str(validation_accuracy) + ' %')
print("The test accuracy is: " + str(test_accuracy) + ' %')

In [None]:
test_accuracy = test_score[1]*100
test_precision = test_score[2]*100
test_recall = test_score[3]*100
tp = int(test_score[4])
tn = int(test_score[5])
fp = int(test_score[6])
fn = int(test_score[7])

f1 = 2*((test_precision*test_recall)/(test_precision+test_recall))
sensitivity_k = (tp/(tp+fn))*100
specificity_k = (tn/(tn+fp))*100

In [None]:
print("Test Accuracy: {}".format(test_accuracy))
print("Test Precision: {}".format(test_precision))
print("Test Recall: {}".format(test_recall))
print("True Positive: {}".format(tp))
print("Test Negetive: {}".format(tn))
print("False Positive: {}".format(fp))
print("False Negetive: {}".format(fn))
print("Sensitivity_k: {}".format(sensitivity_k))
print("Specificity_k: {}".format(specificity_k))

In [None]:
def report(y_true, y_pred, labels):
    from sklearn.metrics import classification_report
    from sklearn.metrics import confusion_matrix 
    
    print("Calculating CLASSIFICATION REPORT: ")
    classification_reports = classification_report(y_true, y_pred, target_names=labels)
    print(classification_reports)

    print("\nCalculating SENSITIVITY & SPECIFICITY..........:")
    cm = confusion_matrix(y_true, y_pred)
    total = sum(sum(cm))
    sensitivity = cm[0, 0] / (cm[0, 0] + cm[0, 1])
    specificity = cm[1, 1] / (cm[1, 0] + cm[1, 1])

    print("sensitivity = {:.4f}".format(sensitivity))
    print("specificity = {:.4f}".format(specificity))
    
    return cm, classification_reports, sensitivity, specificity

In [None]:
def conf_mat(cm, labels, char, file_name):
    fig, ax = plot_confusion_matrix(conf_mat=cm,
                                    colorbar=True,
                                    show_absolute=True,
                                    class_names=labels,
                                    show_normed=True)

    plt.savefig(os.path.join(char, f'{file_name}.eps'))
    plt.savefig(os.path.join(char, f'{file_name}.jpg'))

In [None]:
labels = test_generator.class_indices

# Classification result (Scikit-learn)

In [None]:
def pred_sklearn(test_directory, test_generator, class_no, best_model_address, dim):
    test_class_list = []
    for test_name in os.listdir(test_directory):
        test = os.path.join(test_directory, test_name)
        test_class_list.append(test)
    test_class_list.sort()
    
    print(test_class_list)
    
    y_true = test_generator.classes
    labels = test_generator.class_indices
    
    y_pred = []
    tot = len(os.listdir(test_class_list[1]))*class_no

    best_model = load_model(best_model_address)
    
    with tqdm(total=tot) as pbar:
        for i in range(class_no):
            for filename in os.listdir(test_class_list[i]):
                file = os.path.join(test_class_list[i], filename)
                
                img = image.load_img(file, target_size=(dim[1], dim[0]))
                x = image.img_to_array(img)
                x = np.expand_dims(x, axis=0)

                im_arr = np.vstack([x])
                
#                 img = cv2.imread(file)
#                 res = cv2.resize(img, (dim[0], dim[1]))
#                 normed = res / 255.0
#                 im_arr = normed.reshape(1, dim[0], dim[1], dim[2])

                pred = best_model.predict(im_arr)
                pred_categorical = keras.utils.to_categorical(pred)

                if class_no >= 2:
                    max_pred = np.argmax(pred)
                else:
                    max_pred = np.argmax(pred_categorical)

                y_pred.append(max_pred)

                pbar.set_description("Progress")
                pbar.update()
                
    return y_true, y_pred, labels

In [None]:
labels

In [None]:
y_true, y_pred, labels = pred_sklearn(test_directory, test_generator, class_no, best_model_address, dim)

In [None]:
cm, classification_reports_sklearn, sensitivity_sklearn, specificity_sklearn = report(y_true, y_pred, labels)

In [None]:
conf_mat(cm, labels, char, 'confusion-matrix-sklearn')

# README

In [None]:
from contextlib import redirect_stdout

readme_name_text = "readme.txt"
print(f"Please read the text file named {readme_name_text} for detailed information of the model.")

completeName_txt = os.path.join(char, readme_name_text) 

readme = open(completeName_txt, "w")

if len(os.listdir(train_directory)) > 2:
    readme.write(f"This is a {len(os.listdir(train_directory))}-class CLASSIFICATION")
else:
    readme.write("This is a BINARY CLASSIFICATION")


readme.write("\n\n--HYPERPARAMETERS--\n")
readme.write(f"\nInitial Learning Rate = {learning_rate}")
readme.write(f"\nNo. of epochs = {len(acc)}")
readme.write(f"\nBatch Size = {batch_size}")


readme.write("\n\n--MODEL-PARAMETERS--")
# readme.write(f"\nDropout for feature extraction = {(int(f_dropout*100))} %")
# readme.write(f"\nDropout for dense layer = {(int(d_dropout*100))} %")
readme.write(f"\nOptimizer = {optimizer}\n\n")


readme.write("Trained on a Custom Prebuilt Model\n")
# readme.write(f"\nFilter size = {size_filter}x{size_filter}\n\n")
with redirect_stdout(readme):
    model.summary()
        
    
readme.write("\n\n--MODEL-PERFORMANCE--")
readme.write(f"\nTest Accuracy = {test_accuracy} %")
readme.write(f"\nTest Precision = {test_precision} %")
readme.write(f"\nTest Recall = {test_recall} %")
readme.write(f"\nTrue Positive = {tp}")
readme.write(f"\nTrue Negetive = {tn}")
readme.write(f"\nFalse Positive = {fp}")
readme.write(f"\nFalse Negetive = {fn}")
readme.write(f"\nSensitivity_k = {sensitivity_k}")
readme.write(f"\nSpecificity_k = {specificity_k}\n\n\n")


readme.write("\n\n--MODEL-CHARACTERISTICS--")
readme.write(f"\nacc = {acc}")
readme.write(f"\n\nval_acc = {val_acc}")
readme.write(f"\n\nloss = {loss}")
readme.write(f"\n\nval_loss = {val_loss}")


readme.write("\n\n--Classification Report Keras--\n")
readme.write(classification_reports_keras)
readme.write(f"\nSensitivity = {sensitivity_keras*100} %")
readme.write(f"\nSpecificity = {specificity_keras*100} %")

readme.write("\n\n--Classification Report Scikit-learn--\n")
readme.write(classification_reports_sklearn)
readme.write(f"\nSensitivity = {sensitivity_sklearn*100} %")
readme.write(f"\nSpecificity = {specificity_sklearn*100} %")


readme.write(f"\nExecution Time: {duration} seconds")

readme.write("\n\nCreated using Self-Regulated Image Classifier using Convolution Neural Network")

readme.close()