In [1]:
import pickle
import torch
import numpy as np
import os

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def load_cifar10_data(path='Data/cifar-10-batches-py'):
    all_images = []
    all_labels = []

    # Iterate over all batch files
    for batch in range(1, 6):  # CIFAR-10 has 5 training batches
        batch_file = os.path.join(path, f'data_batch_{batch}')
        data_dict = unpickle(batch_file)

        # Extract data and labels
        data = data_dict[b'data']
        labels = data_dict[b'labels']

        # Convert data to torch tensors and reshape to image format
        data = data.reshape(10000, 3, 32, 32)  # Shape (10000, 3072) to (10000, 3, 32, 32)
        data = torch.FloatTensor(data) / 255  # Normalize to [0, 1]

        # Convert labels to tensor
        labels = torch.tensor(labels, dtype=torch.long)

        # Append to lists
        all_images.append(data)
        all_labels.append(labels)

    # Concatenate all images and labels into final tensors
    images = torch.cat(all_images)
    labels = torch.cat(all_labels)

    return images, labels

xs, ys = load_cifar10_data()

len(xs), len(ys), xs.shape

(50000, 50000, torch.Size([50000, 3, 32, 32]))

In [2]:
loader = torch.utils.data.DataLoader(dataset=list(zip(xs, ys)),
                                     batch_size=64,
                                     shuffle=True,
                                     drop_last=True)

# Accessing a batch from the DataLoader
x, y = next(iter(loader))

# Getting DataLoader length, batch shape, and label
len(loader), x.shape, y

(781,
 torch.Size([64, 3, 32, 32]),
 tensor([4, 1, 1, 5, 5, 2, 8, 6, 1, 3, 7, 3, 4, 4, 8, 4, 8, 7, 8, 9, 9, 6, 1, 2,
         1, 2, 4, 9, 0, 8, 3, 9, 3, 6, 8, 5, 3, 2, 4, 3, 7, 4, 5, 7, 2, 3, 9, 1,
         9, 7, 1, 2, 6, 9, 3, 7, 0, 9, 8, 6, 0, 2, 3, 5]))

In [3]:
# CNN Neural Network
class Model(torch.nn.Module):

    def __init__(self):
        super().__init__()

        # Convolutional layer 1
        self.cnn1 = torch.nn.Conv2d(in_channels=3,
                                    out_channels=16,
                                    kernel_size=5,
                                    stride=2,
                                    padding=0)

        # Convolutional layer 2
        self.cnn2 = torch.nn.Conv2d(in_channels=16,
                                    out_channels=32,
                                    kernel_size=3,
                                    stride=1,
                                    padding=1)

        # Convolutional layer 3
        self.cnn3 = torch.nn.Conv2d(in_channels=32,
                                    out_channels=128,
                                    kernel_size=7,
                                    stride=1,
                                    padding=0)

        # Pooling layer
        self.pool = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        # Activation function
        self.relu = torch.nn.ReLU()

        # Fully connected layer
        self.fc = torch.nn.Linear(in_features=128, out_features=10)

    def forward(self, x):
        # First convolution
        # Input: [8, 3, 32, 32] -> Output: [8, 16, 14, 14]
        x = self.cnn1(x)
        x = self.relu(x)

        # Second convolution
        # Input: [8, 16, 14, 14] -> Output: [8, 32, 14, 14]
        x = self.cnn2(x)
        x = self.relu(x)

        # Pooling layer
        # Input: [8, 32, 14, 14] -> Output: [8, 32, 7, 7]
        x = self.pool(x)

        # Third convolution
        # Input: [8, 32, 7, 7] -> Output: [8, 128, 1, 1]
        x = self.cnn3(x)
        x = self.relu(x)

        # Flatten the tensor
        # Input: [8, 128, 1, 1] -> Output: [8, 128]
        x = x.flatten(start_dim=1)

        # Linear output
        # Input: [8, 128] -> Output: [8, 10]
        return self.fc(x)

# Instantiate the model
model = Model()

# Test forward pass
output = model(torch.randn(8, 3, 32, 32))
output.shape

torch.Size([8, 10])

In [4]:
# Training
def train():
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fun = torch.nn.CrossEntropyLoss()
    model.train()

    for epoch in range(20):
        for i, (x, y) in enumerate(loader):
            out = model(x)
            loss = loss_fun(out, y)

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            if i % 500 == 0:
                acc = (out.argmax(dim=1) == y).sum().item() / len(y)
                print(epoch, i, loss.item(), acc)

    torch.save(model, 'image_model.model')

train()

0 0 2.2970781326293945 0.109375
0 500 1.6805511713027954 0.421875
1 0 1.2635968923568726 0.53125
1 500 1.3278388977050781 0.53125
2 0 0.9698949456214905 0.65625
2 500 1.5373528003692627 0.484375
3 0 1.0896217823028564 0.65625
3 500 0.9602572321891785 0.71875
4 0 0.9210845232009888 0.625
4 500 1.0400537252426147 0.5625
5 0 0.7237127423286438 0.765625
5 500 0.8189454674720764 0.734375
6 0 0.943873405456543 0.6875
6 500 0.8325253129005432 0.703125
7 0 0.7340182065963745 0.703125
7 500 0.534659743309021 0.84375
8 0 0.6964947581291199 0.71875
8 500 0.6476435661315918 0.78125
9 0 1.0015063285827637 0.671875
9 500 0.687664270401001 0.796875
10 0 0.472769170999527 0.84375
10 500 0.5554885268211365 0.8125
11 0 0.5426962971687317 0.828125
11 500 0.7791496515274048 0.75
12 0 0.46159234642982483 0.828125
12 500 0.7271028161048889 0.78125
13 0 0.5349158048629761 0.8125
13 500 0.431968092918396 0.84375
14 0 0.5033153891563416 0.84375
14 500 0.46171948313713074 0.828125
15 0 0.3923768997192383 0.875


In [5]:
# Testing the model on the last 10000 samples (test set)
def test():
    test_data = unpickle('Data/cifar-10-batches-py/test_batch')
    test_images = test_data[b'data']
    test_labels = test_data[b'labels']

    # Convert test data to torch tensors
    test_images = torch.FloatTensor(test_images.reshape(10000, 3, 32, 32)) / 255
    test_labels = torch.tensor(test_labels, dtype=torch.long)

    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        test_loader = torch.utils.data.DataLoader(dataset=list(zip(test_images, test_labels)), batch_size=64, shuffle=False)
        for x, y in test_loader:
            out = model(x)
            _, predicted = torch.max(out, 1)
            total += y.size(0)
            correct += (predicted == y).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

test()

Test Accuracy: 64.51%
