<a href="https://www.kaggle.com/code/mgzotero/mnist-pytorch?scriptVersionId=138948526" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
!python3 --version

In [None]:
!pwd

In [None]:
!ls /kaggle/input/digit-recognizer

# Packages

In [None]:
import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

from matplotlib import pyplot as plt

from sklearn.model_selection import train_test_split

import PIL
import os

np.random.seed(111)
random_state = 111

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
torch.cuda.get_device_name()

# EDA & Preprocessing

In [None]:
path = "/kaggle/input/digit-recognizer"

train = pd.read_csv(os.path.join(path, "train.csv"))
test = pd.read_csv(os.path.join(path, "test.csv"))

In [None]:
train.head(3)

In [None]:
test.head(3)

These are image matrices that have been transformed into unidimensional vectors, assuming they're square-shaped, we can reshape these arrays into $28 \times 28$ matrices ($\sqrt{784} = 28$).

In [None]:
dims = (28, 28)

example = train.values[0, 1:].reshape(dims)  # first column is the label field

example.shape

In [None]:
fig, ax = plt.subplots()

ax.imshow(example, cmap="gray")

fig.show()

We can select a few more images to show.  (Why not?).

In [None]:
n_imgs = 5
imgs = []

for _ in range(n_imgs):
    img_idx = np.random.randint(0, len(train))
    imgs.append(train.values[img_idx, 1:].reshape(dims))

fig, axes = plt.subplots(ncols=n_imgs)
axes = axes.flatten()

for idx, ax in enumerate(axes):
    ax.imshow(imgs[idx], cmap="gray")
    ax.set_xticks([]); ax.set_yticks([])
    
fig.tight_layout()
fig.show()

In [None]:
fig, ax = plt.subplots()

train.label.value_counts().sort_index().plot.bar(ax=ax, rot=0)

ax.set_title("Frequency of Labels")
ax.set_xlabel("Label"); ax.set_ylabel("Frequency")

fig.show()

In [None]:
def get_image_matrix(df: pd.DataFrame,
                     with_labels: bool = True,
                     dims: tuple[int] = (28, 28)) -> pd.DataFrame:
    """
    Receives a dataset of vectorized images and returns
    a dataset of the corresponding image matrices.
    """
    
    if with_labels:
        new_df = df[["label"]].copy(deep=True)
        new_df["img"] = df.iloc[:, 1:].apply(lambda row: row.values.reshape(dims), axis=1)
    else:
        new_df = pd.DataFrame(
            {"img": df.apply(lambda row: row.values.reshape(dims), axis=1)}
        )
    
    return new_df

In [None]:
df = get_image_matrix(train)

df.head(3)

In [None]:
get_image_matrix(test, with_labels=False).head(3)

In [None]:
fig, ax = plt.subplots()

n_img = np.random.randint(0, len(df))
ax.imshow(df.img[n_img], cmap="gray")
ax.set_xticks([]); ax.set_yticks([])

ax.set_title(f"Label of the Image: {df.label[n_img]}")

fig.show()

In [None]:
class ImageDataset(Dataset):
    def __init__(self, raw_dataframe, with_labels=True, transform=get_image_matrix, normalized=False):
        self.raw_dataframe = raw_dataframe
        self.with_labels = with_labels
        self.dataframe = transform(self.raw_dataframe, with_labels=self.with_labels).values
        if with_labels:
            self.dataframe[:, 1:] = self.dataframe[:, 1:] / 255 if not normalized else self.dataframe[:, 1:]
        else:
            self.dataframe = self.dataframe / 255 if not normalized else self.dataframe
        
    def __getitem__(self, idx):
        if self.with_labels:
            img = torch.tensor(self.dataframe[idx, 1], dtype=torch.float32, device=device)
            label = torch.tensor(self.dataframe[idx, 0], dtype=torch.int64, device=device)
            return img, label
        else:
            img = torch.tensor(self.dataframe[idx, 0], dtype=torch.float32, device=device)
            return img
    
    def __len__(self):
        return len(self.dataframe)

In [None]:
train, val = train_test_split(train, test_size=0.8, random_state=random_state)

In [None]:
train_data = ImageDataset(train)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

val_data = ImageDataset(val)
val_loader = DataLoader(val_data, batch_size=64, shuffle=True)

test_data = ImageDataset(test, with_labels=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

In [None]:
# class CNN(nn.Module):
#     def __init__(self):
#         super(CNN, self).__init__()
#         self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(3, 3))
#         self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3))
#         self.fc = nn.Linear(64 * 24 * 24, 10)
        
#     def forward(self, x):
#         x = x.unsqueeze(1)  # add channel dimension
#         x = F.relu(self.conv1(x))
#         x = F.relu(self.conv2(x))
#         x = x.view(x.size(0), -1)  # reshape
#         x = self.fc(x)
#         return x

# Model

In [None]:
class CNN(nn.Module):
    def __init__(self, dropout_rate=0.7):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3)
        self.bn3 = nn.BatchNorm2d(128)
        self.fc = nn.Sequential(
            nn.Linear(1152, 512),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(512, 10)
        )
        
    def forward(self, x):
        x = x.unsqueeze(1)
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [None]:
cnn = CNN().to(device)

In [None]:
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(cnn.parameters(), lr=0.003)

for epoch in range(1, 16):
    cnn.train()
    epoch_train_loss = 0
    epoch_train_acc = 0
    
    for imgs, labels in train_loader:
        optimizer.zero_grad()
        
        preds = cnn(imgs)
        
        loss = criterion(preds, labels)
        loss.backward()
        optimizer.step()
        
        epoch_train_loss += loss.item()
        epoch_train_acc += torch.mean((torch.argmax(preds, 1) == labels).type(torch.float32)).item()
    
    epoch_train_loss = epoch_train_loss / len(train_loader)
    epoch_train_acc = epoch_train_acc / len(train_loader)
    
    
    cnn.eval()
    epoch_val_loss = 0
    epoch_val_acc = 0
    
    with torch.no_grad():
        for imgs, labels in val_loader:
            preds = cnn(imgs)
            loss = criterion(preds, labels)

            epoch_val_loss += loss.item()
            epoch_val_acc += torch.mean((torch.argmax(preds, 1) == labels).type(torch.float32)).item()
    
    epoch_val_loss = epoch_val_loss / len(val_loader)
    epoch_val_acc = epoch_val_acc / len(val_loader)
    
    print(f"Epoch {epoch}:",
          f"Train Loss: {epoch_train_loss:.4f} | Train Accuracy: {epoch_train_acc:.4f}",
          f"Val Loss: {epoch_val_loss:.4f} | Val Accuracy: {epoch_val_acc:.4f}",
          sep="\n",
          end="\n\n")

In [None]:
torch.save(cnn.state_dict(), "/kaggle/working/cnn.pt")

In [None]:
cnn.eval()

preds = []

for i in range(len(test_data)):
    pred = torch.argmax(cnn(test_data[i].unsqueeze(0))).item()
    preds.append(pred)

In [None]:
submission = pd.DataFrame(
    {"ImageId": list(range(1, len(test_data) + 1)), "Label": preds}
)

In [None]:
submission.to_csv("/kaggle/working/submission.csv", index=False)