<a href="https://www.kaggle.com/code/jedike/final-version-of-thesis?scriptVersionId=224392516" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:

import os
import warnings
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras import layers, models, applications, callbacks, preprocessing, regularizers
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from imblearn.over_sampling import RandomOverSampler

# Suppress warnings and set plot style
warnings.filterwarnings("ignore")
plt.style.use('ggplot')

# ===================== Configuration =====================
GS_BASE_PATH = '/kaggle/input/goldenhar/CAPSTR~1'
HEALTHY_PATH = '/kaggle/input/child-dataset/child/Healthy'
IMG_SIZE = (256, 256)
BATCH_SIZE = 32
SEED = 42
EPOCHS = 40

SYMPTOM_CLASSES = [
    'Cleft-Lip-and-Palate',
    'Epibulbar dermoid tumor',
    'Eyelid coloboma',
    'Facial asymmetry',
    'Malocclusion',
    'Microtia',
    'Vertebral abnormality'
]
NUM_CLASSES = len(SYMPTOM_CLASSES)

# Create directories for saving plots if they don't exist
os.makedirs('training_curves', exist_ok=True)
os.makedirs('confusion_matrices', exist_ok=True)
os.makedirs('roc_curves', exist_ok=True)

# ===================== Data Visualization =====================
def plot_class_distribution(y, title, filename):
    plt.figure(figsize=(10, 6))
    sns.countplot(x=y)
    plt.title(f'Class Distribution - {title}')
    plt.xlabel('Class')
    plt.ylabel('Count')
    plt.savefig(f'class_distribution_{filename}.png')
    plt.close()

def plot_training_curves(history, model_name):
    plt.figure(figsize=(12, 6))
    
    # Accuracy plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title(f'{model_name} Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='lower right')
    
    # Loss plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title(f'{model_name} Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper right')
    
    plt.tight_layout()
    plt.savefig(f'training_curves/{model_name}.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred, classes, model_name):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=classes, yticklabels=classes)
    plt.title(f'Confusion Matrix - {model_name}')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig(f'confusion_matrices/{model_name}.png')
    plt.close()

def plot_roc_curve(y_true, y_score, model_name):
    fpr, tpr, _ = roc_curve(y_true, y_score)
    roc_auc = auc(fpr, tpr)
    
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2,
             label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curve - {model_name}')
    plt.legend(loc="lower right")
    plt.savefig(f'roc_curves/{model_name}.png')
    plt.close()

# ===================== Data Loading =====================
def load_dataset():
    gs_images, gs_labels = [], []
    for class_idx, class_name in enumerate(SYMPTOM_CLASSES):
        class_dir = os.path.join(GS_BASE_PATH, class_name)
        images = [os.path.join(class_dir, f) 
                 for f in os.listdir(class_dir)
                 if f.lower().endswith(('.png','.jpg','.jpeg'))]
        gs_images.extend(images)
        gs_labels.extend([class_idx] * len(images))
        print(f"Loaded {len(images)} {class_name} images")

    healthy_images = [os.path.join(HEALTHY_PATH, f) 
                     for f in os.listdir(HEALTHY_PATH)
                     if f.lower().endswith(('.png','.jpg','.jpeg'))]
    print(f"Loaded {len(healthy_images)} healthy images")
    
    return gs_images, gs_labels, healthy_images

# ===================== Data Augmentation =====================
def create_generators(X, y, model_type, is_training=False, is_test=False):
    # Convert labels to strings to satisfy flow_from_dataframe's requirement
    y = np.array(y).astype(str)
    df = pd.DataFrame({'filename': X, 'label': y})
    preprocess_fn = applications.efficientnet.preprocess_input

    if is_training:
        datagen = preprocessing.image.ImageDataGenerator(
            preprocessing_function=preprocess_fn,
            rotation_range=40,
            width_shift_range=0.3,
            height_shift_range=0.3,
            shear_range=0.2,
            zoom_range=0.3,
            brightness_range=[0.7, 1.3],
            horizontal_flip=True,
            vertical_flip=True,
            fill_mode='nearest',
            validation_split=0.2
        )
        subset = 'training'
    elif is_test:
        datagen = preprocessing.image.ImageDataGenerator(
            preprocessing_function=preprocess_fn
        )
        subset = None
    else:
        datagen = preprocessing.image.ImageDataGenerator(
            preprocessing_function=preprocess_fn,
            validation_split=0.2
        )
        subset = 'validation'

    return datagen.flow_from_dataframe(
        dataframe=df,
        x_col='filename',
        y_col='label',
        target_size=IMG_SIZE,
        batch_size=BATCH_SIZE,
        class_mode='binary' if model_type == 'binary' else 'sparse',
        subset=subset,
        seed=SEED,
        shuffle=is_training
    )

# ===================== Regularized Model Architecture =====================
def build_model(base_name, num_classes):
    base_models = {
        'EfficientNetB0': applications.EfficientNetB0,
        'ResNet50': applications.ResNet50,
        'DenseNet121': applications.DenseNet121
    }
    
    base_model = base_models[base_name](
        include_top=False,
        weights='imagenet',
        input_shape=IMG_SIZE + (3,)
    )
    
    # Freeze base model initially
    base_model.trainable = False
    
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu', 
                    kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.6),
        layers.Dense(128, activation='relu',
                    kernel_regularizer=regularizers.l2(0.001)),
        layers.Dropout(0.5),
        layers.Dense(1 if num_classes == 2 else num_classes, 
                    activation='sigmoid' if num_classes == 2 else 'softmax')
    ])
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
    model.compile(
        optimizer=optimizer,
        loss='binary_crossentropy' if num_classes == 2 else 'sparse_categorical_crossentropy',
        metrics=['accuracy', 
                tf.keras.metrics.AUC(name='auc'),
                tf.keras.metrics.Precision(name='precision'),
                tf.keras.metrics.Recall(name='recall')]
    )
    
    return model

