In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
import cv2
from PIL import Image
import glob
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor
import torch.optim as optim
from torch.utils.data import DataLoader

from google.colab import drive
drive.mount('/content/drive')


In [None]:
# 학습 데이터 압축 해제
import os
import zipfile

zip_path = '/content/drive/MyDrive/Colab Notebooks/2025_1 딥러닝/new_COCO1.zip'

zip_ref = zipfile.ZipFile(zip_path, 'r')
zip_ref.extractall('/dataset')
zip_ref.close()

In [None]:
# psnr, mse 계산 함수
# psnr을 계산하기 위해 파이토치 텐서를 넘파이로 변환하는 과정 필요
def compute_psnr(img1,img2):
    # pytorch 내장 gpu연산을 통해 연산 속도. gpu상에서 텐서 연산을 바로 진행
    # gpu->cpu 복사후 numpy로 처리하는 것보다 훨씬 속도가 빠름
    mse = torch.mean((img1 - img2)**2) #mse 계산식
    psnr = 20 * torch.log10(255.0 / torch.sqrt(mse)) #psnr 계산식 using numpy log10 and sqrt
    return psnr

In [None]:
from torchvision import transforms


class RandomPatchDataset(Dataset):
    """
    - 원본 512×512 이미지에서 “무작위 Patch”를 미리 샘플링해 두고,
      학습 중에는 Patch 좌표만 꺼내서 Upsample·평균화·Concatenate 연산만 수행하도록 최적화.

    - K×K 크기의 256×256 패치 M개 → 512×512로 upsample → 평균화  → up256_agg
    - K×K 크기의 512×512 패치 M개 → 512×512로 upsample → 평균화  → up512_agg
    - plus, 128×128 → 512×512 global upsample → up128

    inp = [ up128, up256_agg, up512_agg ]  => (채널 개수 = 3 + 3 + 3 = 9) × 512 × 512

    - 메모리 소모: “(x_256, y_256) 좌표 M개” + “(x_512, y_512) 좌표 M개”만 미리 캐시 → 매우 작음.
    - 불필요한 feature_cache(idx→(inp,hr))를 제거하여 RAM 절약.
    """

    def __init__(self, img_dir: str, K: int, cache_images: bool = True):
        """
        img_dir      : 원본 512×512 이미지 폴더 경로
        K            : patch 한 변의 길이 (픽셀)
        cache_images : True면 PIL.Image를 메모리에 캐시 (권장 False → 메모리 절약)
        """
        super().__init__()
        assert 16384 % (K * K) == 0, "16384 must be divisible by K*K"
        self.K = K
        self.M = 16384 // (K * K)

        # 1) 이미지 파일 경로 리스트
        self.paths = sorted(glob.glob(os.path.join(img_dir, '*.png')))
        # 2) PIL.Image 캐시 여부 (cache_images=False 권장)
        self.cache_images = cache_images
        self.pil_cache = {} if cache_images else None

        # 3) 미리 샘플링한 “256 해상도 무작위 Patch 좌표” / “512 해상도 무작위 Patch 좌표”
        #    각각의 key: 이미지 경로, value: [(x1,y1), (x2,y2), ..., (xM,yM)]
        self.patch256_coords = {}  # for 256×256 패치
        self.patch512_coords = {}  # for 512×512 패치

        self.to_tensor = transforms.ToTensor()

        # 4) 초기화 시점에만 한 번, 모든 이미지 당 M개의 랜덤 좌표 샘플링
        self._sample_patch_coords_all_images()

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        path = self.paths[idx]

        # ── 1) PIL 이미지 로드 & 캐시 처리
        if self.pil_cache is not None and path in self.pil_cache:
            pil_img = self.pil_cache[path]
        else:
            pil_img = Image.open(path).convert('RGB')
            if self.pil_cache is not None:
                self.pil_cache[path] = pil_img

        # ── 2) HR 타깃 (512×512) → Tensor
        hr = self.to_tensor(pil_img)

        # ── 3) Global Up-sample: 128×128 → 512×512
        lr128 = pil_img.resize((128, 128), Image.BICUBIC)
        up128 = F.interpolate(
            self.to_tensor(lr128).unsqueeze(0),
            size=(512, 512),
            mode='bicubic'
        ).squeeze(0)  # 결과 shape: [3, 512, 512]

        # ── 4) 미리 샘플링된 좌표 (256×256 해상도) 꺼내오기
        coords256 = self.patch256_coords[path]  # [(x1,y1), ..., (xM,yM)]
        coords512 = self.patch512_coords[path]  # [(u1,v1), ..., (uM,vM)]

        # ── 5) Accumulator: upsample → 모아서 평균 (256 패치)
        lr256_full = pil_img.resize((256, 256), Image.BICUBIC)
        acc256 = []
        for (x256, y256) in coords256:
            p256 = lr256_full.crop((x256, y256, x256 + self.K, y256 + self.K))
            t256 = self.to_tensor(p256).unsqueeze(0)  # [1,3,K,K]
            up256 = F.interpolate(t256, size=(512, 512), mode='bicubic')  # [1,3,512,512]
            acc256.append(up256)
        # torch.cat → [M, 3, 512, 512], 평균 → [3,512,512]
        up256_agg = torch.mean(torch.cat(acc256, dim=0), dim=0)

        # ── 6) Accumulator: upsample → 모아서 평균 (512 패치)
        acc512 = []
        for (x512, y512) in coords512:
            p512 = pil_img.crop((x512, y512, x512 + self.K, y512 + self.K))
            t512 = self.to_tensor(p512).unsqueeze(0)  # [1,3,K,K]
            up512 = F.interpolate(t512, size=(512, 512), mode='bicubic')  # [1,3,512,512]
            acc512.append(up512)
        up512_agg = torch.mean(torch.cat(acc512, dim=0), dim=0)

        # ── 7) 세 가지 채널( up128, up256_agg, up512_agg )을 concat → 최종 입력
        inp = torch.cat([up128, up256_agg, up512_agg], dim=0)  # shape: [9,512,512]

        return inp, hr

    def clear_cache(self):
        """
        (Optional) Epoch 단위로 PIL 캐시만 지우고 싶으면 호출.
        patch256_coords / patch512_coords는 유지합니다.
        """
        if self.pil_cache is not None:
            self.pil_cache.clear()

    def resample_patch_coords(self):
        """
        매 Epoch마다 ‘완전히 새로운 Random patch’를 다시
        샘플링하고 싶을 때 호출하는 함수.
        """
        self._sample_patch_coords_all_images()

    def _sample_patch_coords_all_images(self):
        """
        1) 모든 이미지에 대해
           - 256×256 해상도에서 K×K 랜덤 Patch 좌표 M개
           - 512×512 해상도에서 K×K 랜덤 Patch 좌표 M개
          를 미리 뽑아서 self.patch256_coords, self.patch512_coords에 저장.
        2) 이 과정은 __init__에서 한 번 호출되며, 이후
           resample_patch_coords()를 통해 재호출 가능.
        """
        # 필요한 변수
        H256, W256 = 256, 256
        H512, W512 = 512, 512

        for path in self.paths:
            # 1) 256×256 해상도 Patch 좌표 M개 무작위 샘플링
            coords256 = []
            for _ in range(self.M):
                x256 = np.random.randint(0, W256 - self.K + 1)
                y256 = np.random.randint(0, H256 - self.K + 1)
                coords256.append((x256, y256))
            self.patch256_coords[path] = coords256

            # 2) 512×512 해상도 Patch 좌표 M개 무작위 샘플링
            coords512 = []
            for _ in range(self.M):
                x512 = np.random.randint(0, W512 - self.K + 1)
                y512 = np.random.randint(0, H512 - self.K + 1)
                coords512.append((x512, y512))
            self.patch512_coords[path] = coords512



