In [5]:
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, Subset, random_split

In [12]:
# VGG11 model with features and classifier
class VGG11(nn.Module):
    def __init__(self):
        super(VGG11, self).__init__()

        # Define convolutional layers (self.features)
        self.features = nn.Sequential(
            # Input Size: (1, 32, 32)
            
            # 1. Conv(001, 064, 3, 1, 1) - BatchNorm(064) - ReLU - MaxPool(2, 2)
            nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Output Size: (64, 16, 16)

            # 2. Conv(064, 128, 3, 1, 1) - BatchNorm(128) - ReLU - MaxPool(2, 2)
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Output Size: (128, 8, 8)

            # 3. Conv(128, 256, 3, 1, 1) - BatchNorm(256) - ReLU
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            # Output Size: (256, 8, 8)

            # 4. Conv(256, 256, 3, 1, 1) - BatchNorm(256) - ReLU - MaxPool(2, 2)
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # 5. Conv(256, 512, 3, 1, 1) - BatchNorm(512) - ReLU
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),

            # 6. Conv(512, 512, 3, 1, 1) - BatchNorm(512) - ReLU - MaxPool(2, 2)
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # 7. Conv(512, 512, 3, 1, 1) - BatchNorm(512) - ReLU
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # 8. Conv(512, 512, 3, 1, 1) - BatchNorm(512) - ReLU - MaxPool(2, 2)
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Define fully connected layers (self.classifier)
        self.classifier = nn.Sequential(
            nn.Linear(512, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(4096, 10)
        )

    def forward(self, x):
        x = x.view(x.size(0), 1, 32, 32)
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [15]:
def resize_batch(imgs):
    imgs = imgs.numpy() 
    imgs = imgs.reshape((-1, 28, 28, 1))
    resized_imgs = np.zeros((imgs.shape[0], 32, 32, 1))
    for i in range(imgs.shape[0]):
        # Resize using numpy
        resized_imgs[i, ..., 0] = np.pad(imgs[i, ..., 0], ((2, 2), (2, 2)), mode='constant')
    return resized_imgs

# load and preprocess data
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor()
])

# load data
root = 'data'
labels = ['train', 'test']
dataset = {'train': MNIST(root=root, train=True, download=True, transform=transform), 
            'test': MNIST(root=root, train=False, download=True, transform=transform)}

# resize images to (32, 32)
for label in labels:
    dataset[label].data = resize_batch(dataset[label].data)

# create dicts for storing sampled data
X = {'train': [], 'test': []}
y = {'train': [], 'test': []}

classes = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

# for the training and test datasets
for label in labels:
    # sample 600 points for each class
    for c in classes:
        subset_idx = torch.isin(dataset[label].targets, torch.as_tensor(c))
        # convert to tensor
        X[label].append(torch.tensor(dataset[label].data[subset_idx][:600]).float())
        y[label].append(torch.tensor(dataset[label].targets[subset_idx][:600]).long()) 
    
    # concatenate along the first dimension
    X[label] = torch.cat(X[label], dim=0)
    y[label] = torch.cat(y[label], dim=0)    
    
print(X['train'].shape)
print(X['test'].shape)
print(y['train'].shape)
print(y['test'].shape)

# # create dicts for storing sampled data subsets
# subset = {'train': [], 'test': []}
# indices = {'train': [], 'test': []}

# # for the training and test datasets
# for label in labels:
#     # sample 600 points for each class
#     for c in range(10):
#         idx = [i for i, lbl in enumerate(dataset[label].targets) if lbl == c]
#         indices[label].extend(idx[:600])

# # resize images to (32, 32)
# for label in labels:
#     dataset[label].data = torch.tensor(resize_batch(dataset[label].data))
        
# # create Subset datasets using the sampled indices
# subset['train'] = Subset(dataset['train'], indices['train'])
# subset['test'] = Subset(dataset['test'], indices['test'])

# print(len(subset['train']))

# # set up DataLoader
# train_loader = DataLoader(subset['train'], batch_size=64, shuffle=True)
# test_loader = DataLoader(subset['test'], batch_size=64, shuffle=False)

  y[label].append(torch.tensor(dataset[label].targets[subset_idx][:600]).long())


torch.Size([6000, 32, 32, 1])
torch.Size([6000, 32, 32, 1])
torch.Size([6000])
torch.Size([6000])


In [16]:
# initialize model, loss function, and optimizer
model = VGG11()
# print(model)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

n_epochs = 5
batch_size = 64

# grabbing training data
x = X['train']
y = y['train']

# gradient descent to train
for epoch in range(n_epochs):
    start = time.time()
    for i in range(0, len(x), batch_size):
        x_batch = x[i:i+batch_size]
        y_batch = y[i:i+batch_size]
        
        y_pred = model(x_batch)
        loss = loss_fn(y_pred, y_batch)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    end = time.time()
    print(f'Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}, Time elapsed: {(end - start):.4f}s')

# compute accuracy
y_pred = model(x)
_, predicted = torch.max(y_pred, dim=1)
accuracy = (predicted == y).float().mean()
print(f'Accuracy = {accuracy:.4f}')

# for epoch in range(n_epochs):
#     model.train()
#     for batch in train_loader:
#         inputs, labels = batch
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         loss = loss_fn(outputs, labels)
#         loss.backward()
#         optimizer.step()

#     # Print training statistics
#     print(f'Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}')

# # Evaluation
# model.eval()
# correct = 0
# total = 0

# with torch.no_grad():
#     for batch in test_loader:
#         inputs, labels = batch
#         outputs = model(inputs)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

# accuracy = correct / total
# print(f'Test Accuracy: {accuracy:.4f}')

Epoch [1/5], Loss: 2.4455, Time elapsed: 60.5096s


KeyboardInterrupt: 