오토인코더 만들기.

1. Dataset
    - CIFAR-10
2. 이미지를 8x8 혹은 4x4 블록으로 쪼개서 DCT 변환 후, DCT Coeff를 Output으로 내는 AutoEncoder.
3. 모델의 Output을 IDCT 해서 원래 이미지가 잘 나오는 지 확인

In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"  # 물리적 순서대로 정렬
os.environ["CUDA_VISIBLE_DEVICES"] = "6"        # 나는 6번 GPU만 보이게 하겠다!

import torch
print(f"현재 사용 가능한 GPU 개수: {torch.cuda.device_count()}")
print(f"현재 선택된 GPU 번호: {torch.cuda.current_device()}")
print(f"장치 이름: {torch.cuda.get_device_name(0)}")

현재 사용 가능한 GPU 개수: 1
현재 선택된 GPU 번호: 0
장치 이름: NVIDIA GeForce RTX 3090


In [2]:
print("오토인코더")

오토인코더


In [3]:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# from torchvision import datasets, transforms
# import numpy as np
# from scipy.fftpack import dct, idct

# # ---------------------------
# # DCT / IDCT utilities
# # ---------------------------

# def dct2(block):
#     return dct(dct(block.T, norm='ortho').T, norm='ortho')

# def idct2(block):
#     return idct(idct(block.T, norm='ortho').T, norm='ortho')

# def blockify(img, block_size=8):
#     blocks = []
#     C, H, W = img.shape
#     for c in range(C):
#         for i in range(0, H, block_size):
#             for j in range(0, W, block_size):
#                 blocks.append(img[c, i:i+block_size, j:j+block_size])
#     return blocks

# def deblockify(blocks, img_shape, block_size=8):
#     C, H, W = img_shape
#     img = torch.zeros(img_shape)
#     idx = 0
#     for c in range(C):
#         for i in range(0, H, block_size):
#             for j in range(0, W, block_size):
#                 img[c, i:i+block_size, j:j+block_size] = blocks[idx]
#                 idx += 1
#     return img

# # ---------------------------
# # AutoEncoder
# # ---------------------------

# class DCTAutoEncoder(nn.Module):
#     def __init__(self, dim=64):
#         super().__init__()
#         self.encoder = nn.Sequential(
#             nn.Linear(dim, 32),
#             nn.ReLU()
#         )
#         self.decoder = nn.Sequential(
#             nn.Linear(32, dim)
#         )

#     def forward(self, x):
#         z = self.encoder(x)
#         out = self.decoder(z)
#         return out

# # ---------------------------
# # Dataset
# # ---------------------------

# transform = transforms.Compose([
#     transforms.ToTensor()
# ])

# dataset = datasets.CIFAR10(
#     root="./data",
#     train=True,
#     download=True,
#     transform=transform
# )

# loader = torch.utils.data.DataLoader(
#     dataset,
#     batch_size=1,
#     shuffle=True
# )

# # ---------------------------
# # Model / Optim
# # ---------------------------

# model = DCTAutoEncoder()
# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=1e-3)

# # ---------------------------
# # Training (very small demo)
# # ---------------------------

# model.train()

# for epoch in range(1):
#     for img, _ in loader:
#         img = img.squeeze(0)  # [3,32,32]

#         blocks = blockify(img)
#         dct_blocks = []

#         for b in blocks:
#             b_np = b.numpy()
#             dct_b = dct2(b_np)
#             dct_blocks.append(torch.tensor(dct_b).flatten())

#         dct_blocks = torch.stack(dct_blocks)

#         output = model(dct_blocks)
#         loss = criterion(output, dct_blocks)

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         print("loss:", loss.item())
#         break
#     break

# # ---------------------------
# # Reconstruction check
# # ---------------------------

# model.eval()
# with torch.no_grad():
#     img, _ = next(iter(loader))
#     img = img.squeeze(0)

#     blocks = blockify(img)
#     recon_blocks = []

#     for b in blocks:
#         dct_b = dct2(b.numpy())
#         dct_flat = torch.tensor(dct_b).flatten()
#         out = model(dct_flat)
#         out_block = out.view(8, 8).numpy()
#         recon = idct2(out_block)
#         recon_blocks.append(torch.tensor(recon))

#     recon_img = deblockify(recon_blocks, img.shape)

# print("Original min/max:", img.min().item(), img.max().item())
# print("Reconstructed min/max:", recon_img.min().item(), recon_img.max().item())


In [4]:
# dct_ae_cifar10.py

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

import numpy as np
from scipy.fftpack import dct, idct
import matplotlib.pyplot as plt

# -----------------------------
# 1. DCT / IDCT utilities -> 데이터를 DCT 영역과 이미지 영역을 넘나들 수 있게 함.
# -----------------------------

def dct2(block):
    # 2D DCT (orthonormal)
    return dct(dct(block, axis=0, norm='ortho'), axis=1, norm='ortho')
    # ㄴ DCT를 하면서, block은 8*8 이고, axis가 0이면 DCT의 C행렬이 왼쪽에 붙고, axis가 1이면 오른쪽(이건 C의 전치)에 붙는다. 
    # norm='ortho'는 C전치C가 항등행렬임을 정의한다. 정규화 과정이다.
    # block 안에는 그냥 8*8 데이터 저장소일뿐, DCT 되기 전 데이터가 있을수도 있고, 그 후의 데이터가 있을 수도 있다. 
def idct2(block):
    # 2D IDCT (orthonormal)
    return idct(idct(block, axis=0, norm='ortho'), axis=1, norm='ortho')

