In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt

from torch import nn
from torch import optim
from torchvision import utils
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

seed = 42
torch.manual_seed(seed=seed)
torch.set_float32_matmul_precision("high")

if device.type == 'cuda':
    torch.cuda.manual_seed_all(seed=seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [3]:
epochs = 10
batch_size = 64
learning_rate = 0.001
latent_dim = 32

In [4]:
train_dataset = datasets.MNIST(root='/home/pervinco/Datasets/MNIST', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='/home/pervinco/Datasets/MNIST', train=False, transform=transforms.ToTensor(), download=True)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [5]:
class Encoder(nn.Module):
    def __init__(self, latent_dim: int=20):
        super().__init__()

        self.fc1 = nn.Linear(784, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc_mean = nn.Linear(128, latent_dim)

    def forward(self, x: torch.Tensor):
        x = x.view(x.size(0), -1)
        x = F.leaky_relu(self.fc1(x), negative_slope=0.2)
        x = F.leaky_relu(self.fc2(x), negative_slope=0.2)
        z = F.leaky_relu(self.fc_mean(x)) ## fc_mean은 prior distribution인 정규분포의 평균을 추정하는 것이 인코더의 목적임을 의미함.

        return z
    
class Decoder(nn.Module):
    def __init__(self, latent_dim: int=20):
        super().__init__()

        self.fc1 = nn.Linear(latent_dim, 128)
        self.fc2 = nn.Linear(128, 256)
        self.fc3 = nn.Linear(256, 784)

    def forward(self, z: torch.Tensor):
        z = F.leaky_relu(self.fc1(z), negative_slope=0.2)
        z = F.leaky_relu(self.fc2(z), negative_slope=0.2)

        x_hat = F.sigmoid(self.fc3(z)) ## Sigmoid 함수로 [0, 1]로 분포를 조절. --> gray scale 이미지니까.
        x_hat = x_hat.view(x_hat.size(0), 1, 28, 28) ## 원본 이미지와 같은 [28 X 28 X 1]의 형상으로 수정.

        return x_hat
    
class Autoencoder(nn.Module):
    def __init__(self, encoder=Encoder, decoder=Decoder, latent_dim: int=20):
        super().__init__()
        self.encoder = encoder(latent_dim=latent_dim)
        self.decoder = decoder(latent_dim=latent_dim)

        self.latent_dim = latent_dim

    def forward(self, x: torch.Tensor):
        z = self.encode(x)
        x_hat = self.decode(z)
        return z, x_hat

    def encode(self, x: torch.Tensor):
        return self.encoder(x)

    def decode(self, z: torch.Tensor):
        return self.decoder(z)

In [6]:
# 저장된 손실 함수를 시각화합니다.
def visualize_loss(epochs, train_loss, valid_loss):
    # x축 계산
    x_len = np.arange(epochs)

    plt.plot(x_len, train_loss, marker=".", c="blue", label="Train Loss")
    plt.plot(x_len, valid_loss, marker=".", c="red", label="Valid Loss")
    # 그래프에 대한 레이블 정보를 우측 상단에 출력합니다.
    plt.legend(loc='upper right')
    plt.grid()
    # x축과 y축에 대한 레이블을 설정합니다.
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.show()

In [7]:
autoencoder = Autoencoder(latent_dim=latent_dim).to(device)

# recon_loss = nn.MSELoss(reduction='sum')
recon_loss = nn.BCELoss(reduction='sum')
optimizer = optim.Adam(params=list(autoencoder.parameters()), lr=learning_rate)

In [9]:
history = dict(train_loss=[], valid_loss=[])

print(f"Training start with {epochs} epochs.")
for epoch in range(1, 1 + epochs):
    
    train_size = 0
    train_epoch_loss = 0
    autoencoder.train()
    for i, batch in enumerate(train_loader):
        image, label = batch
        # image = image.to(device)
        image = image.float().to(device)


        train_size += label.size(0)
        latent = autoencoder.encode(image)
        reconstruction_image = autoencoder.decode(latent)
        reconstruction_image = reconstruction_image.float().to(device)

        optimizer.zero_grad()
        train_loss = recon_loss(image, reconstruction_image)
        train_loss.backward()
        optimizer.step()
        train_epoch_loss += train_loss

    test_size = 0
    test_epoch_loss = 0
    autoencoder.eval()
    with torch.no_grad():
        for i, batch in enumerate(test_loader):
            image, label = batch
            image = image.to(device)
            test_size += label.size(0)

            latent = autoencoder.encode(image)
            reconstruction_image = autoencoder.decode(latent)

            test_loss = recon_loss(image, reconstruction_image)
            test_epoch_loss += test_loss

    train_epoch_loss /= train_size
    test_epoch_loss /= test_size

    history["train_loss"].append(train_epoch_loss.item())
    history["valid_loss"].append(test_epoch_loss.item())

    print(f"[{epoch:02d}/{epochs:02d}]"
        + f"Train BCE Loss = {train_epoch_loss:.4f}, "
        + f"Valid BCE Loss = {test_epoch_loss:.4f}")

Training start with 10 epochs.


/opt/conda/conda-bld/pytorch_1712608885084/work/aten/src/ATen/native/cuda/Loss.cu:95: operator(): block: [94,0,0], thread: [32,0,0] Assertion `target_val >= zero && target_val <= one` failed.
/opt/conda/conda-bld/pytorch_1712608885084/work/aten/src/ATen/native/cuda/Loss.cu:95: operator(): block: [94,0,0], thread: [33,0,0] Assertion `target_val >= zero && target_val <= one` failed.
/opt/conda/conda-bld/pytorch_1712608885084/work/aten/src/ATen/native/cuda/Loss.cu:95: operator(): block: [94,0,0], thread: [34,0,0] Assertion `target_val >= zero && target_val <= one` failed.
/opt/conda/conda-bld/pytorch_1712608885084/work/aten/src/ATen/native/cuda/Loss.cu:95: operator(): block: [94,0,0], thread: [35,0,0] Assertion `target_val >= zero && target_val <= one` failed.
/opt/conda/conda-bld/pytorch_1712608885084/work/aten/src/ATen/native/cuda/Loss.cu:95: operator(): block: [94,0,0], thread: [36,0,0] Assertion `target_val >= zero && target_val <= one` failed.
/opt/conda/conda-bld/pytorch_17126088850

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
visualize_loss(epochs, history["train_loss"], history["valid_loss"])

In [None]:
select = 0
autoencoder.eval()
with torch.no_grad():
    for i, batch in enumerate(test_loader):
        if i != select:
            continue
        else:
            test_image, test_label = batch
            test_image = test_image.to(device)
            test_image = test_image.view(batch_size, 1 * 28 * 28)

            z = autoencoder.encode(test_image)
            reconstruction = autoencoder.decode(z)
            break

print("데이터 배치 이미지입니다.")
plt.imshow(utils.make_grid(test_image[:64].cpu().view(-1, 1, 28, 28)).permute(1, 2, 0))
plt.show()

print("복원된 이미지입니다.")
plt.imshow(utils.make_grid(reconstruction[:64].cpu()[:64].view(-1, 1, 28, 28)).permute(1, 2, 0))
plt.show()