In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("device : ", device)


In [None]:
# 이미지 복원 네트워크
class MyVersionSRCNN(nn.Module):
    def __init__(self, in_channels, out_channels=3):
        super(MyVersionSRCNN, self).__init__()
        self.layers = nn.Sequential(
            # 1) Depthwise Conv 5×5 방법-파라미터, 연산량을 모두 감소
            nn.Conv2d(in_channels, in_channels, kernel_size=5, padding=2, groups=in_channels),
            nn.Conv2d(in_channels, in_channels, kernel_size=5, padding=2, groups=in_channels),
            # 2) Pointwise Conv 1×1 → 64채널
            nn.Conv2d(in_channels, 64, kernel_size=1),
            nn.ReLU(inplace=True),

            nn.Conv2d(64, 32, kernel_size=1),         # dimensionality reduction
            nn.ReLU(inplace=True),

            nn.Conv2d(32, out_channels, kernel_size=5, padding=2)  # reconstruction
        )

    def forward(self, x):
        # x[:, :3] 은 global up128 의 첫 3채널
        res = self.layers(x) # 잔차 학습(Residual Learning)---학습속도 계선
        return  (res + x[:, :3, :, :]).clamp(0,1)# output range [0,1]

# 선택가능 패치 개수 : M = 16384 // (K*K) = 64, so in_ch = 3 + 6*64 = 387
model1 = MyVersionSRCNN(in_channels=9).to(device)

