# 1. Build your own convolutional neural network using pytorch

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import pandas as pd
import os

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


In [15]:
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.fc1 = nn.Linear(channels, channels // reduction)
        self.fc2 = nn.Linear(channels // reduction, channels)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        batch, channels, _, _ = x.size()
        squeeze = x.view(batch, channels, -1).mean(dim=2)  # Global Average Pooling
        excitation = self.fc2(self.relu(self.fc1(squeeze)))
        excitation = self.sigmoid(excitation).view(batch, channels, 1, 1)
        return x * excitation


class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None, dropout_rate=0.3):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.se = SEBlock(out_channels)
        self.dropout = nn.Dropout2d(p=dropout_rate)

    def forward(self, x):
        residual = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.dropout(out)
        out = self.se(out)  # Apply SE Block
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        return F.relu(out)


class DogHeartCNN(nn.Module):
    def __init__(self, num_classes=3, dropout_rate=0.4):
        super(DogHeartCNN, self).__init__()
        # Initial Convolutional Layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Residual Blocks with Multi-Scale Extraction
        self.layer1 = self._make_layer(64, 64, 3, stride=1, dropout_rate=0.2)
        self.layer2 = self._make_layer(64, 128, 3, stride=2, dropout_rate=0.3)
        self.layer3 = self._make_layer(128, 256, 3, stride=2, dropout_rate=0.3)
        self.layer4 = self._make_layer(256, 512, 3, stride=2, dropout_rate=0.4, multi_scale=True)

        # Adaptive Pooling and Fully Connected Layers
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.max_pool = nn.AdaptiveMaxPool2d((1, 1))
        self.dropout = nn.Dropout(p=dropout_rate)
        self.fc1 = nn.Linear(512 * 2, 512)  # Concatenate Avg & Max Pool features
        self.fc2 = nn.Linear(512, num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride, dropout_rate, multi_scale=False):
        downsample = None
        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels),
            )
        layers = [ResidualBlock(in_channels, out_channels, stride, downsample, dropout_rate)]
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels, dropout_rate=dropout_rate))

        if multi_scale:
            layers.append(nn.Conv2d(out_channels, out_channels, kernel_size=3, dilation=2, padding=2))  # Dilated conv
        return nn.Sequential(*layers)

    def forward(self, x):
        # Initial Convolutional Layer
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)

        # Residual Layers
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        # Adaptive Pooling
        avg_features = self.avg_pool(x)
        max_features = self.max_pool(x)
        x = torch.cat([avg_features, max_features], dim=1)  # Concatenate pooled features
        x = torch.flatten(x, 1)

        # Fully Connected Layers
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


# Initialize the model
model = DogHeartCNN(num_classes=3).to('cuda' if torch.cuda.is_available() else 'cpu')
print(model)


DogHeartCNN(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (se): SEBlock(
        (fc1): Linear(in_features=64, out_features=4, bias=True)
        (fc2): Linear(in_features=4, out_features=64, bias=True)
        (relu): ReLU()
        (sigmoid): Sigmoid()
      )
      (dropout): Dropout2d(p=0.2, inplace=False)
    )
    (1): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), s

# 2. Train your model using dog heart dataset (you may need to use  Google Colab (or Kaggle) with GPU to train your code) 

### (1) use torchvision.datasets.ImageFolder for the training dataset
### (2) use custom dataloader for test dataset (return image tensor and file name)

In [16]:
import os
import torch
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from PIL import Image

# Updated transformations with cropping and normalization
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224
    transforms.RandomHorizontalFlip(p=0.5),  # Random horizontal flip
    transforms.ToTensor(),  # Convert to PyTorch tensor
    transforms.Normalize(mean=[0.4926, 0.4927, 0.4926], std=[0.2077, 0.2076, 0.2077])  # Normalize
])

# Load training and validation datasets
train_data = datasets.ImageFolder(
    '/kaggle/input/d/shraddhabelbase/dogxray/Dog_X_ray/Dog_heart/Dog_heart/Train',
    transform
)
val_data = datasets.ImageFolder(
    '/kaggle/input/d/shraddhabelbase/dogxray/Dog_X_ray/Dog_heart/Dog_heart/Valid',
    transform
)

# Custom test dataset for no subfolder structure
class TestDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.image_files = [f for f in os.listdir(root_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, self.image_files[idx]

# Load the test dataset
test_data = TestDataset(
    '/kaggle/input/d/shraddhabelbase3/dogxray/Dog_X_ray/Test/Test',
    transform
)

# DataLoaders
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
val_loader = DataLoader(val_data, batch_size=16, shuffle=False)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False)

print("Datasets and loaders prepared with updated transformations and test dataset.")


Datasets and loaders prepared with updated transformations and test dataset.


In [21]:
import torch.optim as optim

# Loss function
criterion = nn.CrossEntropyLoss()

# Optimizer: AdamW for better generalization
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=1e-3)

