In [1]:
import torch

from tqdm import tqdm

import opendatasets as od
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch.nn as nn
import torch.optim as optim
from torchinfo import summary
import torch.nn.functional as F

from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.datasets import ImageFolder
from PIL import Image

import pytorch_lightning as pl

import torchvision.transforms as T

In [2]:
train_dir = '100-bird-species/train/'
valid_dir = '100-bird-species/valid/'

In [3]:
## tranforms.compose  == several operators
## transorms.resize   == resize the image
    ## Image.LANCZOS  == highest quality filter for resize
## tranforms.ToTensor == converts image file to tensor array

transformation = transforms.Compose([transforms.Resize((224, 224), Image.LANCZOS), transforms.ToTensor()])

In [4]:
train_data = ImageFolder(train_dir, transform = transformation)
valid_data = ImageFolder(valid_dir, transform = transformation)

In [5]:
## data loader returns one sample at a time, it reshuffles the data to reduce overfitting

train_loader = DataLoader(train_data, batch_size=64, shuffle = True)
valid_loader = DataLoader(valid_data, batch_size=64, shuffle = True)

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device: ", device, f"({torch.cuda.get_device_name(device)})" if torch.cuda.is_available() else "")

Using device:  cpu 


In [7]:
class ImagePatcher(nn.Module):
    def __init__(self, image_size = 224, patch_size = 14, embedding_dim = 128):
        super(ImagePatcher, self).__init__()
        self.patch_size = patch_size
        self.embedding_dim = embedding_dim
        
        ##defining function that will patch the image up
        self.unfold = nn.Unfold(kernel_size = self.patch_size, stride = self.patch_size)
        
        ##defining the function that will do the linear embedding
        self.linear = nn.Linear(self.patch_size*self.patch_size*3, self.embedding_dim)
        
    def forward(self, x):
        #batch_size, c, h, w = x.shape
        
        ##patch the images
        patches = self.unfold(x).permute(0, 2, 1) # [batch size, c*h*w, num_of_patches]
        
        ##make linear embedding
        patches = self.linear(patches)
        
        return patches ## output should have dimensions [batch size, num_patches, embedding dimension]

In [8]:
class Attention(nn.Module):
    def __init__(self, hidden_dim, num_heads, embedding_dim = 128, dropout = 0.0):
        super().__init__()
        
        ##normalizations
        self.layer_norm_1 = nn.LayerNorm(embedding_dim)
        self.layer_norm_2 = nn.LayerNorm(embedding_dim)
        
        ##self-attention block
        self.attn = nn.MultiheadAttention(embedding_dim, num_heads, dropout, batch_first=True)
        ##who the fuck thought batch_first = False should be the default?!?!?!?!!??!?!?!?!
        ##bruhhh u stupid asf
        
        ##feed-forward function
        ##  relu activation function, the linear functions just make you go back and forth
        self.feed_forward = nn.Sequential(
            nn.Linear(in_features=embedding_dim, out_features=hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(in_features=hidden_dim, out_features=embedding_dim),
            nn.Dropout(dropout))
    
    def forward(self, x):
        #normalizing the tensor
        inp_x = self.layer_norm_1(x)
        
        #residual connection
        x = x + self.attn(inp_x, inp_x, inp_x)[0]
        
        #feed-forward
        x = x + self.feed_forward(self.layer_norm_2(x))
        
        return(x)

In [9]:
class VisionTransformer(nn.Module):
    def __init__(self, hidden_dim, num_heads, num_layers, 
                 num_classes = 525, image_size = 224, embedding_dim = 128, patch_size = 14, dropout = 0.0):
        
        super().__init__()
        self.patch_size = patch_size
        self.num_patches = (image_size//patch_size)**2
        
        ##calling the previously defined functions:
        self.patcher = ImagePatcher(image_size, patch_size, embedding_dim)
        self.attentions = nn.ModuleList([Attention(hidden_dim=hidden_dim, num_heads=num_heads, 
                                                   embedding_dim=embedding_dim, dropout=dropout) 
                                         for _ in range(num_layers)])
        
        ##droput
        self.dropout = nn.Dropout(dropout)
        
        ##do the classification thingy
        self.mlp_head = nn.Sequential(nn.LayerNorm(normalized_shape=embedding_dim), 
                                      nn.Linear(embedding_dim, num_classes))
        
        ##classification token
        self.cls_token = nn.Parameter(torch.randn(1,1,embedding_dim))
        
        ##positional embedding
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embedding_dim))
        
        
    def forward(self, x):
        #data pre-processing
        x = self.patcher(x)
        batchSize, T, _ = x.shape
        
        #add classification token
        cls_token = self.cls_token.repeat(batchSize, 1, 1)
        x = torch.cat((cls_token, x), dim = 1)
        x = self.pos_embedding[:,:T+1] + x
        
        #apply transformer
        for trans in self.attentions:
            x = self.dropout(x)
            x = trans(x)
        
        #classification prediction
        cls = x[:,0]
        out = self.mlp_head(cls)
        
        return(out)

