<a href="https://colab.research.google.com/github/youness-marrakchi/tmr-vis-v5/blob/main/tumorDetector_v5_test2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow==2.15.0
!pip install tensorflow-addons==0.23.0  # For AdamW
!pip install keras==2.15.0
!pip install kaggle kagglehub
!pip install lime grad-cam nibabel
!pip install scikit-learn plotly seaborn opencv-python matplotlib pandas
!pip install monai  # For 3D reconstruction

[0mCollecting lime
  Using cached lime-0.2.0.1-py3-none-any.whl
Collecting grad-cam
  Using cached grad_cam-1.5.4-py3-none-any.whl
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.7.1->grad-cam)
  Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cusolver-cu12==11.6.1.9 (from torch>=1.7.1->grad-cam)
  Using cached nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl (664.8 MB)
Using cached nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_x86_64.whl (127.9 MB)
[0mInstalling collected packages: nvidia-cudnn-cu12, nvidia-cusolver-cu12, lime, grad-cam
  Attempting uninstall: nvidia-cusolver-cu12
    Found existing installation: nvidia-cusolver-cu12 11.6.3.83
    Uninstalling nvidia-cusolver-cu12-11.6.3.83:
      Successfully uninstalled nvidia-cusolver-cu12-11.6.3.83
Successfully installed grad-cam-1.5.4 lime-0.2.0.1 n

In [None]:
import os
import shutil
import glob
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import plotly.graph_objects as go
import monai
from monai.transforms import LoadImage, Orientation, Spacing, ResizeWithPadOrCrop
import kagglehub
from tqdm import tqdm

# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
# Configuration
class Config:
    IMG_SIZE = (224, 224)   # Uniform input resolution
    BATCH_SIZE = 32
    NUM_CLASSES = 4

    INITIAL_LR = 0.001
    WEIGHT_DECAY = 1e-4
    EPOCHS = 10  # total target epochs (50 or more would yield better results, but i'm limited with the hardware)

    DROPOUT_RATE = 0.5

    PROJECT_PATH = "/content/brain_tumor_classification"
    DATA_PATH = os.path.join(PROJECT_PATH, "data")
    MODEL_PATH = os.path.join(PROJECT_PATH, "models")
    RESULTS_PATH = os.path.join(PROJECT_PATH, "results")

    @classmethod
    def create_directories(cls):
        for path in [cls.PROJECT_PATH, cls.DATA_PATH, cls.MODEL_PATH, cls.RESULTS_PATH]:
            os.makedirs(path, exist_ok=True)

In [None]:
# Dataset Download & Setup
class DataSetup:
    @staticmethod
    def setup_dataset():
        print("Downloading dataset using kagglehub...")
        kaggle_path = kagglehub.dataset_download("masoudnickparvar/brain-tumor-mri-dataset")
        kaggle_data_path = os.path.join("/root/.cache/kagglehub/datasets/masoudnickparvar/brain-tumor-mri-dataset/versions/1")
        project_data_path = Config.DATA_PATH

        print("\nVerifying dataset structure...")
        expected_dirs = ['Training', 'Testing']
        for d in expected_dirs:
            full_path = os.path.join(kaggle_data_path, d)
            if not os.path.exists(full_path):
                print(f"ERROR: Expected directory '{d}' not found in Kaggle dataset.")
                return None

        # Create folder structure and copy images by class
        for split in expected_dirs:
            split_path = os.path.join(project_data_path, split)
            os.makedirs(split_path, exist_ok=True)
            for class_name in ['glioma', 'meningioma', 'notumor', 'pituitary']:
                class_path = os.path.join(split_path, class_name)
                os.makedirs(class_path, exist_ok=True)
                src_dir = os.path.join(kaggle_data_path, split, class_name)
                if os.path.exists(src_dir):
                    for file in glob.glob(os.path.join(src_dir, '*')):
                        shutil.copy2(file, class_path)
                else:
                    print(f"WARNING: Source directory {src_dir} not found.")
        DataSetup.verify_dataset(project_data_path)
        return project_data_path

    @staticmethod
    def verify_dataset(data_path):
        print("\nDataset Statistics:")
        for split in ['Training', 'Testing']:
            print(f"\n{split} Set:")
            total = 0
            for class_name in ['glioma', 'meningioma', 'notumor', 'pituitary']:
                class_path = os.path.join(data_path, split, class_name)
                count = len(os.listdir(class_path)) if os.path.exists(class_path) else 0
                total += count
                print(f"  - {class_name}: {count} images")
            print(f"Total {split} images: {total}")

In [None]:
# Data Pipeline using tf.data (with caching & prefetching)
def get_rescale_layer():
    return tf.keras.layers.Rescaling(1./255)

# Use Keras built-in augmentation layers (efficient and can run on GPU)
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomRotation(0.2),
    tf.keras.layers.RandomTranslation(0.2, 0.2),
    tf.keras.layers.RandomZoom(0.3),
    tf.keras.layers.RandomFlip("horizontal")
])

