# Necessary package import

In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from einops import rearrange
import torch.nn.functional as F
from torchsummary import summary
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms
from timm.models.efficientnet import efficientnetv2_s

  from .autonotebook import tqdm as notebook_tqdm


# Create an architercture like EfficientNetV2-s

1. Leverage open-source code for Involution
2. 1/10th the size of Timm EfficientNetV2-s

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange

class SqueezeExcite(nn.Module):
    """
    Squeeze-and-Excitation (SE) block enhances feature representation
    by adaptively recalibrating channel-wise feature responses.
    """
    def __init__(self, in_ch, se_ratio=0.25):
        super().__init__()
        reduced_ch = max(1, int(in_ch * se_ratio))
        self.fc1 = nn.utils.weight_norm(nn.Conv2d(in_ch, reduced_ch, 1))
        self.fc2 = nn.utils.weight_norm(nn.Conv2d(reduced_ch, in_ch, 1))
    
    def forward(self, x):
        se = F.adaptive_avg_pool2d(x, 1)
        se = F.silu(self.fc1(se))
        se = torch.sigmoid(self.fc2(se))
        return x * se

class Involution(nn.Module):
    """
    Implementation of `Involution: Inverting the Inherence of Convolution for Visual Recognition`.
    """
    def __init__(self, in_channels, out_channels, groups=1, kernel_size=3, stride=1, reduction_ratio=2):
        super().__init__()
        channels_reduced = max(1, in_channels // reduction_ratio)
        padding = kernel_size // 2

        self.reduce = nn.Sequential(
            nn.utils.weight_norm(nn.Conv2d(in_channels, channels_reduced, 1)),
            nn.BatchNorm2d(channels_reduced),
            nn.ReLU(inplace=True))

        self.span = nn.utils.weight_norm(nn.Conv2d(channels_reduced, kernel_size * kernel_size * groups, 1))
        self.unfold = nn.Unfold(kernel_size, padding=padding, stride=stride)
        
        self.resampling = None if in_channels == out_channels else nn.utils.weight_norm(nn.Conv2d(in_channels, out_channels, 1))

        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.groups = groups

    def forward(self, input_tensor):
        # print(f"Involution Input: {input_tensor.shape}")
        _, _, height, width = input_tensor.size()
        if self.stride > 1:
            out_size = lambda x: (x + 2 * self.padding - self.kernel_size) // self.stride + 1
            height, width = out_size(height), out_size(width)
        uf_x = rearrange(self.unfold(input_tensor), 'b (g d k j) (h w) -> b g d (k j) h w',
                         g=self.groups, k=self.kernel_size, j=self.kernel_size, h=height, w=width)

        if self.stride > 1:
            input_tensor = F.adaptive_avg_pool2d(input_tensor, (height, width))
        kernel = rearrange(self.span(self.reduce(input_tensor)), 'b (k j g) h w -> b g (k j) h w',
                           k=self.kernel_size, j=self.kernel_size)

        out = rearrange(torch.einsum('bgdxhw, bgxhw -> bgdhw', uf_x, kernel), 'b g d h w -> b (g d) h w')
        
        if self.resampling:
            out = self.resampling(out)
        
        # print(f"Involution Output: {out.shape}")
        return out.contiguous()

class ConvInvolutionBlock(nn.Module):
    """
    This block combines both convolution and involution to leverage their strengths.
    """
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, expansion=1):
        super().__init__()
        hidden_dim = int(in_channels * expansion)
        self.expand = nn.utils.weight_norm(nn.Conv2d(in_channels, hidden_dim, 1)) if expansion > 1 else nn.Identity()
        self.expand_bn = nn.BatchNorm2d(hidden_dim) if expansion > 1 else nn.Identity()
        self.expand_act = nn.SiLU(inplace=True) if expansion > 1 else nn.Identity()
        
        self.conv = nn.utils.weight_norm(nn.Conv2d(hidden_dim, hidden_dim, kernel_size, stride, padding, groups=hidden_dim))
        self.conv_bn = nn.BatchNorm2d(hidden_dim)
        self.conv_act = nn.SiLU(inplace=True)
        
        self.involution = Involution(hidden_dim, out_channels, kernel_size=kernel_size, stride=1)
        self.inv_bn = nn.BatchNorm2d(out_channels)
        self.inv_act = nn.SiLU(inplace=True)
    
    def forward(self, x):
        # print(f"ConvInvolutionBlock Input: {x.shape}")
        x = self.expand(x)
        x = self.expand_bn(x)
        x = self.expand_act(x)
        # print(f"After Expansion: {x.shape}")
        x = self.conv(x)
        x = self.conv_bn(x)
        x = self.conv_act(x)
        # print(f"After Conv: {x.shape}")
        x = self.involution(x)
        x = self.inv_bn(x)
        x = self.inv_act(x)
        # print(f"After Involution: {x.shape}")
        return x

