In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import pandas as pd
import os
from PIL import Image

dataset_path = '/content/drive/MyDrive/PneumoniaMNIST/pneumoniamnist_224.npz'
data = np.load(dataset_path)

train_images = data['train_images']
train_labels = data['train_labels']
val_images = data['val_images']
val_labels = data['val_labels']
test_images = data['test_images']
test_labels = data['test_labels']

def save_image_array(image_array, output_path):
    image = Image.fromarray(image_array.astype('uint8'))
    image.save(output_path)

output_dir = '/content/drive/MyDrive/PneumoniaMNIST/images'

def create_image_file_paths(images, labels, dataset_type):
    image_paths = []
    label_list = []

    for i, (image, label) in enumerate(zip(images, labels)):
        image_filename = f'{dataset_type}_image_{i}.png'
        image_filepath = os.path.join(output_dir, image_filename)

        save_image_array(image, image_filepath)

        image_paths.append(image_filepath)
        label_list.append(label)

    return image_paths, label_list

train_image_paths, train_labels_list = create_image_file_paths(train_images, train_labels, 'train')
val_image_paths, val_labels_list = create_image_file_paths(val_images, val_labels, 'val')
test_image_paths, test_labels_list = create_image_file_paths(test_images, test_labels, 'test')

train_df = pd.DataFrame({'image_path': train_image_paths, 'label': train_labels_list})
val_df = pd.DataFrame({'image_path': val_image_paths, 'label': val_labels_list})
test_df = pd.DataFrame({'image_path': test_image_paths, 'label': test_labels_list})

train_df.to_csv('train_dataset.csv', index=False)
val_df.to_csv('val_dataset.csv', index=False)
test_df.to_csv('test_dataset.csv', index=False)

combined_df = pd.concat([train_df, val_df, test_df], ignore_index=True)

combined_df.to_csv('combined_dataset.csv', index=False)

print("Images saved and CSV files have been created for train, validation, and test datasets.")


Images saved and CSV files have been created for train, validation, and test datasets.


In [7]:
!pip install torch torchvision segmentation-models-pytorch pandas pillow



In [8]:
!pip install opencv-python pandas pillow numpy matplotlib



In [None]:
import os
import shutil

source_dir = "/content/" 

destination_dir = "/content/drive/MyDrive/PneumoniaMNIST/"
os.makedirs(destination_dir, exist_ok=True)  

csv_files = ["test_dataset.csv", "val_dataset.csv", "train_dataset.csv"]

for csv_file in csv_files:
    source_path = os.path.join(source_dir, csv_file)
    destination_path = os.path.join(destination_dir, csv_file)

    if os.path.exists(source_path):  
        shutil.move(source_path, destination_path)
        print(f"Moved {csv_file} to {destination_dir}")
    else:
        print(f"File not found: {source_path}")

Moved test_dataset.csv to /content/drive/MyDrive/PneumoniaMNIST/
Moved val_dataset.csv to /content/drive/MyDrive/PneumoniaMNIST/
Moved train_dataset.csv to /content/drive/MyDrive/PneumoniaMNIST/


In [None]:
import os
import pandas as pd
import cv2

image_dir = "/content/drive/MyDrive/PneumoniaMNIST/images"  
mask_dir = "/content/drive/MyDrive/PneumoniaMNIST/masks"    
os.makedirs(mask_dir, exist_ok=True)  

train_csv_path = "/content/drive/MyDrive/PneumoniaMNIST/train_dataset.csv"
val_csv_path = "/content/drive/MyDrive/PneumoniaMNIST/val_dataset.csv"
test_csv_path = "/content/drive/MyDrive/PneumoniaMNIST/test_dataset.csv"

train_updated_csv_path = "/content/drive/MyDrive/PneumoniaMNIST/train_dataset_with_masks.csv"
val_updated_csv_path = "/content/drive/MyDrive/PneumoniaMNIST/val_dataset_with_masks.csv"
test_updated_csv_path = "/content/drive/MyDrive/PneumoniaMNIST/test_dataset_with_masks.csv"

def generate_mask_with_opencv(image_path, threshold, mask_dir):

    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")

    _, mask = cv2.threshold(image, threshold, 255, cv2.THRESH_BINARY)

    mask_filename = os.path.basename(image_path).replace(".png", "_mask.png")
    mask_path = os.path.join(mask_dir, mask_filename)
    cv2.imwrite(mask_path, mask)

    return mask_path

