<a href="https://colab.research.google.com/github/profitmonk/TA-Lib.github.io/blob/main/ConvNext.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import time
from torchvision.models import convnext_tiny, ConvNeXt_Tiny_Weights

# ConvNeXt Tiny architecture (as defined previously)
class ConvNeXtTiny(nn.Module):
    def __init__(self, num_classes=100):  # Changed to 100 for CIFAR-100
        super(ConvNeXtTiny, self).__init__()

        # Stem layer
        self.stem = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=4, stride=4),
            LayerNorm(96, eps=1e-6, data_format="channels_first")
        )

        # Stage 1
        self.stage1 = nn.Sequential(
            ConvNeXtBlock(dim=96),
            ConvNeXtBlock(dim=96),
            ConvNeXtBlock(dim=96)
        )

        # Downsampling 1
        self.downsample1 = nn.Sequential(
            LayerNorm(96, eps=1e-6, data_format="channels_first"),
            nn.Conv2d(96, 192, kernel_size=2, stride=2)
        )

        # Stage 2
        self.stage2 = nn.Sequential(
            ConvNeXtBlock(dim=192),
            ConvNeXtBlock(dim=192),
            ConvNeXtBlock(dim=192)
        )

        # Downsampling 2
        self.downsample2 = nn.Sequential(
            LayerNorm(192, eps=1e-6, data_format="channels_first"),
            nn.Conv2d(192, 384, kernel_size=2, stride=2)
        )

        # Stage 3
        self.stage3 = nn.Sequential(
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384),
            ConvNeXtBlock(dim=384)
        )

        # Downsampling 3
        self.downsample3 = nn.Sequential(
            LayerNorm(384, eps=1e-6, data_format="channels_first"),
            nn.Conv2d(384, 768, kernel_size=2, stride=2)
        )

        # Stage 4
        self.stage4 = nn.Sequential(
            ConvNeXtBlock(dim=768),
            ConvNeXtBlock(dim=768),
            ConvNeXtBlock(dim=768)
        )

        # Final norm and classifier
        self.norm = LayerNorm(768, eps=1e-6, data_format="channels_first")
        self.fc = nn.Linear(768, num_classes)

    def forward(self, x):
        x = self.stem(x)
        x = self.stage1(x)
        x = self.downsample1(x)
        x = self.stage2(x)
        x = self.downsample2(x)
        x = self.stage3(x)
        x = self.downsample3(x)
        x = self.stage4(x)
        x = self.norm(x)
        x = x.mean([-2, -1])  # Global average pooling
        x = self.fc(x)
        return x

class ConvNeXtBlock(nn.Module):
    def __init__(self, dim):
        super(ConvNeXtBlock, self).__init__()
        self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim)
        self.norm = LayerNorm(dim, eps=1e-6)
        self.pwconv1 = nn.Linear(dim, 4 * dim)
        self.act = nn.GELU()
        self.pwconv2 = nn.Linear(4 * dim, dim)
        self.gamma = nn.Parameter(torch.ones(dim), requires_grad=True)

    def forward(self, x):
        input = x
        x = self.dwconv(x)
        x = x.permute(0, 2, 3, 1)  # (N, C, H, W) -> (N, H, W, C)
        x = self.norm(x)
        x = self.pwconv1(x)
        x = self.act(x)
        x = self.pwconv2(x)
        x = x * self.gamma.view(1, 1, 1, -1)
        x = x.permute(0, 3, 1, 2)  # (N, H, W, C) -> (N, C, H, W)
        return x + input

class LayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps
        self.data_format = data_format
        if self.data_format not in ["channels_last", "channels_first"]:
            raise NotImplementedError
        self.normalized_shape = (normalized_shape, )

    def forward(self, x):
        if self.data_format == "channels_last":
            return nn.functional.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)
        elif self.data_format == "channels_first":
            u = x.mean(1, keepdim=True)
            s = (x - u).pow(2).mean(1, keepdim=True)
            x = (x - u) / torch.sqrt(s + self.eps)
            x = self.weight[:, None, None] * x + self.bias[:, None, None]
            return x
def load_pretrained_weights(model, pretrained_model):
    # Load state_dict from pretrained model
    pretrained_dict = pretrained_model.state_dict()
    model_dict = model.state_dict()

    # Filter out unnecessary keys
    pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict and 'fc' not in k}

    # Overwrite entries in the existing state dict
    model_dict.update(pretrained_dict)

    # Load the new state dict
    model.load_state_dict(model_dict)

    # Randomly initialize the final fully connected layer
    nn.init.normal_(model.fc.weight, std=0.01)
    nn.init.zeros_(model.fc.bias)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Download and prepare CIFAR-100 dataset
