In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import cv2 as cv
import numpy as np
import os
import random

train_normal= "/kaggle/input/chestxraydataset/chest_xray/train/NORMAL/"
train_pneumonia= "/kaggle/input/chestxraydataset/chest_xray/train/PNEUMONIA/"

test_normal= "/kaggle/input/chestxraydataset/chest_xray/test/NORMAL/"
test_pneumonia= "/kaggle/input/chestxraydataset/chest_xray/test/PNEUMONIA/"


def augment_img_pos(image):
  rows,cols,_ = image.shape # height, width, RGB 
  # cols-1 and rows-1 are the coordinate limits.
  M = cv.getRotationMatrix2D(center=((cols-1)/2.0,(rows-1)/2.0),angle=10,scale=1)
  dst = cv.warpAffine(image,M,dsize=(cols,rows))

  # create the translation matrix using tx and ty, it is a NumPy array 
  translation_M = np.array([
      [1, 0, 10],
      [0, 1, 15]
  ], dtype=np.float32)
  dst = cv.warpAffine(dst,translation_M,dsize=(cols,rows))

  return dst

def augment_img_neg(image):
  rows,cols,_ = image.shape # row, col, RGB 
  # cols-1 and rows-1 are the coordinate limits.
  M = cv.getRotationMatrix2D(center=((cols-1)/2.0,(rows-1)/2.0),angle=-10,scale=1)
  dst = cv.warpAffine(image,M,dsize=(cols,rows))

  # create the translation matrix using tx and ty, it is a NumPy array 
  translation_M = np.array([
      [1, 0, -5],
      [0, 1, -20]
  ], dtype=np.float32)
  dst = cv.warpAffine(dst,translation_M,dsize=(cols,rows))
  
  return dst

train_folders = [train_normal, train_pneumonia]

output_base = "/kaggle/working/train/"
output_normal = os.path.join(output_base, "NORMAL")
output_pneumonia = os.path.join(output_base, "PNEUMONIA")

os.makedirs(output_normal, exist_ok=True)
os.makedirs(output_pneumonia, exist_ok=True)

# Loop over each folder
for folder_path in train_folders:
    # Get all image files in the folder
    image_files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    
    # Shuffle images randomly
    random.shuffle(image_files)
    
    # Determine number of images to augment (30% of total)
    num_to_augment = int(len(image_files) * 0.3)
    half_aug = num_to_augment // 2  # 15% for pos, 15% for neg
    
    # Split the selected images for positive and negative augmentation
    pos_images = image_files[:half_aug]
    neg_images = image_files[half_aug:num_to_augment]
    
    # --- Positive augmentation ---
    for img_file in pos_images:
        img_path = os.path.join(folder_path, img_file)
        img = cv.imread(img_path)
        aug_img = augment_img_pos(img)
        
        # Create new filename
        base_name, ext = os.path.splitext(img_file)
        new_name = f"{base_name}_pos{ext}"
        save_folder = output_normal if folder_path == train_normal else output_pneumonia
        cv.imwrite(os.path.join(save_folder, new_name), aug_img)
    
    # --- Negative augmentation ---
    for img_file in neg_images:
        img_path = os.path.join(folder_path, img_file)
        img = cv.imread(img_path)
        aug_img = augment_img_neg(img)
        
        # Create new filename
        base_name, ext = os.path.splitext(img_file)
        new_name = f"{base_name}_neg{ext}"
        save_folder = output_normal if folder_path == train_normal else output_pneumonia
        cv.imwrite(os.path.join(save_folder, new_name), aug_img)
    
    print(f"Augmented {num_to_augment} images ({half_aug} pos, {half_aug} neg)")



In [None]:
def preprocess_image(img, target_size=(224,224)):
    # Histogram Equalization for contrast
    image_bw = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
    clahe = cv.createCLAHE(clipLimit=4)
    clahe_img = np.clip(clahe.apply(image_bw) + 30, 0, 255).astype(np.uint8)
    clahe_rgb = cv.cvtColor(clahe_img, cv.COLOR_GRAY2BGR)
    
    img_resized = cv.resize(clahe_rgb, dsize=target_size, interpolation=cv.INTER_CUBIC)
    img_normalized = img_resized / 255.0
    return img_normalized

