In [None]:
import os
DATASET_DIR = r"C:\Users\nadim shah\OneDrive\Desktop\xray classification\Dataset"
# List of categories corresponding to different disease classes
CATEGORIES = ["Viral Pneumonia", "Normal", "Lung_Opacity", "COVID"]
def load_images_and_masks(dataset_dir, categories):
    """
    Loads images and masks from the dataset.

    Args:
        dataset_dir (str): Path to the main dataset directory.
        categories (list): List of disease categories.

    Returns:
        dict: A dictionary containing images and masks for each category.
    """
    data = {category: {"images": [], "masks": []} for category in categories}
    for category in categories:
        image_dir = os.path.join(dataset_dir, category, "images") 
        mask_dir = os.path.join(dataset_dir, category, "masks")

        images = [img for img in os.listdir(image_dir) if img.lower().endswith('.png')]

        for img_name in images:
            image_path = os.path.join(image_dir, img_name)  
            mask_path = os.path.join(mask_dir, img_name)   
            
            if os.path.exists(mask_path) or os.path.exists(mask_path.replace('.png', '.PNG')):
                data[category]["images"].append(image_path)
                data[category]["masks"].append(mask_path if os.path.exists(mask_path) else mask_path.replace('.png', '.PNG'))
    
    return data 

data = load_images_and_masks(DATASET_DIR, CATEGORIES)

def print_data_lengths(data):
    """
    Prints the count of images and masks for each category.

    Args:
        data (dict): Dictionary containing images and masks.
    """
    for category, paths in data.items():
        images = paths["images"]
        masks = paths["masks"]  

        print(f"\n{category.capitalize()}:")
        print(f"  Number of images: {len(images)}")
        print(f"  Number of masks: {len(masks)}")
        print("-" * 50)
print_data_lengths(data)


In [None]:
import os
import random
import shutil

def balance_images_and_masks(data, viral_pneumonia_count, save_dir):
    """
    Balances the dataset by adjusting the number of images and masks for each category.

    Args:
        data (dict): Original dataset with images and masks.
        viral_pneumonia_count (int): The reference count, typically the number of images in the 
                                     category with the fewest samples (Viral Pneumonia in this case).

    Returns:
        dict: Balanced dataset with equalized samples across categories.
    
    Note:
    - Balancing is required because the dataset is unbalanced.
    - We use downsampling for categories with more images and upsampling for categories with fewer images.
    """    
    balanced_data = {category: {"images": [], "masks": []} for category in data.keys()}

    for category, paths in data.items():
        images = paths["images"]
        masks = paths["masks"]
        
        num_samples = len(images)
        if category == "Viral Pneumonia":
            balanced_data[category]["images"] = images
            balanced_data[category]["masks"] = masks
            print(f"  Kept original number: {num_samples} samples (no change)")
        
        elif num_samples > viral_pneumonia_count:

            indices = random.sample(range(num_samples), viral_pneumonia_count)
            balanced_data[category]["images"] = [images[i] for i in indices]
            balanced_data[category]["masks"] = [masks[i] for i in indices]
            print(f"  Downsampled to: {viral_pneumonia_count} samples")
        
        elif num_samples < viral_pneumonia_count:
            while len(images) < viral_pneumonia_count:
                images.extend(random.sample(images, viral_pneumonia_count - len(images)))
                masks.extend(random.sample(masks, viral_pneumonia_count - len(masks)))
            balanced_data[category]["images"] = images
            balanced_data[category]["masks"] = masks
            print(f"  Upsampled to: {viral_pneumonia_count} samples")
        
        else:

            balanced_data[category]["images"] = images
            balanced_data[category]["masks"] = masks
            print(f"  Already balanced at: {num_samples} samples")
        save_category_dir = os.path.join(save_dir, category)
        os.makedirs(save_category_dir, exist_ok=True)
        
        image_save_dir = os.path.join(save_category_dir, "images")
        mask_save_dir = os.path.join(save_category_dir, "masks")
        
        os.makedirs(image_save_dir, exist_ok=True)
        os.makedirs(mask_save_dir, exist_ok=True)

        for i, image_path in enumerate(balanced_data[category]["images"]):
            image_name = f"{category}_image_{i+1}.jpg"  
            shutil.copy(image_path, os.path.join(image_save_dir, image_name))

        for i, mask_path in enumerate(balanced_data[category]["masks"]):
            mask_name = f"{category}_mask_{i+1}.jpg" 
            shutil.copy(mask_path, os.path.join(mask_save_dir, mask_name))
    
    return balanced_data

