In [None]:
# FILE DESCRIPTION: -------------------------------------------------------

# This file builds, trains and evaluates a CNN model and 6 other pretrained 
# models using transfer learning. The following models ran were EfficientNet, 
# ResNet50, VGG16, MobileNetV2, Xception, and DenseNet201.

# --------------------------------------------------------------------------



# ----------- IMPORTS ----------------

# Libraries for building convolutional neural network and transfer learning
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50, EfficientNetB0, VGG16, MobileNetV2, Xception, DenseNet201

# Libaries for evaluation performance metrics
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import roc_curve, auc, roc_auc_score, RocCurveDisplay
from sklearn.metrics import accuracy_score

# Utility libraries
import os
from IPython.display import display
import time
from itertools import cycle



# ----------- CONSTANTS ----------------

# Directory structure
TRAIN_DIR = "PROCESSED_DATA/TRAINING_DATA/TRAINING_AUGMENTED_DATA"
VALID_DIR = "PROCESSED_DATA/VALIDATION_DATA/"
TEST_DIR = "PROCESSED_DATA/TEST_DATA/"

# Image parameters
IMG_SIZE = (224, 224)
BATCH_SIZE = 32

# Load in data
DATAGEN = ImageDataGenerator(rescale=1./255)

# Training parameters
#MODEL_OPTIONS = ["ResNet50", "EfficientNet", "VGG16", "MobileNetV2", "Xception", "DenseNet201"]
MODEL_OPTIONS = ["ResNet50", "EfficientNet"]
DEFAULT_LR = 0.0001
DEFAULT_EPOCHS = 1
DEFAULT_DROPOUT = 0.5

# Sound settings
PLAY_SOUND_FLAG = True

# ----------- DATA LOADING FUNCTION ----------------

def load_data(directory,shuffle_flag=True):
    generator = DATAGEN.flow_from_directory(
    directory,
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',shuffle=shuffle_flag)
    return generator

# Initialize data generators
TRAIN_GENERATOR = load_data(TRAIN_DIR)
VAL_GENERATOR = load_data(VALID_DIR)
TEST_GENERATOR = load_data(TEST_DIR,shuffle_flag=False)
EVAL_VAL_GENERATOR = load_data(VALID_DIR, shuffle_flag=False)



# ----------- MODEL BUILDING Functions -------------------

def build_cnn(dropout_percent: float = DEFAULT_DROPOUT) -> tf.keras.Model:
    """
    Builds a standard convolutional neural network model

    Parameters:
        dropout_percent (float, optional): The percentage of dropout. Defaults to DEFAULT_DROPOUT.

    Returns:
        a keras CNN model
    """
    
    model = Sequential([
        Conv2D(32, (3,3), activation='relu', padding='same', input_shape=(224, 224, 3)),
        MaxPooling2D(2,2),
        Conv2D(64, (3,3), activation='relu', padding='same'),
        MaxPooling2D(2,2),
        Conv2D(128, (3,3), activation='relu', padding='same'),
        MaxPooling2D(2,2),
        Flatten(),
        Dense(256, activation='relu'),
        Dropout(dropout_percent),
        Dense(TRAIN_GENERATOR.num_classes, activation='softmax')
    ])
    return model

def build_transfer_learning(model_name: str, dropout_percent: float = DEFAULT_DROPOUT) -> tf.keras.Model:
    """
    Builds and implements a transfer learning model using a pretrained model

    Parameters:
        model_name - str: name of pretrained model (e.g., DenseNet201)
        dropout_percent - float: percentage of dropout

    Returns:
        a keras transfer learning model
    """
    
    if model_name == 'ResNet50':
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
    elif model_name == 'EfficientNet':
        base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
    elif model_name == 'VGG16':
        base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
    elif model_name == 'MobileNetV2':
        base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
    elif model_name == 'Xception':
        base_model = Xception(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
    elif model_name == 'DenseNet201':
        base_model = DenseNet201(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
    else:
        raise ValueError("Invalid model name. Choose from 'ResNet50', 'EfficientNet', 'VGG16', 'MobileNetV2', 'Xception', or 'DenseNet201'")

    # freeze the base model layers
    base_model.trainable = False  
    
    x = Flatten()(base_model.output)
    x = Dense(512, activation='relu')(x)
    x = Dropout(dropout_percent)(x)
    
    output_layer = Dense(TRAIN_GENERATOR.num_classes, activation='softmax')(x)
    
    model = Model(inputs=base_model.input, outputs=output_layer)
    
    return model


def full_build_cnn(
    use_transfer_learning: bool = False, 
    transfer_model_name: str = 'ResNet50', 
    lr: float = DEFAULT_LR,
    metrics_lst: list = ['accuracy'], 
    val_generator = VAL_GENERATOR, 
    epoch_num: int = DEFAULT_EPOCHS,
    play_sound_flag: bool = True, 
    dropout_percent: float = DEFAULT_DROPOUT
) -> tuple:
    """
    Builds and trains a CNN model

    Parameters:
        use_transfer_learning - bool: indicating if this model should use transfer learning
        transfer_model_name - str: pretrained model name
        lr - float: denoting learning rate
        metrics_lst list: list of metrics to use in model compilation (accuracy)
        val_generator: validation data generator from load_data function
        epoch_num - int: number of epochs
        play_sound_flag -bool: plays sound after training is done
        dropout_percent -float: dropout percentage

    Returns:
        model - keras model object
        training_history - keras model training history
    """
    
    if use_transfer_learning:
        model = build_transfer_learning(transfer_model_name, dropout_percent=dropout_percent)
        
    else:
        model = build_cnn(dropout_percent=dropout_percent)
    
    # compile the model
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr), loss='categorical_crossentropy', metrics=metrics_lst)
    
    # training
    training_history = model.fit(
        TRAIN_GENERATOR,
        validation_data=val_generator,
        epochs=epoch_num
    )

    if play_sound_flag == True:
        
        # Sound that plays after training model is finished
        repeat_times = 3  
        if os.name == 'posix':  # For macOS and Linux
            for _ in range(repeat_times):
                os.system('afplay /System/Library/Sounds/Glass.aiff')  # macOS
                # Linux users: os.system('aplay /path/to/sound.wav')
                
    return model, training_history


def evaluate_model(model: tf.keras.Model, filename: str = "pest_classifier_cnn.h5") -> tuple:
    """
    Saves model to h5 file, returns validation accuracy loss and validation accuracy

    Parameters:
        model - keras model: model to evaluate.
        filename -str: filename to save the model

    Returns:
        tuple: containing the validation loss and validation accuracy
    """

    # evaluate on validation data
    val_loss, val_acc = model.evaluate(VAL_GENERATOR)
    print(f"Validation Accuracy: {val_acc:.4f}")
    print(f"Validation Loss: {val_loss:.4f}")

    # save to file
    model.save(filename)
    
    return val_loss, val_acc



# ----------- MODEL EVALUATION Functions ------

def create_classification_report(y_true, y_pred, class_indices: dict) -> pd.DataFrame:
    """
    Creates a classification report

    Parameters:
        y_true: true class labels
        y_pred: predicted class labels
        class_indices: mapping of class labels to class names.

    Function:
        generates a classification report including precision, recall, F1-score, and accuracy for each class
        outputs the report as a DataFrame for further analysis

    Returns:
        dataframe - classification report
    """

    report = classification_report(y_true, y_pred, target_names=list(class_indices.keys()), output_dict=True)
    report_df = pd.DataFrame(report).transpose()
    print("Classification Report:")
    display(report_df)
    return report_df

    
def plot_confusion_matrix(y_true, y_pred, class_indices: dict) -> None:
    """
    Plots a confusion matrix

    Parameters:
        y_true: true class labels
        y_pred: predicted class labels
        class_indices: Mapping of class labels to class names

    Function:
        Plots a confusion matrix
    """

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(class_indices.keys()))
    disp.plot(cmap=plt.cm.Blues, colorbar=True)
    plt.xticks(rotation=90)
    plt.title('Confusion Matrix')
    plt.show()


