# Modules import

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import torchvision
from torchvision.transforms import v2

from pathlib import Path

import pandas as pd
import time
import math

# Data preparation

In [None]:
class MNISTDataset(Dataset):
    def __init__(self, datapath, transform=None, train=True):
        super().__init__()
        _data = pd.read_csv(datapath)
        self.transform = transform
        self.init_transform = v2.Compose([
            v2.ToImage(),
            v2.ToDtype(torch.float64, scale=True)
        ])
        self.train = train
        if train:
            self.data = _data.iloc[:, 1:].to_numpy().astype('float64') / 255.
            self.labels = _data.iloc[:, 0]
        else:
            self.data = _data.to_numpy().astype('float64') / 255.


    def __getitem__(self, idx):
        img = self.data[idx].reshape(28, 28)
        img = self.init_transform(img)
        if self.transform:
            img = self.transform(img)
        img = torch.cat([img] * 3, axis=0)
        if self.train:
            label = self.labels.iloc[idx]
            label = F.one_hot(torch.tensor(label), num_classes=10)
            return img, label
        return img

    def __len__(self):
        return len(self.data)

In [None]:
transformations = {"train": v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float64, scale=True),
    v2.RandomAffine(degrees=(-10, 10), translate=(0.2, 0.2)),
    # v2.RandomHorizontalFlip(p=0.5),
    # v2.RandomVerticalFlip(p=0.5),
    v2.Resize(size=(224, 224)),
    v2.Normalize(mean=[0.13], std=[0.31]),
]), "test": v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float64, scale=True),
    v2.Resize(size=(224, 224)),
    v2.Normalize(mean=[0.13], std=[0.31]),
])}

In [None]:
datapath = Path("/kaggle/input/digit-recognizer")
batch_size = 32
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_dataset = MNISTDataset(datapath / "train.csv", transformations["train"])
test_dataset = MNISTDataset(datapath / "test.csv", transformations["test"], train=False)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
def train(model, dataloader, loss_fn, optimizer, scheduler, num_epochs, last_epochs=0):
    size = len(dataloader.dataset)
    
    def train_epoch():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device).float(), y.to(device).float()
        
            pred = model(X)
            loss = loss_fn(pred, y)
            
            accuracy = (pred.argmax(dim=1) == y.argmax(dim=1)).sum().item() / y.size(0)
    
            loss.backward()
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()
        
            if batch % 100 == 0:
                loss, current= loss.item(), (batch + 1) * len(X)
                print(f"loss: {loss:>7f}  accuracy: {accuracy:.3f}  [{current:>5d}/{size:>5d}]")

    
    model.train()
    for epoch in range(num_epochs - last_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)
        train_epoch()

    if last_epochs:
        for param in model.parameters():
            param.requires_grad = True
    
    for epoch in range(last_epochs):
        print(f'Epoch {num_epochs + epoch + 1}/{num_epochs}')
        print('-' * 10)
        train_epoch()

In [None]:
def predict(model, dataloader):
    model = model.to(device)
    size = len(dataloader.dataset)
    predictions = []
    model.eval()
    for batch, X in enumerate(dataloader):
        X = X.to(device).float()
        
        pred = model(X).argmax(dim=1)
        predictions.append(pred)
        if batch % 100 == 0:
            print(f"[{batch}/{math.ceil(size / 32)}]")
    return predictions

# Model

In [None]:
model = torch.hub.load('huawei-noah/ghostnet', 'ghostnet_1x', pretrained=True)

for param in model.parameters():
    param.requires_grad = False

model.classifier = nn.Linear(model.classifier.in_features, 10)
model = model.to(device)

In [None]:
epochs = 10
lr = 1e-3
steps_per_epoch = math.ceil(len(train_dataset) / batch_size)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=lr, epochs=epochs, steps_per_epoch=steps_per_epoch)

In [None]:
train(model, train_dataloader, loss_fn, optimizer, scheduler, epochs, last_epochs=5)

In [None]:
preds = predict(model, test_dataloader)

In [None]:
from functools import reduce, partial
import numpy as np

concat = partial(np.concatenate, axis=0)
mmm = [x.cpu().numpy() for x in preds]
predictions = reduce(lambda x, y: concat((x, y)), mmm)

In [None]:
predictions.shape

In [None]:
submission = {"ImageId": np.arange(1, len(predictions) + 1),
             "Label": predictions}

submission = pd.DataFrame(submission)
submission = submission.set_index("ImageId")

In [None]:
submission.to_csv("/kaggle/working/submission.csv")