transform = transforms.Compose([
    transforms.Resize(224),  # ConvNeXt typically expects 224x224 input
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

trainset = torchvision.datasets.CIFAR100(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

# Initialize the model
model = ConvNeXtTiny(num_classes=100).to(device)

# Download pre-trained weights and assign them
pretrained_model = convnext_tiny(weights=ConvNeXt_Tiny_Weights.IMAGENET1K_V1)
load_pretrained_weights(model, pretrained_model)


# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

exit()

# Training loop
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for i, data in enumerate(loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        if i % 100 == 99:
            print(f'[{i+1}] loss: {running_loss/100:.3f} | acc: {100.*correct/total:.2f}%')
            running_loss = 0.0
    return 100. * correct / total

# Evaluation function
def evaluate(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    return 100. * correct / total

# Train for one epoch
print("Starting training...")
start_time = time.time()
train_accuracy = train_epoch(model, trainloader, criterion, optimizer, device)
end_time = time.time()

print(f"Training completed in {end_time - start_time:.2f} seconds")
print(f"Final training accuracy: {train_accuracy:.2f}%")

# Evaluate on test set
test_accuracy = evaluate(model, testloader, device)
print(f"Test accuracy: {test_accuracy:.2f}%")

import os

# Create a directory to save checkpoints
os.makedirs('checkpoints', exist_ok=True)

# Function to save checkpoints
def save_checkpoint(model, optimizer, epoch, accuracy, filename):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'accuracy': accuracy,
    }
    torch.save(checkpoint, filename)
    print(f"Checkpoint saved: {filename}")

# ... (code for data loading and model initialization remains the same)

# Train for a few epochs
num_epochs = 5
print("Starting training...")
for epoch in range(num_epochs):
    start_time = time.time()
    train_accuracy = train_epoch(model, trainloader, criterion, optimizer, device)
    end_time = time.time()
    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Training completed in {end_time - start_time:.2f} seconds")
    print(f"Training accuracy: {train_accuracy:.2f}%")

    # Evaluate on test set
    test_accuracy = evaluate(model, testloader, device)
    print(f"Test accuracy: {test_accuracy:.2f}%")

    # Save checkpoint
    checkpoint_filename = f'checkpoints/convnext_epoch_{epoch+1}.pth'
    save_checkpoint(model, optimizer, epoch+1, test_accuracy, checkpoint_filename)

    print("-----------------------------")

print("Training finished.")

Using device: cuda
Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to ./data/cifar-100-python.tar.gz


100%|██████████| 169001437/169001437 [00:18<00:00, 9268793.02it/s] 


Extracting ./data/cifar-100-python.tar.gz to ./data
Files already downloaded and verified


Downloading: "https://download.pytorch.org/models/convnext_tiny-983f1562.pth" to /root/.cache/torch/hub/checkpoints/convnext_tiny-983f1562.pth
100%|██████████| 109M/109M [00:00<00:00, 192MB/s] 


Starting training...
[100] loss: 4.473 | acc: 3.33%
[200] loss: 4.134 | acc: 4.83%
[300] loss: 3.989 | acc: 6.08%
[400] loss: 3.827 | acc: 7.33%
[500] loss: 3.706 | acc: 8.30%
[600] loss: 3.570 | acc: 9.32%
[700] loss: 3.439 | acc: 10.49%
Training completed in 1407.19 seconds
Final training accuracy: 11.29%
Test accuracy: 18.77%
Starting training...
[100] loss: 3.238 | acc: 19.89%
[200] loss: 3.171 | acc: 21.02%
[300] loss: 3.120 | acc: 21.48%
[400] loss: 3.024 | acc: 22.00%
[500] loss: 2.966 | acc: 22.79%
[600] loss: 2.897 | acc: 23.58%
[700] loss: 2.846 | acc: 24.20%
Epoch 1/5
Training completed in 1405.06 seconds
Training accuracy: 24.78%
Test accuracy: 30.34%
Checkpoint saved: checkpoints/convnext_epoch_1.pth
-----------------------------
[100] loss: 2.568 | acc: 33.67%
[200] loss: 2.551 | acc: 33.84%
[300] loss: 2.551 | acc: 34.08%
[400] loss: 2.496 | acc: 34.25%
[500] loss: 2.458 | acc: 34.42%
[600] loss: 2.430 | acc: 34.72%
[700] loss: 2.443 | acc: 34.89%
Epoch 2/5
Training comp

In [None]:
i = 0
import time
for i in range(10):
  start_time = time.time()
  train_accuracy = train_epoch(model, trainloader, criterion, optimizer, device)
  end_time = time.time()
  print(f"Training completed in {end_time - start_time:.2f} seconds")
  print(f"Final training accuracy: {train_accuracy:.2f}%")

  # Evaluate on test set
  test_accuracy = evaluate(model, testloader, device)
  print(f"Test accuracy: {test_accuracy:.2f}%")

NameError: name 'train_epoch' is not defined