<a href="https://www.kaggle.com/code/kgxiao/pytorch-cnn?scriptVersionId=106211900" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
import pandas as pd
import numpy as np
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import datetime

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torch.optim as optim

In [2]:
#load data
train_set = pd.read_csv('../input/digit-recognizer/train.csv')
test_set = pd.read_csv('../input/digit-recognizer/test.csv')

In [3]:
#scale
train_set[:][1:] = train_set[:][1:].values / 255.0
test_set[:][:]= test_set[:][:].values / 255.0

In [4]:
valid_size = 0.1
data_nums = len(train_set)
indices = list(range(data_nums))

np.random.shuffle(indices)
splits = int(np.floor(valid_size * data_nums))
train_ind, valid_ind = indices[splits:], indices[:splits]

train_s = SubsetRandomSampler(train_ind)
valid_s = SubsetRandomSampler(valid_ind)

In [5]:
#dataloader
class Datamnist(Dataset):
    
    def __init__(self, data, transforms=None, labels=True):
        
        self.data = data
        self.transforms = transforms
        self.labels = labels
        
    def __len__(self):
        
        return len(self.data)
    
    def __getitem__(self, index):
        
        dataset = self.data.iloc[index]
        if self.labels:
            x = dataset[1:].values.astype(np.uint8).reshape((28, 28))
            y = dataset[0]
        
        else:
            x = dataset[0:].values.astype(np.uint8).reshape((28, 28))
            y = 0
        
        if self.transforms is not None:
            
            x = self.transforms(x)
        
        return x, y
            

In [6]:
transforms_dic = {'train':
               transforms.Compose([transforms.ToPILImage(),
               transforms.RandomAffine(8, translate=(0, 0.1), scale=(0.8, 1.2)),
               transforms.ToTensor()]),
               'valid':
               transforms.Compose([transforms.ToTensor()]),
               'test':
               transforms.Compose([transforms.ToTensor()])
              }

train = Datamnist(train_set, transforms_dic['train'])
valid = Datamnist(train_set, transforms_dic['valid'])
test = Datamnist(test_set, transforms_dic['test'], labels=False)

train_loader = DataLoader(train, batch_size=128, sampler=train_s)
valid_loader = DataLoader(valid, sampler=valid_s)
test_loader = DataLoader(test)

In [7]:
class Convmodel(nn.Module):
    
    def __init__(self, outputs=10, chanles=10):
        super().__init__()
        self.outputs = outputs
        self.chanles = chanles
        
        self.dr = nn.Dropout2d(p=0.3, inplace=True)
        self.cov1 = nn.Conv2d(1, self.chanles*1, 3) #outs:  26*26
        self.bn1 = nn.BatchNorm2d(self.chanles*1)
        self.cov2 = nn.Conv2d(self.chanles*1, self.chanles*2, 3) #outs: 24*24
        self.bn2 = nn.BatchNorm2d(self.chanles*2)
        
        self.cov3 = nn.Conv2d(self.chanles*2, self.chanles*3, 5) #outs: 20*20
        self.bn3 = nn.BatchNorm2d(self.chanles*3)
        self.cov4 = nn.Conv2d(self.chanles*3, self.chanles*4, 5) #outs: 16*16
        self.bn4 = nn.BatchNorm2d(self.chanles*4)
        
        self.p1 = nn.MaxPool2d(4) #outs:4*4
        self.l1 = nn.Linear(self.chanles*4*4*4,128)
        self.l2 = nn.Linear(128,64)
        self.bn5 = nn.BatchNorm1d(64)
        self.l3 = nn.Linear(64,10)
        
    def forward(self, x):
        out = self.bn1(F.relu(self.cov1(x))) #26 * 26
        out = self.bn2(F.relu(self.cov2(out))) #24 * 24
        out = self.dr(self.bn3(F.relu(self.cov3(out)))) #20*20
        out = self.dr(self.p1(self.bn4(F.relu(self.cov4(out))))) #4*4*chanles*4
        out = out.view(-1,self.chanles*4*4*4)
        out = self.dr(self.bn5(self.l2(self.l1(out))))
        out = self.l3(out)
        
        return out

In [8]:
def write_info(file, info:str, statue=True):
    
    if statue:
        print(info)

    file.write(info)
    file.write('\n')
    return None

