<a href="https://colab.research.google.com/github/tatendatobaiwa/cnn/blob/main/FINAL_CNN_PROJECT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ***SIMPLE CNN MODEL BUILD FOR LUNG CANCER IMAGING***

**Step 1: Mount Drive**

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Step 2: Create Directories in Drive**


**Step 3: Install Dependenices**

In [4]:
!pip install -q tensorflow==2.13.0
!pip install -q tensorflow-addons==0.23.0
!pip install albumentations==1.3.0
!pip install numpy==1.24.3



In [5]:
import tensorflow as tf
import cv2
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
import os
import sys
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

project_path = '/content/drive/MyDrive/CNN_Medical_Imaging_Project'
sys.path.append(project_path)

**Step 4: data_processing.py: Load and Preprocess Data**

In [6]:
import cv2
import numpy as np
from pathlib import Path
import albumentations as A
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.utils import Sequence

class AlbumentationsSequence(Sequence):
    """Custom sequence for batching and augmenting images."""
    def __init__(self, images, labels, batch_size, transform):
        self.images = images  # Array of uint8 grayscale images
        self.labels = labels  # Array of integer labels or None for test data
        self.batch_size = batch_size
        self.transform = transform

    def __len__(self):
        return int(np.ceil(len(self.images) / self.batch_size))

    def __getitem__(self, idx):
        start = idx * self.batch_size
        end = min((idx + 1) * self.batch_size, len(self.images))
        batch_images = self.images[start:end]
        # Convert each grayscale image to 3-channel RGB
        batch_images_rgb = np.stack([np.stack((img,)*3, axis=-1) for img in batch_images], axis=0)
        augmented = [self.transform(image=img)['image'] for img in batch_images_rgb]
        augmented = np.array(augmented)
        if self.labels is not None:
            batch_labels = self.labels[start:end]
            return augmented, batch_labels
        return augmented

class DataProcessor:
    """Handles loading and preprocessing of image data."""
    def __init__(self, target_size=(224, 224)):
        self.target_size = target_size
        self.class_names = ['benign', 'malignant', 'normal']

    def load_images(self, data_dir):
        """Loads images and labels from a directory with class subfolders."""
        data_dir = Path(data_dir)
        images = []
        labels = []
        print(f"Loading images from {data_dir}")
        for label, class_name in enumerate(self.class_names):
            class_dir = data_dir / class_name
            if not class_dir.exists():
                print(f"Warning: Directory {class_dir} does not exist.")
                continue
            # Use explicit patterns for jpg, png, and jpeg files
            img_paths = list(class_dir.glob('*.jpg')) + list(class_dir.glob('*.png')) + list(class_dir.glob('*.jpeg'))
            print(f"Found {len(img_paths)} images in {class_name}")
            for img_path in img_paths:
                img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE)
                if img is not None:
                    img = cv2.resize(img, self.target_size)
                    images.append(img)
                    labels.append(label)
        if not images:
            raise ValueError(f"No images loaded from {data_dir}")
        images = np.array(images)
        labels = np.array(labels, dtype=np.int32)
        print(f"Total images loaded: {len(images)}")
        return images, labels, self.class_names

    def load_test_images(self, test_dir):
        """Loads unlabeled test images from a flat directory."""
        test_dir = Path(test_dir)
        images = []
        img_paths = list(test_dir.glob('*.jpg')) + list(test_dir.glob('*.png')) + list(test_dir.glob('*.jpeg'))
        print(f"Found {len(img_paths)} images in {test_dir}")
        for img_path in img_paths:
            img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE)
            if img is not None:
                img = cv2.resize(img, self.target_size)
                images.append(img)
        if not images:
            print(f"No test images loaded from {test_dir}")
        else:
            print(f"Total test images loaded: {len(images)}")
        return np.array(images)

    def create_generators(self, X_train, y_train, X_val, y_val, batch_size=32):
        """Creates training and validation generators with augmentation."""
        train_transform = A.Compose([
            A.Rotate(limit=20, p=0.5),
            A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=0, p=0.5),
            A.HorizontalFlip(p=0.5),
            A.ElasticTransform(alpha=34, sigma=4, p=0.3),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
            # Apply EfficientNet preprocessing
            A.Lambda(image=lambda img, **kwargs: preprocess_input(img))
        ])
        val_transform = A.Compose([
            A.Lambda(image=lambda img, **kwargs: preprocess_input(img))
        ])
        train_gen = AlbumentationsSequence(X_train, y_train, batch_size, train_transform)
        val_gen = AlbumentationsSequence(X_val, y_val, batch_size, val_transform)
        return train_gen, val_gen

    def create_test_generator(self, X_test, batch_size=32):
        """Creates a test generator for prediction (no labels)."""
        test_transform = A.Compose([
            A.Lambda(image=lambda img, **kwargs: preprocess_input(img))
        ])
        return AlbumentationsSequence(X_test, None, batch_size, test_transform)