def print_balanced_data_length(balanced_data):
    """
    Prints the number of images and masks for each category after balancing.

    Args:
        balanced_data (dict): Balanced dataset with images and masks.
    """    
    for category, paths in balanced_data.items():
        num_images = len(paths["images"])
        num_masks = len(paths["masks"])
        print(f"\nCategory: {category}")
        print(f"  Number of images after balancing: {num_images}")
        print(f"  Number of masks after balancing: {num_masks}")
        print("-" * 50)
viral_pneumonia_count = len(data["Viral Pneumonia"]["images"]) 
balanced_data = balance_images_and_masks(data, viral_pneumonia_count, save_dir="balanced_data_dir")

print_balanced_data_length(balanced_data)


In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import random

def plot_quadraplot_with_labels(balanced_data, categories):
    """
    Plots a grid of images and their corresponding masks for each category.

    Args:
        balanced_data (dict): Dictionary containing balanced image and mask paths for each category.
        categories (list): List of category names to display.

    Returns:
        None
    """
    fig, axes = plt.subplots(2, 4, figsize=(20, 10))
    for idx, category in enumerate(categories):

        sample_image_path = random.choice(balanced_data[category]["images"])
        sample_mask_path = random.choice(balanced_data[category]["masks"])
    
        img = Image.open(sample_image_path)
        axes[0, idx].imshow(img, cmap="gray")  
        axes[0, idx].set_title(f"{category} - Image") 
        axes[0, idx].axis("off") 

        mask = Image.open(sample_mask_path)
        axes[1, idx].imshow(mask, cmap="gray") 
        axes[1, idx].set_title(f"{category} - Mask") 
        axes[1, idx].axis("off") 
    plt.tight_layout()
    plt.show()

categories = ["Viral Pneumonia", "Normal", "Lung_Opacity", "COVID"]
plot_quadraplot_with_labels(balanced_data, categories)


In [None]:
import os
from PIL import Image, UnidentifiedImageError
from torchvision import transforms
from tqdm import tqdm 


"""
Purpose of This Code:
- This code is used to increase the size of the dataset by applying augmentation transformations.
- For every image in the input directory, this script generates one augmented version per defined transformation.
- If you don't want to increase the dataset size, you can skip this part.

Key Notes:
1. Augmentation ensures that the dataset has more variability, which helps in training a more robust model.
2. This script creates one augmented image for each transformation applied to an original image.
3. Modify the augmentation_transforms list to add or remove transformations as needed.
"""
augmentation_transforms = [
    transforms.RandomHorizontalFlip(p=1),  # Flip the image horizontally
    transforms.RandomRotation(5),         # Rotate the image by up to 5 degrees
]

input_folder = r'C:\Users\nadim shah\OneDrive\Desktop\xray classification\balanced_data_dir - Copy'
output_folder = r'C:\Users\nadim shah\OneDrive\Desktop\xray classification\augmented_data_dir'

for class_name in os.listdir(input_folder):
    class_input_path = os.path.join(input_folder, class_name, 'images')
    class_output_path = os.path.join(output_folder, class_name, 'images')
    if not os.path.isdir(class_input_path):
        print(f"Skipping {class_input_path}, 'images' folder not found.")
        continue
    os.makedirs(class_output_path, exist_ok=True)

    for image_name in tqdm(os.listdir(class_input_path), desc=f"Processing {class_name}"):
        image_path = os.path.join(class_input_path, image_name)

        if os.path.isfile(image_path):
            try:
                image = Image.open(image_path).convert("RGB")
            except UnidentifiedImageError:
                print(f"Error opening image {image_name}: Unidentified image format.")
                continue
            except PermissionError:
                print(f"Error opening image {image_name}: Permission denied.")
                continue
            original_image_path = os.path.join(class_output_path, f"original_{image_name}")
            image.save(original_image_path)

            for i, transform in enumerate(augmentation_transforms):
                augmented_image = transform(image)
                augmented_image_path = os.path.join(class_output_path, f"augmented_{i}_{image_name}")
                augmented_image.save(augmented_image_path)




In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torchvision import models, transforms
from torch.utils.data import DataLoader, random_split
from torchvision import datasets
import matplotlib.pyplot as plt