In [12]:
model = VisionTransformer(hidden_dim = 512, num_heads = 8, num_layers = 6, embedding_dim=128, dropout=0.1).to(device)

summary(model, input_size=(1,3,224,224))

Layer (type:depth-idx)                   Output Shape              Param #
VisionTransformer                        [1, 525]                  33,024
├─ImagePatcher: 1-1                      [1, 256, 128]             --
│    └─Unfold: 2-1                       [1, 588, 256]             --
│    └─Linear: 2-2                       [1, 256, 128]             75,392
├─Dropout: 1-2                           [1, 257, 128]             --
├─ModuleList: 1-13                       --                        (recursive)
│    └─Attention: 2-3                    [1, 257, 128]             --
│    │    └─LayerNorm: 3-1               [1, 257, 128]             256
│    │    └─MultiheadAttention: 3-2      [1, 257, 128]             66,048
│    │    └─LayerNorm: 3-3               [1, 257, 128]             256
│    │    └─Sequential: 3-4              [1, 257, 128]             131,712
├─Dropout: 1-4                           [1, 257, 128]             --
├─ModuleList: 1-13                       --              

In [11]:
# Define loss function
criterion = nn.CrossEntropyLoss()

# Initialize optimizer and scheduler
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[25, 40], gamma=0.1)

In [None]:
num_epochs = 55

def validate(model, val_loader, criterion):
    model.eval()
    total_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Compute accuracy
            _, predicted_labels = torch.max(outputs, 1)
            correct_predictions += (predicted_labels == labels).sum().item()
            total_predictions += labels.size(0)
            
            total_loss += loss.item() * inputs.size(0)
    
    # Compute average loss and accuracy
    avg_loss = total_loss / len(val_loader.dataset)
    accuracy = correct_predictions / total_predictions
    
    return avg_loss, accuracy

# Training loop with validation
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    
    with tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as tqdm_loader:
        for inputs, labels in tqdm_loader:
            
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            # Compute accuracy
            _, predicted_labels = torch.max(outputs, 1)
            correct_predictions += (predicted_labels == labels).sum().item()
            total_predictions += labels.size(0)
            
            # Update progress bar description with current loss and accuracy
            tqdm_loader.set_postfix({'Loss': loss.item(), 'Accuracy': correct_predictions / total_predictions})
            
            total_loss += loss.item() * inputs.size(0)
    
    # Compute epoch loss and accuracy
    epoch_loss = total_loss / len(train_loader.dataset)
    epoch_accuracy = correct_predictions / total_predictions
    
    # Evaluate on validation set
    val_loss, val_accuracy = validate(model, val_loader, criterion)
    
    # Update learning rate scheduler
    scheduler.step()
    
    checkpoint = {
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        # Add other relevant information as needed
    }
    torch.save(checkpoint, f'{checkpoint_dir}/checkpoint_epoch_{epoch+1}.pt')
    
    
    # Print training progress and validation metrics
    print(f'Epoch [{epoch+1}/{num_epochs}], '
          f'Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_accuracy:.4f}, '
          f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

# Print training completed
print('Training completed!')