<a href="https://colab.research.google.com/github/music-ai-644/AI_Study_2022/blob/main/week8_quiz_mnist.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import matplotlib.pyplot as plt 
import tensorflow as tf
import numpy as np 
from argparse import ArgumentError

from tqdm import tqdm

from torch.utils.data import Dataset, DataLoader

import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

In [None]:
fig = plt.figure(figsize=(15, 4))

for i in range(10):
    plottable_image = np.reshape(x_train[i], (28, 28))
    ax = fig.add_subplot(2, 5, i+1)
    ax.imshow(plottable_image, cmap='gray_r')

* **Parameters**

In [None]:
# data specific
input_shape = 28*28
num_classes = 10

# Hyperparameters
BATCH_SIZE = 64
EPOCHS = 10
learning_rate = 1e-3
NUM_LAYERS = 6 # Input + hidden layer + Output layer
HIDDEN_SIZES = [input_shape, 512, 256, 128, 64, 32, num_classes] # PLEASE change only intermediate numbers of hidden sizes. 

* **Preprocess**

In [None]:
def MinMax_Scaler(np_train, np_test): 
    train_min = np.min(np_train)
    train_max = np.max(np_train)
    
    scaled_train = (np_train - train_min) / train_max
    scaled_test = (np_test - train_min) / train_max
    
    return scaled_train, scaled_test
    
def to_categorical(np_labels): # must be numpy array indicating indices of classes.
    '''
    One-hot encoding
    '''
    return np.eye(np.max(np_labels) + 1, dtype='float')[np_labels]

# normalize pixel 0 ~ 255 to 0 ~ 1
x_train = x_train / 255
x_test = x_test / 255

# minmax scaling 
x_train, x_test = MinMax_Scaler(x_train, x_test)

# one-hot encoding
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

# Resize for PyTorch Model
x_train, x_test = np.expand_dims(x_train, axis=1), np.expand_dims(x_test, axis=1)


* **DataLoader**

In [None]:
class MNISTDataset(Dataset):
    def __init__(self, x, y):
        super().__init__()
        self.x_data = x
        self.y_data = y
        
    def __getitem__(self, index):
        x_item = self.x_data[index]
        y_item = self.y_data[index]
        return x_item, y_item
    
    def __len__(self):
        return len(self.x_data)
    
train_loader = DataLoader(MNISTDataset(x_train, y_train), batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
test_loader = DataLoader(MNISTDataset(x_test, y_test), batch_size=BATCH_SIZE, shuffle=True, drop_last=True)

* **Model**

In [None]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

class DNN(nn.Module):
    def __init__(self, input_shape=28*28, num_layers=3, hidden_sizes=[3, 3, 3], num_classes=10):
        super().__init__()
        
        if num_layers + 1 != len(hidden_sizes):
            raise ArgumentError("Check number of layers and hidden sizes you want.")
        
        # Initialize
        self.input_shape = input_shape
        self.num_layers = num_layers
        self.hidden_sizes = hidden_sizes
        self.num_classes = num_classes
        
        # Construct Neural Networks
        self.DNN = self._make_layer()
        
    def forward(self, x):
        output = self.DNN(x)
        return F.log_softmax(output, dim = 1)
        
    def _make_layer(self):
        layers = []
        
        layers.append(nn.Flatten())

        for idx in range(0, self.num_layers):
            ANN = nn.Sequential(
                nn.Linear(self.hidden_sizes[idx], self.hidden_sizes[idx + 1]),
                nn.ReLU()
            )
            layers.append(ANN)
            
        # last layer: No ReLU()
        last_ANN = nn.Sequential(
                nn.Linear(self.hidden_sizes[self.num_layers], self.num_classes)
            )
        layers.append(last_ANN)
        
        return nn.Sequential(*layers) # unpacking list
    
model = DNN(input_shape = input_shape, num_layers=NUM_LAYERS, hidden_sizes=HIDDEN_SIZES, num_classes=num_classes)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

In [None]:
def train(model, train_loader):
    model.train()

    for idx, batch in enumerate(train_loader):
        x_train, y_train = batch
        x_train, y_train = x_train.to(DEVICE, dtype=torch.float), y_train.to(DEVICE, dtype=torch.float)

        output = model(x_train)
        loss = criterion(output, y_train)

        optimizer.zero_grad()
        model.zero_grad()

        loss.backward()
        optimizer.step()

In [None]:
def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    
    with torch.no_grad():
        for idx, batch in enumerate(test_loader):
            x_test, y_test = batch
            x_test, y_test = x_test.to(DEVICE, dtype=torch.float), y_test.to(DEVICE, dtype=torch.float)

            output = model(x_test)
            test_loss += criterion(output, y_test)
            
            pred = output.argmax(dim=1, keepdim=True)
            y_test = y_test.argmax(dim=1, keepdim=True)
            correct += pred.eq(y_test.view_as(pred)).sum().item()
    test_loss /= len(test_loader.dataset)
    test_acc = 100. * correct / len(test_loader.dataset)
        
    # print the result
    print("Loss: {:.4f}, Accuracy: {}%".format(test_loss, test_acc))

In [None]:
print("Training the model...")
for epoch in tqdm(range(1, EPOCHS + 1), desc='EPOCHS'):
    train(model, train_loader)
    
print("TRAIN IS SUCCESSFULLY DONE.")
evaluate(model, test_loader)