In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader

from random import choice, random, randint

from torchvision import transforms
from torchvision import models as models
from PIL import Image

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from time import time

In [None]:
df_train = pd.read_csv('../input/digit-recognizer/train.csv')
df_train.info()

# Creating a custom Dataset class

In [None]:
class customdataset(Dataset):

    def __init__(self, lista, tipo):
        self.lista = lista
        self.tipo = tipo
    
    def __len__(self):
        return len(self.lista)
    
    def __getitem__(self, index):
        elemento = self.lista[index]
        label = elemento[0]
        img_numpy = elemento[1:].reshape(28,28)
        return img_numpy, label, self.tipo

datasetteste = customdataset(df_train.values, 'train')

plt.figure(figsize=(15, 4))
for k in range(5):
    plt.subplot(1, 5, k+1)
    img_numpy, label, tipo = choice(datasetteste)
    plt.imshow(img_numpy)
    plt.title(label)
plt.show()

# Now let's create a custom DataLoader

In [None]:
# Simple transformers
transformers = {
    'train': transforms.Compose([
        transforms.ToTensor(),
        transforms.RandomRotation(16)
    ]),
    'test': transforms.Compose([
        transforms.ToTensor()
    ])
}

# First let's create a collate function that transform
def custom_collate_fn(batch):
    imgs_batch, labels_batch = [], []
    for img_numpy, label, tipo in batch:
        imgs_batch.append(transformers[tipo](img_numpy))
        labels_batch.append(label)
    
    labels_batch = torch.tensor(labels_batch)
    imgs_batch = torch.stack(imgs_batch).type(torch.float)

    return imgs_batch, labels_batch

# creating simple df_train to validate a custom_collate_function
simple_dataset = customdataset(df_train.sample(8).values, 'train')
dataloader = DataLoader(simple_dataset, batch_size=2, shuffle=True, collate_fn=custom_collate_fn)
imgs_batch, labels_batch = next(iter(dataloader))
print (imgs_batch.shape)
print (labels_batch)

# Let's create the model and yours criterion and optimizer

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print (f'device: {device}')

model = torch.nn.Sequential(
    torch.nn.Conv2d(in_channels=1, out_channels=128, kernel_size=3),
    torch.nn.ReLU(),
    torch.nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3),
    torch.nn.ReLU(),
    torch.nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3),
    torch.nn.MaxPool2d(kernel_size=2),
    torch.nn.ReLU(),
    torch.nn.Dropout(0.5),
    torch.nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3),
    torch.nn.ReLU(),
    torch.nn.Dropout(0.5),
    torch.nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3),
    torch.nn.AdaptiveAvgPool2d(output_size=(10,10)),
    torch.nn.Flatten(),
    torch.nn.Linear(in_features=64*10*10, out_features=1024),
    torch.nn.Dropout(0.5),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=10)
)

print ('testing image passing throught model')
imgs_batch, labels_batch = next(iter(dataloader))

output = model(imgs_batch)
print (f'output shape: {output.shape}')

model.to(device)

In [None]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

# Now, creating a validation function that will be used at every end of epoch

In [None]:
probabilitys = torch.nn.Softmax(dim=1)

def validation_step(dataloader, debug=False):

    begin_time = time()

    if (debug):
        print ('validation: ', end='')
    
    model.eval()
    
    dataloader_length = len(dataloader)
    steps = 10
    range = int(100/steps)
    positions = steps*[False]

    with torch.no_grad():
        
        validation_loss = 0
        labels_pred, labels_true = [], []

        for k, (imgs_batch, labels_batch) in enumerate(dataloader):

            if (debug):
                actual_position = int(100*k/dataloader_length) // range
                if (not(positions[actual_position])):
                    print (int(100*k/dataloader_length), end='%, ')
                    positions[actual_position] = True

            imgs_batch, labels_batch = imgs_batch.to(device), labels_batch.to(device)
            output = model(imgs_batch)
            loss = criterion(output, labels_batch)
            validation_loss += loss.item()

            labels_probabilitys = probabilitys(output)
            labels_pred.append(labels_probabilitys.argmax(1))
            labels_true.append(labels_batch)

        labels_pred = torch.cat(labels_pred).to('cpu').numpy()
        labels_true = torch.cat(labels_true).to('cpu').numpy()

        accuracy = accuracy_score(labels_true, labels_pred)
    
    if (debug):
        total_time = int(time() - begin_time)
        print (f'100% in {total_time} sec')

    return validation_loss, accuracy #, labels_pred, labels_true

simplecustomdataset = customdataset(df_train.sample(9).values, 'train')
simpledataloader = DataLoader(simplecustomdataset, batch_size=3, collate_fn=custom_collate_fn, shuffle=True)

validation_step(simpledataloader)

# Creating a one epoch function training