def get_dataset(data_dir, subset=None, shuffle=True):
    if subset in ['training', 'validation']:
        ds = tf.keras.preprocessing.image_dataset_from_directory(
            data_dir,
            labels="inferred",
            label_mode="categorical",
            image_size=Config.IMG_SIZE,
            batch_size=Config.BATCH_SIZE,
            shuffle=shuffle,
            seed=42,
            validation_split=0.2,
            subset=subset
        )
    else:
        ds = tf.keras.preprocessing.image_dataset_from_directory(
            data_dir,
            labels="inferred",
            label_mode="categorical",
            image_size=Config.IMG_SIZE,
            batch_size=Config.BATCH_SIZE,
            shuffle=False
        )
    # Apply rescaling
    rescale = get_rescale_layer()
    ds = ds.map(lambda x, y: (rescale(x), y), num_parallel_calls=tf.data.AUTOTUNE)
    return ds.cache().prefetch(tf.data.AUTOTUNE)

class DataPipeline:
    def __init__(self, data_path):
        training_dir = os.path.join(data_path, 'Training')
        testing_dir  = os.path.join(data_path, 'Testing')
        self.train_ds = get_dataset(training_dir, subset='training', shuffle=True)
        self.val_ds   = get_dataset(training_dir, subset='validation', shuffle=False)
        self.test_ds  = get_dataset(testing_dir, subset=None, shuffle=False)

        # Augmentation for training data only
        self.train_ds = self.train_ds.map(
            lambda x, y: (data_augmentation(x, training=True), y),
            num_parallel_calls=tf.data.AUTOTUNE
        )


In [None]:
# Using ResNet50V2 as the Main Arch
class ModelBuilder:
    @staticmethod
    def add_regularization(model, weight_decay=Config.WEIGHT_DECAY):
        if not isinstance(weight_decay, float) or weight_decay <= 0.:
            return model
        for layer in model.layers:
            if isinstance(layer, tf.keras.layers.Conv2D) or isinstance(layer, tf.keras.layers.Dense):
                layer.add_loss(lambda: tf.keras.regularizers.l2(weight_decay)(layer.kernel))
                if hasattr(layer, 'bias') and layer.use_bias:
                    layer.add_loss(lambda: tf.keras.regularizers.l2(weight_decay)(layer.bias))
        return model

    @staticmethod
    def build_model(architecture, input_shape=(224, 224, 3), num_classes=Config.NUM_CLASSES):
        if architecture == 'resnet':
            base_model = tf.keras.applications.ResNet50V2(
                weights='imagenet', include_top=False, input_shape=input_shape
            )
        elif architecture == 'mobilenet':
            base_model = tf.keras.applications.MobileNetV3Large(
                weights='imagenet', include_top=False, input_shape=input_shape
            )
        else:
            raise ValueError("Unsupported architecture. Choose 'resnet' or 'mobilenet'.")
        x = base_model.output
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
        x = tf.keras.layers.LayerNormalization()(x)
        x = tf.keras.layers.Dense(1024, activation='relu')(x)
        x = tf.keras.layers.Dropout(Config.DROPOUT_RATE)(x)
        x = tf.keras.layers.LayerNormalization()(x)
        predictions = tf.keras.layers.Dense(Config.NUM_CLASSES, activation='softmax')(x)
        model = tf.keras.Model(inputs=base_model.input, outputs=predictions)
        return ModelBuilder.add_regularization(model)

