In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torchvision import models
import matplotlib.pyplot as plt
from PIL import Image
import os
import torch.nn.init as init

import warnings
warnings.filterwarnings('ignore')

### Prepare Dataset

In [28]:
class CTScanDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        for label, category in enumerate(["Normal", "Covid"]):
            category_path = os.path.join(root_dir, category)
            for img_name in os.listdir(category_path):
                self.image_paths.append(os.path.join(category_path, img_name))
                self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Transform
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = CTScanDataset(root_dir='/Users/rohanojha/Documents/01_Sem_1_DS 5220 Code/SML_Project/Code_Test/Train/', transform=transform)
val_dataset   = CTScanDataset(root_dir='/Users/rohanojha/Documents/01_Sem_1_DS 5220 Code/SML_Project/Code_Test/Val/',   transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

### Model 2 - 4 Layer CNN

In [8]:
# ''' Independent Channel Processing '''

# '''
# Changed the number of neurons in the hidden layers
#         1. HL1 : 512
#         2. HL2 : 256
# Max Pool Kernel Size Changed to : 3x3
# '''

# class CustomCNN(nn.Module):
#     def __init__(self):
#         super(CustomCNN, self).__init__()
#         # Convolutional layers
#         self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1, groups=32)
#         self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, groups=64)
#         self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, groups=128)
        
#         # Max pooling layer with 3x3 kernel
#         self.pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
#         # Fully connected layers
#         self.fc1 = nn.Linear(256 * 14 * 14, 512)  # Adjusted input size after pooling
#         self.fc2 = nn.Linear(512, 256)        # Additional hidden layer
#         self.fc3 = nn.Linear(256, 2)          # Binary classification layer

#         self.relu = nn.ReLU()
#         self.dropout = nn.Dropout(0.5)

#     def forward(self, x):
#         # Apply convolutional layers with ReLU activation and max pooling
#         x = self.pool(self.relu(self.conv1(x)))
#         x = self.pool(self.relu(self.conv2(x)))
#         x = self.pool(self.relu(self.conv3(x)))
#         x = self.pool(self.relu(self.conv4(x)))
        
#         # Flatten the tensor to feed into fully connected layers
#         x = x.view(x.size(0), -1)
        
#         # Apply fully connected layers with dropout and ReLU activation
#         x = self.dropout(self.relu(self.fc1(x)))
#         x = self.dropout(self.relu(self.fc2(x)))  # Additional hidden layer
#         x = self.fc3(x)  # Output layer
#         return x

In [100]:
# ''' Independent Channel Processing '''

# '''
# Changed the number of neurons in the hidden layers
#         1. HL1 : 512
#         2. HL2 : 256
# Max Pool Kernel Size Changed to : 3x3
# Using HE Initialization for weights and biases:
#         1. Prevents gradient decay in ReLU
#         2. Works better for deeper networks(normally greater >3 layers)
#         3. Works for ReLU, Leaky ReLU

# Xavier Initialization for weights and biases:
#         1. Balanced gradients
#         2. Works well for shallow networks(normally greater <=3 layers)
#         3. Works for Sigmoid, Tanh
# '''

# class CustomCNN(nn.Module):
#     def __init__(self):
#         super(CustomCNN, self).__init__()
#         # Convolutional layers
#         self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1, groups=32)
#         self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, groups=64)
#         self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, groups=128)
        
#         # Max pooling layer with 3x3 kernel
#         self.pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
#         # Fully connected layers
#         self.fc1 = nn.Linear(256 * 14 * 14, 512)  # Adjusted input size after pooling
#         self.fc2 = nn.Linear(512, 256)        # Additional hidden layer
#         self.fc3 = nn.Linear(256, 2)          # Binary classification layer

#         self.relu = nn.ReLU()
#         self.dropout = nn.Dropout(0.3)

#         # Apply He Initialization to all layers
#         self._initialize_weights()

#     def _initialize_weights(self):
#         # Loop through all layers in the module
#         for m in self.modules():
#             if isinstance(m, nn.Conv2d):  # For convolutional layers
#                 init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
#                 if m.bias is not None:
#                     init.zeros_(m.bias)
#             elif isinstance(m, nn.Linear):  # For fully connected layers
#                 init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
#                 if m.bias is not None:
#                     init.zeros_(m.bias)

#     def forward(self, x):
#         # Apply convolutional layers with ReLU activation and max pooling
#         x = self.pool(self.relu(self.conv1(x)))
#         x = self.pool(self.relu(self.conv2(x)))
#         x = self.pool(self.relu(self.conv3(x)))
#         x = self.pool(self.relu(self.conv4(x)))
        
#         # Flatten the tensor to feed into fully connected layers
#         x = x.view(x.size(0), -1)
        
#         # Apply fully connected layers with dropout and ReLU activation
#         x = self.dropout(self.relu(self.fc1(x)))
#         x = self.dropout(self.relu(self.fc2(x)))  # Additional hidden layer
#         x = self.fc3(x)  # Output layer
#         return x

In [30]:
''' Independent Channel Processing '''

'''
- Changed the number of neurons in the hidden layers
        1. HL1 : 512
        2. HL2 : 256
- Max Pool Kernel Size Changed to : 3x3
- Activation Function: Leaky ReLU
- Using HE Initialization for weights and biases:
        1. Prevents gradient decay in ReLU
        2. Works better for deeper networks(normally greater >3 layers)
        3. Works for ReLU, Leaky ReLU

- Xavier Initialization for weights and biases:
        1. Balanced gradients
        2. Works well for shallow networks(normally greater <=3 layers)
        3. Works for Sigmoid, Tanh
'''

class CustomCNN(nn.Module):
    def __init__(self):
        super(CustomCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, groups=32)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, groups=64)
        self.conv4 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1, groups=256)
        
        # Max pooling layer with 3x3 kernel
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # Fully connected layers
        self.fc1 = nn.Linear(256 * 14 * 14, 512)  # Adjusted input size after pooling
        self.fc2 = nn.Linear(512, 256)        # Additional hidden layer
        self.fc3 = nn.Linear(256, 2)          # Binary classification layer

        # self.relu = nn.ReLU()
        # self.dropout = nn.Dropout(0.3)
        
        # Activation and dropout
        self.activation = nn.LeakyReLU(negative_slope=0.01) 
        self.dropout = nn.Dropout(0.25)

        # Apply He Initialization to all layers
        self._initialize_weights()

    def _initialize_weights(self):
        # Loop through all layers in the module
        for m in self.modules():
            if isinstance(m, nn.Conv2d):  # For convolutional layers
                init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu')
                if m.bias is not None:
                    init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):  # For fully connected layers
                init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu')
                if m.bias is not None:
                    init.zeros_(m.bias)

    def forward(self, x):
        # Apply convolutional layers with ReLU activation and max pooling
        x = self.pool(self.activation(self.conv1(x)))
        x = self.pool(self.activation(self.conv2(x)))
        x = self.pool(self.activation(self.conv3(x)))
        x = self.pool(self.activation(self.conv4(x)))
        
        # Flatten the tensor to feed into fully connected layers
        x = x.view(x.size(0), -1)
        
        # Apply fully connected layers with dropout and ReLU activation
        x = self.dropout(self.activation(self.fc1(x)))
        x = self.dropout(self.activation(self.fc2(x)))  # Additional hidden layer
        x = self.fc3(x)  # Output layer
        return x

