In [2]:
import torch
import numpy as np
import torchvision
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler
import time



In [18]:
transform = torchvision.transforms.Compose([
    torchvision.transforms.Resize((224, 224)),  # Resize images to 32x32 pixels
    torchvision.transforms.ToTensor(),        # Convert images to PyTorch tensors
    torchvision.transforms.Normalize(mean=(0.5,), std=(0.5,))  # Normalize with mean=0.5, std=0.5
])

train_dataset = torchvision.datasets.MNIST(
    root='/home/kami/Documents/datasets/',
    train=True,
    download=False,
    transform=transform
)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=True
)

# Example: Iterate through one batch to verify the data
for images, labels in train_loader:
    print(f"Batch shape: {images.shape}")  # Should be [64, 1, 32, 32] (batch, channels, height, width)
    print(f"Labels shape: {labels.shape}")  # Should be [64]
    print(f"Image tensor min: {images.min()}, max: {images.max()}")  # Check normalization
    break  # Only print the first batch




Batch shape: torch.Size([64, 1, 32, 32])
Labels shape: torch.Size([64])
Image tensor min: -1.0, max: 1.0


In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU())
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
                        nn.BatchNorm2d(out_channels))
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.out_channels = out_channels

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes = 10, in_channels = 1):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, 64, kernel_size = 7, stride = 2, padding = 3),
                        nn.BatchNorm2d(64),
                        nn.ReLU())
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride = 2)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:

            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        print("1:",x.size())
        x = self.conv1(x)
        print("2:",x.size())
        x = self.maxpool(x)
        print("3:",x.size())
        x = self.layer0(x)
        print("4:",x.size())
        x = self.layer1(x)
        print("5:",x.size())
        x = self.layer2(x)
        print("6:",x.size())
        x = self.layer3(x)
        print("7:",x.size())
        x = self.avgpool(x)
        print("8:",x.size())
        x = x.view(x.size(0), -1)
        print("9:",x.size())
        x = self.fc(x)
        print("10:",x.size())

        return x

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = ResNet(ResidualBlock, [3, 4, 6, 3] ,10 , 1).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, weight_decay = 0.005, momentum = 0.9)

st = time.time()
# Train for 10 epochs
model.train()
for epoch in range(10):
    batch_index = 0
    for images, labels in train_loader:
        # Move data to device
        images = images.to(device)
        labels = labels.to(device)

        # Zero out gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Log loss every 100 batches
        batch_index += 1
        if batch_index % 100 == 0:
            print(f"Epoch: {epoch} | Batch: {batch_index} | Loss: {loss.item():.4f}")

et = time.time()
print(et-st)
print("Training completed.")