In [1]:
''' 1. Module Import '''
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, datasets

In [2]:
''' 2. 딥러닝 모델을 설계할 때 활용하는 장비 확인 '''
if torch.cuda.is_available():
    DEVICE = torch.device('cuda')
else:
    DEVICE = torch.device('cpu')
print('Using PyTorch version:', torch.__version__, ' Device:', DEVICE)

Using PyTorch version: 1.9.0+cpu  Device: cpu


In [3]:
BATCH_SIZE = 32
EPOCHS = 10

In [4]:
''' 3. FashionMNIST 데이터 다운로드 (Train set, Test set 분리하기) '''
train_dataset = datasets.FashionMNIST(root = "/data/FashionMNIST",
                                      train = True,
                                      download = True,
                                      transform = transforms.ToTensor())

test_dataset = datasets.FashionMNIST(root = "/data/FashionMNIST",
                                     train = False,
                                     transform = transforms.ToTensor())

train_loader = torch.utils.data.DataLoader(dataset = train_dataset, # train_dataset을 불러들여
                                           batch_size = BATCH_SIZE, # BATCH_SIZE 정리
                                           shuffle = True)

test_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                          batch_size = BATCH_SIZE,
                                          shuffle = False)

  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [5]:
''' 4. 데이터 확인하기 (1) '''
for (X_train, y_train) in train_loader:
    print('X_train:', X_train.size(), 'type:', X_train.type())
    print('y_train:', y_train.size(), 'type:', y_train.type())
    break

X_train: torch.Size([32, 1, 28, 28]) type: torch.FloatTensor
y_train: torch.Size([32]) type: torch.LongTensor


In [6]:
# ''' 5. 데이터 확인하기 (2) '''
# pltsize = 1
# plt.figure(figsize=(10 * pltsize, pltsize))
# for i in range(10):
#     plt.subplot(1, 10, i + 1)
#     plt.axis('off')
#     plt.imshow(X_train[i, :, :, :].numpy().reshape(28, 28), cmap = "gray_r")
#     plt.title('Class: ' + str(y_train[i].item()))

In [7]:
''' 6. AutoEncoder (AE) 모델 설계하기 '''
class AE(nn.Module):
    def __init__(self):
        super(AE, self).__init__()
        
        self.encoder = nn.Sequential(      # Sequential, 인코더 정의
            nn.Linear(28 * 28, 512),       # 784 Input -> 512 layer
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 32),)           # 256 -> 32로 
        
        self.decoder = nn.Sequential(      # 디코더 정의
            nn.Linear(32, 256),            # 32 -> 256 으로 디코더 
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 28 * 28),)

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded           # 인코더와 디코더 둘다 반환

In [8]:
''' 7. Optimizer, Objective Function 설정하기 '''

model = AE().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001) # Adam으로 학습 진행
criterion = nn.MSELoss()              # Loss function은 MSELoss

print(model)

AE(
  (encoder): Sequential(
    (0): Linear(in_features=784, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=32, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=32, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=784, bias=True)
  )
)


In [9]:
''' 8. AE 모델 학습을 진행하며 학습 데이터에 대한 모델 성능을 확인하는 함수 정의 '''
def train(model, train_loader, optimizer, log_interval):
    model.train()           # 학습을 진행모드로 변경
    for batch_idx, (image, _) in enumerate(train_loader):           # batch와 image 반환(여기서 정답값이 없으므로 (, _)
        image = image.view(-1, 28 * 28).to(DEVICE)                  # 2차원에서 1차원으로 변경, 이미지(인풋)
        target = image.view(-1, 28 * 28).to(DEVICE)                 # 2차원에서 1차원으로 변경, 타겟값(아웃풋 비교용)
        optimizer.zero_grad()                  # 파라미터 초기화!
        encoded, decoded = model(image)        # 인코더와 디코더 반환
        loss = criterion(decoded, target)      # 디코더(출력값) <> 타겟값(아웃풋 비교용)
        loss.backward()                        # 역전파하여 loss 최소화
        optimizer.step()                       # 파라미터 업데이트

        if batch_idx % log_interval == 0:
            print("Train Epoch: {} [{}/{} ({:.0f}%)]\tTrain Loss: {:.6f}".format(
                epoch, batch_idx * len(image), 
                len(train_loader.dataset), 100. * batch_idx / len(train_loader), 
                loss.item()))

In [10]:
''' 9. 학습되는 과정 속에서 검증 데이터에 대한 모델 성능을 확인하는 함수 정의 '''
def evaluate(model, test_loader):
    model.eval()                        # 평가 검증 모드 변환
    test_loss = 0                       # loss 초기화 
    real_image = []                     # 실제 이미지 리스트 
    gen_image = []                      # 생성 이미지 리스트
    with torch.no_grad():               # 업데이트 현상 방지 (Gradient 흐름 억제)
        for image, _ in test_loader:    # train과 똑같음
            image = image.view(-1, 28 * 28).to(DEVICE)
            target = image.view(-1, 28 * 28).to(DEVICE)
            encoded, decoded = model(image)
            
            test_loss += criterion(decoded, image).item()
            real_image.append(image.to("cpu"))
            gen_image.append(decoded.to("cpu"))
            
    test_loss /= (len(test_loader.dataset) / BATCH_SIZE) # 평균 Loss 값 반환

    return test_loss, real_image, gen_image

In [11]:
''' 10. AutoEncoder 학습 실행하며 Test set의 Reconstruction Error 확인하기 '''
for epoch in range(1, EPOCHS + 1):
    train(model, train_loader, optimizer, log_interval = 200)
    test_loss, real_image, gen_image = evaluate(model, test_loader)
    print("\n[EPOCH: {}], \tTest Loss: {:.4f}".format(epoch, test_loss))
#     f, a = plt.subplots(2, 10, figsize = (10, 4))
#     for i in range(10):
#         img = np.reshape(real_image[0][i], (28, 28))
#         a[0][i].imshow(img, cmap = "gray_r")
#         a[0][i].set_xticks(())
#         a[0][i].set_yticks(())
    
#     for i in range(10):
#         img = np.reshape(gen_image[0][i], (28, 28))
#         a[1][i].imshow(img, cmap = "gray_r")
#         a[1][i].set_xticks(())
#         a[1][i].set_yticks(())
#      plt.show() # 그림 생성 시 커널이 죽음.


[EPOCH: 1], 	Test Loss: 0.0153

[EPOCH: 2], 	Test Loss: 0.0130

[EPOCH: 3], 	Test Loss: 0.0118

[EPOCH: 4], 	Test Loss: 0.0114

[EPOCH: 5], 	Test Loss: 0.0110

[EPOCH: 6], 	Test Loss: 0.0108

[EPOCH: 7], 	Test Loss: 0.0104

[EPOCH: 8], 	Test Loss: 0.0102

[EPOCH: 9], 	Test Loss: 0.0101

[EPOCH: 10], 	Test Loss: 0.0099
