# Import packages

In [None]:
import os
import random
import pandas as pd
import numpy as np
import cv2
import albumentations as A
import seaborn as sns
sns.set_style('darkgrid')
from PIL import Image

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_curve
from sklearn.metrics import auc

import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import Sequential
from tensorflow.keras import regularizers, layers

# Setup datasets

In [None]:
dataset_name = "mias"
dataset_paths = [
    '/kaggle/input/mias-preprocessed-datasets/datasets/clasify'
]

In [None]:
IMG_SIZE = 224
HIDDEN_LAYERS = [256, 256, 128]
USE_HIDDEN_LAYERS = True
USE_AUGUMENTATION = False
INPUT_SHAPE = (IMG_SIZE, IMG_SIZE, 3)
BATCH_SIZE = 8
EPOCHS = 100

In [None]:
from collections import defaultdict

count_dist = defaultdict(int)
images = []
labels = []

# Spliting by my self later
for path in dataset_paths:
    for dirpath, _, filenames in os.walk(path):
        for filename in filenames:
            
            image = cv2.imread(os.path.join(dirpath, filename))
            image = cv2.resize(image, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_AREA)
            label = dirpath.split('/')[-1]
            
            count_dist[label] += 1
            images.append(image)
            labels.append(label)
            
count_dist

In [None]:
def visualize_datasets(images, labels, k=4, seed=42):
    # visualize datasets
    plt.figure(figsize=(20, 20))
    random.seed(seed)
    samples = random.sample(list(range(len(images))), k)
    for stt, i in enumerate(samples):
        plt.subplot(2, k, stt + 1)
        plt.imshow(images[i])
        class_name = labels[i]
        plt.title(class_name, color = 'blue' , fontsize=12)
        plt.axis('off')

    plt.show()

visualize_datasets(images, labels)

# Data augumentation

In [None]:
img_augmentation_layers = [
    layers.RandomRotation(factor=0.15),
    layers.RandomFlip(),
    layers.RandomContrast(factor=0.1),
]

def img_augmentation(images, k=1):
    results = []
    for i in range(k):
        x = images
        for layer in img_augmentation_layers:
            x = layer(x)
        results.extend(x)
    
    return results

In [None]:
data_dict = defaultdict(list)

for i, img in enumerate(images):
    data_dict[labels[i]].append(img)

aug_dict = defaultdict(list)
aug_dict['NORMAL'] = img_augmentation(data_dict['NORMAL'], k=10)
aug_dict['B'] = img_augmentation(data_dict['B'], k=40)
aug_dict['M'] = img_augmentation(data_dict['M'], k=40)

all_images = images
all_labels = labels
for label, items in aug_dict.items():
    size = len(items)
    aug_labels = [label for i in range(size)]
    all_images.extend(items)
    all_labels.extend(aug_labels)

len(all_images), len(all_labels)

# Encode datasets

In [None]:
classes = ['NORMAL', 'B', 'M']
num_classes = 3

In [None]:
# Convert labels to numpy array
x = np.stack(images, axis=0)
y = np.array([classes.index(label) for label in labels])

x.shape, y.shape

# Split datasets

In [None]:
def count_labels(labels):
    count_dict = defaultdict(int)
    for idx in labels:
        count_dict[classes[idx]] += 1
    return count_dict

In [None]:
# Split the data into training and remaining sets (validation + test)
x_train, x_temp, y_train, y_temp = train_test_split(x, y, test_size=0.2, random_state=42, stratify=y)
# Split the remaining data into validation and test sets
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

[
    x_train.shape, 
    x_val.shape, 
    x_test.shape, 
    y_train.shape , 
    y_val.shape , 
    y_test.shape, 
    count_labels(y_train),
    count_labels(y_val),
    count_labels(y_test), 
]

# Helper functions

In [None]:
def plot_acc(model_history, epochs, name):
    print('\n\n')
    plt.figure(figsize=(12,8))
    plt.plot(np.arange(0, epochs), model_history.history["accuracy"], label="train_acc")
    plt.plot(np.arange(0, epochs), model_history.history["val_accuracy"], label="val_acc")
    plt.title("Training Accuracy - {}".format(name))
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()