if __name__ == "__main__":
    processor = DataProcessor()
    X_train, y_train, class_names = processor.load_images('/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/train')
    X_test = processor.load_test_images('/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/test')

Loading images from /content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/train
Found 1200 images in benign
Found 1201 images in malignant
Found 1208 images in normal
Total images loaded: 3609
Found 75 images in /content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/test
Total test images loaded: 75


**model_mobilenet.py : Model Creation Script**

In [7]:
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB0

def create_efficientnet_model(input_shape=(224, 224, 3), num_classes=3):
    """Creates an EfficientNetB0-based model and returns both the full model and base model."""
    base_model = EfficientNetB0(
        input_shape=input_shape,
        include_top=False,
        weights='imagenet'
    )
    base_model.trainable = False  # Freeze the base model initially

    x = base_model.output
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    model = tf.keras.models.Model(inputs=base_model.input, outputs=outputs)
    return model, base_model  # Return both for fine-tuning access

if __name__ == "__main__":
    model, base_model = create_efficientnet_model()
    model.summary()

Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 rescaling (Rescaling)       (None, 224, 224, 3)          0         ['input_1[0][0]']             
                                                                                                  
 normalization (Normalizati  (None, 224, 224, 3)          7         ['rescaling[0][0]']           
 on)                                                                                              
                                                                                                 

**Step 5: train.py: Model Training**

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
import numpy as np
import os
from tensorflow.keras.models import load_model
from data_processing import DataProcessor
from model_efficientB0net import create_efficientnet_model

def train_model(train_gen, val_gen, class_names, epochs=75):
    """
    Trains the model, loading the best model from 'best_model.h5' if it exists,
    otherwise creates and compiles a new model.

    Args:
        train_gen: Training data generator
        val_gen: Validation data generator
        class_names: List of class names
        epochs: Number of epochs to train (default: 75)

    Returns:
        model: Trained model
        history: Training history
    """
    # Convert labels to NumPy array of integers if not already
    if not isinstance(train_gen.labels, np.ndarray):
        train_gen.labels = np.array(train_gen.labels)
    train_gen.labels = train_gen.labels.astype(np.int32)

    # Compute class weights to handle imbalanced data
    classes = np.unique(train_gen.labels)
    print("Classes:", classes)
    if len(classes) == 0:
        raise ValueError("No unique classes found in train_gen.labels.")
    class_weights = compute_class_weight('balanced', classes=classes, y=train_gen.labels)
    class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}
    print(f"Class weights: {class_weights_dict}")

    # Check if a saved model exists from a previous run
    if os.path.exists('best_model.h5'):
        model = load_model('best_model.h5')
        print("Loaded existing model from 'best_model.h5'")
    else:
        # Create and compile a new model if no saved model exists
        model, base_model = create_efficientnet_model(num_classes=len(class_names))
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        print("Created and compiled new model")

    # Define callbacks for training
    early_stop = EarlyStopping(
        monitor='val_loss',
        patience=15,
        restore_best_weights=True
    )
    checkpoint = ModelCheckpoint(
        'best_model.h5',
        monitor='val_loss',
        save_best_only=True
    )
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=3,
        min_lr=1e-6
    )

    # Train the model
    history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=epochs,
        callbacks=[early_stop, checkpoint, reduce_lr],
        class_weight=class_weights_dict
    )

    return model, history

