In [None]:
# Specify the classifier to be trained and the classes array
# a: ['MEL', 'NMEL']
# b: ['MEL', 'NV']
# c: ['BEN', 'MAL']
# d: ['BKL', 'DF', 'VASC']
# e: ['BCC', 'AKIEC']

classifier = 'a'
classesArray = ['BCC', 'AKIEC']

In [None]:
import cv2
import os
from operator import itemgetter
from numpy import array
import csv
import time
import matplotlib.pyplot as plt
import keras
import tensorflow as tf
from sklearn.metrics import confusion_matrix

In [None]:
# imports all the images from a specified folder, with a specific extension
# and resizes to a specific imgHeight, imgWidth

def import_dataset(path, mode, fileExtension='.jpg', imgWidth=224, imgHeight=224):
    datasetFilenamesImages = []
    dataset = []
    print("Start importing " + mode + " images...")
    for filename in os.listdir(path):
        if filename.endswith(fileExtension): 
            completePath = os.path.join(path, filename)
            image = cv2.imread(completePath, cv2.IMREAD_COLOR)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, dsize=(imgHeight, imgWidth), interpolation=cv2.INTER_AREA)
            filenameImage = [filename, image]
            datasetFilenamesImages.append(filenameImage)
        else:
            continue
    
    datasetFilenamesImages = sorted(datasetFilenamesImages, key=itemgetter(0))
    for x in datasetFilenamesImages:
        dataset.append(x[1])
    
    return array(dataset)

In [None]:
def assign_labels(path_groundtruth, classesArray):
    target = []
    if len(classesArray) == 2:
        counter = {classesArray[0]: 0, classesArray[1]: 0}
    elif len(classesArray) == 3:
        counter = {classesArray[0]: 0, classesArray[1]: 0, classesArray[2]: 0}
        
    i = 0
    with open(path_groundtruth, 'r') as file:
        reader = csv.reader(file)
        for row in reader:
            if i == 0:
                i += 1
                continue

            if row[1] == '1.0': # BCC
                counter[classesArray[0]] += 1
                target.append(0)
            elif row[2] == '1.0': # AKIEC
                counter[classesArray[1]] += 1
                target.append(1)
                
            if len(classesArray) == 3:
                if row[3] == '1.0':
                    counter[classesArray[2]] += 1
                    target.append(2)

    print(counter)
    file.close()
    return counter, target

In [None]:
def plot_val_train_error(fit):
    plt.plot(fit.history['accuracy'])
    plt.plot(fit.history['val_accuracy'])
    plt.grid(True)
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()

    plt.plot(fit.history['loss'])
    plt.plot(fit.history['val_loss'])
    plt.grid(True)
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()

In [None]:
def create_model(model = 'densenet', transferLearning=True, noClasses=2):
    if model == 'resnet':

        from tensorflow.keras.applications.resnet import ResNet101
        from tensorflow.keras.layers import Dense, GlobalAveragePooling2D

        if transferLearning == False:
            resnet = ResNet101(include_top=False, weights=None, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))
        else:
            resnet = ResNet101(include_top=False, weights='imagenet', input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))

        model = tf.keras.Sequential(resnet)
        model.add(GlobalAveragePooling2D())
        model.add(tf.keras.layers.Dropout(0.5))
        model.add(Dense(units=noClasses, activation="softmax"))

    elif model == 'densenet':

        from tensorflow.keras.applications.densenet import DenseNet121
        from tensorflow.keras.layers import Dense, GlobalAveragePooling2D

        if transferLearning == False:
            densenet = DenseNet121(include_top=False, weights=None, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))
        else:
            densenet = DenseNet121(include_top=False, weights='imagenet', input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))

        model = tf.keras.Sequential(densenet)
        model.add(GlobalAveragePooling2D())
        model.add(tf.keras.layers.Dropout(0.5))
        model.add(Dense(units=noClasses, activation="softmax"))

    else:
        print("That model is not available.")
        exit(0)

    return model

In [None]:
# Learning Rate Scheduler
def scheduler(epoch, noEpochs, learningRate):
    if epoch < int(0.5 * noEpochs):
        return learningRate
    elif epoch < int(0.75 * noEpochs):
        return learningRate / 10
    else:
        return learningRate / 100

In [None]:
# Import Training and Validation datasets and Groundtruth of classifier
classifierToIndex = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4}
indexClassifier = classifierToIndex[classifier]

pathTrainArray = ['/c/Users/ruben/Desktop/train2018',
                  '/c/Users/ruben/Desktop/hier/b',
                  '/c/Users/ruben/Desktop/hier/c',
                  '/c/Users/ruben/Desktop/hier/d',
                  '/c/Users/ruben/Desktop/hier/e']