### Loss and Optimizer

In [32]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomCNN().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

### Train -- Freezing this

In [12]:
# num_epochs = 10
# for epoch in range(num_epochs):
#     model.train()
#     running_loss = 0.0
#     correct = 0
#     total = 0

#     for images, labels in train_loader:
#         images, labels = images.to(device), labels.to(device)
        
#         optimizer.zero_grad()
#         outputs = model(images)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
        
#         running_loss += loss.item()
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()
#     print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

Epoch [1/10], Loss: 0.9851, Accuracy: 49.35%
Epoch [2/10], Loss: 0.7448, Accuracy: 49.46%
Epoch [3/10], Loss: 0.6802, Accuracy: 59.24%
Epoch [4/10], Loss: 0.5474, Accuracy: 73.91%
Epoch [5/10], Loss: 0.4238, Accuracy: 80.76%
Epoch [6/10], Loss: 0.2750, Accuracy: 89.57%
Epoch [7/10], Loss: 0.1569, Accuracy: 94.57%
Epoch [8/10], Loss: 0.1046, Accuracy: 96.96%
Epoch [9/10], Loss: 0.1252, Accuracy: 95.76%
Epoch [10/10], Loss: 0.1025, Accuracy: 97.28%


In [44]:
# num_epochs = 10
# for epoch in range(num_epochs):
#     model.train()
#     running_loss = 0.0
#     correct = 0
#     total = 0