"""
Purpose of This Code:
- This code handles dataset preparation for training, validation, and testing.
- It applies data augmentation transformations to the training dataset to improve model robustness.
- Validation and test datasets are resized and normalized without augmentation.
- The dataset is split into three parts: training (60%), validation (20%), and testing (20%).

Key Notes:
1. Augmentation helps create variability in the training dataset to avoid overfitting.
2. The transformations include resizing, flipping, rotation, and normalization using ImageNet statistics.
3. Validation and test datasets ensure consistency for performance evaluation without any random alterations.
4. Data loaders are created to efficiently process data in batches.
"""

train_transform = transforms.Compose([
    transforms.Resize((224, 224)), 
    transforms.RandomHorizontalFlip(),  
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize using ImageNet stats
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),  
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  
])

dataset = datasets.ImageFolder(root=r'C:\Users\nadim shah\OneDrive\Desktop\xray classification\augmented_data_dir')

train_size = int(0.6 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size 
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_dataset.dataset.transform = train_transform  
val_dataset.dataset.transform = val_test_transform  
test_dataset.dataset.transform = val_test_transform  

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)  
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)  
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)  

class_labels = dataset.classes 
print(f"Class Labels: {class_labels}")  


In [None]:
#Print some tensor labels to see which corresponds to which class
data_iter = iter(train_loader)
images, labels = next(data_iter) 

for i in range(len(labels)):
    print(f"Image {i}: Label = {labels[i].item()} -> Class = {class_labels[labels[i]]}")

In [None]:
from torch.utils.data import DataLoader, random_split
from torchvision import datasets
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torchvision import models, transforms

"""
Purpose of This Code:
- This code sets up a pre-trained VGG16 model eith customize layer for our current desese  classification purpose.
- The model is fine-tuned for the specific application by modifying its classifier layers.
- Dropout layers are included to prevent overfitting during training.

Key Notes:
1. VGG16 is chosen for its effectiveness in image classification tasks and its pre-trained weights on ImageNet.
2. The convolutional layers are frozen to retain the learned features from the pre-trained model.
3. Fully connected layers in the classifier are modified to adapt to the number of classes in the dataset.
4. Dropout layers are added to reduce overfitting during training.
"""

model = models.vgg16(pretrained=True)

for param in model.features.parameters():
    param.requires_grad = False

model.classifier = nn.Sequential(
    nn.Linear(model.classifier[0].in_features, 512),
    nn.ReLU(), 
    nn.Dropout(0.5),  
    nn.Linear(512, 256), 
    nn.ReLU(), 
    nn.Dropout(0.5), 
    nn.Linear(256, len(class_labels)) 
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}") 
model.to(device) 

criterion = nn.CrossEntropyLoss()  

optimizer = Adam(model.classifier.parameters(), lr=0.001)

In [None]:
"""
Purpose of This Code:
- This code implements the training and validation process for an image classification model.
- The model is trained for a specified number of epochs, with learning rate adjustments using a scheduler.
- Dropout layers are used in the model to reduce overfitting during training.
- Training and validation accuracies are calculated and visualized to monitor model performance.

Key Notes:
1. **Learning Rate Scheduler**: The StepLR scheduler is used to reduce the learning rate by half every 5 epochs. This gradual reduction helps the model converge more smoothly and avoid overshooting the optimal solution as training progresses.
2. **Optimizer**: Adam optimizer is used to update the model parameters, and the learning rate is dynamically adjusted using the StepLR scheduler.

"""

scheduler = StepLR(optimizer, step_size=5, gamma=0.5)  
num_epochs = 10 
train_accuracies = [] 
val_accuracies = [] 

for epoch in range(num_epochs):
    model.train() 
    correct_train = 0  
    total_train = 0  

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)  
        
        optimizer.zero_grad()  
        outputs = model(images)  
        _, predicted = torch.max(outputs, 1)  

        correct_train += (predicted == labels).sum().item() 
        total_train += labels.size(0) 
        
        loss = criterion(outputs, labels)  
        loss.backward()  

    train_accuracy = (correct_train / total_train) * 100
    train_accuracies.append(train_accuracy)  

    model.eval()  
    correct_val = 0 
    total_val = 0  

    with torch.no_grad(): 
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) 
            _, predicted = torch.max(outputs, 1) 

            correct_val += (predicted == labels).sum().item() 
            total_val += labels.size(0)  

    val_accuracy = (correct_val / total_val) * 100
    val_accuracies.append(val_accuracy)  


    scheduler.step()  

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Accuracy: {train_accuracy:.2f}%, Val Accuracy: {val_accuracy:.2f}%, Learning Rate: {scheduler.get_last_lr()}")