class MBConv(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size, stride, expansion, se_ratio, num_repeats=1, dropout_rate=0.2):
        super().__init__()
        layers = []
        for _ in range(num_repeats):
            layers.append(ConvInvolutionBlock(in_ch, out_ch, kernel_size, stride, kernel_size//2, expansion))
            in_ch = out_ch
        self.blocks = nn.Sequential(*layers)
    
    def forward(self, x):
        for block in self.blocks:
            # print(f"MBConv Input: {x.shape}")
            x = block(x)
            # print(f"MBConv Output: {x.shape}")
        return x

class EfficientNetV2S_WithInvolution(nn.Module):
    def __init__(self, num_classes=1486):
        super().__init__()
        self.stem = ConvInvolutionBlock(3, 24, 3, stride=2, padding=1, expansion=1)
        self.blocks = nn.Sequential(
            MBConv(24, 48, 3, 2, 4, 0.0, num_repeats=5),
            MBConv(48, 64, 3, 2, 4, 0.25, num_repeats=5),
            MBConv(64, 128, 3, 2, 4, 0.25, num_repeats=5),
            MBConv(128, 160, 3, 1, 6, 0.25, num_repeats=3),
            MBConv(160, 256, 3, 2, 6, 0.25, num_repeats=1)
        )
        self.head = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 512),
            nn.Dropout(0.3),
            nn.Linear(512, 512),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        x = self.stem(x)
        x = self.blocks(x)
        x = self.head(x)
        return x


## Check torch summary

In [None]:
import torch

# Clear cache
torch.cuda.empty_cache()

# Optional: Collect garbage (useful if memory is fragmented)
import gc
gc.collect()

# Assuming your model is on the GPU
# model = efficientnetv2_s(pretrained=False)
model = EfficientNetV2S_WithInvolution(num_classes=1486)  # Dynamically set number of classes
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)  # Ensure model is on GPU

# Print model summary
summary(model, input_size=(3, 224, 224))

# Model training

In [3]:
import torch

# Clear cache
torch.cuda.empty_cache()

# Optional: Collect garbage (useful if memory is fragmented)
import gc
gc.collect()

16

In [None]:
import torch
import os
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
from torch.amp import GradScaler, autocast
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

training_run = 1

# Set device to GPU if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on: {device}")

# Hyperparameters
batch_size = 150  # Batch size
epochs = 100  # Set the number of epochs
grad_clip = 1.0  # Maximum gradient norm

# Dataset paths
train_dir = "/home/sashankhravi/Datasets/inatbirds100k/train_transformed"
val_dir = "/home/sashankhravi/Datasets/inatbirds100k/val_transformed"

# Dynamically fetch number of classes from the folder structure (number of subfolders)
num_classes = len(os.listdir(train_dir))

# Image resizing and transformation (only resizing to 256x256)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  # Convert image to tensor
])

# Custom dataset loading
train_dataset = datasets.ImageFolder(train_dir, transform=transform)
val_dataset = datasets.ImageFolder(val_dir, transform=transform)

# Create DataLoader with the sampler
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)

# Initialize the model (assuming EfficientNetV2SWithInvolution is defined elsewhere)
model = EfficientNetV2S_WithInvolution(num_classes=num_classes)  # Dynamically set num_classes

# Move model to GPU if available
model = model.to(device)

# Define the loss function
criterion = nn.CrossEntropyLoss()

# Define the optimizer (e.g., Adam optimizer)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Initialize the gradient scaler for FP16 precision
scaler = GradScaler()

# Initialize lists to store training and validation losses
train_losses = []
val_losses = []

# Training loop
for epoch in range(epochs):
    model.train()  # Set model to training mode
    running_train_loss = 0.0
    correct_preds = 0
    total_preds = 0

    for step, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass using mixed precision (autocast)
        with autocast(device_type='cuda', dtype=torch.float16):  # Use autocast for FP16 precision
            outputs = model(images)
            loss = criterion(outputs, labels)

        # Backward pass and gradient scaling
        scaler.scale(loss).backward()

        # Gradient Clipping (before optimizer step)
        scaler.unscale_(optimizer)  # Unscale gradients first before clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

        # Update the model weights using FP16 gradients and scaling
        scaler.step(optimizer)
        scaler.update()

        # Track loss and accuracy
        running_train_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        correct_preds += torch.sum(preds == labels)
        total_preds += labels.size(0)
        
        # Print training loss and accuracy every 100 steps
        if step % 500 == 0:
            print(f"Epoch [{epoch + 1}/{epochs}], Step [{step}/{len(train_loader)}], Train Loss: {loss.item():.4f}")
        
        # Save model checkpoint every 1000 steps
        if step % 1000 == 0:
            try:
                os.makedirs(f"model_checkpoints_training_run_{training_run}")
            except:
                continue
            torch.save(model.state_dict(), f"model_checkpoints_training_run_{training_run}/saved_model_epoch_{epoch}.pth")

    avg_train_loss = running_train_loss / len(train_loader)
    train_accuracy = 100 * correct_preds / total_preds
    train_losses.append(avg_train_loss)

    print(f"Epoch [{epoch + 1}/{epochs}], Loss: {avg_train_loss:.4f}, Accuracy: {train_accuracy:.2f}%")

    # Validation loop
    model.eval()  # Set model to evaluation mode
    running_val_loss = 0.0
    correct_preds = 0
    total_preds = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            with autocast(device_type='cuda', dtype=torch.float16):  # Use autocast for FP16 precision
                outputs = model(images)

            # Calculate loss
            val_loss = criterion(outputs, labels)
            running_val_loss += val_loss.item()

            # Track accuracy
            _, preds = torch.max(outputs, 1)
            correct_preds += torch.sum(preds == labels)
            total_preds += labels.size(0)

        avg_val_loss = running_val_loss / len(val_loader)
        val_accuracy = 100 * correct_preds / total_preds
        val_losses.append(avg_val_loss)

        print(f"Validation Accuracy: {val_accuracy:.2f}%")

# Plot the cumulative training and validation loss curves
plt.figure(figsize=(10, 5))
plt.plot(range(1, epochs + 1), train_losses, label="Training Loss", color='blue', marker='o')
plt.plot(range(1, epochs + 1), val_losses, label="Validation Loss", color='red', marker='o')
plt.xlabel('Epochs')
plt.ylabel('Cumulative Loss')
plt.title('Cumulative Training and Validation Loss per Epoch')
plt.legend()
plt.grid(True)
plt.show()


Training on: cuda


  WeightNorm.apply(module, name, dim)


Epoch [1/100], Step [0/11096], Train Loss: 7.3095