In [None]:
# 2) Basic SRCNN (adapted for 9-channel input)
class BasicSRCNN(nn.Module):
    def __init__(self, in_channels=9, out_channels=3):
        super(BasicSRCNN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=9, padding=4),  # feature extraction
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 32, kernel_size=1),                       # non-linear mapping
            nn.ReLU(inplace=True),
            nn.Conv2d(32, out_channels, kernel_size=5, padding=2)   # reconstruction
        )

    def forward(self, x):
        return self.net(x).clamp(0, 1)

model2 = BasicSRCNN(in_channels=9).to(device)


In [None]:
# 3) Residual SRCNN
class ResidualSRCNN(nn.Module):
    def __init__(self, in_channels=9, out_channels=3):
        super(ResidualSRCNN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=9, padding=4),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 32, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, out_channels, kernel_size=5, padding=2)
        )

    def forward(self, x):
        res = self.net(x)
        # add global up128 (first 3 channels) as residual connection
        return (res + x[:, :3, :, :]).clamp(0, 1)

model3 = ResidualSRCNN(in_channels=9).to(device)


In [None]:
# 4) Depthwise-Separable SRCNN
class DepthwiseSRCNN(nn.Module):
    def __init__(self, in_channels=9, out_channels=3):
        super(DepthwiseSRCNN, self).__init__()
        self.net = nn.Sequential(
            # depthwise 9x9
            nn.Conv2d(in_channels, in_channels, kernel_size=9, padding=4, groups=in_channels),
            # pointwise 1x1
            nn.Conv2d(in_channels, 64, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 32, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, out_channels, kernel_size=5, padding=2)
        )

    def forward(self, x):
        return self.net(x).clamp(0, 1)

model4 = DepthwiseSRCNN(in_channels=9).to(device)


In [None]:
from torchsummary import summary
summary(model1, (9, 512, 512)) # MyVersionSRCNN
summary(model2, (9, 512, 512)) # BasicSRCNN
summary(model3, (9, 512, 512)) # ResidualSRCNN
summary(model4, (9, 512, 512)) # DepthwiseSRCNN

In [None]:
print("CPU 코어 수:", os.cpu_count())

In [None]:
# Initialize dataset
train_dataset = RandomPatchDataset(img_dir='/dataset/new_COCO1/Train',K=16)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=1, pin_memory=True, persistent_workers=True, prefetch_factor=2)
test_dataset = RandomPatchDataset(img_dir='/dataset/new_COCO1/Test', K=16)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=1, pin_memory=True, persistent_workers=True, prefetch_factor=2)


In [None]:
# Dataset shape 확인
print(f"Train dataset : {len(train_dataset)}")
print(f"Test dataset : {len(test_dataset)}")

sample_input, sample_output = train_dataset[0]
print(f"Train dataset input shape(3개의 scale의 rgb이미지): {sample_input.shape}")
print(f"Train dataset output shape(복원된 이미지): {sample_output.shape}")

sample_input, sample_output = test_dataset[0]
print(f"Test dataset input shape: {sample_input.shape}")
print(f"Test dataset output shape: {sample_output.shape}")

In [None]:
# 하이퍼파라미터 설정
learning_rate = 1e-3 # 학습률
criterion = nn.MSELoss() # loss fuction
num_epochs = 10 # number of epochs

# 1번모델========================================================
# optimizer
optimizer1 = optim.Adam(model1.parameters(), lr=learning_rate)

# shceduler(learning rate 조절)
scheduler1 = optim.lr_scheduler.StepLR(optimizer1, step_size=10, gamma=0.1) # 10 epoch마다 0.1배

# 2번모델==========================================================
# optimizer
optimizer2 = optim.Adam(model2.parameters(), lr=learning_rate)

# shceduler(learning rate 조절)
scheduler2 = optim.lr_scheduler.StepLR(optimizer2, step_size=10, gamma=0.1) # 10 epoch마다 0.1배

# 3번모델==========================================================
# optimizer
optimizer3 = optim.Adam(model3.parameters(), lr=learning_rate)

# shceduler(learning rate 조절)
scheduler3 = optim.lr_scheduler.StepLR(optimizer3, step_size=10, gamma=0.1) # 10 epoch마다 0.1배

# 4번모델==========================================================
# optimizer
optimizer4 = optim.Adam(model4.parameters(), lr=learning_rate)

# shceduler(learning rate 조절)
scheduler4 = optim.lr_scheduler.StepLR(optimizer4, step_size=10, gamma=0.1) # 10 epoch마다 0.1배


In [None]:
import time
from torch.cuda.amp import autocast, GradScaler

