<a href="https://www.kaggle.com/code/fogdiffusion/mnist-mobilenet-v3-earlystopping-pytorch?scriptVersionId=91897681" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
# import package
import os
import random
from glob import glob
from warnings import filterwarnings

import argparse

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

import cv2
from PIL import Image

import torch
from torchvision import datasets, transforms, models
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim

from collections import defaultdict

filterwarnings('ignore') 

In [None]:
# random seed
torch.manual_seed(1234)
torch.cuda.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

In [None]:
# gpu or cpu
device = 'cuda' if torch.cuda.is_available() else 'cpu'
kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {} 
print(f'device：{device}')

In [None]:
# Set constant
pretrained = True

# Model name
model_name = 'mobilenet_v3_large'

In [None]:
# hyperparameter
parser = argparse.ArgumentParser()
parser.add_argument('--test_size', type=float, default=0.3)
parser.add_argument('--image_size', type=int, default=28)
parser.add_argument('--num_classes', type=int, default=10)
parser.add_argument('--epochs', type=int, default=100)
parser.add_argument('--batch_size', type=int, default=16)
parser.add_argument('--lr', type=float, default=1e-4) # learning rate
parser.add_argument('--patience', type=int, default=10) # earlystopping monitoring times
opt = parser.parse_args(args=[])
print(opt)

# Dataset, Dataloader

In [None]:
# read data
train_df = pd.read_csv('/kaggle/input/digit-recognizer/train.csv')
print(train_df.head())
print('number of data: ', train_df.shape[0])

# split into train and validation data
train, val = train_test_split(train_df, test_size=opt.test_size, random_state=42, stratify=train_df['label'])
print(train.head())
print('number of train: ', train.shape[0])
print('number of val: ', val.shape[0])

In [None]:
# transform
transform = {
    'train': transforms.Compose([
        transforms.ToPILImage(),
        #transforms.RandomRotation(degrees=20),
        transforms.RandomAffine(degrees=20, translate=(0.1,0.1), scale=(0.9, 1.1)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
        ]),
    'val': transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
        ]),
}

In [None]:
# Dataset
class MNISTDataset(Dataset):
    def __init__(self, df, transform=None, phase=None):
        self.df = df
        self.transform = transform
        self.phase = phase

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        
        # loads the index-th image and preprocesses it
        data = self.df.iloc[index]
        image = data[1:].values.reshape((28,28)).astype(np.uint8)
        image = self.transform[self.phase](image)
        
        # get the index-th label
        label = data[0]

        return image, label

In [None]:
# Instantiation of Dataset
train_dataset = MNISTDataset(df=train, transform=transform, phase='train')
val_dataset = MNISTDataset(df=val, transform=transform, phase='val')

In [None]:
# Dataloader
dataloader = {
    'train': DataLoader(train_dataset, batch_size=opt.batch_size, shuffle=True),
    'val': DataLoader(val_dataset, batch_size=opt.batch_size, shuffle=False)
}

In [None]:
# check images
train_iter = iter(dataloader['train'])
imgs, labels = train_iter.next()
print(imgs.size())
print(labels)

# display first image
img = imgs[0].reshape((28,28))
plt.imshow(img, cmap='gray')
print('ラベル', np.array(labels[0]))

# EarlyStoppling Class

In [None]:
class EarlyStopping:
    def __init__(self, patience=10, verbose=0):
        '''
        Parameters:
            patience(int): number of epochs to monitor (default: 10)
            verbose(int): output flag for early termination
                          output(1), don't output(0)      
        '''
        # initialize instance variables
        # initialize the counter for the number of epochs being monitored
        self.epoch = 0
        # initialize loss for comparison with infinity 'inf'.
        self.pre_loss = float('inf')
        # initialize the number of epochs to be monitored with parameters
        self.patience = patience
        # initialize the output flag for early termination messages with a parameter
        self.verbose = verbose
        
    def __call__(self, current_loss):
        '''
        Parameters:
            current_loss(float): loss of validation data after 1 epoch
        Return:
            True: If the loss of the previous epoch is exceeded by the maximum number of monitoring times
            False: If the loss of the previous epoch is not exceeded by the maximum number of monitored epochs
        '''
        # If the loss is greater than the loss in the previous epoch
        if self.pre_loss < current_loss:
            self.epoch += 1 # Counter += 1
            # When the maximum number of monitoring times is reached
            if self.epoch > self.patience:
                if self.verbose: # # If the flag for early termination is 1
                    print('early stopping')
                return True # return True to terminate training
        # If the loss is less than or equal to the loss of the previous epoch
        else:
            self.epoch = 0               # counter: 0
            self.pre_loss = current_loss # update loss values
        
        # If the loss of the previous epoch is not exceeded by the maximum number of monitoring times
        # return False to continue learning
        # If the loss of the previous epoch is exceeded but within the number of times monitored
        # note that the return statement is located here because it must return False
        return False