In [9]:
class Runmodel:
    
    def __init__(self, traindata, validdata, epochs=20, lr=0.01):
        
        self.epochs = epochs
        self.traindata = traindata
        self.validdata = validdata
        self.lr = lr
        self.model = self.model_init()
        self.loss = self.model_loss()
        self.optim = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        
        if not os.path.exists('pytorch_model.txt'):
            f=open('pytorch_model.txt',"w")
            f.write(f"{'*'*10}Model train and valid infomation.{'*'*10}\n")
            f.close()
        
    def model_init(self):
        
        return Convmodel()
    
        
    def train(self):
        #train
        self.model.train()
        n_total_correct = 0.0
        n_total_samples = 0.0
        
        #valid metrics for all epochs
        n_total_valid_acc = 0.0
        n_total_valid_sam = 0.0
        files = open('pytorch_model.txt', mode='a+')
        write_info(files, f"epochs: {self.epochs}, learning_rate: {self.lr}")
        
        time_sta = datetime.datetime.now()
        
        for epoch in range(self.epochs):
            
            n_samples = 0.0
            n_correct = 0.0
            
            for i, (data, label) in enumerate(self.traindata):
                outputs = self.model(data)
                loss = self.loss(outputs, label)
                self.optim.zero_grad()
                loss.backward()
                self.optim.step()
                
                _, predicted = outputs.max(1)
                n_correct += (predicted == label).sum().item()
                n_samples += label.shape[0]
                
                n_total_correct += (predicted == label).sum().item()
                n_total_samples += label.shape[0]
                                
                if (i+1) % 50 == 0:
                    i_time = datetime.datetime.now()
                    info = f'time:[{i_time-time_sta}], train epoch:[{epoch+1} / {self.epochs}], sample proportion:[{100*(n_samples/(len(self.traindata.dataset))):.2f}%], current Loss:[{loss.item():.6f}]'
                    write_info(files, info)
            
            epoch_time = datetime.datetime.now()
            write_info(files, f'epoch time:[{epoch_time-time_sta}], train sets acc: [{100*(n_correct/n_samples):.2f}%]')
            
            #valid acc
            n_valid_correct, n_valid_sam = self.valid(files)   #epoch correct for valid
            n_total_valid_acc += n_valid_correct
            n_total_valid_sam += n_valid_sam
        
        all_time = datetime.datetime.now()
        write_info(files, f'====All time:[{all_time - time_sta}]====')
        write_info(files, f'Arain total Acc: [{100*(n_total_correct/n_total_samples):.2f}%]')
        write_info(files, f'Valid total Acc: [{100*(n_total_valid_acc/n_total_valid_sam):.2f}%]')
        
        files.close()
        
    def valid(self,files):
        #valida data
        with torch.no_grad():
            self.model.eval()
            n_total_correct = 0.0
            n_total_samples = 0.0
            
            for i, (data, label) in enumerate(self.validdata):
                
                outputs = self.model(data)
                _, predicted = outputs.max(1)
                
                n_total_correct += (predicted == label).sum().item()
                n_total_samples += label.shape[0]
            
            write_info(files, f'>>>>valid sets acc:[{100*(n_total_correct/n_total_samples):.2f}%]')
            
            return n_total_correct, n_total_samples
        
    
        
    def predict(self, testdata):
        #predict for test
        with torch.no_grad():
            self.model.eval()
            test_pred = torch.LongTensor()
            for data, _ in testdata:
                
                outs = self.model(data)
                pred = outs.max(1, keepdim=True)[1]
                test_pred = torch.cat((test_pred, pred), dim=0)
                
            out_df = pd.DataFrame(np.c_[np.arange(1, len(testdata)+1)[:,None], test_pred.numpy()], 
                      columns=['ImageId', 'Label'])
            
        return out_df.to_csv('submission.csv', index=False)
    
    def model_loss(self):
        #loss func
        criterion = nn.CrossEntropyLoss()
        return criterion
    

In [10]:
Trainmodel = Runmodel(train_loader, valid_loader)
Trainmodel.train()

epochs: 20, learning_rate: 0.01




time:[0:00:09.731232], train epoch:[1 / 20], sample proportion:[15.24%], current Loss:[0.925504]
time:[0:00:19.430580], train epoch:[1 / 20], sample proportion:[30.48%], current Loss:[0.896117]
time:[0:00:28.893119], train epoch:[1 / 20], sample proportion:[45.71%], current Loss:[0.755088]
time:[0:00:38.403462], train epoch:[1 / 20], sample proportion:[60.95%], current Loss:[0.880313]
time:[0:00:48.208780], train epoch:[1 / 20], sample proportion:[76.19%], current Loss:[0.812696]
epoch time:[0:00:56.908752], train sets acc: [64.79%]
>>>>valid sets acc:[96.40%]
time:[0:01:12.668774], train epoch:[2 / 20], sample proportion:[15.24%], current Loss:[0.148873]
time:[0:01:21.823225], train epoch:[2 / 20], sample proportion:[30.48%], current Loss:[0.143837]
time:[0:01:30.920479], train epoch:[2 / 20], sample proportion:[45.71%], current Loss:[0.067621]
time:[0:01:40.063547], train epoch:[2 / 20], sample proportion:[60.95%], current Loss:[0.122642]
time:[0:01:49.326372], train epoch:[2 / 20], 

In [11]:
Trainmodel.predict(test_loader)
print('ok')

ok