In [None]:
def plot_loss(model_history, epochs, name):
    print('\n\n')
    plt.figure(figsize=(12,8))
    plt.plot(np.arange(0, epochs), model_history.history["loss"], label="train_loss", )
    plt.plot(np.arange(0, epochs), model_history.history["val_loss"], label="val_loss")
    plt.title("Training Loss - {}".format(name))
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

In [None]:
# Function to plot confusion matrix
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    
    # plot the confusion matrix
    class_count = len(classes)
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        
    plt.figure(figsize=(12, 8))
    sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)       
    plt.xticks(np.arange(class_count)+.5, classes, rotation=90)
    plt.yticks(np.arange(class_count)+.5, classes, rotation=0)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title(title)
    plt.show()

In [None]:
def evaluate(model, x, y):
    scores = model.evaluate(x, y, verbose=1)
    return scores

In [None]:
def predict_prob(model):
    return model.predict(x_test, batch_size=BATCH_SIZE, verbose=1)

In [None]:
def predict(model):
    predictions = predict_prob(model)
    return np.argmax(predictions, axis=1)

In [None]:
def calculate_metrics(y_true, y_pred):
    
    print("Visualize: y_true, y_pred top 20")
    print('Y_true', [i for i in y_true[:20]])
    print('Y_pred', [j for j in y_pred[:20]])

    # precision tp / (tp + fp)
    precision = precision_score(y_true, y_pred, average='weighted')
    print("Precision: {}".format(precision))

    # recall: tp / (tp + fn)
    recall = recall_score(y_true, y_pred, average='weighted')
    print("Recall:    {}".format(recall))

    # f1: 2 tp / (2 tp + fp + fn)
    f1 = f1_score(y_true, y_pred, average='weighted')
    print("F1:        {}".format(f1))

# Setup Transfer Learning

In [None]:
def transfer_learning(model, name):
    
    best_weights_ph1 = f"{dataset_name}_{name}_ph1_weights.keras"
    
    callbacks_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath = best_weights_ph1,
        monitor = "val_accuracy",
        mode = "max",
        save_weights_only=True,
        save_best_only = True,
        verbose=1, # Logging when callback running
    )
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-2)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    history = model.fit(
        x_train,
        y_train,
        batch_size=BATCH_SIZE,
        validation_data=(x_val, y_val),
        validation_batch_size=BATCH_SIZE,
        epochs = EPOCHS,
        callbacks = [callbacks_checkpoint]
    )
    
    acc_max = max(history.history["accuracy"])
    acc_min = min(history.history["accuracy"])
    print("Training Acc:", [acc_min, acc_max])
    
    val_acc_max = max(history.history["val_accuracy"])
    val_acc_min = min(history.history["val_accuracy"])
    print("Validation Acc:", [val_acc_min, val_acc_max])
    
    best_idx = np.argmax(history.history["val_accuracy"])
    print('The best val_acc result expected at epoch {} with metrics: '.format(best_idx))
    for k, vals in history.history.items():
        print('{}: {}'.format(k, vals[best_idx]))
    
    print('\nRestoring best weights and predicting validation set.')
    model.load_weights(best_weights_ph1)
    
    loss, acc = evaluate(model, x_test, y_test)
    print('Transfer Learning test scores (loss, acc):', [loss, acc])
    
    plot_acc(history, EPOCHS, f"\n Transfer Learning - ACC: {name} PhA.")
    plot_loss(history, EPOCHS, f"\n Transfer Learning - LOSS: {name} PhA.")
    
    y_pred = predict(model)
    return history, model, val_acc_max, y_pred

# Setup fine tuning

In [None]:
y_train

