In [1]:
from functools import partial
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms.v2 as T

device = "cuda" if torch.cuda.is_available() else "cpu"

In [2]:
device

'cuda'

In [3]:
def train(model, optimizer, criterion, train_loader, n_epochs,device):
    model.train()
    for epoch in range(n_epochs):
        total_loss = 0.
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        mean_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch + 1}/{n_epochs}, Loss: {mean_loss:.4f}")


def evaluate(model, data_loader, metric_fn, aggregate_fn=torch.mean):
    model.eval()
    metrics = []
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            metric = metric_fn(y_pred, y_batch)
            metrics.append(metric.cpu())
    return aggregate_fn(torch.stack(metrics))

In [4]:
toTensor = T.Compose([T.ToImage(), T.ToDtype(torch.float32, scale=True)])

train_and_valid_data = torchvision.datasets.FashionMNIST(
    root="datasets", train=True, transform=toTensor, download=True
)

test_data = torchvision.datasets.FashionMNIST(
    root="datasets", train=False, transform=toTensor, download=True
)

torch.manual_seed(42)
train_data, valid_data = torch.utils.data.random_split(
    train_and_valid_data, [55_000, 5_000]
)

train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_data, batch_size=32)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=32)




100%|██████████| 26.4M/26.4M [00:02<00:00, 10.6MB/s]
100%|██████████| 29.5k/29.5k [00:00<00:00, 205kB/s]
100%|██████████| 4.42M/4.42M [00:01<00:00, 3.82MB/s]
100%|██████████| 5.15k/5.15k [00:00<00:00, 27.6MB/s]


In [5]:
class AlexNet(nn.Module):
    def __init__(self, in_channels, out_channels, num_classes=1000):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=out_channels, out_channels=192, kernel_size=5, stride=1, padding="same"),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=192, out_channels=384, kernel_size=3, stride=1, padding="same"),
            nn.ReLU(),
            nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, stride=1, padding="same"),
            nn.ReLU(),
            nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, stride=1, padding="same"),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.AdaptiveAvgPool2d((6, 6)),
            nn.Flatten(),
            nn.Linear(in_features=256*6*6, out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=4096, out_features=num_classes),
            # nn.Softmax(dim=1)

        )

    def forward(self, x):
        return self.model(x)

In [6]:
a = next(iter(train_loader))

In [7]:
a[0].shape, a[1].shape

(torch.Size([32, 1, 28, 28]), torch.Size([32]))

In [8]:
epochs = 20
model = AlexNet(in_channels=1, out_channels=96, num_classes=10).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train(model, optimizer, criterion, train_loader, epochs, device)

Epoch 1/20, Loss: 0.6037
Epoch 2/20, Loss: 0.3792
Epoch 3/20, Loss: 0.3403
Epoch 4/20, Loss: 0.3172
Epoch 5/20, Loss: 0.3004
Epoch 6/20, Loss: 0.2854
Epoch 7/20, Loss: 0.2814
Epoch 8/20, Loss: 0.2732
Epoch 9/20, Loss: 0.2720
Epoch 10/20, Loss: 0.2547
Epoch 11/20, Loss: 0.2518
Epoch 12/20, Loss: 0.2533
Epoch 13/20, Loss: 0.2588
Epoch 14/20, Loss: 0.2435
Epoch 15/20, Loss: 0.2585
Epoch 16/20, Loss: 0.2477
Epoch 17/20, Loss: 0.2345
Epoch 18/20, Loss: 0.2438
Epoch 19/20, Loss: 0.2325
Epoch 20/20, Loss: 0.2336


In [12]:
import torchmetrics

accuracy_fn = torchmetrics.Accuracy(task="multiclass", num_classes=10).to(device)

print(f"Training Accuracy: {evaluate(model, train_loader, accuracy_fn):.4f}")
print(f"Validation Accuracy: {evaluate(model, valid_loader, accuracy_fn):.4f}")
print(f"Test Accuracy: {evaluate(model, test_loader, accuracy_fn):.4f}")

Training Accuracy: 0.9294
Validation Accuracy: 0.9015
Test Accuracy: 0.9004


# GoogLeNet