def train(model, device, train_loader, optimizer, criterion, scheduler, num_epochs, time_limit_hours=10):
    start_time = time.time()
    train_loss = []
    train_psnr = []

    for epoch in range(num_epochs):
        train_dataset.clear_cache() # epoch마다 캐시 초기화
        model.train()
        epoch_loss = 0
        epoch_psnr = 0
        num_batches = len(train_loader)
        scaler = GradScaler()
        for input_tensor, target in train_loader:
            input_tensor = input_tensor.to(device)  # [B, 9, 512, 512]
            target = target.to(device)              # [B, 3, 512, 512]

            optimizer.zero_grad()
            with autocast():
                output = model(input_tensor)
                loss = criterion(output, target)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # PSNR 계산
            psnr = compute_psnr(output, target)

            epoch_loss += loss.item()
            epoch_psnr += psnr.item()

        scheduler.step()

        avg_loss = epoch_loss / num_batches
        avg_psnr = epoch_psnr / num_batches
        train_loss.append(avg_loss)
        train_psnr.append(avg_psnr)
        print(f"[Epoch {epoch+1}/{num_epochs}] Loss: {avg_loss:.4f}, Avg PSNR: {avg_psnr:.2f} dB")

        # 시간 제한 확인
        elapsed_time = time.time() - start_time
        if elapsed_time > time_limit_hours * 3600:
            print("Training stopped: 10-hour limit reached.")
            break

    return model, train_loss, train_psnr


In [None]:
print('1번 모델 훈련 : MyVersionSRCNN')
train_model1, train_loss1, train_psnr1 = train(model1, device, train_dataloader, optimizer1, criterion, scheduler1, num_epochs)


In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

def display_random_patch(dataset, index, K):
    """
    랜덤으로 추출한 KxK 패치와 원본 이미지를 출력합니다.
    패치로 선택되지 않은 영역은 검정색으로 처리합니다.

    Args:
        dataset (RandomPatchDataset): RandomPatchDataset 객체.
        index (int): 데이터셋에서 이미지를 선택할 인덱스.
        K (int): 패치의 한 변의 길이 (픽셀).
    """
    path = dataset.paths[index]
    pil_img = Image.open(path).convert('RGB')
    img_np = np.array(pil_img)

    # 미리 샘플링된 좌표 (256×256 해상도) 꺼내오기
    coords256 = dataset.patch256_coords[path]

    # 미리 샘플링된 좌표 (512×512 해상도) 꺼내오기
    coords512 = dataset.patch512_coords[path]

    # 256 이미지에 대한 마스크 생성 및 패치 영역 표시
    img_256_masked = np.zeros_like(img_np)
    lr256_full = pil_img.resize((256, 256), Image.BICUBIC)
    lr256_np = np.array(lr256_full)

    for (x256, y256) in coords256:
        # 256 해상도에서 패치 영역
        patch256 = lr256_np[y256 : y256 + K, x256 : x256 + K, :]
        # 512 해상도로 확대하여 원래 이미지 위치에 표시 (간단한 방법)
        # 실제 interpolate와는 다를 수 있지만, 시각화 목적
        x512 = x256 * 2
        y512 = y256 * 2
        # 확대된 패치 영역 (간단한 반복 사용)
        for r in range(K):
            for c in range(K):
                # 2x2 블록으로 확대하여 512 이미지에 표시
                img_256_masked[y512 + r*2 : y512 + r*2 + 2, x512 + c*2 : x512 + c*2 + 2, :] = patch256[r, c, :]

    # 512 이미지에 대한 마스크 생성 및 패치 영역 표시
    img_512_masked = np.zeros_like(img_np)
    for (x512, y512) in coords512:
        # 512 해상도에서 패치 영역
        patch512 = img_np[y512 : y512 + K, x512 : x512 + K, :]
        # 512 이미지에 표시
        img_512_masked[y512 : y512 + K, x512 : x512 + K, :] = patch512


    # 이미지 출력
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.imshow(img_np)
    plt.title('Original Image (512x512)')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.imshow(img_256_masked)
    plt.title(f'Patches from 256 (K={K}, M={dataset.M})')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.imshow(img_512_masked)
    plt.title(f'Patches from 512 (K={K}, M={dataset.M})')
    plt.axis('off')

    plt.show()

# train_dataset에서 첫 번째 이미지에 대해 K=16으로 패치 시각화
display_random_patch(train_dataset, 0, 16)

In [None]:

import matplotlib.pyplot as plt
import numpy as np
# 테스트 함수 (GPU 사용)
def test_model(model, device, test_loader):
    model.eval()  # evaluation mode
    test_psnr = 0
    num_batches = len(test_loader)

    with torch.no_grad():  # Disable gradient calculation
        for input_tensor, target in test_loader:
            input_tensor = input_tensor.to(device)
            target = target.to(device)

            output = model(input_tensor)

            # PSNR 계산
            psnr = compute_psnr(output, target)
            test_psnr += psnr.item()

    avg_psnr = test_psnr / num_batches
    print(f"Test Avg PSNR: {avg_psnr:.2f} dB")
    return avg_psnr