# ===================== Training Callbacks =====================
def get_callbacks(model_name):
    return [
        callbacks.EarlyStopping(
            patience=10,
            monitor='val_loss',
            restore_best_weights=True
        ),
        callbacks.ModelCheckpoint(
            f"best_{model_name}.keras",
            save_best_only=True,
            monitor='val_loss'
        ),
        callbacks.ReduceLROnPlateau(
            factor=0.2,
            patience=5,
            min_lr=1e-7
        )
    ]

# ===================== Training Pipeline =====================
def train_pipeline():
    gs_images, gs_labels, healthy_images = load_dataset()
    
    # Plot initial class distribution for binary classification (GS vs Healthy)
    plot_class_distribution(
        np.array([1]*len(gs_images) + [0]*len(healthy_images)),
        'Full Dataset (GS vs Healthy)', 'full_dataset'
    )
    
    # ----- Binary Classification Setup -----
    X = np.array(gs_images + healthy_images)
    y = np.array([1]*len(gs_images) + [0]*len(healthy_images))
    
    # Split data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y, random_state=SEED
    )
    
    # Calculate class weights for imbalanced data
    class_counts = np.bincount(y_train)
    class_weights = {0: 1/class_counts[0], 1: 1/class_counts[1]}
    
    print("\n=== Training GS Detection Models ===")
    models_list = ['EfficientNetB0', 'ResNet50', 'DenseNet121']
    
    for model_name in models_list:
        print(f"\nTraining {model_name} for GS detection...")
        
        # Create training and validation generators for binary classification
        train_gen = create_generators(X_train, y_train, 'binary', is_training=True)
        val_gen = create_generators(X_train, y_train, 'binary')
        
        # Build and train initial model
        model = build_model(model_name, 2)
        history = model.fit(
            train_gen,
            validation_data=val_gen,
            epochs=EPOCHS,
            class_weight=class_weights,
            callbacks=get_callbacks(f"gs_{model_name}"),
            verbose=2
        )
        plot_training_curves(history, f'gs_{model_name}_phase1')
        
        # Fine-tuning: unfreeze part of the base model
        model.layers[0].trainable = True
        for layer in model.layers[0].layers[:-10]:
            layer.trainable = False
            
        model.compile(
            optimizer=tf.keras.optimizers.Adam(1e-6),
            loss=model.loss,
            metrics=['accuracy',
                    tf.keras.metrics.AUC(name='auc'),
                    tf.keras.metrics.Precision(name='precision'),
                    tf.keras.metrics.Recall(name='recall')]
        )
        
        history_fine = model.fit(
            train_gen,
            validation_data=val_gen,
            epochs=EPOCHS + 20,
            initial_epoch=history.epoch[-1],
            class_weight=class_weights,
            callbacks=get_callbacks(f"gs_fine_{model_name}"),
            verbose=2
        )
        plot_training_curves(history_fine, f'gs_{model_name}_phase2')
        
        # Evaluate GS detection model on test set
        test_gen = create_generators(X_test, y_test, 'binary', is_test=True)
        y_pred = model.predict(test_gen)
        y_pred_class = (y_pred > 0.5).astype(int)
        
        # Generate evaluation plots
        plot_confusion_matrix(y_test, y_pred_class, ['Healthy', 'GS'], f'gs_{model_name}')
        plot_roc_curve(y_test, y_pred, f'gs_{model_name}')
        
        print(f"\n{model_name} GS Detection Classification Report:")
        print(classification_report(y_test, y_pred_class, target_names=['Healthy', 'GS']))
    
    # ----- Symptom Classification Setup -----
    print("\n=== Training Symptom Classifier ===")
    sampler = RandomOverSampler()
    X_res, y_res = sampler.fit_resample(np.array(gs_images).reshape(-1, 1), gs_labels)
    plot_class_distribution(y_res, 'After Oversampling', 'symptoms_oversampled')
    
    for model_name in models_list:
        print(f"\nTraining {model_name} for symptom classification...")
        train_gen = create_generators(X_res.flatten(), y_res, 'symptom', is_training=True)
        val_gen = create_generators(X_res.flatten(), y_res, 'symptom')
        
        model = build_model(model_name, NUM_CLASSES)
        history = model.fit(
            train_gen,
            validation_data=val_gen,
            epochs=EPOCHS,
            callbacks=get_callbacks(f"symptom_{model_name}"),
            verbose=2
        )
        plot_training_curves(history, f'symptom_{model_name}')
        
        # Evaluate on a test split for symptom classification
        _, X_test_sym, _, y_test_sym = train_test_split(
            gs_images, gs_labels, test_size=0.2, stratify=gs_labels, random_state=SEED
        )
        test_gen = create_generators(X_test_sym, y_test_sym, 'symptom', is_test=True)
        y_pred_sym = np.argmax(model.predict(test_gen), axis=1)
        
        # Generate evaluation plots for symptom classifier
        plot_confusion_matrix(y_test_sym, y_pred_sym, SYMPTOM_CLASSES, f'symptom_{model_name}')
        
        print(f"\n{model_name} Symptom Classification Report:")
        print(classification_report(y_test_sym, y_pred_sym, target_names=SYMPTOM_CLASSES))