# Scheduler: StepLR with step size of 10 epochs and gamma of 0.1
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

def train_model(model, train_loader, val_loader, criterion, optimizer,  epochs=50, patience=10):
    best_val_loss = float("inf")
    patience_counter = 0

    print("Starting training with StepLR scheduler...")

    for epoch in range(epochs):
        # Training phase
        model.train()
        running_train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_train_loss += loss.item()

        avg_train_loss = running_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                val_loss = criterion(outputs, labels)
                running_val_loss += val_loss.item()

        avg_val_loss = running_val_loss / len(val_loader)

        # Print training and validation loss
        print(f"Epoch [{epoch+1}/{epochs}] - Training Loss: {avg_train_loss:.4f} - Validation Loss: {avg_val_loss:.4f}")

    
        # Save best model based on validation loss
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "best_model.pth")
            print("Best model saved!")
        else:
            patience_counter += 1

        # Early stopping
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

    print("Training completed.")

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=50, patience=10)


Starting training with StepLR scheduler...
Epoch [1/50] - Training Loss: 0.1846 - Validation Loss: 1.0688
Best model saved!
Epoch [2/50] - Training Loss: 0.1353 - Validation Loss: 1.0711
Epoch [3/50] - Training Loss: 0.1311 - Validation Loss: 1.2100
Epoch [4/50] - Training Loss: 0.1617 - Validation Loss: 0.9917
Best model saved!
Epoch [5/50] - Training Loss: 0.1476 - Validation Loss: 1.1445
Epoch [6/50] - Training Loss: 0.1368 - Validation Loss: 1.0769
Epoch [7/50] - Training Loss: 0.1163 - Validation Loss: 1.4659
Epoch [8/50] - Training Loss: 0.1536 - Validation Loss: 1.0211
Epoch [9/50] - Training Loss: 0.0831 - Validation Loss: 1.2310
Epoch [10/50] - Training Loss: 0.0692 - Validation Loss: 1.3621
Epoch [11/50] - Training Loss: 0.1055 - Validation Loss: 1.7103
Epoch [12/50] - Training Loss: 0.1748 - Validation Loss: 1.4206
Epoch [13/50] - Training Loss: 0.0917 - Validation Loss: 1.2886
Epoch [14/50] - Training Loss: 0.0818 - Validation Loss: 1.4865
Early stopping triggered.
Training

# 3. Evaluate your model using the developed software

In [22]:
def generate_predictions(model, test_loader):
    model.eval()
    predictions = []
    with torch.no_grad():
        for images, filenames in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            for i in range(len(filenames)):
                predictions.append((filenames[i], predicted[i].item()))
    return predictions

# Generate and save predictions
predictions = generate_predictions(model, test_loader)
output_file = '/kaggle/working/test_predictions.csv'
pd.DataFrame(predictions, columns=["Filename", "Predicted Class"]).to_csv(output_file, index=False, header=False)
print(f"Predictions saved to {output_file}")


Predictions saved to /kaggle/working/test_predictions.csv


# 4. Compare results with [RVT paper](https://www.nature.com/articles/s41598-023-50063-x). Requirement: performance is better than VGG16: 75%

The lightweight CNN model achieved a test accuracy of 71.75%, demonstrating its effectiveness in classifying canine heart X-rays into clinically relevant categories: Small, Normal, and Large. In comparison, VGG16, a widely used architecture in veterinary imaging, has been reported to achieve a test accuracy of approximately 75% in similar tasks. While the lightweight CNN does not surpass the performance of VGG16, it offers significant advantages in computational efficiency and model simplicity, making it more suitable for deployment in resource-constrained veterinary settings.

The model's predictions were validated using the Dog X ray classification software.  The software evaluation highlighted the robustness of the model, particularly in distinguishing "Normal" and "Large" classes, where clinical accuracy is most critical for diagnosis.

# 5. Write a four-page paper report using the shared LaTex template. Upload your paper to ResearchGate or Arxiv, and put your paper link and GitHub weight link here.

https://www.researchgate.net/publication/385946967_AI-Powered_Cardiomegaly_Detection_in_Canine_X-rays_A_Lightweight_CNN_Approach

# 6. Grading rubric

(1). Code ------- 20 points (you also need to upload your final model as a pt file)

(2). Grammer ---- 20 points

(3). Introduction & related work --- 10 points


(4). Method  ---- 20 points

(5). Results ---- 20 points

     > = 75 % -->10 points
     < 55 % -->0 points
     >= 55 % & < 75% --> 0.5 point/percent
     

(6). Discussion - 10 points

![X-ray](./predict.png "Result")