In [5]:
class Inception(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.most_right = nn.Sequential(
            nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding="same"),
            nn.ReLU(),
        )
        self.right = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding="same"),
            nn.ReLU(),
            nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=5, stride=1, padding="same"),
            nn.ReLU(),
        )
        self.left = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding="same"),
            nn.ReLU(),
            nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding="same"),
            nn.ReLU(),
        )
        self.most_left = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding="same"),
            nn.ReLU(),
        )
        

    def forward(self, x):
        most_right = self.most_right(x)
        right = self.right(x)
        left = self.left(x)
        most_left = self.most_left(x)
        return torch.cat([most_right, right, left, most_left], dim=1)
        


In [6]:
class GoogLeNet(nn.Module):
    def __init__(self, in_channels, out_channels, num_classes=1000):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=7, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(out_channels),
            nn.Conv2d(in_channels=out_channels, out_channels=192, kernel_size=1, stride=1, padding="same"),
            nn.ReLU(),
            nn.Conv2d(in_channels=192, out_channels=192, kernel_size=3, stride=1, padding="same"),
            nn.ReLU(),
            nn.BatchNorm2d(192),
            nn.MaxPool2d(kernel_size=3, stride=2),
            Inception(in_channels=192, out_channels=64),
            Inception(in_channels=256, out_channels=120),
            
            nn.MaxPool2d(kernel_size=3, stride=2),
            

            Inception(in_channels=480, out_channels=128),  # -> (B, 512, 3, 3)
            Inception(in_channels=512, out_channels=128),  # -> (B, 512, 3, 3)
            Inception(in_channels=512, out_channels=208),  # -> (B, 832, 3, 3)


            nn.AdaptiveAvgPool2d((1, 1)),  # -> (B, C, 1, 1)
            nn.Flatten(),                  # -> (B, C)
            nn.Dropout(p=0.4),
            nn.Linear(832, num_classes)    # logits (NO softmax)
        )

    def forward(self, x):
        return self.model(x)

In [36]:
sample = next(iter(train_loader))
sample[0].shape, sample[1].shape

(torch.Size([32, 1, 28, 28]), torch.Size([32]))

In [37]:
example = GoogLeNet(in_channels=1, out_channels=96, num_classes=10).to(device)
example(sample[0].to(device)).shape

torch.Size([32, 10])

In [38]:
example = GoogLeNet(in_channels=1, out_channels=96, num_classes=10).to(device)
example(sample[0].to(device)).shape

torch.Size([32, 10])

In [7]:
epochs = 20
model = GoogLeNet(in_channels=1, out_channels=96, num_classes=10).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
train(model, optimizer, criterion, train_loader, epochs, device)

Epoch 1/20, Loss: 0.5505
Epoch 2/20, Loss: 0.3456
Epoch 3/20, Loss: 0.3001
Epoch 4/20, Loss: 0.2680
Epoch 5/20, Loss: 0.2502
Epoch 6/20, Loss: 0.2354
Epoch 7/20, Loss: 0.2227
Epoch 8/20, Loss: 0.2051
Epoch 9/20, Loss: 0.1985
Epoch 10/20, Loss: 0.1913
Epoch 11/20, Loss: 0.1777
Epoch 12/20, Loss: 0.1683
Epoch 13/20, Loss: 0.1631
Epoch 14/20, Loss: 0.1549
Epoch 15/20, Loss: 0.1442
Epoch 16/20, Loss: 0.1460
Epoch 17/20, Loss: 0.1316
Epoch 18/20, Loss: 0.1333
Epoch 19/20, Loss: 0.1250
Epoch 20/20, Loss: 0.1214


In [9]:
import torchmetrics

accuracy_fn = torchmetrics.Accuracy(task="multiclass", num_classes=10).to(device)

print(f"Training Accuracy: {evaluate(model, train_loader, accuracy_fn):.4f}")
print(f"Validation Accuracy: {evaluate(model, valid_loader, accuracy_fn):.4f}")
print(f"Test Accuracy: {evaluate(model, test_loader, accuracy_fn):.4f}")

Training Accuracy: 0.9616
Validation Accuracy: 0.9122
Test Accuracy: 0.9144


# ResNet

In [5]:
# class residual_unit(nn.Module):
#     def __init__(self, in_channels, out_channels, stride=1):
#         super().__init__()
#         self.DefaultConv2d = partial(
#             nn.Conv2d, kernel_size=3, stride=stride, padding=1, bias=False
#         )
        
