In [56]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split, DataLoader
from torchvision import datasets, transforms, models
import pandas as pd
import os
import numpy as np

In [57]:
# load the training-set
batch_size = 64

transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset = datasets.ImageFolder(root='../cs-424-ass-1-wednesday-class/train', transform=transform_train)

# Split dataset into training and validation sets (e.g., 80% train, 20% validation)
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [58]:
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [59]:
# Define ResNet-18 
class CustomResNet18(nn.Module):
    def __init__(self, num_classes):
        super(CustomResNet18, self).__init__()

        self.model = models.resnet18(weights=None)
        self.model.fc = nn.Sequential(
            nn.Linear(self.model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        return self.model(x)



num_classes = 10   
model = CustomResNet18(num_classes=num_classes).to(dev)

# Define the loss function and optimizer
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-4)

In [None]:
# OneCycleLR Scheduler
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.002, epochs=300, steps_per_epoch=len(train_loader), pct_start=0.3)

import numpy as np
from PIL import Image

class Cutout(object):
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        w, h = img.size
        mask = np.ones((h, w), np.float32)

        for n in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)
            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x - self.length // 2, 0, w)
            x2 = np.clip(x + self.length // 2, 0, w)

            mask[y1:y2, x1:x2] = 0.

        mask = Image.fromarray(mask * 255).convert('L')
        img = Image.composite(img, Image.new('RGB', img.size), mask)
        return img


# EarlyStopping class
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience  # Number of epochs to wait for improvement
        self.delta = delta  # Minimum change to qualify as an improvement
        self.counter = 0  # Counter for epochs with no improvement
        self.best_score = None  # Best score (validation loss or accuracy)
        self.early_stop = False  # Flag to indicate if early stopping should occur
        self.best_model_wts = None  # Best model weights

    def __call__(self, val_loss, model):
        score = -val_loss  # We use negative loss since we want to minimize the loss
        if self.best_score is None:
            self.best_score = score
            self.best_model_wts = model.state_dict()
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.best_model_wts = model.state_dict()
            self.counter = 0
            print("Saved New Best Weight!")

# Function for random augmentations that change each epoch
def get_random_transform():
    return transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(40),
        transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
        transforms.ColorJitter(brightness=0.2, contrast=0.4, saturation=0.4, hue=0.2),
        transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),
        transforms.RandomPerspective(distortion_scale=0.3, p=0.5, interpolation=3),
        transforms.RandomCrop(224, padding=4),  # Added RandomCrop with padding
        transforms.RandomAffine(degrees=0, scale=(0.9, 1.1)),  # Added random zoom
        transforms.GaussianBlur(kernel_size=5),  # Added Gaussian blur
        Cutout(n_holes=1, length=32),  # Added Cutout with 1 hole of length 32
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])


# Mixup function implementation
def mixup_data(x, y, alpha=0.3):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1.0
    batch_size = x.size(0)
    index = torch.randperm(batch_size).cuda()
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

early_stopping = EarlyStopping(patience=30, delta=0.001)

# Modified train function with mixup and random augmentations
def train_model():    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        total_correct = 0
        total_samples = 0
        
        # Reinitialize augmentations for each epoch
        transform_train = get_random_transform()
        train_dataset = datasets.ImageFolder(root='../cs-424-ass-1-wednesday-class/train', transform=transform_train)
        train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

        for images, labels in train_loader:
            images, labels = images.to(dev), labels.to(dev)

            # Apply Mixup
            mixed_images, targets_a, targets_b, lam = mixup_data(images, labels)

            outputs = model(mixed_images)
            loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            scheduler.step()

            total_loss += loss.item()
            total_samples += images.size(0)
            _, predicted = torch.max(outputs, 1)
            total_correct += (lam * predicted.eq(targets_a).sum().item() + (1 - lam) * predicted.eq(targets_b).sum().item())
        
        train_loss = total_loss / len(train_loader)
        train_accuracy = total_correct / total_samples


        model.eval()
        total_val_loss = 0
        correct_val = 0
        total_val = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(dev), labels.to(dev)
                
                # Apply augmentations to the validation set
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                total_val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

        val_loss = total_val_loss / len(val_loader)
        val_accuracy = correct_val / total_val

        # Print loss and accuracy for both training and validation
        print(f"Epoch [{epoch+1}/{num_epochs}], "
              f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, "
              f"Learning Rate: {scheduler.get_lr()[0]:.10f}")
        
        early_stopping(train_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered")
            model.load_state_dict(early_stopping.best_model_wts)  # Load the best model weights
            break
        else:
            print(f"Patience Counter: {early_stopping.counter}/{early_stopping.patience}")

    # Save the model
    torch.save(model.state_dict(), 'resnet18_local.pth')

In [None]:
# Model size (should be less than 26)
total_params = sum(p.numel() for p in model.parameters())
total_params/(1024*1024) 

10.914133071899414

In [None]:
# load the test-set
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


test_dataset = datasets.ImageFolder(root='../cs-424-ass-1-wednesday-class/test', transform=transform_test)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# test the model
def test_model():
    model.eval()
    predictions = []
    image_paths = [path for path, _ in test_dataset.imgs]   

    old_prefix = ".."
    new_prefix = "/kaggle/input"
    
    with torch.no_grad():
        for images, _ in test_loader:   
            images = images.to(dev)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            predictions.extend(predicted.cpu().numpy())

    adjusted_paths= [path.replace(old_prefix, new_prefix) for path in image_paths]
    final_paths= [path.replace("\\", "/") for path in adjusted_paths]

    # prediction.csv
    df = pd.DataFrame({
        'id': final_paths,   
        'label': predictions
    })
    df.to_csv('naufala.2022.csv', index=False)
    print("Results saved to naufala.2022.csv")

In [None]:
if __name__ == "__main__":
    test_model()

Results saved to naufala.2022.csv