train_normal= "/kaggle/working/train/NORMAL/"
train_pneumonia= "/kaggle/working/train/PNEUMONIA/"
train_folders = [train_normal, train_pneumonia]

label_map = {train_normal: 0, train_pneumonia: 1}

X_train = []
y_train = []



for folder_path in train_folders:
    label = label_map[folder_path]
    
    image_files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.jpg','.jpeg','.png'))]
    
    for img_file in image_files:
        img_path = os.path.join(folder_path, img_file)
        img = cv.imread(img_path)
        
        if img is None:
            continue  # skip if reading fails
        
        img_processed = preprocess_image(img)
        
        X_train.append(img_processed)
        y_train.append(label)

X_train = np.array(X_train, dtype=np.float32)
y_train = np.array(y_train, dtype=np.int64)

print("Training data shape:", X_train.shape)
print("Labels shape:", y_train.shape)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import torchvision.models as models
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import time

X = torch.tensor(X_train).permute(0,3,1,2).float()  # reaarange dimensions to (N,C,H,W)
y = torch.tensor(y_train).long()

X_tr, X_val, y_tr, y_val = train_test_split(X, y, test_size=0.1, random_state=42)

train_dataset = TensorDataset(X_tr, y_tr)
val_dataset   = TensorDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=16, shuffle=False)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(pretrained=True)
for param in model.parameters():
    param.requires_grad = False  # freeze base layers

num_classes = 2
model.fc = nn.Linear(model.fc.in_features, num_classes)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=1e-4)

# 3. Train classifier head only
num_epochs_head = 9
print("=== Training classifier head ===")
for epoch in range(num_epochs_head):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    start_time = time.time()
    
    loop = tqdm(train_loader, desc=f"Epoch [{epoch+1}/{num_epochs_head}]")
    for imgs, labels in loop:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * imgs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        
        loop.set_postfix(loss=loss.item(), acc=f"{(preds==labels).sum().item()/imgs.size(0):.4f}")
    
    epoch_loss = running_loss / total
    epoch_acc  = correct / total
    elapsed = time.time() - start_time
    
    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * imgs.size(0)
            _, preds = torch.max(outputs, 1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)
    val_loss /= val_total
    val_acc = val_correct / val_total
    
    print(f"[Head] Epoch {epoch+1}/{num_epochs_head} | "
          f"Train Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f} | Time: {elapsed:.1f}s")

# 4. Fine-tune Layer4 + classifier
for name, param in model.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True

optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-5)

num_epochs_finetune = 3
print("=== Fine-tuning last block + classifier ===")
for epoch in range(num_epochs_finetune):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    start_time = time.time()
    
    loop = tqdm(train_loader, desc=f"Epoch [{epoch+1}/{num_epochs_finetune}]")
    for imgs, labels in loop:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * imgs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        
        loop.set_postfix(loss=loss.item(), acc=f"{(preds==labels).sum().item()/imgs.size(0):.4f}")
    
    epoch_loss = running_loss / total
    epoch_acc  = correct / total
    elapsed = time.time() - start_time
    
    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * imgs.size(0)
            _, preds = torch.max(outputs, 1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)
    val_loss /= val_total
    val_acc = val_correct / val_total
    
    print(f"[Fine-tune] Epoch {epoch+1}/{num_epochs_finetune} | "
          f"Train Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f} | Time: {elapsed:.1f}s")

# 5. Save model
torch.save(model.state_dict(), "resnet50_xray_finetuned.pth")
print("Model saved as resnet50_xray_finetuned.pth")


In [None]:
path = "resnet50_xray_finetuned.pth"
torch.save(model.state_dict(), path)

from IPython.display import FileLink
FileLink(path)