In [1]:
import os
import cv2
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import torchvision
from PIL import Image
from typing import Any, Tuple, Optional, Callable
from torchvision.transforms.functional import to_tensor
from torchvision import transforms
from torch.utils.data import Dataset
import random
from sklearn.model_selection import train_test_split
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
from tqdm import tqdm

In [2]:
def read_csv(path: str) -> pd.DataFrame:
    '''
    Read a csv file.

    Args:
        path (str): Path to the csv file.
    
    Returns:
        pd.DataFrame: Dataframe with the csv file data.
    '''
    
    assert os.path.exists(path), f'CSV file not found: {path}!'
    assert os.path.splitext(path)[
    -1] == '.csv', f'Unsupported file type {os.path.splitext(path)[-1]}!'
    return pd.read_csv(path)

class ImageDataset(Dataset):
    def __init__(self, dataframe: pd.DataFrame, images_folder: str = './images', transform: Optional[Callable] = None, target_transform: Optional[Callable] = None) -> None:
        '''
        Image dataset.

        Args:
            dataframe (pd.DataFrame): Dataframe with the image filenames and labels.
            images_folder (str): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
            target_transform (callable, optional): Optional transform to be applied on a target.
        '''
        assert 'Filename' in dataframe.columns, f'Filename column not found!'
        assert os.path.exists(images_folder), f'Image folder not found: {images_folder}!'

        self.dataframe = dataframe
        self.images_folder = images_folder
        self.transform = transform
        self.target_transform = target_transform

        data = []
        targets = []

        for i, sample in dataframe.iterrows():
            image = cv2.imread(os.path.join(images_folder, sample['Filename']))
            data.append(image)

            targets.append(int(sample['Label']) if 'Label' in sample else -1)

        self.data = data
        self.targets = targets

    def __len__(self) -> int:
        '''
        Returns:
            int: Length of the dataset.
        '''
        return len(self.data)

    def __getitem__(self, index: int) -> Tuple[Any, Any]:
        '''
        Args:
            index (int): Index
        
        Returns:
            tuple: (image, target) where target is class_index of the target class. For the public test set, target is a class from [0, 1, 2, 3, 4, 5, 6, 7, 8]. For the private test set (before releasing the test set labels), target is -1.
        '''
        img = self.data[index]
        target = self.targets[index]

        img = Image.fromarray(img)

        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target
    