def display_comparison(model, device, dataset, index):
    """
    원본 이미지와 복원된 이미지를 비교하여 출력합니다.

    Args:
        model (torch.nn.Module): 복원 모델.
        device (str): 'cuda' 또는 'cpu'.
        dataset (RandomPatchDataset): 데이터셋 객체.
        index (int): 데이터셋에서 이미지를 선택할 인덱스.
    """
    model.eval()  # 모델을 평가 모드로 설정

    # 데이터셋에서 하나의 샘플 가져오기
    input_tensor, target_tensor = dataset[index]

    # 모델 입력 준비 (배치 차원 추가, 디바이스 이동)
    input_tensor = input_tensor.unsqueeze(0).to(device) # [1, 9, 512, 512]
    target_tensor = target_tensor.to(device) # [3, 512, 512]

    with torch.no_grad():
        # 모델 추론
        restored_tensor = model(input_tensor).squeeze(0) # [3, 512, 512]

    # Tensor를 NumPy 이미지로 변환
    target_img_np = target_tensor.cpu().permute(1, 2, 0).numpy() # [H, W, C]
    restored_img_np = restored_tensor.cpu().permute(1, 2, 0).numpy() # [H, W, C]

    # 0-1 범위의 float 이미지를 0-255 범위의 uint8로 변환 (시각화용)
    target_img_np = (target_img_np * 255).astype(np.uint8)
    restored_img_np = (restored_img_np * 255).astype(np.uint8)


    # 이미지 출력
    plt.figure(figsize=(10, 5))

    plt.subplot(1, 2, 1)
    plt.imshow(target_img_np)
    plt.title('Original Image (HR)')
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow(restored_img_np)
    plt.title(f'Restored Image (DepthwiseSRCNN)')
    plt.axis('off')

    plt.show()

# 훈련된 모델과 테스트 데이터셋을 사용하여 복원된 이미지 비교 출력
# 예를 들어, test_dataset의 첫 번째 이미지에 대해 비교
display_comparison(model1, device, test_dataset, 4)
display_comparison(model1, device, test_dataset, 5)
display_comparison(model1, device, test_dataset, 6)
display_comparison(model1, device, test_dataset, 7)

In [None]:
# 그래프로 결과 시각화
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.plot(train_loss1)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.subplot(1,2,2)
plt.plot(train_psnr1)
plt.title('Training PSNR')
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.show()

In [None]:
print('2번 모델 훈련 : BasicSRCNN')
train_model2, train_loss2, train_psnr2 = train(model2, device, train_dataloader, optimizer2, criterion, scheduler2, num_epochs)


In [None]:
# 그래프로 결과 시각화
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.plot(train_loss2)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.subplot(1,2,2)
plt.plot(train_psnr2)
plt.title('Training PSNR')
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.show()

In [None]:
print('3번 모델 훈련 : ResidualSRCNN')
train_model3, train_loss3, train_psnr3 = train(model3, device, train_dataloader, optimizer3, criterion, scheduler3, num_epochs)


In [None]:
# 그래프로 결과 시각화
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.plot(train_loss3)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.axis('off')

plt.subplot(1,2,2)
plt.plot(train_psnr3)
plt.title('Training PSNR')
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.axis('off')
plt.show()

In [None]:
print('4번 모델 훈련 : DepthwiseSRCNN')
train_model4, train_loss4, train_psnr4 = train(model4, device, train_dataloader, optimizer4, criterion, scheduler4, num_epochs)


In [None]:
# 그래프로 결과 시각화
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.plot(train_loss4)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.axis('off')

plt.subplot(1,2,2)
plt.plot(train_psnr4)
plt.title('Training PSNR')
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.axis('off')
plt.show()

In [None]:

def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    test_psnr = 0
    num_batches = len(test_loader)

    with torch.no_grad():
        for input_tensor, target in test_loader:
            input_tensor = input_tensor.to(device)
            target = target.to(device)

            output = model(input_tensor)
            loss = criterion(output, target)

            # PSNR 계산
            psnr = compute_psnr(output, target)

            test_loss += loss.item()
            test_psnr += psnr.item()

    avg_loss = test_loss / num_batches
    avg_psnr = test_psnr / num_batches

    print(f"\nTest Results:")
    print(f"Average Loss: {avg_loss:.4f}, Average PSNR: {avg_psnr:.2f} dB")
    return avg_loss, avg_psnr