In [None]:
def fine_turning(model, name, acc_ph1):
    
    best_weights_ph2 = f"{dataset_name}_{name}_ph2_weights.keras"
    callbacks_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath = best_weights_ph2,
        monitor = "val_accuracy",
        mode = "max",
        save_weights_only=True,
        save_best_only = True,
        verbose=1, # Logging when callback running
    )
    
    for layer in model.layers[-20:]:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True
        else:
            layer.trainable = False

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    history = model.fit(
        x_train, 
        y_train,
        batch_size=BATCH_SIZE,
        validation_data=(x_val, y_val),
        validation_batch_size=BATCH_SIZE,
        epochs = EPOCHS,
        callbacks = [callbacks_checkpoint]
    )
    
    acc_max = max(history.history["accuracy"])
    acc_min = min(history.history["accuracy"])
    print("Training Acc:", [acc_min, acc_max])
    
    val_acc_max = max(history.history["val_accuracy"])
    val_acc_min = min(history.history["val_accuracy"])
    print("Validation Acc:", [val_acc_min, val_acc_max])
    
    best_idx = np.argmax(history.history["val_accuracy"])
    print('The best val_acc result expected at epoch {} with metrics: '.format(best_idx))
    for k, vals in history.history.items():
        print('{}: {}'.format(k, vals[best_idx]))
    
    print('Restoring best weights of Ph2 and predicting test set.')
    model.load_weights(best_weights_ph2)
    loss, acc = evaluate(model, x_test, y_test)
    print('Fine Tuning test scores (loss, acc):', [loss, acc])
    
    if val_acc_max < acc_ph1:
        print('\nPhase 2 resulted in lower accuracy than Phase 1.')
    
    plot_acc(history, EPOCHS, f"\n Fine Turning - ACC: {name} PhB.")
    plot_loss(history, EPOCHS, f"\n Fine Turning - LOSS: {name} PhB.")
    
    y_pred = predict(model)
    return history, model, acc, y_pred

In [None]:
initial_models = dict(
    EfficientNetB3=tf.keras.applications.EfficientNetB3,
    ResNet50=tf.keras.applications.resnet50.ResNet50,
    MobileNet=tf.keras.applications.mobilenet.MobileNet,
    InceptionV3=tf.keras.applications.inception_v3.InceptionV3
)

base_model_kwargs = dict(
    include_top=False,
    weights='imagenet',
    input_shape=(IMG_SIZE, IMG_SIZE, 3),
)

# custom kwargs for each model here 
initial_models_kwargs = dict(
    EfficientNetB3={ **base_model_kwargs },
    ResNet50={ **base_model_kwargs },
    MobileNet={ **base_model_kwargs },
    InceptionV3={ **base_model_kwargs }
)

In [None]:
for name, Model in initial_models.items():
    
    base_model = Model(**initial_models_kwargs[name])
    base_model.trainable = False
        
    output = base_model.output
    top_layers = [
        layers.GlobalMaxPooling2D(),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
    ]
    
    if USE_HIDDEN_LAYERS:
        for i, layer in enumerate(HIDDEN_LAYERS):
            top_layers.append(layers.Dense(layer, activation='relu'))
            top_layers.append(layers.BatchNormalization())
            top_layers.append(layers.Dropout(0.15))
    
    top_layers.append(layers.Dense(len(classes), activation='softmax'))
    
    for layer in top_layers:
        output = layer(output)
    
    model = tf.keras.models.Model(base_model.input, output, name=name)
    
    print(f'\n\n ==========Start Process with model {name}=========')
    # model.summary()
    
    history, model, best_acc_ph1, y_pred = transfer_learning(model, name)
    calculate_metrics(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    plot_confusion_matrix(cm, classes, title=f"Confusion matrix for {name} - Transfer Learning")
    
    
    if best_acc_ph1 < 1.00:
        history, model, best_acc_ph2, y_pred = fine_turning(model, name, best_acc_ph1)
        calculate_metrics(y_test, y_pred)
        cm = confusion_matrix(y_test, y_pred)
        plot_confusion_matrix(cm, classes, title=f"Confusion matrix for {name} - Fine Turnning")
        
    else:
        print('Transfer learning have 100% accuracy so no need to do fine-turning.')
    
    print(f'==========End Process with model {name}==========\n\n')