# ===================== Model Evaluation Helper Functions =====================
def evaluate_gs_model(model_path, X_test, y_test):
    model = tf.keras.models.load_model(model_path)
    test_gen = create_generators(X_test, y_test, 'binary', is_test=True)
    y_pred = model.predict(test_gen)
    y_pred_class = (y_pred > 0.5).astype(int).flatten()
    acc = accuracy_score(y_test, y_pred_class)
    prec = precision_score(y_test, y_pred_class)
    rec = recall_score(y_test, y_pred_class)
    f1 = f1_score(y_test, y_pred_class)
    auc_val = roc_auc_score(y_test, y_pred)
    return acc, prec, rec, f1, auc_val

def evaluate_symptom_model(model_path, X_test_sym, y_test_sym):
    model = tf.keras.models.load_model(model_path)
    test_gen = create_generators(X_test_sym, y_test_sym, 'symptom', is_test=True)
    y_pred = model.predict(test_gen)
    y_pred_class = np.argmax(y_pred, axis=1)
    report = classification_report(y_test_sym, y_pred_class, target_names=SYMPTOM_CLASSES, output_dict=True)
    return report

# ===================== Model Comparison =====================
def compare_models():
    # Prepare test sets for evaluation
    gs_images, gs_labels, healthy_images = load_dataset()
    X = np.array(gs_images + healthy_images)
    y = np.array([1]*len(gs_images) + [0]*len(healthy_images))
    _, X_test, _, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=SEED)
    
    # For symptom classification
    _, X_test_sym, _, y_test_sym = train_test_split(gs_images, gs_labels, test_size=0.2, stratify=gs_labels, random_state=SEED)
    
    gs_model_names = ['EfficientNetB0', 'ResNet50', 'DenseNet121']
    gs_results = []
    
    for model_name in gs_model_names:
        model_file = f"best_gs_fine_{model_name}.keras"
        if os.path.exists(model_file):
            acc, prec, rec, f1, auc_val = evaluate_gs_model(model_file, X_test, y_test)
            gs_results.append({
                "Model": model_name,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1-Score": f1,
                "AUC": auc_val
            })
        else:
            print(f"Model file {model_file} not found.")
    
    gs_results_df = pd.DataFrame(gs_results)
    print("=== GS Detection Models Comparison ===")
    print(gs_results_df)
    
    # Evaluate symptom classification models
    symptom_results = {}
    for model_name in gs_model_names:
        model_file = f"best_symptom_{model_name}.keras"
        if os.path.exists(model_file):
            report = evaluate_symptom_model(model_file, X_test_sym, y_test_sym)
            symptom_results[model_name] = report
        else:
            print(f"Model file {model_file} not found.")
    
    # Summarize overall accuracy and weighted F1-score for symptom classification
    symptom_summary = []
    for model_name, report in symptom_results.items():
        acc = report.get('accuracy', None)
        weighted_f1 = report.get('weighted avg', {}).get('f1-score', None)
        symptom_summary.append({
            "Model": model_name,
            "Accuracy": acc,
            "Weighted F1-Score": weighted_f1
        })
    
    symptom_summary_df = pd.DataFrame(symptom_summary)
    print("\n=== Symptom Classification Models Comparison ===")
    print(symptom_summary_df)