truthTrainArray = ['/c/Users/ruben/Desktop/hier/a/labels.csv',
                   '/c/Users/ruben/Desktop/hier/b/labels.csv',
                   '/c/Users/ruben/Desktop/hier/c/labels.csv',
                   '/c/Users/ruben/Desktop/hier/d/labels.csv',
                   '/c/Users/ruben/Desktop/hier/e/labels.csv',]

pathValArray  = ['/c/Users/ruben/Desktop/val2018',
                 '/c/Users/ruben/Desktop/hier_val/b',
                 '/c/Users/ruben/Desktop/hier_val/c',
                 '/c/Users/ruben/Desktop/hier_val/d',
                 '/c/Users/ruben/Desktop/hier_val/e']

truthValArray = ['/c/Users/ruben/Desktop/hier_val/a/labels.csv',
                 '/c/Users/ruben/Desktop/hier_val/b/labels.csv',
                 '/c/Users/ruben/Desktop/hier_val/c/labels.csv',
                 '/c/Users/ruben/Desktop/hier_val/d/labels.csv',
                 '/c/Users/ruben/Desktop/hier_val/e/labels.csv']

x_train = import_dataset(pathTrainArray[indexClassifier], 'training')
counter, y_train = assign_labels(truthTrainArray[indexClassifier], classesArray)

x_val = import_dataset(pathValArray[indexClassifier], 'validation')
counter, y_val = assign_labels(truthValArray[indexClassifier], classesArray)

print("Dataset Imported")

In [None]:
# Define training variables
noEpochs = 20
learningRate = 1e-5
noClasses = len(classesArray)
batchSize = 10
modelName = 'densenet'



checkpointPath = {
    'resnet': ["/c/Users/ruben/Desktop/ModelWeights/Hier/a/resnet/cp.ckpt",
               "/c/Users/ruben/Desktop/ModelWeights/Hier/b/resnet/cp.ckpt",
               "/c/Users/ruben/Desktop/ModelWeights/Hier/c/resnet/cp.ckpt",
               "/c/Users/ruben/Desktop/ModelWeights/Hier/d/resnet/cp.ckpt",
               "/c/Users/ruben/Desktop/ModelWeights/Hier/e/resnet/cp.ckpt"],
    'densenet': ["/c/Users/ruben/Desktop/ModelWeights/Hier/a/densenet/cp.ckpt",
                "/c/Users/ruben/Desktop/ModelWeights/Hier/b/densenet/cp.ckpt",
                "/c/Users/ruben/Desktop/ModelWeights/Hier/c/densenet/cp.ckpt",
                "/c/Users/ruben/Desktop/ModelWeights/Hier/d/densenet/cp.ckpt",
                "/c/Users/ruben/Desktop/ModelWeights/Hier/e/densenet/cp.ckpt"]
}

w_0_float = float(counter[classesArray[0]])
w_1_float = float(counter[classesArray[1]])


if noClasses == 2:
    w_total = w_0_float + w_1_float
    w_0 = w_total / w_0_float
    w_1 = w_total / w_1_float
    classWeights = {0: w_0, 1: w_1}
elif noClasses == 3:
    w_2_float = float(counter[classesArray[2]])
    w_total = w_0_float + w_1_float + w_2_float
    w_0 = w_total / w_0_float
    w_1 = w_total / w_1_float
    w_2 = w_total / w_2_float
    classWeights = {0: w_0, 1: w_1, 2: w_2}

y_train_cat = keras.utils.to_categorical(y_train, noClasses)
y_val_cat = keras.utils.to_categorical(y_val, noClasses)

# Callback to adapt the learning rate value
schedulerLearningRate = tf.keras.callback.LearningRateScheduler(scheduler)

# Callback to save the Best Model Weights
checkpoint_dir = os.path.dirname(checkpointPath[modelName][index])
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpointPath[modelName][index],
    save_weights_only=True,
    verbose=1, 
    save_best_only=True
)

In [None]:
# Create and fit the model
model = create_model(modelName, transferLearning=True, noClasses)
model.summary()

model.compile(
    loss = keras.losses.categorical_crossentropy,
    optimizer = tf.keras.optimizers.Adam(lr=1e-5),
    metrics = ['accuracy']
)

model.fit(
    x_train,
    y_train_cat,
    batch_size=batchSize,
    class_weight=classWeights,
    callbacks=[schedulerLearningRate, checkpoint],
    epochs = noEpochs,
    shuffle=True
    validation_data=(x_val, y_val_cat)
)

In [None]:
# Evaluate in validation set
model.load_weights(checkpointPath[modelName])

y_pred = model.predict_classes(x_val)
conf_matrix = confusion_matrix(y_val, y_pred)
print(conf_matrix)

score = model.evaluate(x_val, y_val_cat, verbose=1)
print(f'Val loss: {score[0]} / Val accuracy: {score[1]}')

plot_val_train_error(fit)