#     for images, labels in train_loader:
#         images, labels = images.to(device), labels.to(device)
        
#         optimizer.zero_grad()
#         outputs = model(images)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
        
#         running_loss += loss.item()
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()
#     print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

Epoch [1/10], Loss: 0.7696, Accuracy: 52.28%
Epoch [2/10], Loss: 0.6849, Accuracy: 55.54%
Epoch [3/10], Loss: 0.6315, Accuracy: 62.93%
Epoch [4/10], Loss: 0.5358, Accuracy: 72.17%
Epoch [5/10], Loss: 0.4660, Accuracy: 78.80%
Epoch [6/10], Loss: 0.4393, Accuracy: 79.67%
Epoch [7/10], Loss: 0.3840, Accuracy: 83.04%
Epoch [8/10], Loss: 0.3739, Accuracy: 84.46%
Epoch [9/10], Loss: 0.3195, Accuracy: 86.85%
Epoch [10/10], Loss: 0.2771, Accuracy: 89.13%


In [34]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

Epoch [1/10], Loss: 0.6810, Accuracy: 56.20%
Epoch [2/10], Loss: 0.6433, Accuracy: 62.83%
Epoch [3/10], Loss: 0.5091, Accuracy: 79.02%
Epoch [4/10], Loss: 0.4142, Accuracy: 82.61%
Epoch [5/10], Loss: 0.2895, Accuracy: 89.78%
Epoch [6/10], Loss: 0.2239, Accuracy: 92.28%
Epoch [7/10], Loss: 0.1458, Accuracy: 96.20%
Epoch [8/10], Loss: 0.1142, Accuracy: 96.85%
Epoch [9/10], Loss: 0.0767, Accuracy: 98.91%
Epoch [10/10], Loss: 0.0629, Accuracy: 99.02%


### Model Evaluation -- Freezing this

In [14]:
# model.eval()
# with torch.no_grad():
#     correct = 0
#     total = 0

#     for images, labels in val_loader:
#         images, labels = images.to(device), labels.to(device)
#         outputs = model(images)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

#     print(f"Validation Accuracy: {100 * correct / total:.2f}%")

Validation Accuracy: 70.69%


In [46]:
# model.eval()
# with torch.no_grad():
#     correct = 0
#     total = 0

#     for images, labels in val_loader:
#         images, labels = images.to(device), labels.to(device)
#         outputs = model(images)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

#     print(f"Validation Accuracy: {100 * correct / total:.2f}%")

Validation Accuracy: 75.43%


In [36]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0

    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f"Validation Accuracy: {100 * correct / total:.2f}%")

Validation Accuracy: 63.79%


### Save and Load Model

In [60]:
# Save the Model
torch.save(model.state_dict(), "covid_ct_model_ICP_K32_256_HL_1_2_512Neurons_3MxPl_Drpout_30.pth")

# Load the model
model = CustomCNN()
model.load_state_dict(torch.load("covid_ct_model_ICP_K32_256_HL_1_2_512Neurons_3MxPl_Drpout_30.pth"))
model.eval()

# Predict on a single image
from PIL import Image

# Define the transform used during training
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the image
image_path = "/Users/rohanojha/Documents/01_Sem_1_DS 5220 Code/SML_Project/Code_Test/single_prediction/normal.png"  # Replace with the actual path
# image_path = "/Users/rohanojha/Documents/01_Sem_1_DS 5220 Code/SML_Project/Code_Test/single_prediction/covid.png"
# image_path = "/Users/rohanojha/Documents/01_Sem_1_DS 5220 Code/SML_Project/Code_Test/single_prediction/Non-Covid (10).png"
image = Image.open(image_path).convert('RGB')

# Apply transformations
image_tensor = transform(image).unsqueeze(0)  # Add batch dimension

# Move the tensor to the same device as the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_tensor = image_tensor.to(device)
model = model.to(device)

# Predict
with torch.no_grad():
    output = model(image_tensor)
    _, predicted_class = torch.max(output, 1)

# Map predicted class to label
classes = ["Normal", "Covid"]
prediction = classes[predicted_class.item()]
print(f"Prediction: {prediction}")

Prediction: Normal