# ===================== Main Execution =====================
if __name__ == "__main__":
    # Run training pipeline
    train_pipeline()
    
    # After training, compare the results of the 3 models
    compare_models()


Loaded 242 Cleft-Lip-and-Palate images
Loaded 43 Epibulbar dermoid tumor images
Loaded 20 Eyelid coloboma images
Loaded 167 Facial asymmetry images
Loaded 16 Malocclusion images
Loaded 97 Microtia images
Loaded 44 Vertebral abnormality images
Loaded 2168 healthy images

=== Training GS Detection Models ===

Training EfficientNetB0 for GS detection...
Found 1790 validated image filenames belonging to 2 classes.
Found 447 validated image filenames belonging to 2 classes.
Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
[1m16705208/16705208[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Epoch 1/40
56/56 - 88s - 2s/step - accuracy: 0.7190 - auc: 0.5231 - loss: 0.5921 - precision: 0.3135 - recall: 0.1932 - val_accuracy: 0.8009 - val_auc: 0.6571 - val_loss: 1.1545 - val_precision: 0.8571 - val_recall: 0.0638 - learning_rate: 1.0000e-05
Epoch 2/40
56/56 - 34s - 608ms/step - accuracy: 0.7257 - auc: 0.5509 - loss: 0.5793 - precisio

InvalidArgumentError: Graph execution error:

Detected at node LogicalAnd defined at (most recent call last):
<stack traces unavailable>
Incompatible shapes: [1,32] vs. [1,224]
	 [[{{node LogicalAnd}}]]
	tf2xla conversion failed while converting __inference_one_step_on_data_563796[]. Run with TF_DUMP_GRAPH_PREFIX=/path/to/dump/dir and --vmodule=xla_compiler=2 to obtain a dump of the compiled functions.
	 [[StatefulPartitionedCall]] [Op:__inference_one_step_on_iterator_564531]