In [None]:
import torch
import torch.nn as nn
import pickle
import os
import numpy as np
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score

def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class BasicCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(BasicCNN, self).__init__()
        # First convolutional layer (input: 3 channels, output: 32 filters)
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        # Second convolutional layer (input: 32 channels, output: 64 filters)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        # Max pooling layer
        self.pool = nn.MaxPool2d(2, 2)
        # Fully connected layer (input: 64 * 8 * 8, output: 512)
        self.fc1 = nn.Linear(64 * 8 * 8, 512)
        # Output layer
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # Apply conv1 + ReLU + Pool
        x = self.pool(F.relu(self.conv2(x)))  # Apply conv2 + ReLU + Pool
        x = torch.flatten(x, 1)  # Flatten the tensor for fully connected layers
        x = F.relu(self.fc1(x))  # Fully connected layer with ReLU
        x = self.fc2(x)  # Output layer
        return x

In [None]:
# Transformations to apply to the CIFAR-10 dataset
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # Data augmentation
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalization
])

# Load full CIFAR-10 training and test datasets
full_train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Split train into training + validation
train_size = int(0.8 * len(full_train_data))  # 80% train
val_size = len(full_train_data) - train_size  # 20% val
train_data, val_data = random_split(full_train_data, [train_size, val_size])

# Create DataLoaders
train_loader = DataLoader(train_data, batch_size=256, shuffle=True, num_workers = 2, pin_memory = True)
val_loader = DataLoader(val_data, batch_size=256, shuffle=False,num_workers = 2, pin_memory = True)
test_loader = DataLoader(test_data, batch_size=256, shuffle=False,num_workers = 2, pin_memory = True)

# # Instantiate the model
model = BasicCNN(num_classes=10)
model = model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# Define the loss function and optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training function
def train(model, train_loader, loss_function, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)

            # Calculate loss
            loss = loss_function(outputs, labels)
            loss.backward()  # Backpropagation

            # Update weights
            optimizer.step()

            # Accumulate loss and accuracy
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        # Print statistics every epoch
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct/total:.2f}%")

# Test function
def test(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():  # No need to compute gradients during testing
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    print(f"Test Accuracy: {100 * correct/total:.2f}%")

# Set device (GPU or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# # Train the model
train(model, train_loader, loss_function, optimizer, num_epochs=10)

# # Test the model
test(model, test_loader)

In [None]:
# First, build a resNet like model

class ResNetBlock(nn.Module):
     def __init__(self, in_channels, out_channels, downsample = None,stride = 1):
            super(ResNetBlock, self).__init__()
            self.expansion_ratio = 4
            self.conv_layer1 = nn.Conv2d(in_channels, out_channels,kernel_size = 1, stride=1, padding = 0)
            self.batch_norm1 = nn.BatchNorm2d(out_channels)
            self.conv_layer2 = nn.Conv2d(out_channels, out_channels,kernel_size = 3, stride= stride, padding = 1)
            self.batch_norm2 = nn.BatchNorm2d(out_channels)
            self.conv_layer3 = nn.Conv2d(out_channels, out_channels*self.expansion_ratio,kernel_size = 1,
                                         stride = 1, padding = 0)
            self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion_ratio)
            self.activation = nn.ReLU()
            self.downsample = downsample

     def forward(self, x):
        identity = x
        x = self.conv_layer1(x)
        x = self.batch_norm1(x)
        x = self.activation(x)
        x = self.conv_layer2(x)
        x = self.batch_norm2(x)
        x = self.activation(x)
        x = self.conv_layer3(x)
        x = self.batch_norm3(x)

        if self.downsample is not None:
            identity = self.downsample(identity)

        x += identity
        x = self.activation(x)
        return x


class ResNetLite(nn.Module):
    #Pass the type of block,How many blocks per layer, RGB, number of output classes
    def __init__(self,block, layers, image_channels, num_classes = 10):
        super(ResNetLite,self).__init__()
        self.in_channels = 64
        self.conv_layer1 = nn.Conv2d(image_channels, 64, kernel_size = 7, stride = 2, padding = 3)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.activation = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)


        #ResNetLite layers go here:
        self.layer1 = self._make_layer(block,layers[0], out_channels = 64, stride = 1)
        self.layer2 = self._make_layer(block,layers[1], out_channels = 128, stride = 2)
        self.layer3 = self._make_layer(block,layers[2], out_channels = 256, stride = 2)
        self.layer4 = self._make_layer(block,layers[3], out_channels = 512, stride = 2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv_layer1(x)
        x = self.batch_norm1(x)
        x = self.activation(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x


    def _make_layer(self,block, num_blocks, out_channels, stride):
        layers = []
        downsample = None
        if stride != 1 or self.in_channels != out_channels * 4:


            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels*4, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * 4),
            )

        layers.append(block(self.in_channels, out_channels, downsample, stride ))
        self.in_channels  = out_channels * 4
        for i in range(num_blocks-1):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)