# Model

In [None]:
model = models.mobilenet_v3_large(pretrained=pretrained)
model.features[0][0] = nn.Conv2d(1, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) # change input channel
fc_in_features = model.classifier[3].out_features # number of dimensions of the final layer function
model.fc = nn.Linear(fc_in_features, opt.num_classes) # change final layer
print(model)

# model to device
model.to(device)

In [None]:
# definition of loss function and optimization function
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=opt.lr)

In [None]:
# dict object for storing loss and accuracy history
history = {'train_loss':[],'train_accuracy':[], 'val_loss':[], 'val_accuracy':[]}

# generate an object to determine early termination
ers = EarlyStopping(patience=opt.patience, # monitoring times
                    verbose=1)  # output meassage when early stopping


# define training model
def train_model(model, epochs, loss_fn, optimizer):
    
    # initialize best score
    best_loss = np.inf
    
    # setting of epoch loop
    for epoch in range(epochs):
        
        # output of number of current epoch
        print(f'Epoch: {epoch+1} / {epochs}')
        print('--------------------------')
        
         # setting train / val
        for phase in ['train', 'val']:
            
            # change train / val
            if phase == 'train':
                model.train()
            else:
                model.eval()
            
            # reset loss
            epoch_loss = 0.0
            # number of correct
            corrects = 0
            # reset pred-list
            pred_list = []
            # reset correct-list
            true_list = []
            
            # reading data from dataloader in mini-batch (batch_size) units
            for images, labels in dataloader[phase]:
                
                # move images and labels to the same device as the model
                images = images.to(device)
                labels = labels.to(device)
                
                # initialize slope information
                optimizer.zero_grad()
                
                # enable gradient information only when learning
                with torch.set_grad_enabled(phase=='train'):
                    
                    # model calculation
                    outputs = model(images)
                    # calculation of loss values
                    loss = loss_fn(outputs, labels)
                    # calculation of predictive labels
                    preds = torch.argmax(outputs, dim=1) # output preds
                    
                    # update back-propagation and parameter only during training
                    if phase == 'train':
                        
                        # calculation of gradient by error back propagation method
                        loss.backward()
                        # update optimizer
                        optimizer.step()
                    
                    # add loss
                    epoch_loss += loss.item() * images.size(0)
                    
                    # add correct
                    corrects += torch.sum(preds == labels.data)                    
                    
                    # add predicted label to pred-list
                    preds = preds.to('cpu').numpy()
                    pred_list.extend(preds)
                    
                    # add correct label to true-list
                    labels = labels.to('cpu').numpy()
                    true_list.extend(labels)
            
            # average of loss values within 1 epoch
            epoch_loss = epoch_loss / len(dataloader[phase].dataset)
            
            # calculation of the percentage of correct answers
            accuracy = corrects.double() / len(dataloader[phase].dataset)
            accuracy = accuracy.to('cpu').detach().numpy().copy() # Tensor → Numpy
            
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_accuracy'].append(accuracy)
            
            # outputs each evaluation score
            print(f'{phase} Loss: {epoch_loss:.4f} Accuracy: {accuracy:.4f}')
            
            # During validation, make a large/small comparison between the validation score and the best score
            if (phase == 'val') and (epoch_loss < best_loss):
                
                ## Only if the validation score improves, do the following
                
                # update best score
                best_loss = epoch_loss
                # set param name
                param_name = f'/kaggle/working/{model_name}.pth'
                # save model
                torch.save(model, param_name)
        
        # pass loss of validation data to EarlyStopping object to determine early termination
        if (phase == 'val') and ers(epoch_loss):
            # If losses do not improve at the monitored epoch, learning is terminated
            break

In [None]:
# training
train_model(model, opt.epochs, criterion, optimizer)