# 1. Build your own convolutional neural network using pytorch

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

# Defining a custom convolutional neural network (CNN)
class CustomCNN(nn.Module):
    def __init__(self, num_classes=4):
        super(CustomCNN, self).__init__()
        # Custom CNN layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.5)

        # Fully connected layers
        self.fc1 = nn.Linear(256 * 2 * 2, 512)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Passing through custom CNN layers
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool(x)
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool(x)
        x = self.dropout(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Hyperparameters
num_classes = 4
learning_rate = 0.001
batch_size = 16
num_epochs = 5

# Creating synthetic data for training (e.g., random tensors)
x_train = torch.randn(100, 3, 32, 32)  # 100 samples, 3 channels, 32x32 image size
y_train = torch.randint(0, num_classes, (100,))  # 100 labels for 4 classes

# Creating DataLoader
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Initializing model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomCNN(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Computing training statistics
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total

print('Finished Training')


Finished Training


# 2. Train your model using dog heart dataset (you may need to use  Google Colab (or Kaggle) with GPU to train your code)

### (1) use torchvision.datasets.ImageFolder for the training dataset
### (2) use custom dataloader for test dataset (return image tensor and file name)

In [None]:
import os
import zipfile
import csv
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset
from torch.amp import autocast, GradScaler
import numpy as np

# Defining the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Extracting zip files
zip_files = {'Dog_heart.zip': 'Dog_heart', 'Test.zip': 'Test'}
for zip_file, extract_path in zip_files.items():
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall(extract_path)

# Enhanced data augmentation
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(30),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.RandomPerspective(distortion_scale=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

transform_valid = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Data loading with improved batch size
train_data_path = 'Dog_heart/Dog_heart/Train'
valid_data_path = 'Dog_heart/Dog_heart/Valid'

train_dataset = datasets.ImageFolder(root=train_data_path, transform=transform_train)
valid_dataset = datasets.ImageFolder(root=valid_data_path, transform=transform_valid)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

class CustomTestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = [os.path.join(root_dir, img) for img in os.listdir(root_dir) if img.endswith('.png')]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(img_path)

# Test data loading
test_data_path = 'Test/Test'
test_dataset = CustomTestDataset(root_dir=test_data_path, transform=transform_valid)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

# Defining a custom CNN model
class CustomCNN(nn.Module):
    def __init__(self, num_classes=4):
        super(CustomCNN, self).__init__()
        # Custom CNN layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.5)

        # Pretrained GoogleNet for feature extraction
        self.googlenet = models.googlenet(pretrained=True)
        self.googlenet.fc = nn.Linear(in_features=1024, out_features=4)

        # Fully connected layers
        self.fc1 = nn.Linear(256 * 14 * 14 + 4, 512)  # Combining custom CNN and GoogleNet features
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Passing through custom CNN layers
        x_custom = F.relu(self.bn1(self.conv1(x)))
        x_custom = self.pool(x_custom)
        x_custom = F.relu(self.bn2(self.conv2(x_custom)))
        x_custom = self.pool(x_custom)
        x_custom = F.relu(self.bn3(self.conv3(x_custom)))
        x_custom = self.pool(x_custom)
        x_custom = F.relu(self.bn4(self.conv4(x_custom)))
        x_custom = self.pool(x_custom)
        x_custom = self.dropout(x_custom)
        x_custom = torch.flatten(x_custom, 1)

        # Passing through GoogleNet for feature extraction
        x_google = self.googlenet(x)
        x_google = torch.flatten(x_google, 1)

        # Concatenating features from custom CNN and GoogleNet
        x = torch.cat((x_custom, x_google), dim=1)

        # Passing through fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initializing the custom CNN model
custom_cnn = CustomCNN(num_classes=4).to(device)

# Initializing loss, optimizer, and schedulers
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(custom_cnn.parameters(), lr=1e-3, weight_decay=1e-4)
scaler = GradScaler()

# Learning rate scheduler with warm-up and ReduceLROnPlateau
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, ReduceLROnPlateau
warmup_scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1)
plateau_scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

# Early Stopping class
class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta

    def __call__(self, val_acc, model):
        score = val_acc
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(model)
            self.counter = 0

    def save_checkpoint(self, model):
        torch.save(model.state_dict(), 'best_model.pth')

# Training function with mixed precision
def train_one_epoch(model, train_loader, criterion, optimizer, scaler):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        with autocast(device_type='cuda'):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    return running_loss / len(train_loader), 100 * correct / total

# Validation function
def validate(model, valid_loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in valid_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    return val_loss / len(valid_loader), 100 * correct / total

# Training loop with early stopping
num_epochs = 100
best_acc = 0
early_stopping = EarlyStopping(patience=10)
for epoch in range(num_epochs):
    train_loss, train_acc = train_one_epoch(custom_cnn, train_loader, criterion, optimizer, scaler)
    val_loss, val_acc = validate(custom_cnn, valid_loader, criterion)

    print(f'Epoch {epoch+1}/{num_epochs}:')
    print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
    print(f'Valid Loss: {val_loss:.4f}, Valid Acc: {val_acc:.2f}%')

    warmup_scheduler.step(epoch + np.random.rand())
    plateau_scheduler.step(val_acc)

    early_stopping(val_acc, custom_cnn)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break

# Loading best model for testing
custom_cnn.load_state_dict(torch.load('best_model.pth'))
custom_cnn.eval()

# Testing and saving results
results = []
with torch.no_grad():
    for inputs, file_names in test_loader:
        inputs = inputs.to(device)
        outputs = custom_cnn(inputs)
        _, predicted = outputs.max(1)
        for file_name, pred in zip(file_names, predicted):
            results.append({'Image': file_name, 'Predicted Class': pred.item()})

# Saving results to CSV
csv_file_path = 'test_results.csv'
with open(csv_file_path, mode='w', newline='') as csv_file:
    writer = csv.writer(csv_file)
    for result in results:
        writer.writerow([result['Image'], result['Predicted Class']])

print(f'Results saved to {csv_file_path}')


Epoch 1/100:
Train Loss: 23.3761, Train Acc: 37.07%
Valid Loss: 2.6390, Valid Acc: 45.50%
Epoch 2/100:
Train Loss: 2.4847, Train Acc: 40.14%
Valid Loss: 1.3741, Valid Acc: 44.00%
Epoch 3/100:
Train Loss: 1.1202, Train Acc: 47.64%
Valid Loss: 1.0504, Valid Acc: 54.00%
Epoch 4/100:
Train Loss: 0.8180, Train Acc: 55.71%
Valid Loss: 0.8375, Valid Acc: 61.00%
Epoch 5/100:
Train Loss: 0.7797, Train Acc: 56.86%
Valid Loss: 0.6626, Valid Acc: 54.50%
Epoch 6/100:
Train Loss: 0.7475, Train Acc: 60.29%
Valid Loss: 0.6318, Valid Acc: 66.00%
Epoch 7/100:
Train Loss: 0.7046, Train Acc: 63.50%
Valid Loss: 0.6012, Valid Acc: 63.50%
Epoch 8/100:
Train Loss: 0.6733, Train Acc: 65.57%
Valid Loss: 0.6885, Valid Acc: 61.50%
Epoch 9/100:
Train Loss: 0.6110, Train Acc: 68.93%
Valid Loss: 0.6080, Valid Acc: 66.50%
Epoch 10/100:
Train Loss: 0.5911, Train Acc: 71.43%
Valid Loss: 0.5846, Valid Acc: 67.00%
Epoch 11/100:
Train Loss: 0.6043, Train Acc: 69.79%
Valid Loss: 0.5939, Valid Acc: 66.50%
Epoch 12/100:
Trai

  custom_cnn.load_state_dict(torch.load('best_model.pth'))


Results saved to test_results.csv
