In [None]:
import os
import os.path
import time 
import gzip
import pickle

import numpy as np

import matplotlib.pyplot as plt
import matplotlib.cm as cm

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader


In [None]:
def _load_img(file_name):
    file_path = dataset_dir + "/" + file_name
    
    print("Converting " + file_name + " to NumPy Array ...")    
    with gzip.open(file_path, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=16)
    data = data.reshape(-1, img_size)
    print("Done")
    
    return data

In [None]:
def _load_label(file_name):
    file_path = dataset_dir + "/" + file_name
    
    print("Converting " + file_name + " to NumPy Array ...")
    with gzip.open(file_path, 'rb') as f:
            labels = np.frombuffer(f.read(), np.uint8, offset=8)
    print("Done")
    
    return labels

In [None]:
key_file = {
    'train_img':'train-images-idx3-ubyte.gz',
    'train_label':'train-labels-idx1-ubyte.gz',
    'test_img':'t10k-images-idx3-ubyte.gz',
    'test_label':'t10k-labels-idx1-ubyte.gz'
}

dataset_dir = "mnist"
save_file = dataset_dir + "/mnist.pkl"

train_num = 60000
test_num = 10000
img_dim = (1, 28, 28)
img_size = 784

dataset = {}
dataset['train_img'] =  _load_img(key_file['train_img'])
dataset['train_label'] = _load_label(key_file['train_label'])    
dataset['test_img'] = _load_img(key_file['test_img'])
dataset['test_label'] = _load_label(key_file['test_label'])

with open(save_file, 'wb') as f:
    pickle.dump(dataset, f, -1)

for key in ('train_img', 'test_img'):
    dataset[key] = dataset[key].astype(np.float32)
    dataset[key] /= 255.0

In [None]:
plt.imshow(dataset['train_img'][2000].reshape(28, 28), cmap = cm.binary) #binary형태의 이미지 설정
plt.show()

In [None]:
x_train, y_train, x_test, y_test = map(
    torch.tensor, (dataset['train_img'], dataset['train_label'], dataset['test_img'], dataset['test_label'])
)

print(x_train.shape)
print(y_train.min(), y_train.max())

In [None]:
batch_size = 100
n_epochs = 20

train_ds = TensorDataset(x_train, y_train)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)

valid_ds = TensorDataset(x_test, y_test)
valid_dl = DataLoader(valid_ds, batch_size=batch_size * 2)

In [None]:
class Mnist_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, 16, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(16, 10, kernel_size=3, stride=2, padding=1)

    def forward(self, xb):
        xb = xb.view(-1, 1, 28, 28)
        xb = F.relu(self.conv1(xb))
        xb = F.relu(self.conv2(xb))
        xb = F.relu(self.conv3(xb))
        xb = F.avg_pool2d(xb, 4)
        return xb.view(-1, xb.size(1))

lr = 0.1
loss_func = F.cross_entropy

In [None]:
def fit(epochs, model, loss_func, opt, train_dl, valid_dl):
    for epoch in range(epochs):
        model.train()
        for xb, yb in train_dl:
            loss_batch(model, loss_func, xb, yb, opt)

        model.eval()
        with torch.no_grad():
            losses, nums = zip(
                *[loss_batch(model, loss_func, xb, yb) for xb, yb in valid_dl]
            )
        val_loss = np.sum(np.multiply(losses, nums)) / np.sum(nums)

        print(epoch, val_loss)

In [None]:
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [None]:
torch.cuda.is_available()

In [None]:
def loss_batch(model, loss_func, xb, yb, opt=None):
    loss = loss_func(model(xb.to(dev)), yb.to(dev))

    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(), len(xb)

In [None]:
gpu_model = Mnist_CNN()
gpu_model.to(dev)
opt = optim.SGD(gpu_model.parameters(), lr=lr, momentum=0.9)

batch_size = 50

gpu_start = time.time() 

fit(30, gpu_model, loss_func, opt, train_dl, valid_dl)

gpu_training_time = time.time() - gpu_start
print("gpu training time : ", round(gpu_training_time, 2))

In [None]:
from sklearn.metrics import confusion_matrix, f1_score

predict_dev = torch.device("cpu")
gpu_model.to(predict_dev)

y_pred = gpu_model(x_test)

In [None]:
predict_result = []
label = dataset['test_label']
for r in range(0,10000):
    predict_result.append(np.argmax(y_pred.detach().numpy()[r]))

f1_result = f1_score(label, predict_result, average = None)
print(f1_result)
print(np.mean(f1_result))

In [None]:
confusion_matrix(label, predict_result)