if __name__ == "__main__":
    # Initialize data processor
    processor = DataProcessor()

    # Load training data
    X_train_full, y_train_full, class_names = processor.load_images(
        '/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/train'
    )

    # Split data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full,
        y_train_full,
        test_size=0.2,
        stratify=y_train_full,
        random_state=42
    )

    # Create data generators
    train_gen, val_gen = processor.create_generators(X_train, y_train, X_val, y_val)

    # Train the model, loading the best model if available
    model, history = train_model(train_gen, val_gen, class_names)

    # Load and predict on test data (unlabeled)
    test_dir = '/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/test'
X_test = processor.load_test_images(test_dir)
if len(X_test) > 0:
    test_gen = processor.create_test_generator(X_test)
    y_pred_probs = model.predict(test_gen, verbose=1)
    y_pred_classes = np.argmax(y_pred_probs, axis=1)
    # Get sorted list of image file names from the test directory
    file_names = sorted([f for f in os.listdir(test_dir) if f.endswith(('.jpg', '.jpeg', '.png'))])
    print("\nPredicted classes for test images:")
    for i, pred_class in enumerate(y_pred_classes):
        print(f"Test image {file_names[i]}: Predicted class = {class_names[pred_class]}")
    with open('/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/test_predictions.txt', 'w') as f:
        for i, pred_class in enumerate(y_pred_classes):
            f.write(f"Test image {file_names[i]}: {class_names[pred_class]}\n")
    print("Predictions saved to 'test_predictions.txt'")
else:
    print("No test data available. Please populate the test directory.")


Loading images from /content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/train
Found 1200 images in benign
Found 1201 images in malignant
Found 1208 images in normal
Total images loaded: 3609




Classes: [0 1 2]
Class weights: {0: 1.0024305555555555, 1: 1.0013874436351022, 2: 0.9962042788129745}
Loaded existing model from 'best_model.h5'
Epoch 1/75

  saving_api.save_model(


Epoch 2/75
Epoch 3/75
Epoch 4/75
Epoch 5/75
Epoch 6/75
Epoch 7/75
Epoch 8/75
Epoch 9/75
Epoch 10/75
Epoch 11/75
Epoch 12/75

**Step 6: evaluate.py: Model Evaluation**

In [None]:
import numpy as np
from tensorflow.keras.models import load_model
from data_processing import DataProcessor

def predict_unlabeled(model, X_test, class_names):
    """Predicts classes for unlabeled test images."""
    processor = DataProcessor()
    test_gen = processor.create_test_generator(X_test)
    y_pred_probs = model.predict(test_gen, verbose=1)
    y_pred_classes = np.argmax(y_pred_probs, axis=1)
    print("\nPredicted classes for test images:")
    for i, pred_class in enumerate(y_pred_classes):
        print(f"Test image {i}: Predicted class = {class_names[pred_class]}")
    return y_pred_classes

if __name__ == "__main__":
    processor = DataProcessor()
    X_test = processor.load_test_images('/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/test')
    model = load_model('best_model.h5')
    if len(X_test) > 0:
        predictions = predict_unlabeled(model, X_test, processor.class_names)
    else:
        print("No test data loaded for prediction.")

**Step 7: xai.py: Implementation of Explainable AI (Grad-CAM)**

In [None]:
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications.efficientnet import preprocess_input
import matplotlib.pyplot as plt
from pathlib import Path
import os

def preprocess_image(image_path):
    """Load and preprocess the image for the model."""
    img = cv2.imread(image_path, cv2.IMREAD_COLOR)  # Load in color
    img = cv2.resize(img, (224, 224))  # Resize to model's input size
    img = preprocess_input(img)  # Use EfficientNet's preprocessing
    return img[np.newaxis, ...]  # Explicitly add batch dimension

def get_prediction(model, image_path, class_names):
    """Get the model's prediction and probability."""
    img = preprocess_image(image_path)  # Already has batch dimension
    pred_probs = model.predict(img, verbose=0)
    pred_class_idx = np.argmax(pred_probs)
    return class_names[pred_class_idx], pred_probs[0][pred_class_idx]

def get_gradcam_heatmap(model, image_path, layer_name):
    """Generate a Grad-CAM heatmap for the given image and layer."""
    img = preprocess_image(image_path)  # Batch dimension included
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(layer_name).output, model.output]
    )
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img)
        loss = predictions[:, np.argmax(predictions[0])]  # Use the predicted class
    grads = tape.gradient(loss, conv_outputs)[0]
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    conv_outputs = conv_outputs[0]
    heatmap = tf.reduce_mean(tf.multiply(conv_outputs, pooled_grads), axis=-1)
    # Prevent division by zero
    max_heat = np.max(heatmap) if np.max(heatmap) != 0 else 1e-10
    heatmap = np.maximum(heatmap, 0) / max_heat
    heatmap = cv2.resize(heatmap.numpy(), (224, 224))
    return heatmap