print('\n모델 1 테스트 결과 : MyVersionSRCNN')
test_loss1, test_psnr1 = test(train_model1, device, test_dataloader, criterion)
print('\n모델 2 테스트 결과 : BasicSRCNN')
test_loss2, test_psnr2 = test(train_model2, device, test_dataloader, criterion)
print('\n모델 3 테스트 결과 : ResidualSRCNN')
test_loss3, test_psnr3 = test(train_model3, device, test_dataloader, criterion)
print('\n모델 4 테스트 결과 : DepthwiseSRCNN')
test_loss4, test_psnr4 = test(train_model4, device, test_dataloader, criterion)

# 모델별 테스트 결과 비교 (선택 사항)
print("\n--- 최종 테스트 결과 비교 ---")
print(f"MyVersionSRCNN: Avg Loss = {test_loss1:.4f}, Avg PSNR = {test_psnr1:.2f} dB")
print(f"BasicSRCNN:     Avg Loss = {test_loss2:.4f}, Avg PSNR = {test_psnr2:.2f} dB")
print(f"ResidualSRCNN:  Avg Loss = {test_loss3:.4f}, Avg PSNR = {test_psnr3:.2f} dB")
print(f"DepthwiseSRCNN: Avg Loss = {test_loss4:.4f}, Avg PSNR = {test_psnr4:.2f} dB")


In [None]:
import matplotlib.pyplot as plt
def display_random_test_image_reconstruction(model, device, test_dataset):
    # 랜덤 이미지 선택
    random_idx = random.randint(0, len(test_dataset) - 1)
    input_tensor, target = test_dataset[random_idx]

    # 모델 입력 형태로 변환 및 디바이스 이동
    input_tensor_device = input_tensor.unsqueeze(0).to(device) # Add batch dimension

    # 모델 예측
    model.eval()
    with torch.no_grad():
        reconstructed_output = model(input_tensor_device)

    # 이미지 시각화를 위해 CPU로 이동 및 numpy 배열로 변환
    # (C, H, W) -> (H, W, C)로 차원 변경
    input_original = target.cpu().squeeze(0).permute(1, 2, 0).numpy()
    reconstructed_image = reconstructed_output.cpu().squeeze(0).permute(1, 2, 0).numpy()

    # 시각화
    plt.figure(figsize=(12, 6))

    # 원본 이미지 출력
    plt.subplot(1, 2, 1)
    plt.imshow(input_original)
    plt.title('Original Image')
    plt.axis('off')

    # 복원 이미지 출력
    plt.subplot(1, 2, 2)
    plt.imshow(reconstructed_image)
    plt.title('Reconstructed Image')
    plt.axis('off')

    plt.show()

print("\n--- 랜덤 테스트 이미지 복원 결과 (MyVersionSRCNN) ---")
display_random_test_image_reconstruction(train_model1, device, test_dataset)

print("\n--- 랜덤 테스트 이미지 복원 결과 (BasicSRCNN) ---")
display_random_test_image_reconstruction(train_model2, device, test_dataset)

print("\n--- 랜덤 테스트 이미지 복원 결과 (ResidualSRCNN) ---")
display_random_test_image_reconstruction(train_model3, device, test_dataset)

print("\n--- 랜덤 테스트 이미지 복원 결과 (DepthwiseSRCNN) ---")
display_random_test_image_reconstruction(train_model4, device, test_dataset)

In [None]:
def evaluate_masked_psnr(model, device, dataloader, dataset, K):
    model.eval()
    total_masked_psnr = 0
    count = 0
    dataset.clear_cache() # evaluation 전에 캐시 초기화

    with torch.no_grad():
        for idx, (input_tensor, target) in enumerate(dataloader):
            input_tensor = input_tensor.to(device)
            target = target.to(device)

            output = model(input_tensor)

            # Batch 내 각 이미지에 대해 masked_psnr 계산
            for i in range(output.size(0)):
                # 현재 배치에서 해당 이미지의 인덱스에 해당하는 coords 가져오기
                # dataloader가 shuffle되어 있을 수 있으므로 original dataset 인덱스 필요
                # 하지만 현재 dataloader 구현에서 batch 내 index와 dataset index가 일치하지 않으므로,
                # 여기서는 간단하게 dataloader 내 batch index를 활용
                # 실제 정확한 구현을 위해서는 dataloader에서 original dataset index도 함께 반환하도록 수정 필요
                # 임시 방편으로 현재 batch index + (dataloader batch index * batch_size) 사용
                original_dataset_index = idx * dataloader.batch_size + i
                if original_dataset_index < len(dataset): # 데이터셋 크기 초과 방지
                  coords = dataset.coords[original_dataset_index]
                  psnr = masked_psnr(output[i].unsqueeze(0) * 255.0, target[i].unsqueeze(0) * 255.0, coords, K) # PSNR 계산은 보통 0-255 범위에서 수행
                  total_masked_psnr += psnr.item()
                  count += 1
                else:
                    print(f"Warning: Original dataset index {original_dataset_index} out of bounds.")


    avg_masked_psnr = total_masked_psnr / count if count > 0 else 0
    return avg_masked_psnr