def ResNet50Lite(image_channels = 3, num_classes = 10):
    return ResNetLite(ResNetBlock, [3,4,6,3], image_channels, num_classes)

def ResNet18Lite(image_channels = 3, num_classes = 10):
    return ResNetLite(ResNetBlock, [2,2,2,2], image_channels, num_classes)


def test50():
    net = ResNet50Lite()
    x = torch.randn(2,3,224,224)
    y= net(x).to('cuda')
    print(y.shape)

def test18():
    net = ResNet18Lite()
    x = torch.randn(2,3,224,224)
    y = net(x).to('cuda')
    print(y.shape)


test50()
test18()

In [None]:
#training and evaluating a ResNet50Lite

model = ResNet18Lite().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Define scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=2, factor=0.5)


# Accuracy function
def compute_accuracy(loader):
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y).sum().item()
            total += y.size(0)
    return 100 * correct / total


# Variables to track best performance
best_val_acc = 0.0
best_model_wts = None


#UNCOMMENT BEFORE SUBMITTING

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Compute accuracies
    train_acc = compute_accuracy(train_loader)
    val_acc = compute_accuracy(val_loader)
    test_acc = compute_accuracy(test_loader)

    # Print results
    print(f"Epoch {epoch+1}/{num_epochs} | Loss: {running_loss:.3f} | "
          f"Train Acc: {train_acc:.2f}% | Val Acc: {val_acc:.2f}% | Test Acc: {test_acc:.2f}%")

    # Save the model if it improves on validation accuracy
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_wts = model.state_dict()  # Save the best weights
        torch.save(best_model_wts, 'resnet50lite_best_model.pth')  # Save to file

    # Step scheduler based on validation accuracy (optional)
    scheduler.step(val_acc)

# After training, load the best model
model.load_state_dict(best_model_wts)
print("Best model loaded from saved weights!")

In [None]:
# New convnext with layer norm in it:

import torch
import torch.nn as nn
import torch.nn.functional as F

# Channelwise LayerNorm wrapper
class ChannelwiseLayerNorm(nn.Module):
    def __init__(self, num_channels, eps=1e-6):
        super().__init__()
        self.norm = nn.LayerNorm(num_channels, eps=eps)

    def forward(self, x):
        #Same shape as ConvNext model uses
        # Convert [N, C, H, W] -> [N, H, W, C]
        x = x.permute(0, 2, 3, 1)
        x = self.norm(x)
        # Convert back [N, H, W, C] -> [N, C, H, W]
        return x.permute(0, 3, 1, 2)

# ConvNeXt-like Block
class ConvNextBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample=None, stride=1,
                 use_inverted_bottleneck=False, use_gelu=False,
                 use_large_kernels=False, use_layer_norm=False):
        super(ConvNextBlock, self).__init__()
        self.expansion_ratio = 4
        self.downsample = downsample
        self.use_inverted_bottleneck = use_inverted_bottleneck
        self.use_large_kernels = use_large_kernels

        self.activation = nn.GELU() if use_gelu else nn.ReLU()
        kernel_size = 7 if self.use_large_kernels else 3
        padding = kernel_size // 2

        Norm = lambda c: ChannelwiseLayerNorm(c) if use_layer_norm else nn.BatchNorm2d(c)

        if self.use_inverted_bottleneck:
            expanded_channels = in_channels * self.expansion_ratio

            self.expand = nn.Conv2d(in_channels, expanded_channels, kernel_size=1)
            self.norm1 = Norm(expanded_channels)

            self.depthwise_conv_layer = nn.Conv2d(
                expanded_channels, expanded_channels, kernel_size=kernel_size,
                stride=stride, padding=padding, groups=expanded_channels)
            self.norm2 = Norm(expanded_channels)

            self.project = nn.Conv2d(expanded_channels, out_channels * self.expansion_ratio, kernel_size=1)
            self.norm3 = Norm(out_channels * self.expansion_ratio)
        else:
            self.conv_layer1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
            self.norm1 = Norm(out_channels)

            self.conv_layer2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
            self.norm2 = Norm(out_channels)

            self.conv_layer3 = nn.Conv2d(out_channels, out_channels * self.expansion_ratio, kernel_size=1,
                                         stride=1, padding=0)
            self.norm3 = Norm(out_channels * self.expansion_ratio)

    def forward(self, x):
        identity = x

        if self.use_inverted_bottleneck:
            x = self.expand(x)
            x = self.norm1(x)
            x = self.activation(x)

            x = self.depthwise_conv_layer(x)
            x = self.norm2(x)
            x = self.activation(x)

            x = self.project(x)
            x = self.norm3(x)
        else:
            x = self.conv_layer1(x)
            x = self.norm1(x)
            x = self.activation(x)

            x = self.conv_layer2(x)
            x = self.norm2(x)
            x = self.activation(x)

            x = self.conv_layer3(x)
            x = self.norm3(x)

        if self.downsample is not None:
            identity = self.downsample(identity)

        x += identity
        x = self.activation(x)
        return x


