In [4]:
# Pytorch
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import transforms
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

# Data visualization
import matplotlib.pyplot as plt

# Tools & Preprocessing
import pandas as pd
import zipfile
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from argparse import Namespace
from sklearn.metrics import accuracy_score
import os
import shutil

## Read data

In [5]:
with zipfile.ZipFile("digit-recognizer.zip") as z:
   with z.open("train.csv") as f:
      df = pd.read_csv(f)

In [None]:
df.head()

In [None]:
df.shape

In [5]:
# Setup
args = Namespace()
args.batch_size = 64
args.epochs = 50
args.lr_rate = 2.3e-4
args.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
args.lr_patience = 5
args.lr_factor = 0.1
args.patience = 20

## Preprocess data

In [15]:
# Create a custom
class CustomDataset(Dataset):
    def __init__(self, features, labels, transform=None):
        self.features = features.values
        self.labels = labels.values
        self.transform = transform

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        X = self.features[index].astype('float32')
        y = self.labels[index]

        X = X.reshape(28, 28, 1)

        if self.transform:
            X = self.transform(X)

        return X, y

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

In [9]:
# Extract images and labels
X = df.iloc[:, 1:]
y = df.iloc[:, 0]

# Normalize between [0, 1]
X = X / 255
X_train, X_val, y_train, y_val = train_test_split(X,y,test_size =0.2, random_state = 42)

# Create pytorch dataset
train_data = CustomDataset(X_train, y_train, transform=transform)
val_data = CustomDataset(X_val, y_val, transform=transform)

# Create dataloaders
train_loader = DataLoader(dataset=train_data, batch_size=args.batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_data, batch_size=args.batch_size, shuffle=False)

## Create model

In [6]:
# Create an attention module
class SelfAttention(nn.Module):
    def __init__(self, in_dim):
        super(SelfAttention, self).__init__()
        self.query_conv = nn.Conv2d(in_channels=in_dim, out_channels=in_dim // 8, kernel_size=1)
        self.key_conv = nn.Conv2d(in_channels=in_dim, out_channels=in_dim // 8, kernel_size=1)
        self.value_conv = nn.Conv2d(in_channels=in_dim, out_channels=in_dim, kernel_size=1)
        self.softmax = nn.Softmax(dim=-2)  # Softmax over the spatial dimensions

    def forward(self, x):
        batch_size, C, width, height = x.size()
        query = self.query_conv(x).view(batch_size, -1, width * height).permute(0, 2, 1)  # B x (W*H) x C'
        key = self.key_conv(x).view(batch_size, -1, width * height)  # B x C' x (W*H)
        value = self.value_conv(x).view(batch_size, -1, width * height)  # B x C x (W*H)

        attention = self.softmax(torch.bmm(query, key))  # B x (W*H) x (W*H)
        out = torch.bmm(value, attention.permute(0, 2, 1))  # B x C x (W*H)
        out = out.view(batch_size, C, width, height)

        return out + x  # Skip connection

In [7]:
# Create the model ResNet + Attention + Dense
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(128)
        self.attention = SelfAttention(in_dim=128)  # Attention layer

        # Downsample for ResNet
        self.downsample = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=128, kernel_size=3, padding=1, bias=False),
            nn.MaxPool2d(kernel_size=4, stride=4),
            nn.BatchNorm2d(128)
        )

        # Faltten layer
        self.flatten = nn.Flatten()

        # Clasification layers
        self.fc1 = nn.Linear(128 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 10)
        self.drop1 = nn.Dropout(p=0.4)

    def forward(self, x):
        # Convolution
        identity = self.downsample(x)

        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.bn3(self.conv3(x))

        x += identity
        x = F.relu(x)

        # Attention
        x = self.attention(x)

        # Flatten
        x = self.flatten(x)

        # Clasification
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.drop1(x)
        return self.fc3(x)

In [35]:
# Create a function to save models
# Saving directory
args.savedir = 'model'
os.makedirs(args.savedir, exist_ok=True)

def save_checkpoint(state, is_best, checkpoint_path, filename="checkpoint.pt"):
    filename = os.path.join(checkpoint_path, filename)
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, os.path.join(checkpoint_path, "model_best.pt"))

In [None]:
# Criterion and optimizers
model = Model().to(args.device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, "min",
    patience=args.lr_patience,
    factor=args.lr_factor
)

