In [None]:
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
from utilities import model_creation, metrics_and_plots, data_processing
from tensorflow.keras.utils import plot_model, to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping
# Let's see our luck
print("We are running {} version of the TensorFlow,\nand we have {} GPUs Available.".format(
    tf.__version__, len(tf.config.list_physical_devices('GPU'))))

In [None]:
target_size = (128, 128)
batch_size = 32
epochs = 50
input_shape = (128, 128, 3)

METRICS = [
    tf.keras.metrics.TruePositives(name='tp'),
    tf.keras.metrics.FalsePositives(name='fp'),
    tf.keras.metrics.TrueNegatives(name='tn'),
    tf.keras.metrics.FalseNegatives(name='fn'), 
    tf.keras.metrics.BinaryAccuracy(name='accuracy'),
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall'),
    tf.keras.metrics.AUC(name='auc'),
    tf.keras.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
    tf.keras.metrics.SpecificityAtSensitivity(sensitivity=0.5, name = 'Specificity'),
    tf.keras.metrics.SensitivityAtSpecificity(specificity=0.5, name = 'Sensitivity')
]

In [None]:
# os.chdir("../../Dataset/2_0_1_splitted")
os.chdir("../../Dataset/1_asli_splitted")

train_dir = "train"
test_dir = "test"
val_dir = "validation"


train_iterator = data_processing.generate_iterator(path=train_dir, augmentation = True, 
                                                   rescale = 0, batch_size = batch_size)
test_iterator = data_processing.generate_iterator(path=test_dir, augmentation=False, 
                                                  shuffle=False, rescale=0, batch_size = batch_size)
validation_iterator = data_processing.generate_iterator(path=val_dir, augmentation=False, 
                                                        shuffle=False, rescale=0, batch_size = batch_size)

# data_processing.display_images(train_iterator)

In [None]:
is_hybrid_model = True
is_hybrid_model_main_only = False

is_test_only = False

In [None]:
# Let's define the callbacks to save the model and reduce the learning rate
import os

## FOR ENHANCED DATASET
save_dir_for_enhanced = os.path.join('..', '..', "Result", '2_4_reference_updated')
if is_hybrid_model:
    if is_hybrid_model_main_only:
        model_name_for_enhanced = 'hybrid_model_main_only'
    else:
        model_name_for_enhanced = 'hybrid_model'
else:
    architecture = "VGG16"
    model_name_for_enhanced = f'from_tensorflow_{architecture}'
if not os.path.isdir(save_dir_for_enhanced):
    os.makedirs(save_dir_for_enhanced)
filepath_for_enhanced = os.path.join(save_dir_for_enhanced, f"{model_name_for_enhanced}.h5")
csv_for_enhanced = os.path.join(save_dir_for_enhanced, f"{model_name_for_enhanced}.csv")

# prepare callbacks for model saving and for learning rate adjustment.
checkpoint_for_enhanced = ModelCheckpoint(filepath=filepath_for_enhanced,
                                          monitor= 'val_Specificity',
                                          verbose=1,
                                          save_best_only=True,
                                          save_weights_only=True,
                                          mode='max')


CSVLogger_for_enhanced = CSVLogger(filename=csv_for_enhanced, separator=',',
                                  append=False)

lr_reducer = ReduceLROnPlateau(factor=0.5,
                               cooldown=0,
                               patience=5,
                               min_lr=1e-6,
                               monitor='val_Specificity',
                               mode='max')

early_stopping = EarlyStopping(patience=6,
                               monitor='val_Specificity',
                               start_from_epoch=6,
                               verbose=1,
                               mode='max')


callbacks_for_enhanced = [checkpoint_for_enhanced, lr_reducer, CSVLogger_for_enhanced, early_stopping]

In [None]:
if is_hybrid_model:
    if is_hybrid_model_main_only:
        model = model_creation.main_model_only(input_shape=input_shape, num_classes=5, blocks=4)
    else:
        model = model_creation.final_model(input_shape=input_shape, num_classes=5, blocks=4)
else:
    model = model_creation.model_from_tf(input_shape=input_shape, num_classes=5, is_transfer_learning=False)

In [None]:
model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=0.001),
                           loss = tf.keras.losses.CategoricalCrossentropy(),
                           metrics = METRICS)
# plot_model(model, show_shapes=True)

In [None]:
if not is_test_only:
    history = model.fit(x=train_iterator, epochs=epochs, 
                        validation_data=validation_iterator, 
                        callbacks=callbacks_for_enhanced,
                        batch_size=batch_size)

In [None]:
if not is_test_only:
    import numpy as np
    np.save('history.npy', history.history)
    # In order to load, we will use the following line
    # history1 = np.load('history.npy', allow_pickle=True).item()

In [None]:
if not is_test_only:
    metrics_and_plots.plot_metrics(history)

In [None]:
model.load_weights(filepath_for_enhanced)

In [None]:
test_predictions_baseline = model.predict(test_iterator, batch_size=64)
y_test = to_categorical(test_iterator.labels)

In [None]:
baseline_results = model.evaluate(test_iterator,
                                  batch_size=64, verbose=0)
for name, value in zip(model.metrics_names, baseline_results):
    print(name, ': ', value)
print('.......................................')

In [None]:
# Compute ROC curve and ROC area for each class

y_score = test_predictions_baseline
n_classes = 5
metrics_and_plots.plot_roc(y_test, y_score, 5)

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, f1_score, ConfusionMatrixDisplay

In [None]:
def predict_label_by_category(predict_categorical):
    for result in predict_categorical:
        yield tf.math.argmax(result).numpy()

In [None]:
test_label = list(predict_label_by_category(y_test))
predict_label = list(predict_label_by_category(y_score))

In [None]:
print(f"F1 score - micro: {f1_score(test_label, predict_label, average='micro')}\nF1 score - macro: {f1_score(test_label, predict_label, average='macro')}\nF1 score - weighted: {f1_score(test_label, predict_label, average='weighted')}")