def superimpose_heatmap(image_path, heatmap):
    """Overlay the heatmap on the original image."""
    img = cv2.imread(image_path, cv2.IMREAD_COLOR)
    img = cv2.resize(img, (224, 224))
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    superimposed_img = cv2.addWeighted(img, 0.6, heatmap, 0.4, 0.0)
    return superimposed_img

def demo_predictions_with_explanations(model, test_dir, class_names, gradcam_layer='top_conv', save_dir=None):
    """Displays or saves each test image with its Grad-CAM overlay and prediction details."""
    test_dir = Path(test_dir)
    img_paths = list(test_dir.glob('*.jpg')) + list(test_dir.glob('*.png')) + list(test_dir.glob('*.jpeg'))

    for img_path in img_paths:
        pred_class, prob = get_prediction(model, str(img_path), class_names)
        heatmap = get_gradcam_heatmap(model, str(img_path), gradcam_layer)
        overlay = superimpose_heatmap(str(img_path), heatmap)

        # Load original image for display
        original_img = cv2.imread(str(img_path))
        original_img = cv2.resize(original_img, (224, 224))
        original_img = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)

        # Display using matplotlib
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        plt.imshow(original_img)
        plt.title("Original Image")
        plt.axis("off")

        plt.subplot(1, 2, 2)
        overlay_rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)
        plt.imshow(overlay_rgb)
        plt.title(f"Predicted: {pred_class} ({prob:.2f})")
        plt.axis("off")

        if save_dir:
            save_path = Path(save_dir) / f"{img_path.stem}_gradcam.png"
            plt.savefig(save_path)
            print(f"Saved Grad-CAM overlay to {save_path}")
        else:
            plt.show()

if __name__ == "__main__":
    from tensorflow.keras.models import load_model
    from data_processing import DataProcessor

    processor = DataProcessor()
    class_names = processor.class_names
    model = load_model('best_model.h5')
    test_dir = '/content/drive/MyDrive/CNN_Medical_Imaging_Project/data/raw/test'
    save_dir = '/content/drive/MyDrive/CNN_Medical_Imaging_Project/gradcam_overlays'  # Optional: Set to None to display instead

    if save_dir and not os.path.exists(save_dir):
        os.makedirs(save_dir)

    demo_predictions_with_explanations(model, test_dir, class_names, gradcam_layer='top_conv', save_dir=save_dir)