In [19]:
# Get predictions and evaluate the model
def get_pred(logits):
    probs = F.softmax(logits.detach(), dim=1)
    y_pred = torch.argmax(probs, dim=1)
    return y_pred

def eval_model(test_loader, model, device=args.device):
    test_loss = []
    with torch.no_grad():
        preds, tgts = [], []

        for x, y_true in test_loader:

            # Forward
            x = x.to(device) # gpu
            y_true = y_true.to(device) # gpu
            logits_pred = model(x) # gpu

            # Loss
            loss = criterion(logits_pred, y_true)
            test_loss.append(loss.item())

            y_pred = get_pred(logits_pred) # gpu
            preds.append(y_pred.cpu().numpy())
            tgts.append(y_true.cpu().numpy())


        tgts = [e for l in tgts for e in l]
        preds = [e for l in preds for e in l]

    return accuracy_score(tgts, preds), test_loss

In [None]:
# Training
train_loss = []
val_loss = []
best_metric = 0

for epoch in range(args.epochs):
    model.train()
    loop = tqdm(enumerate(train_loader), total=len(train_loader), leave=True)
    loop.set_description(f'Epoch {epoch+1}/{args.epochs}')

    # Initialize accumulators for loss and accuracy
    train_loss_epoch = []
    train_accuracy_epoch = []

    for i, (x, y_true) in loop:

        # Forward pass
        x = x.to(args.device) # gpu
        y_true = y_true.to(args.device) # gpu
        logits_pred = model(x) # gpu

        # Loss train
        loss = criterion(logits_pred, y_true) # gpu
        train_loss_epoch.append(loss.item())

        # Calculate accuracy
        y_pred = get_pred(logits_pred) # gpu
        accuracy = accuracy_score(y_true.cpu().numpy(), y_pred.cpu().numpy())
        train_accuracy_epoch.append(accuracy)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update tqdm postfix with the current average loss and accuracy
        loop.set_postfix(loss_train=np.mean(train_loss_epoch), accuracy_train=np.mean(train_accuracy_epoch))

    # Accuracy and loss in test
    accuracy_val, val_loss_epoch = eval_model(val_loader, model, args.device)
    train_loss += train_loss_epoch
    val_loss += val_loss_epoch

    # Update learning rate scheduler
    scheduler.step(np.mean(train_loss_epoch))

    # Keep track of accuracy improvement
    is_improvement = accuracy_val > best_metric
    if is_improvement:
        best_metric = accuracy_val
        n_no_improve = 0
    else:
        n_no_improve += 1

    # Save best model if accuracy improve
    save_checkpoint(
        {
            "epoch": epoch + 1,
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "scheduler": scheduler.state_dict(),
            "best_metric": best_metric,
        },
        is_improvement,
        args.savedir,
    )

    # Print accuracy in validation 
    print('accuracy_test: {: .4f}'.format(accuracy_val))

    # Early stopping
    if n_no_improve >= args.patience:
        print("No improvement. Breaking out of loop")
        break




## Let's prove our model

In [None]:
best_model = Model()
best_model.load_state_dict(torch.load('model_best.pt', map_location=torch.device('cpu'))['state_dict'])
best_model.train(False)

print("."*30)
print("Digit recognizer")
print("."*30)

In [22]:
with zipfile.ZipFile("digit-recognizer.zip") as z:
   with z.open("test.csv") as f:
      df_test = pd.read_csv(f)

In [None]:
df_test.head()

In [None]:
df_test.shape

In [None]:
df_test / 255

In [None]:
torch.tensor(df_test.iloc[0,:].to_numpy()).view(28, 28)

In [41]:
predictions = [
    ['ImageId','Label'],
]

for i in range(df_test.shape[0]):

    # Forward pass
    x = torch.tensor(df_test.iloc[i,:].to_numpy(),  dtype=torch.float).view(1, 1, 28, 28)
    x = x.to(args.device)
    logits_pred = best_model(x)
    y_pred = get_pred(logits_pred)
    predictions.append([i+1, y_pred.cpu().item()])


In [None]:
import csv

filename = 'predictions_digit_recognizer.csv'

with open(filename, 'w', newline='') as file:
    writer = csv.writer(file)

    # Write the data to the CSV file
    for row in predictions:
        writer.writerow(row)

print(f'{filename} has been created and populated with data.')