# MyVersionSRCNN 모델에 대한 masked PSNR 계산
avg_masked_psnr1 = evaluate_masked_psnr(train_model1, device, test_dataloader, test_dataset, K=16)
print(f"MyVersionSRCNN Masked PSNR on test set: {avg_masked_psnr1:.2f} dB")

# BasicSRCNN 모델에 대한 masked PSNR 계산
avg_masked_psnr2 = evaluate_masked_psnr(train_model2, device, test_dataloader, test_dataset, K=16)
print(f"BasicSRCNN Masked PSNR on test set: {avg_masked_psnr2:.2f} dB")

# ResidualSRCNN 모델에 대한 masked PSNR 계산
avg_masked_psnr3 = evaluate_masked_psnr(train_model3, device, test_dataloader, test_dataset, K=16)
print(f"ResidualSRCNN Masked PSNR on test set: {avg_masked_psnr3:.2f} dB")

# DepthwiseSRCNN 모델에 대한 masked PSNR 계산
avg_masked_psnr4 = evaluate_masked_psnr(train_model4, device, test_dataloader, test_dataset, K=16)
print(f"DepthwiseSRCNN Masked PSNR on test set: {avg_masked_psnr4:.2f} dB")



In [None]:
import matplotlib.pyplot as plt
import numpy as np
def plot_reconstruction_and_inputs(model, device, dataset, idx, K):
    """
    주어진 인덱스의 테스트 데이터셋 샘플에 대해 다음을 시각화합니다:
    1. 원본 512x512 이미지
    2. 모델 입력의 각 컴포넌트 (up128, up256, up512)
    3. 분산 기반으로 선택된 512x512 이미지 패치만 (나머지는 검정)
    4. 분산 기반으로 선택된 256x256 이미지 패치만 (나머지는 검정)
    5. 모델에 의해 복원된 이미지

    Args:
        model (nn.Module): 학습된 모델.
        device (torch.device): 모델이 로드된 장치 ('cuda' 또는 'cpu').
        dataset (Dataset): 테스트 데이터셋 (VariancePatchDataset).
        idx (int): 시각화할 샘플의 인덱스.
        K (int): 패치 크기.
    """
    dataset.clear_cache() # 캐시 초기화

    # 샘플 데이터 가져오기
    input_tensor, target = dataset[idx]
    image_path = dataset.paths[idx]
    coords = dataset.coords[idx]

    # 모델 예측
    model.eval()
    with torch.no_grad():
        input_tensor_device = input_tensor.unsqueeze(0).to(device)
        reconstructed_output = model(input_tensor_device).squeeze(0).cpu()

    # 이미지 시각화를 위해 CPU로 이동 및 numpy 배열 변환 (H, W, C)
    original_image = target.permute(1, 2, 0).cpu().numpy()
    reconstructed_image_np = reconstructed_output.permute(1, 2, 0).numpy()

    # 입력 컴포넌트 준비
    up128_image_np = input_tensor[:3].permute(1, 2, 0).cpu().numpy()
    up256_image_np = input_tensor[3:6].permute(1, 2, 0).cpu().numpy()
    up512_image_np = input_tensor[6:].permute(1, 2, 0).cpu().numpy()

    # 512x512 및 256x256 패치 이미지 준비
    pil_512 = Image.open(image_path).convert('RGB')
    img_512_np = np.array(pil_512)
    masked_512 = np.zeros_like(img_512_np)

    for (y, x) in coords:
        patch_512 = img_512_np[y:y+K, x:x+K]
        masked_512[y:y+K, x:x+K] = patch_512

    pil_256 = pil_512.resize((256, 256), Image.BICUBIC)
    img_256_np = np.array(pil_256)
    masked_256 = np.zeros_like(img_256_np)

    coords_256 = [(y//2, x//2) for (y, x) in coords]
    for (y2, x2) in coords_256:
        patch_256 = img_256_np[y2:y2+K//2, x2:x2+K//2]
        masked_256[y2:y2+K//2, x2:x2+K//2] = patch_256


    # 시각화
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))

    axes[0, 0].imshow(original_image)
    axes[0, 0].set_title("Original 512x512")
    axes[0, 0].axis('off')

    axes[0, 1].imshow(up128_image_np)
    axes[0, 1].set_title("Input: Up128")
    axes[0, 1].axis('off')

    axes[0, 2].imshow(up256_image_np)
    axes[0, 2].set_title("Input: Up256 (Aggregated Patches)")
    axes[0, 2].axis('off')

    axes[1, 0].imshow(up512_image_np)
    axes[1, 0].set_title("Input: Up512 (Aggregated Patches)")
    axes[1, 0].axis('off')

    axes[1, 1].imshow(masked_512)
    axes[1, 1].set_title("Selected 512x512 Patches Only")
    axes[1, 1].axis('off')

    axes[1, 2].imshow(masked_256)
    axes[1, 2].set_title("Selected 256x256 Patches Only")
    axes[1, 2].axis('off')

    # 복원된 이미지를 따로 큰 제목으로 표시
    plt.figure(figsize=(6, 6))
    plt.imshow(reconstructed_image_np)
    plt.title("Reconstructed Image")
    plt.axis('off')
    plt.show()


# 각 모델에 대해 랜덤 테스트 이미지 하나를 선택하여 시각화
random_test_idx = random.randint(0, len(test_dataset) - 1)
patch_size = test_dataset.K

print(f"\n--- 시각화 샘플 인덱스: {random_test_idx} ---")

print("\n--- MyVersionSRCNN 결과 ---")
plot_reconstruction_and_inputs(train_model1, device, test_dataset, random_test_idx, patch_size)

print("\n--- BasicSRCNN 결과 ---")
plot_reconstruction_and_inputs(train_model2, device, test_dataset, random_test_idx, patch_size)

print("\n--- ResidualSRCNN 결과 ---")
plot_reconstruction_and_inputs(train_model3, device, test_dataset, random_test_idx, patch_size)

print("\n--- DepthwiseSRCNN 결과 ---")
plot_reconstruction_and_inputs(train_model4, device, test_dataset, random_test_idx, patch_size)


In [None]:
import time
from tqdm import tqdm
from torch.cuda.amp import autocast

def test(model, device, test_loader, criterion):
    """
    모델을 평가 모드로 전환한 뒤, test_loader의 모든 배치에 대해
    손실과 PSNR을 계산합니다.
    Returns:
      avg_loss: 테스트 데이터셋 전체 평균 MSE 손실
      avg_psnr: 테스트 데이터셋 전체 평균 PSNR (dB)
    """
    model.eval()
    test_loss = 0.0
    test_psnr = 0.0
    num_batches = len(test_loader)

    # no_grad()로 그래디언트 계산 비활성화
    with torch.no_grad():
        # tqdm으로 진행률 표시
        for input_tensor, target in tqdm(test_loader, desc="Test", leave=False):
            input_tensor = input_tensor.to(device)
            target       = target.to(device)

            # mixed‐precision inference
            with autocast():
                output = model(input_tensor)
                loss   = criterion(output, target)

            # 손실 및 PSNR accumulate
            test_loss += loss.item()
            test_psnr += compute_psnr(output, target).item()

    # 평균 계산
    avg_loss = test_loss / num_batches if num_batches else 0
    avg_psnr = test_psnr / num_batches if num_batches else 0

    print(f"[Test]  Loss: {avg_loss:.4f}, Avg PSNR: {avg_psnr:.2f} dB")
    return avg_loss, avg_psnr


In [None]:
# test 진행
test_loss, test_psnr = test(model, device, test_dataloader, criterion)


In [None]:
# prompt: testdata 이미지를 랜덤으로 1장 선택해서 원본과 복원된 이미지 psnr 계산

import random

# 테스트 데이터셋의 인덱스 목록 가져오기
test_indices = list(range(len(test_dataset)))

# 랜덤으로 하나의 인덱스 선택
random_index = random.choice(test_indices)

# 선택된 인덱스에 해당하는 샘플 가져오기
test_dataset.clear_cache() # 테스트 데이터셋 캐시 초기화 (랜덤 샘플 선택 시 필요)
sample_input, sample_target = test_dataset[random_index]

# 모델 예측
model.eval()
with torch.no_grad():
    # 배치 차원 추가 (모델 입력 형태 맞추기 위해)
    sample_input_batch = sample_input.unsqueeze(0).to(device)
    restored_output_batch = model(sample_input_batch)
    # 배치 차원 제거
    restored_output = restored_output_batch.squeeze(0).cpu()

# 결과 이미지 출력
display_images(sample_target.cpu(), restored_output, title=f"Random Sample {random_index}: Original vs Restored Image")

# 복원된 이미지와 원본 이미지의 PSNR 계산
# sample_target: 원본 이미지 텐서 (3, 512, 512)
# restored_output: 모델이 복원한 이미지 텐서 (3, 512, 512)

# compute_psnr 함수는 이미 정의되어 있습니다.
# compute_psnr 함수는 텐서를 입력받아 GPU 상에서 PSNR을 계산합니다.

# PSNR 계산 실행
psnr_value = compute_psnr(restored_output, sample_target.cpu()) # CPU 텐서로 변환하여 계산

print(f"\nPSNR between Original and Restored Image for Random Sample {random_index}: {psnr_value.item():.2f} dB")