<a href="https://colab.research.google.com/github/raki-rankawat/pytorch-mobilenet/blob/main/CIFAR10_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import time
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [2]:
# -----------------------------
# Data Loaders
# -----------------------------

batch_size = 64

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=test_transform)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

100%|██████████| 170M/170M [00:12<00:00, 13.3MB/s]


In [3]:
# -----------------------------
# MobileNetV2 Blocks
# -----------------------------

class InvertedResidual(nn.Module):
    def __init__(self, in_channels, out_channels, stride, expand_ratio):
        super().__init__()
        hidden_dim = int(in_channels * expand_ratio)
        self.use_res_connect = stride == 1 and in_channels == out_channels

        layers = []

        # Expansion
        if expand_ratio != 1:
            layers.append(nn.Conv2d(in_channels, hidden_dim, 1, bias=False))
            layers.append(nn.BatchNorm2d(hidden_dim))
            layers.append(nn.ReLU6(inplace=True))

        # Depthwise
        layers.append(nn.Conv2d(
            hidden_dim, hidden_dim, 3,
            stride=stride, padding=1,
            groups=hidden_dim, bias=False
        ))
        layers.append(nn.BatchNorm2d(hidden_dim))
        layers.append(nn.ReLU6(inplace=True))

        # Projection
        layers.append(nn.Conv2d(hidden_dim, out_channels, 1, bias=False))
        layers.append(nn.BatchNorm2d(out_channels))

        self.conv = nn.Sequential(*layers)

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)

In [4]:
# -----------------------------
# MobileNetV2 Model (CIFAR)
# -----------------------------

class CIFARMobileNetV2(nn.Module):
    def __init__(self, num_classes=10, width_mult=1.0):
        super().__init__()

        input_channel = int(32 * width_mult)

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, input_channel, 3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(input_channel),
            nn.ReLU6(inplace=True)
        )

        # t, c, n, s
        cfg = [
            (1, 16, 1, 1),
            (6, 24, 2, 1),
            (6, 32, 3, 2),
            (6, 64, 4, 2),
            (6, 96, 3, 1),
            (6, 160, 3, 2),
            (6, 320, 1, 1),
        ]

        layers = []
        in_channels = input_channel

        for t, c, n, s in cfg:
            out_channels = int(c * width_mult)
            for i in range(n):
                stride = s if i == 0 else 1
                layers.append(
                    InvertedResidual(in_channels, out_channels, stride, t)
                )
                in_channels = out_channels

        self.blocks = nn.Sequential(*layers)

        last_channel = int(1280 * width_mult)

        self.conv_last = nn.Sequential(
            nn.Conv2d(in_channels, last_channel, 1, bias=False),
            nn.BatchNorm2d(last_channel),
            nn.ReLU6(inplace=True)
        )

        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(last_channel, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.blocks(x)
        x = self.conv_last(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [5]:
# -----------------------------
# Setup
# -----------------------------

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(41)

model = CIFARMobileNetV2(width_mult=1.0).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

In [6]:
# -----------------------------
# Training Function
# -----------------------------

def train(model, loader, criterion, optimizer, device):
    model.train()

    correct = 0
    total = 0
    running_loss = 0

    for X, y in loader:
        X = X.to(device)
        y = y.to(device)

        outputs = model(X)
        loss = criterion(outputs, y)

        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        batch_size = y.size(0)
        running_loss += loss.item() * batch_size
        preds = outputs.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += batch_size

    avg_loss = running_loss / total
    accuracy = correct / total

    return avg_loss, accuracy

In [7]:
# -----------------------------
# Testing Function
# -----------------------------

def test(model, loader, criterion):
    model.eval()

    correct = 0
    total = 0
    running_loss = 0

    with torch.no_grad():
        for X, y in loader:
            X = X.to(device)
            y = y.to(device)

            outputs = model(X)
            loss = criterion(outputs, y)

            batch_size = y.size(0)
            running_loss += loss.item() * batch_size
            preds = outputs.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += batch_size

    avg_loss = running_loss / total
    accuracy = correct / total

    return avg_loss, accuracy

In [8]:
# -----------------------------
# Training Loop
# -----------------------------

epochs = 20
start_time = time.time()

for epoch in range(1, epochs + 1):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    test_loss, test_acc = test(model, test_loader, criterion)

    print(
        f"Epoch: {epoch}/{epochs} | "
        f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc * 100:.2f}% | "
        f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc * 100:.2f}%"
    )

print(f"Training Time: {(time.time() - start_time) / 60:.2f} minutes!")

Epoch: 1/20 | Train Loss: 1.6083 | Train Acc: 41.10% | Test Loss: 1.2611 | Test Acc: 55.28%
Epoch: 2/20 | Train Loss: 1.1597 | Train Acc: 58.36% | Test Loss: 0.9461 | Test Acc: 66.50%
Epoch: 3/20 | Train Loss: 0.9239 | Train Acc: 67.60% | Test Loss: 0.8576 | Test Acc: 70.46%
Epoch: 4/20 | Train Loss: 0.7893 | Train Acc: 72.32% | Test Loss: 0.6966 | Test Acc: 75.33%
Epoch: 5/20 | Train Loss: 0.7059 | Train Acc: 75.70% | Test Loss: 0.6901 | Test Acc: 76.13%
Epoch: 6/20 | Train Loss: 0.6427 | Train Acc: 77.56% | Test Loss: 0.5958 | Test Acc: 79.71%
Epoch: 7/20 | Train Loss: 0.6004 | Train Acc: 79.21% | Test Loss: 0.5712 | Test Acc: 80.23%
Epoch: 8/20 | Train Loss: 0.5602 | Train Acc: 80.58% | Test Loss: 0.5700 | Test Acc: 80.42%
Epoch: 9/20 | Train Loss: 0.5218 | Train Acc: 81.99% | Test Loss: 0.5454 | Test Acc: 81.66%
Epoch: 10/20 | Train Loss: 0.4975 | Train Acc: 82.86% | Test Loss: 0.5425 | Test Acc: 81.42%
Epoch: 11/20 | Train Loss: 0.4644 | Train Acc: 84.06% | Test Loss: 0.4664 | Tes

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
torch.save(model.state_dict(), "/content/drive/My Drive/Colab Notebooks/cifar10_model.pth")
print("✅ Model saved as cifar10_model.pth")