def get_default_device():
    """Pick GPU/MPS if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
#     elif torch.backends.mps.is_available():
#         return torch.device('mps')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

def show_batch(dl):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(10, 10))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(images[:64], nrow=8).permute(1, 2, 0))
        break

In [3]:
public_dataframe = read_csv('assignment_7_public.csv')
train_split, valid_split = train_test_split(public_dataframe, test_size=0.1, random_state=42)



In [4]:
public_dataset = ImageDataset(public_dataframe)

print('Image', type(public_dataset[0][0]), public_dataset[0][0].size) # Image <class 'PIL.Image.Image'> (28, 28)
print('Target', type(public_dataset[0][1])) # Target <class 'int'>
print('Length', len(public_dataset)) # Length 85744

imgs = [to_tensor(item[0]) for item in public_dataset] # item[0] and item[1] are image and its label
imgs = torch.stack(imgs, dim=0).numpy()

# calculate mean over each channel (r,g,b)
mean_r = imgs[:,0,:,:].mean()
mean_g = imgs[:,1,:,:].mean()
mean_b = imgs[:,2,:,:].mean()
print('\nmean RGB:', mean_r,mean_g,mean_b)

# calculate std over each channel (r,g,b)
std_r = imgs[:,0,:,:].std()
std_g = imgs[:,1,:,:].std()
std_b = imgs[:,2,:,:].std()
print('sd RGB:', std_r,std_g,std_b)

Image <class 'PIL.Image.Image'> (28, 28)
Target <class 'int'>
Length 85744

mean RGB: 0.73970443 0.5331324 0.70590323
sd RGB: 0.12986475 0.175617 0.13220353


In [5]:
stats = ((mean_r, mean_g, mean_b), (std_r, std_g, std_b))
train_transforms = transforms.Compose([transforms.RandomCrop(32, padding=4, padding_mode='reflect'), 
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(), 
                                       transforms.Normalize(*stats,inplace=True)])
test_transforms = transforms.Compose([transforms.ToTensor(), 
                                      transforms.Normalize(*stats)])

In [6]:
batch_size = 400
num_workers = 3
classes = (0, 1, 2, 3, 4, 5, 6, 7, 8)


train_dataset = ImageDataset(dataframe=train_split, transform=train_transforms)
train_dl = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)


valid_dataset = ImageDataset(dataframe=valid_split, transform=test_transforms)
valid_dl = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)



In [7]:
# show_batch(train_dl)

### Construct Model

In [7]:
# define to use cpu or gpu
device = get_default_device()
print('device:', device)
print('device_count:', torch.cuda.device_count())

if torch.cuda.is_available():
    device_name = torch.cuda.get_device_name(device)
    print('GPU Device:', device_name)
else:
    print('CPU Device')

device: cuda
device_count: 1
GPU Device: NVIDIA GeForce RTX 4060 Laptop GPU


In [8]:
train_dl = DeviceDataLoader(train_dl, device)
valid_dl = DeviceDataLoader(valid_dl, device)

In [9]:
train_dl.device

device(type='cuda')

In [10]:
len(valid_dl)

22

In [11]:
len(valid_dl.dl)

22

In [15]:
# list(valid_dl.dl)

In [12]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], last_lr: {:.5f}, train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['lrs'][-1], result['train_loss'], result['val_loss'], result['val_acc']))

In [13]:
def conv_block(in_channels, out_channels, pool=False):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
              nn.BatchNorm2d(out_channels), 
              nn.ReLU(inplace=True)]
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class ResNet9(ImageClassificationBase):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        self.conv1 = conv_block(in_channels, 16) 
        self.conv2 = conv_block(16, 32, pool=True) 
        self.res1 = nn.Sequential(conv_block(32, 32), conv_block(32, 32)) 
        
        self.conv3 = conv_block(32, 64, pool=True) 
        self.conv4 = conv_block(64, 128, pool=True) 
        self.res2 = nn.Sequential(conv_block(128, 128), conv_block(128, 128)) 
        
        self.classifier = nn.Sequential(nn.MaxPool2d(4), 
                                        nn.Flatten(), 
                                        nn.Linear(128, num_classes)) 
        
    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.classifier(out)
        return out

In [14]:
model = to_device(ResNet9(3, 10), device)
model

ResNet9(
  (conv1): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (res1): Sequential(
    (0): Sequential(
      (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (1): Sequential(
      (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    

In [15]:
trainable_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(trainable_parameters)

413514


# train the model

In [23]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = []
    total_batches = len(val_loader)
    progress_bar = tqdm(total=total_batches, desc='Evaluation Progress')

    for batch in val_loader:
        output = model.validation_step(batch)
        outputs.append(output)
        progress_bar.update(1)

    progress_bar.close()
    return model.validation_epoch_end(outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, model, train_loader, val_loader, 
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []
    
    # Set up cutom optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader))
    
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            
            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step()
            optimizer.zero_grad()
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        model.epoch_end(epoch, result)
        history.append(result)
    return history

# Cannot eval the dataset, run 40 mins without any progress update (suppose to have 22 stages)

In [16]:
history = [evaluate(model, valid_dl)]
history

Evaluation Progress:   0%|          | 0/22 [00:00<?, ?it/s]

In [24]:
# hyper-parameters
epochs = 5
max_lr = 0.01
grad_clip = 0.1
weight_decay = 1e-4
opt_func = torch.optim.Adam

In [29]:
history = None  # Define and initialize 'history'
if history is None: print('Y')

Y


In [30]:
%%time
history += fit_one_cycle(epochs, max_lr, model, train_dl, valid_dl, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)

In [None]:
PATH = './clf01/CNN_net.pth'
torch.save(model.state_dict(), PATH)

# Plot graph

In [None]:
def plot_accuracies(history):
    accuracies = [x['val_acc'] for x in history]
    plt.plot(accuracies, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.title('Accuracy vs. No. of epochs')

def plot_losses(history):
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    plt.plot(val_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss vs. No. of epochs')

def plot_lrs(history):
    lrs = np.concatenate([x.get('lrs', []) for x in history])
    plt.plot(lrs)
    plt.xlabel('Batch no.')
    plt.ylabel('Learning rate')
    plt.title('Learning Rate vs. Batch no.')

In [None]:
plot_accuracies(history)

In [None]:
plot_losses(history)

In [None]:
plot_lrs(history)