def plot_loss_curves(training_history: tf.keras.callbacks.History) -> None:
    """
    Plots two plots: training and validation accuracy and loss curves

    Parameters:
        training_history: object from model.fit() training history containing metrics accuracy and loss

    Function:
        plots training and validation accuracy and loss curves to evaluate model performance over 10 epochs
    """

    accuracy = training_history.history.get('accuracy', [])
    val_accuracy = training_history.history.get('val_accuracy', [])
    loss = training_history.history.get('loss', [])
    val_loss = training_history.history.get('val_loss', [])
    epochs = range(len(accuracy))

    # Plot training validation accuracy curve
    plt.figure(figsize=(8, 6))
    plt.plot(epochs, accuracy, 'bo', label='Training Accuracy')
    plt.plot(epochs, val_accuracy, 'b', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

    # Plot training validation loss curve
    plt.figure(figsize=(8, 6))
    plt.plot(epochs, loss, 'bo', label='Training Loss')
    plt.plot(epochs, val_loss, 'b', label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()


def plot_roc_curve(y_true, y_pred_probs, class_indices: dict) -> float:
    """
    Parameters:
        y_true: true class labels
        y_pred_probs: predicted probabilities for each class
        class_indices - dict: mapping of class labels to class names

    Function:
        plots the receiver operating characteristic (ROC) curve for each class and calculates the macro-averaged
        one vs rest (OvR) ROC AUC score

    Returns:
        macro averaged one vs rest ROC AUC score
    
    
    """

    # ROC AUC reference: https://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html
    # ROC curve and AUC for multi-class classification
    y_true_bin = label_binarize(y_true, classes=list(range(len(class_indices))))
    n_classes = y_true_bin.shape[1]

    fig, ax = plt.subplots(figsize=(8, 6))
    colors = cycle(["aqua", "darkorange", "cornflowerblue", "red", "purple", "green", "gold", "deeppink", "brown", "gray", "navy"])

    for i, color in zip(range(n_classes), colors):
        RocCurveDisplay.from_predictions(
            y_true_bin[:, i],
            y_pred_probs[:, i],
            name=f"Class {i}",
            color=color,
            ax=ax
        )

    # Macro average reference: https://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html
    # Macro average ROC AUC score using OvR strategy
    macro_roc_auc_ovo = roc_auc_score(y_true, 
                                      y_pred_probs, 
                                      multi_class="ovr", 
                                      average="macro")
    
    # ROC AUC plot reference: https://scikit-learn.org/1.1/auto_examples/model_selection/plot_roc.html
    # Plot ROC AUC curve
    ax.plot([0, 1], [0, 1], "k--", label="Chance Level (0.5)")
    ax.set(
        xlabel="False Positive Rate",
        ylabel="True Positive Rate",
        title="ROC Curve",
    )
    ax.legend(loc="lower right")
    plt.show()
    print(f"Macro-averaged One-vs-Rest ROC AUC score: {macro_roc_auc_ovo:.2f}")
    return macro_roc_auc_ovo

    
def evaluation_metrics(model: tf.keras.Model, generator, training_history: tf.keras.callbacks.History) -> dict:
    """
    Evaluates trained model and returns evaluation metrics

    Parameters:
        model: trained model
        generator: data generator for the evaluation set
        training_history: object from model.fit() training history containing metrics accuracy and loss

    Function:
        combines evaluation metrics (classification report, confusion matrix, training curves, and ROC curve)
        outputs key metrics: accuracy, precision, recall, and F1-score

    Returns: 
        dictionary containing key metrics:
            accuracy: model accuracy on the evaluation data
            precision: macro averaged precision score
            recall: macro averaged recall score
            f1_score: macro averaged F1 score
            classification_report_df: classification report as a dataframe

    Outputs:
        confusion matrix plot
        loss plots
        macro average ROC curve plot
        macro averaged one vs rest ROC AUC score
    """

    # Get true labels
    y_true = generator.classes
    
    # Generate predictions
    y_pred_probs = model.predict(generator)
    y_pred = np.argmax(y_pred_probs, axis=1)
    class_indices = generator.class_indices

    # Calculate accuracy
    accuracy = accuracy_score(y_true, y_pred)

    # Classification report
    report_df = create_classification_report(y_true, y_pred, class_indices)

    # Confusion matrix
    plot_confusion_matrix(y_true, y_pred, class_indices)
    
    # Loss curves
    plot_loss_curves(training_history)

    # ROC AUC OvR score
    macro_roc_auc_ovo = plot_roc_curve(y_true, y_pred_probs, class_indices)

    # Get metrics from the classification report
    precision = round(report_df.loc["macro avg", "precision"], 3)
    recall = round(report_df.loc["macro avg", "recall"], 3)
    f1_score = round(report_df.loc["macro avg", "f1-score"], 3)

    # Print key metrics
    print(f"Accuracy: {accuracy:.3f}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1_score}")

    return {
        "accuracy": round(accuracy, 3),
        "precision": precision,
        "recall": recall,
        "f1_score": f1_score,
        "classification_report_df": report_df
    }


def main():
    """
    Main function to run, train, and evaluate the 7 different models
    """

    # Store the results of the trained models
    results = {}

    # Train and evaluate the basic CNN model
    print("Training Basic CNN Model...")
    basic_cnn_model, basic_training_history = full_build_cnn(epoch_num=DEFAULT_EPOCHS)
    basic_test_loss, basic_test_acc = evaluate_model(basic_cnn_model, filename="pest_classifier_cnn_basic.h5")
    results["Basic_CNN"] = evaluation_metrics(basic_cnn_model, EVAL_VAL_GENERATOR, basic_training_history)

    # Train and evaluate each transfer learning model
    for model_name in MODEL_OPTIONS:
        print(f"\nTraining {model_name} Transfer Learning Model...")
        model, training_history = full_build_cnn(
            use_transfer_learning=True, 
            transfer_model_name=model_name, 
            epoch_num=DEFAULT_EPOCHS
        )
        filename = f"pest_classifier_cnn_{model_name.lower()}.h5"
        test_loss, test_acc = evaluate_model(model, filename=filename)
        results[model_name] = evaluation_metrics(model, EVAL_VAL_GENERATOR, training_history)

    print("Training and evaluation complete.")
    return results
# --------------------------------------------------------------------------
    # TEST CASE / EXPECTED RESULTS when this script is run:

        # For each model, output should contain classification report, confusion matrix, 
        # accuracy/loss curves, ROC curves, accuracy, precision, recall, and F1 score
        
        # Seven .h5 files pertaining to the 7 pretrained models

    
        # time completion: ~5-6 hours
    # --------------------------------------------------------------------------



if __name__ == "__main__":
    main()