plt.figure(figsize=(10, 6)) 
plt.plot(range(1, num_epochs + 1), train_accuracies, label='Training Accuracy', color='blue', marker='o') 
plt.plot(range(1, num_epochs + 1), val_accuracies, label='Validation Accuracy', color='orange', marker='x')
plt.xlabel('Epochs')  
plt.ylabel('Accuracy (%)')  
plt.title('Training vs. Validation Accuracy') 
plt.legend(loc='best') 
plt.grid(True)
plt.show() 


In [None]:
"""
Purpose of This Code:
- This code performs evaluation of the model on the test set after the validation phase.

- A confusion matrix is generated to visualize the performance of the model for each class.
- This helps in understanding how well the model is performing, including the specific classes that may require more attention.

Key Notes:
1. **Model Evaluation**: The model is set to `eval()` mode to ensure dropout and batch normalization are turned off during inference.
2. **Metrics Calculation**:
   - The **classification report** provides detailed metrics like precision, recall, and F1-score for each class.
   - **Precision** and **recall** are calculated using the macro average, which averages the metrics across all classes without considering class imbalance.
   - **F1-score** is the harmonic mean of precision and recall, giving a balance between them.
3. **Confusion Matrix**: The confusion matrix is computed and visualized to show how well the model is classifying each class, with annotated counts for true positives, false positives, true negatives, and false negatives.
"""

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, confusion_matrix
import numpy as np
import torch

model.eval()  

all_labels = [] 
all_preds = [] 

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images) 

        _, predicted = torch.max(outputs, 1)

        all_labels.extend(labels.cpu().numpy()) 
        all_preds.extend(predicted.cpu().numpy())  

print("Classification Report (Validation):")
print(classification_report(all_labels, all_preds, target_names=class_labels))

accuracy = np.mean(np.array(all_preds) == np.array(all_labels)) 
precision = precision_score(all_labels, all_preds, average='macro') 
recall = recall_score(all_labels, all_preds, average='macro')
f1 = f1_score(all_labels, all_preds, average='macro') 

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Macro): {precision:.4f}")
print(f"Recall (Macro): {recall:.4f}")
print(f"F1-Score (Macro): {f1:.4f}")
cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(8, 6)) 
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel('Predicted Labels') 
plt.ylabel('True Labels') 
plt.title('Confusion Matrix')  
plt.show()


In [None]:
import torch
from torchvision import transforms
from PIL import Image

"""
Purpose of This Code:
- This part of the code implements a pipeline that takes a single image as input, processes it, 
  and predicts the disease type (e.g., COVID-19, pneumonia, etc.) using a trained model.
- The image is preprocessed to match the input format expected by the model, and then the model predicts
  the class label based on the processed image.

"""

def preprocess_image(image_path):
    """
    Preprocess the input image to the format expected by the model.

    Args:
        image_path (str): Path to the image file.

    Returns:
        Tensor: The preprocessed image tensor ready for inference.
    """

    transform = transforms.Compose([
        transforms.Resize((224, 224)), 
        transforms.ToTensor(), 
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  
    ])

    image = Image.open(image_path).convert('RGB')  

    image = transform(image).unsqueeze(0) 
    return image

def predict_image_class(image_path, model, device):
    """
    Predicts the class label of a single image using the trained model.

    Args:
        image_path (str): Path to the input image.
        model (torch.nn.Module): The trained model for classification.
        device (torch.device): The device (CPU/GPU) to run the model on.

    Returns:
        tuple: The predicted class ID and class label.
    """

    image = preprocess_image(image_path)
    image = image.to(device) 

    model.eval()

    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs, 1)  
    
    predicted_class = predicted.item() 
    class_labels = ['covid', 'lung_opacity', 'Normal', 'pneumonia'] 
    predicted_label = class_labels[predicted_class] 
    
    return predicted_class, predicted_label
# Replace with the path of your input image
image_path = r"C:\Users\nadim shah\OneDrive\Desktop\xray classification\balanced_data_dir - Copy\Normal\images\Normal_image_38.jpg" 

model.to(device) 

predicted_class, predicted_label = predict_image_class(image_path, model, device)

print(f"Predicted Class ID: {predicted_class}")
print(f"Predicted Label: {predicted_label}")