In [None]:
def train_one_epoch(dataloader, debug=False):

    begin_time = time()

    if (debug):
        print ('Training: ', end='')

    model.train()

    dataloader_length = len(dataloader)
    steps = 10
    range = int(100/steps)
    positions = steps*[False]

    epoch_loss = 0

    for k, (imgs_batch, labels_batch) in enumerate(dataloader):
        if (debug):
            actual_position = int(100*k/dataloader_length) // range
            if (not(positions[actual_position])):
                print (int(100*k/dataloader_length), end='%, ')
                positions[actual_position] = True
    
        imgs_batch, labels_batch = imgs_batch.to(device), labels_batch.to(device)

        optimizer.zero_grad()
        
        output = model(imgs_batch)

        loss = criterion(output, labels_batch)

        epoch_loss += loss.item()

        loss.backward()

        optimizer.step()
    
    if (debug):
        total_time = int(time() - begin_time)
        print (f'100% in {total_time} sec')
    
    return epoch_loss

simplecustomdataset = customdataset(df_train.sample(1024).values, 'train')
simpledataloader = DataLoader(simplecustomdataset, batch_size=8, collate_fn=custom_collate_fn, shuffle=True)

train_one_epoch(simpledataloader, debug=True)

# Let's create train-validation dataset

In [None]:
df_train2, df_validation = train_test_split(df_train, test_size=0.07, shuffle=True)
print (f'len df_train2: {len(df_train2)}')
print (f'len df_validation: {len(df_validation)}')

dataset_train = customdataset(df_train2.values, 'train')
dataset_validation = customdataset(df_validation.values, 'test')

BATCH_SIZE = 128

train_dataloader = DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True, collate_fn=custom_collate_fn)
validation_dataloader = DataLoader(dataset_validation, batch_size=BATCH_SIZE, shuffle=True, collate_fn=custom_collate_fn)

print (f'Batch_size: {BATCH_SIZE}')
print (f'len train_dataloader: {len(train_dataloader)}')
print (f'len validation_dataloader: {len(validation_dataloader)}')

# Training the model

In [None]:
results = []

for epoch in range(6):
    print (f'Epoch: {epoch}')
    epoch_loss = train_one_epoch(train_dataloader, debug=True)
    validation_loss, validation_accuracy = validation_step(validation_dataloader, debug=True)
    print ('epoch_loss {:0.2f}, validation_loss: {:0.2f}, validation_accuracy: {:0.2f}'.format(epoch_loss, validation_loss, validation_accuracy))
    print (50*'-')
    results.append([epoch, epoch_loss, validation_loss, validation_accuracy])

# Ploting some results

In [None]:
results = np.array(results)
plt.figure(figsize=(18,4))

plt.subplot(1, 3, 1)
plt.plot(results[:,0], results[:,1])
plt.title('Epoch Loss')

plt.subplot(1, 3, 2)
plt.plot(results[:,0], results[:,2])
plt.title('Validation Loss')

plt.subplot(1, 3, 3)
plt.plot(results[:,0], results[:,3])
plt.title('Validation Accuracy')

plt.show()

# Now, predicting test dataset

In [None]:
df_test = pd.read_csv('../input/digit-recognizer/test.csv')
df_test.info()

In [None]:
class custom_test_dataset(Dataset):

    def __init__(self, lista):
        self.lista = lista
    
    def __len__(self):
        return len(self.lista)
    
    def __getitem__(self, index):
        elemento = self.lista[index]
        img_numpy = elemento.reshape(28,28)
        return img_numpy

test_dataset = custom_test_dataset(df_test.values)
plt.imshow(choice(test_dataset))
plt.show()

In [None]:
def custom_collate_test_fn(batch):
    imgs_batch = []
    for img_numpy in batch:
        imgs_batch.append(torch.tensor(img_numpy, dtype=torch.float).unsqueeze_(0))
    
    imgs_batch = torch.stack(imgs_batch)

    return imgs_batch

simple_test_dataloader = DataLoader(test_dataset, batch_size=2, collate_fn=custom_collate_test_fn, shuffle=True)
imgs_test_batch = next(iter(simple_test_dataloader))
print (imgs_test_batch.shape)

# Predicting the test dataset

In [None]:
test_dataset = custom_test_dataset(df_test.values)
test_dataloader = DataLoader(test_dataset, batch_size=2, collate_fn=custom_collate_test_fn)

model.eval()

labels_pred = []

with torch.no_grad():
    for imgs_batch in test_dataloader:
        imgs_batch = imgs_batch.to(device)
        output = model(imgs_batch)
        labels_probabilitys = probabilitys(output)
        labels_pred.append(labels_probabilitys.argmax(1))

labels_pred = torch.cat(labels_pred).to('cpu').numpy()

labels_pred

# Verifying the 10 first results

In [None]:
plt.figure(figsize=(18, 7))

for k in range(10):
    plt.subplot(2, 5, k+1)
    img_numpy = test_dataset[k].reshape(28,28)
    label_pred = labels_pred[k]
    plt.imshow(img_numpy)
    plt.title(label_pred)
    plt.xticks([])

plt.show()

# Creating submission.csv file

In [None]:
submission_array = [[k+1, label] for k, label in enumerate(labels_pred)]
submission_dataset = pd.DataFrame(submission_array, columns=['ImageId','Label'])

submission_dataset.to_csv('submission.csv', index=False)