def image_to_dct_blocks(img, block_size=8): # 블록을 8*8로 쪼개기. 
    """
    img: (H, W) numpy array -> 높이, 너비. 32*32 흑백 이미지
    return: (num_blocks, block_size*block_size) -> (16, 64)
    """
    H, W = img.shape
    blocks = [] # 8*8 블록 하나를 DCT 한 뒤, 1차원 64개로 플랫튼. 
    for i in range(0, H, block_size): # 0 8 16 24
        for j in range(0, W, block_size):
            block = img[i:i+block_size, j:j+block_size]
            blocks.append(dct2(block).flatten()) # 블록 하나를 행이 큰단위, 열이 작은단위로 64개 원소 폄. 그걸 리스트의 한 원소로 추가
    return np.stack(blocks) # (16, 64). 16개의 블록 펼쳐진게 쌓여있음. 리스트의 각 원소를 한 행으로 쌓음. 
                            # 사실 위랑 구조는 같은데, 연산 가능한 행렬로 바꾼 것임. 16행 64열 행렬. 

# 그냥 위의 역 과정임.
def dct_blocks_to_image(blocks, H, W, block_size=8):
    """
    blocks: (num_blocks, block_size*block_size)
    return: reconstructed image (H, W)
    """
    img = np.zeros((H, W))
    idx = 0
    for i in range(0, H, block_size):
        for j in range(0, W, block_size):
            block = blocks[idx].reshape(block_size, block_size)
            img[i:i+block_size, j:j+block_size] = idct2(block)
            idx += 1
    return img

# -----------------------------
# 2. AutoEncoder definition
# -----------------------------

class DCTAutoEncoder(nn.Module): # 블록간 간섭 없고, 완전히 독립적으로 압축, 복원. 64개 들어가서 64개 나옴.
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16)
        )
        self.decoder = nn.Sequential(
            nn.Linear(16, 32), #정보가 강제로 사라졌다가 다시 추청한거라, 기존의 32차원의 정보를 완벽히 복원할 수 없음!!! *중요*
            nn.ReLU(),
            nn.Linear(32, 64)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out

# -----------------------------
# 3. Dataset (CIFAR-10) -> 컬러 이미지 50,000장을 흑백으로 바꾸고, 연산하기 쉽게 변환. 
# -----------------------------

transform = transforms.Compose([
    transforms.Grayscale(),  # 단순화를 위해 grayscale. 처리하면 RGB가 흑백사진으로 바뀜. 현재 코드는 컬러로 복원하지 않음.
    transforms.ToTensor() # (1, 32, 32)로 변경되고, float32 [0, 1] 범위로 바뀜. 원래는 unit8 [0, 255] 임. 
])

dataset = torchvision.datasets.CIFAR10(
    root='./data',
    train=True,
    download=True,
    transform=transform
)

loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=1,
    shuffle=True
)

# -----------------------------
# 4. Training setup
# -----------------------------

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DCTAutoEncoder().to(device)
criterion = nn.MSELoss() # 기준은 Mean Squared Error
optimizer = optim.Adam(model.parameters(), lr=1e-3) # 인코더 디코더의 모든 파라미터를 최적화. 
# 옵티마이저는 다음 스텝에서 가중치를 고쳐줌. 모멘트를 고려하는건데, 이건 나중에 따로 학습하기. 

# -----------------------------
# 5. Training loop
# -----------------------------

epochs = 5 # 전체 이미지를 5바뀌 학습.
model.train()

for epoch in range(epochs):
    total_loss = 0.0 # loss 누적값인데, 학습이 진행되는지 확인용. 
    for img, _ in loader: # _는 라벨인데, AE에서는 사용 안 함. 
        img = img.squeeze().numpy()  # (32, 32) 원래는 (1, 1, 32, 32)였음. 

        dct_blocks = image_to_dct_blocks(img)  # (16, 64). 주파주 차원으로 옮김. 
        dct_blocks = torch.tensor(dct_blocks, dtype=torch.float32).to(device) # 텐서로 바꾸고, GPU에 올림. 

        optimizer.zero_grad()
        recon = model(dct_blocks)
        loss = criterion(recon, dct_blocks)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() # <- 이거 바꿔서, 평균내거나 오차율 작게 하기. 

    print(f"[Epoch {epoch+1}] loss: {total_loss:.4f}")

# -----------------------------
# 6. Visualization (fixed image) -> 이미지로 복원하는 과정.
# -----------------------------

model.eval()
with torch.no_grad(): # 이 블록 안에서는 기울기 계산 안 함. 
    img, _ = dataset[0]
    img = img.squeeze().numpy() # 순수 흑백이미지 생성. 

    dct_blocks = image_to_dct_blocks(img) # 플랫튼 한 (16, 64) 행렬 생성. 
    dct_blocks_t = torch.tensor(dct_blocks, dtype=torch.float32).to(device) # 텐서로 바꾸고, GPU에 올림. 

    recon_blocks = model(dct_blocks_t).cpu().numpy() # 인코딩 디코딩을 모두 통과함. 블록이 출력됨.
    recon_img = dct_blocks_to_image(recon_blocks, 32, 32) # (16,64) 블록들을 다시 이미지로 복원. 

print("Original min/max:", img.min(), img.max()) # 이론적으로 값의 범위는 0~1 사이여야 함. by transforms.ToTensor()
print("Reconstructed min/max:", recon_img.min(), recon_img.max()) # DCT계수는 실수 전 범위. 

plt.figure(figsize=(6,3)) # 원본과 이미지를 보여주는 그림. 
plt.subplot(1,2,1)
plt.title("Original")
plt.imshow(img, cmap='gray')
plt.axis('off')

plt.subplot(1,2,2) # 복원이미지를 보여주는 그림. 
plt.title("Reconstructed")
plt.imshow(recon_img, cmap='gray')
plt.axis('off')

plt.tight_layout()
plt.show()


Files already downloaded and verified


KeyboardInterrupt: 