class ConvNextLite(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes=10,
                 use_patchify_stem=False, use_gelu=False,
                 use_inverted_bottleneck=False, use_large_kernels=False,
                 use_layer_norm=False):
        super(ConvNextLite, self).__init__()
        self.use_patchify_stem = use_patchify_stem
        self.use_gelu = use_gelu
        self.use_inverted_bottleneck = use_inverted_bottleneck
        self.use_large_kernels = use_large_kernels
        self.use_layer_norm = use_layer_norm

        Norm = lambda c: ChannelwiseLayerNorm(c) if use_layer_norm else nn.BatchNorm2d(c)

        if self.use_patchify_stem:
            self.conv_layer1 = nn.Conv2d(image_channels, 64, kernel_size=4, stride=4, padding=0)
            self.norm1 = Norm(64)
        else:
            self.conv_layer1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
            self.norm1 = Norm(64)
            self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.activation = nn.GELU() if use_gelu else nn.ReLU()
        self.in_channels = 64

        self.layer1 = self._make_layer(block, layers[0], out_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], out_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], out_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], out_channels=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv_layer1(x)
        x = self.norm1(x)
        x = self.activation(x)

        if not self.use_patchify_stem:
            x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x

    def _make_layer(self, block, num_blocks, out_channels, stride):
        layers = []
        Norm = lambda c: ChannelwiseLayerNorm(c) if self.use_layer_norm else nn.BatchNorm2d(c)

        downsample = None
        if stride != 1 or self.in_channels != out_channels * 4:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * 4, kernel_size=1, stride=stride),
                Norm(out_channels * 4)
            )

        layers.append(block(
            self.in_channels, out_channels, downsample, stride,
            use_inverted_bottleneck=self.use_inverted_bottleneck,
            use_gelu=self.use_gelu,
            use_large_kernels=self.use_large_kernels,
            use_layer_norm=self.use_layer_norm))

        self.in_channels = out_channels * 4
        for _ in range(num_blocks - 1):
            layers.append(block(
                self.in_channels, out_channels,
                use_inverted_bottleneck=self.use_inverted_bottleneck,
                use_gelu=self.use_gelu,
                use_large_kernels=self.use_large_kernels,
                use_layer_norm=self.use_layer_norm))

        return nn.Sequential(*layers)


In [None]:
# Training a ConvNextLite for testing

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

print("Model setup complete")
model = ConvNextLite(ConvNextBlock, [1,1,3,1], 3, use_patchify_stem=True, use_gelu=True, use_inverted_bottleneck=True, use_large_kernels=True, use_layer_norm=True)
model = model.to(device)
print("Model transferred to device")

# model = ConvNextLite(ConvNextBlock, [3,3,9,3], 3, use_patchify_stem = True, use_gelu = True, use_inverted_bottleneck = True, use_large_kernels = True, use_layer_norm = True)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Define scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=2, factor=0.5)


# Accuracy function
def compute_accuracy(loader):
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y).sum().item()
            total += y.size(0)
    return 100 * correct / total


# Variables to track best performance
best_val_acc = 0.0
best_model_wts = None

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    # Compute accuracies
    train_acc = compute_accuracy(train_loader)
    val_acc = compute_accuracy(val_loader)
    test_acc = compute_accuracy(test_loader)

    # Print results
    print(f"Epoch {epoch+1}/{num_epochs} | Loss: {running_loss:.3f} | "
          f"Train Acc: {train_acc:.2f}% | Val Acc: {val_acc:.2f}% | Test Acc: {test_acc:.2f}%")

    # Save the model if it improves on validation accuracy
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_wts = model.state_dict()  # Save the best weights
        torch.save(best_model_wts, 'all_ConvNextLite_model.pth')  # Save to file

    # Step scheduler based on validation accuracy
    scheduler.step(val_acc)

# After training, load the best model
model.load_state_dict(best_model_wts)
print("Best model loaded from saved weights!")