def process_dataset_with_masks(csv_path, updated_csv_path, image_dir, mask_dir, threshold):

    df = pd.read_csv(csv_path)

    df['mask_path'] = df['image_path'].apply(
        lambda img_path: generate_mask_with_opencv(os.path.join(image_dir, os.path.basename(img_path)), threshold, mask_dir)
    )

    df.to_csv(updated_csv_path, index=False)
    print(f"Updated CSV saved to {updated_csv_path}")

threshold = 110

process_dataset_with_masks(train_csv_path, train_updated_csv_path, image_dir, mask_dir, threshold)
process_dataset_with_masks(val_csv_path, val_updated_csv_path, image_dir, mask_dir, threshold)
process_dataset_with_masks(test_csv_path, test_updated_csv_path, image_dir, mask_dir, threshold)

print(f"Masks generated and paths added to train, val, and test datasets.")


FileNotFoundError: Image not found: /content/drive/MyDrive/PneumoniaMNIST/images/train_image_50.png

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
import pandas as pd
from PIL import Image
import os
import numpy as np
from torchvision.models import resnet34
from sklearn.metrics import accuracy_score


class ClassificationDatasetWithMaskGrayscale(Dataset):
    def __init__(self, df, base_image_dir, transforms=None):
        self.df = df
        self.base_image_dir = base_image_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = os.path.join(self.base_image_dir, os.path.basename(row['image_path']))
        mask_path = row['mask_path']
        label = row['label']

        image = Image.open(image_path).convert("L")
        mask = Image.open(mask_path).convert("L")

        if self.transforms:
            image = self.transforms(image)
            mask = self.transforms(mask)

        if mask.dim() == 2:
            mask = mask.unsqueeze(0)

        image_with_mask = torch.cat((image, mask), dim=0)
        label = float(label)

        return image_with_mask, torch.tensor(label, dtype=torch.float32)


data_transforms = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
])

base_image_dir = "/content/drive/MyDrive/PneumoniaMNIST/images"
train_df = pd.read_csv('/content/drive/MyDrive/PneumoniaMNIST/train_dataset.csv')
val_df = pd.read_csv('/content/drive/MyDrive/PneumoniaMNIST/val_dataset.csv')
test_df = pd.read_csv('/content/drive/MyDrive/PneumoniaMNIST/test_dataset.csv')

train_df['label'] = train_df['label'].apply(lambda x: int(x.strip("[]")))
val_df['label'] = val_df['label'].apply(lambda x: int(x.strip("[]")))
test_df['label'] = test_df['label'].apply(lambda x: int(x.strip("[]")))

train_dataset = ClassificationDatasetWithMaskGrayscale(train_df, base_image_dir, transforms=data_transforms)
val_dataset = ClassificationDatasetWithMaskGrayscale(val_df, base_image_dir, transforms=data_transforms)
test_dataset = ClassificationDatasetWithMaskGrayscale(test_df, base_image_dir, transforms=data_transforms)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


class ResNetClassifierWithMaskGrayscale(nn.Module):
    def __init__(self, encoder_weights="imagenet"):
        super(ResNetClassifierWithMaskGrayscale, self).__init__()
        self.encoder = resnet34(weights="ResNet34_Weights.IMAGENET1K_V1" if encoder_weights == "imagenet" else None)
        self.encoder.conv1 = nn.Conv2d(2, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.encoder = nn.Sequential(*list(self.encoder.children())[:-2])
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(512, 1)

    def forward(self, x):
        features = self.encoder(x)
        pooled = self.gap(features).view(features.size(0), -1)
        output = self.fc(pooled)
        return output


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNetClassifierWithMaskGrayscale().to(device)

loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = loss_fn(outputs.squeeze(), labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}")

model.eval()
val_labels = []
val_preds = []

with torch.no_grad():
    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        preds = torch.sigmoid(outputs).squeeze()
        val_labels.extend(labels.cpu().numpy())
        val_preds.extend(preds.cpu().numpy())

val_preds_binary = (np.array(val_preds) > 0.5).astype(int)
val_accuracy = accuracy_score(val_labels, val_preds_binary)
print(f"Validation Accuracy: {val_accuracy:.4f}")

test_labels = []
test_preds = []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        preds = torch.sigmoid(outputs).squeeze()
        test_labels.extend(labels.cpu().numpy())
        test_preds.extend(preds.cpu().numpy())

test_preds_binary = (np.array(test_preds) > 0.5).astype(int)
test_accuracy = accuracy_score(test_labels, test_preds_binary)
print(f"Test Accuracy: {test_accuracy:.4f}")

In [10]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
import pandas as pd
from PIL import Image
import os
import numpy as np


class ClassificationDatasetWithMaskGrayscale(Dataset):
    def __init__(self, df, base_image_dir, transforms=None):
        self.df = df
        self.base_image_dir = base_image_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = os.path.join(self.base_image_dir, os.path.basename(row['image_path']))
        mask_path = row['mask_path']
        label = row['label']

        image = Image.open(image_path).convert("L")
        mask = Image.open(mask_path).convert("L")

        if self.transforms:
            image = self.transforms(image)
            mask = self.transforms(mask).squeeze(0)

        mask = mask.unsqueeze(0)
        image_with_mask = torch.cat((image, mask), dim=0)

        return image_with_mask, torch.tensor(label, dtype=torch.float32)


data_transforms = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
])