#         self.main_layers = nn.Sequential(
#             self.DefaultConv2d(in_channels, out_channels, stride=stride),
#             nn.BatchNorm2d(out_channels),
#             nn.ReLU(),
#             self.DefaultConv2d(out_channels, out_channels),
#             nn.BatchNorm2d(out_channels)
#         )

#         if stride > 1:
#             self.skip_connection = nn.Sequential(
#                 self.DefaultConv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0),
#                 nn.BatchNorm2d(out_channels)
#             )
#         else:
#             self.skip_connection = nn.Identity()

#     def forward(self, x):
#         return F.relu(self.main_layers(x) + self.skip_connection(x))
    




class residual_unit(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        # Only fix kernel_size/padding/bias here (NOT stride)
        DefaultConv3x3 = partial(nn.Conv2d, kernel_size=3, padding=1, bias=False)

        self.main_layers = nn.Sequential(
            DefaultConv3x3(in_channels, out_channels, stride=stride),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),

            DefaultConv3x3(out_channels, out_channels, stride=1),
            nn.BatchNorm2d(out_channels)
        )

        # Projection shortcut if shape changes (stride or channels)
        if stride != 1 or in_channels != out_channels:
            self.skip_connection = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.skip_connection = nn.Identity()

    def forward(self, x):
        out = self.main_layers(x)
        skip = self.skip_connection(x)
        return F.relu(out + skip)

In [6]:
class Resnet(nn.Module):
    def __init__(self):
        super().__init__()
        layers = [
            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),

        ]

        prev_filters = 64

        for filters in [64] * 3 + [128] * 4 + [256] * 6 + [512] * 3:
            layers.append(residual_unit(prev_filters, filters, stride=1 if filters == prev_filters else 2))
            prev_filters = filters

        layers+=[
            nn.AdaptiveAvgPool2d(output_size=1),
            nn.Flatten(),
            nn.LazyLinear(10)
        ]

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [7]:
model = Resnet().to(device)

In [8]:
smaple = next(iter(train_loader))
model(smaple[0].to(device)).shape

torch.Size([32, 10])

In [9]:
model 

Resnet(
  (model): Sequential(
    (0): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): residual_unit(
      (main_layers): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (skip_connection): Identity()
    )
    (5): residual_unit(
      (main_layers): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05,

In [10]:
n_epochs = 20
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [11]:
train(model, optimizer, criterion, train_loader, n_epochs, device)

Epoch 1/20, Loss: 0.5140
Epoch 2/20, Loss: 0.3555
Epoch 3/20, Loss: 0.3088
Epoch 4/20, Loss: 0.2922
Epoch 5/20, Loss: 0.2580
Epoch 6/20, Loss: 0.2406
Epoch 7/20, Loss: 0.2195
Epoch 8/20, Loss: 0.2105
Epoch 9/20, Loss: 0.1856
Epoch 10/20, Loss: 0.1696
Epoch 11/20, Loss: 0.1549
Epoch 12/20, Loss: 0.1460
Epoch 13/20, Loss: 0.1307
Epoch 14/20, Loss: 0.1209
Epoch 15/20, Loss: 0.1092
Epoch 16/20, Loss: 0.0995
Epoch 17/20, Loss: 0.0954
Epoch 18/20, Loss: 0.0827
Epoch 19/20, Loss: 0.0780
Epoch 20/20, Loss: 0.0740


In [12]:
!pip install torchmetrics

Collecting torchmetrics
  Downloading torchmetrics-1.8.2-py3-none-any.whl.metadata (22 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.15.2-py3-none-any.whl.metadata (5.7 kB)
Downloading torchmetrics-1.8.2-py3-none-any.whl (983 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m983.2/983.2 kB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hDownloading lightning_utilities-0.15.2-py3-none-any.whl (29 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.15.2 torchmetrics-1.8.2


In [13]:
import torchmetrics

accuracy_fn = torchmetrics.Accuracy(task="multiclass", num_classes=10).to(device)

print(f"Training Accuracy: {evaluate(model, train_loader, accuracy_fn):.4f}")
print(f"Validation Accuracy: {evaluate(model, valid_loader, accuracy_fn):.4f}")
print(f"Test Accuracy: {evaluate(model, test_loader, accuracy_fn):.4f}")

Training Accuracy: 0.9843
Validation Accuracy: 0.9170
Test Accuracy: 0.9122