# Training Manager (with incremental training)
class TrainingManager:
    def __init__(self, model, train_ds, val_ds, test_ds):
        self.model = model
        self.train_ds = train_ds
        self.val_ds = val_ds
        self.test_ds = test_ds

    def compile_model(self):
        optimizer = tfa.optimizers.AdamW(
            learning_rate=Config.INITIAL_LR,
            weight_decay=Config.WEIGHT_DECAY
        )
        self.model.compile(
            optimizer=optimizer,
            loss='categorical_crossentropy',
            metrics=[
                'accuracy',
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
                tf.keras.metrics.AUC()
            ]
        )

    def get_callbacks(self, model_name):
        return [
            tf.keras.callbacks.ModelCheckpoint(
                os.path.join(Config.MODEL_PATH, f'best_{model_name}.h5'),
                monitor='val_accuracy',
                save_best_only=True,
                mode='max'
            ),
            tf.keras.callbacks.ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=3,
                min_lr=1e-6,
                verbose=1
            ),
            tf.keras.callbacks.EarlyStopping(
                monitor='val_loss',
                patience=10,
                restore_best_weights=True,
                verbose=1
            )
        ]

    def train_incrementally(self, model_name, total_epochs=10, increment=2):
        # If training is resumed, the model should already be compiled.
        current_epoch = 0
        checkpoint_path = os.path.join(Config.MODEL_PATH, f'{model_name}_checkpoint.h5')
        while current_epoch < total_epochs:
            print(f"Training from epoch {current_epoch} to {current_epoch + increment}...")
            history = self.model.fit(
                self.train_ds,
                validation_data=self.val_ds,
                epochs=current_epoch + increment,
                initial_epoch=current_epoch,
                callbacks=self.get_callbacks(model_name)
            )
            current_epoch += increment
            # saving the model (including optimizer state) to resume later if needed
            self.model.save(checkpoint_path)
            print(f"Checkpoint saved at epoch {current_epoch}")
        return history

In [None]:
# Visualization Manager (for plotting training history)
class VisualizationManager:
    def __init__(self):
        self.results_path = Config.RESULTS_PATH

    def save_plot(self, fig, filename):
        plt.savefig(os.path.join(self.results_path, filename))
        plt.close()

    def plot_training_history(self, history, model_name):
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
        ax1.plot(history.history['accuracy'], marker='o')
        ax1.plot(history.history['val_accuracy'], marker='o')
        ax1.set_title(f'{model_name} - Accuracy')
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Accuracy')
        ax1.legend(['Train', 'Validation'])
        ax2.plot(history.history['loss'], marker='o')
        ax2.plot(history.history['val_loss'], marker='o')
        ax2.set_title(f'{model_name} - Loss')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Loss')
        ax2.legend(['Train', 'Validation'])
        plt.tight_layout()
        self.save_plot(fig, f'{model_name}_training_history.png')


In [None]:
# Testing Function (Upload an image and predict its class)
def test_model(model):
    from google.colab import files
    uploaded = files.upload()
    for filename in uploaded.keys():
        print("Processing image:", filename)
        img = tf.keras.preprocessing.image.load_img(filename, target_size=Config.IMG_SIZE)
        img_array = tf.keras.preprocessing.image.img_to_array(img)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = img_array / 255.0  # apply rescaling
        predictions = model.predict(img_array)
        predicted_class = np.argmax(predictions, axis=1)
        class_names = sorted(["glioma", "meningioma", "notumor", "pituitary"])
        print("Predicted class index:", predicted_class[0])
        print("Predicted class name:", class_names[predicted_class[0]])


In [None]:
# ======================================================
# Main Execution
# ======================================================
def main():
    # Create directories
    Config.create_directories()

    # Download and prepare dataset
    data_path = DataSetup.setup_dataset()
    if not data_path:
        return

    # Build tf.data pipelines
    data_pipeline = DataPipeline(data_path)
    train_ds, val_ds, test_ds = data_pipeline.train_ds, data_pipeline.val_ds, data_pipeline.test_ds

    # Build model
    model_name = 'resnet'
    checkpoint_path = os.path.join(Config.MODEL_PATH, f'{model_name}_checkpoint.h5')
    if os.path.exists(checkpoint_path):
        model = tf.keras.models.load_model(checkpoint_path, compile=False)
        print("Loaded checkpoint from", checkpoint_path)
        # Recompile model after loading
        optimizer = tfa.optimizers.AdamW(learning_rate=Config.INITIAL_LR, weight_decay=Config.WEIGHT_DECAY)
        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    else:
        model = ModelBuilder.build_model(model_name)
        optimizer = tfa.optimizers.AdamW(learning_rate=Config.INITIAL_LR, weight_decay=Config.WEIGHT_DECAY)
        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        print("Created new model.")

    # Initialize Training Manager and train incrementally (2 or 1 epochs at a time until 10 epochs are reached)
    # going with such small epochs because of my potato laptop
    trainer = TrainingManager(model, train_ds, val_ds, test_ds)
    trainer.compile_model()  # Ensure model is compiled
    history = trainer.train_incrementally(model_name, total_epochs=Config.EPOCHS, increment=1)

    # Evaluate model on test set
    test_loss, test_acc = model.evaluate(test_ds)
    print("Test Accuracy:", test_acc)

    # Plot training history
    vis_manager = VisualizationManager()
    vis_manager.plot_training_history(history, model_name)

    # Allow user to upload a scan and test the model
    print("Upload an image to test the model:")
    test_model(model)

if __name__ == "__main__":
    main()