base_image_dir = "/content/drive/MyDrive/PneumoniaMNIST/images"
train_df = pd.read_csv('/content/drive/MyDrive/PneumoniaMNIST/train_dataset.csv')
val_df = pd.read_csv('/content/drive/MyDrive/PneumoniaMNIST/val_dataset.csv')
test_df = pd.read_csv('/content/drive/MyDrive/PneumoniaMNIST/test_dataset.csv')

train_df['label'] = train_df['label'].apply(lambda x: int(x.strip("[]")))
val_df['label'] = val_df['label'].apply(lambda x: int(x.strip("[]")))
test_df['label'] = test_df['label'].apply(lambda x: int(x.strip("[]")))

train_dataset = ClassificationDatasetWithMaskGrayscale(train_df, base_image_dir, transforms=data_transforms)
val_dataset = ClassificationDatasetWithMaskGrayscale(val_df, base_image_dir, transforms=data_transforms)
test_dataset = ClassificationDatasetWithMaskGrayscale(test_df, base_image_dir, transforms=data_transforms)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


class UNetClassifier(nn.Module):
    def __init__(self, input_channels=2, num_classes=1):
        super(UNetClassifier, self).__init__()
        self.encoder1 = self.conv_block(input_channels, 64)
        self.encoder2 = self.conv_block(64, 128)
        self.encoder3 = self.conv_block(128, 256)
        self.encoder4 = self.conv_block(256, 512)

        self.pool = nn.MaxPool2d(2, 2)

        self.decoder3 = self.conv_block(512 + 256, 256)
        self.decoder2 = self.conv_block(256 + 128, 128)
        self.decoder1 = self.conv_block(128 + 64, 64)

        self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1)
        self.global_pool = nn.AdaptiveAvgPool2d(1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))

        dec3 = self.decoder3(torch.cat([enc4, enc3], dim=1))
        dec2 = self.decoder2(torch.cat([self.pool(dec3), enc2], dim=1))
        dec1 = self.decoder1(torch.cat([self.pool(dec2), enc1], dim=1))

        output = self.final_conv(dec1)
        output = self.global_pool(output)
        return output.view(output.size(0), -1)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNetClassifier().to(device)

loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        loss = loss_fn(outputs.squeeze(), labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}")

model.eval()

with torch.no_grad():
    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)

Downloading: "https://download.pytorch.org/models/resnet34-333f7ec4.pth" to /root/.cache/torch/hub/checkpoints/resnet34-333f7ec4.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 281MB/s]


Epoch 1, Loss: 0.26017281296899764
Epoch 2, Loss: 0.14346097245559855
Epoch 3, Loss: 0.12288634221432573
Epoch 4, Loss: 0.1156041322370707
Epoch 5, Loss: 0.10788549247939708


In [11]:
import torch
from sklearn.metrics import accuracy_score, roc_auc_score

model.eval()
all_true_labels = []
all_predicted_probs = []

with torch.no_grad():
    for images, masks in test_loader:
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)
        probs = torch.sigmoid(outputs).cpu().numpy()
        all_true_labels.extend(masks.cpu().numpy().flatten())
        all_predicted_probs.extend(probs.flatten())

all_true_labels = np.array(all_true_labels)
all_predicted_probs = np.array(all_predicted_probs)
all_true_labels = (all_true_labels > 0.5).astype(int)
predicted_classes = (all_predicted_probs > 0.5).astype(int)
accuracy = accuracy_score(all_true_labels, predicted_classes)
roc_auc = roc_auc_score(all_true_labels, all_predicted_probs)

print(f"Accuracy: {accuracy:.4f}")
print(f"ROC-AUC: {roc_auc:.4f}")

Accuracy: 0.9203
ROC-AUC: 0.9984


In [None]:
save_path = "unet_model.pth"  
torch.save(model.state_dict(), save_path)
print(f"Model saved to {save_path}")

Model saved to unet_model.pth
