### 화질변화 영상 데이터 - 초해상화 Task

<br>

[화질 변환 영상 데이터](https://www.aihub.or.kr/aihubdata/data/view.do?currMenu=115&topMenu=100&aihubDataSe=realm&dataSetSn=71540)
<br>[화질변화 영상 데이터 - 초해상화 Task](https://aifactory.space/task/2644/overview)

<br>

In [None]:
!pip install gdown



In [None]:
#Train LR
file_id='1crEpiw-fbJyhsZAu-84Cp7_rj2SAsrnJ' #zip 버전
url = f"https://drive.google.com/uc?id={file_id}"
!gdown {url}

#Train HR
file_id='1PClJ9OrPIZ-TX8qm8MX3Su2F4EMjfza1' #zip 버전
url = f"https://drive.google.com/uc?id={file_id}"
!gdown {url}

#Test LR
file_id='1uJyoEsbF-DVpsMqVpuMUwAXWPP0rsWkl' #zip 버전
url = f"https://drive.google.com/uc?id={file_id}"
!gdown {url}

import os
os.mkdir('/content/weight')


Downloading...
From: https://drive.google.com/uc?id=1crEpiw-fbJyhsZAu-84Cp7_rj2SAsrnJ
To: /content/LR.zip
100% 405M/405M [00:05<00:00, 69.6MB/s]
Downloading...
From: https://drive.google.com/uc?id=1PClJ9OrPIZ-TX8qm8MX3Su2F4EMjfza1
To: /content/HR.zip
100% 1.13G/1.13G [00:15<00:00, 71.8MB/s]
Downloading...
From: https://drive.google.com/uc?id=1uJyoEsbF-DVpsMqVpuMUwAXWPP0rsWkl
To: /content/Test_LR.zip
100% 11.1M/11.1M [00:00<00:00, 44.8MB/s]


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import ToTensor, Resize
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import sys

In [None]:

#Train LR 경로 입력
lr_file='/content/LR.zip'
#Train HR 경로 입력
hr_file='/content/HR.zip'
#weight 저장
model_weight_path='/content/weight'

lr_folder=lr_file[:-4]
hr_folder=hr_file[:-4]

if not os.path.exists(lr_folder):
    os.system('unzip {} -d {}'.format(lr_file,lr_folder))

if not os.path.exists(hr_folder):
    os.system('unzip {} -d {}'.format(hr_file,hr_folder))


In [None]:
# SRCNN 모델 정의
class SRCNN(nn.Module):
    def __init__(self):
        super(SRCNN, self).__init__()
        self.layer1 = nn.Conv2d(3, 64, kernel_size=9, padding=4)
        self.layer2 = nn.Conv2d(64, 32, kernel_size=1)
        self.layer3 = nn.Conv2d(32, 3, kernel_size=5, padding=2)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.relu(self.layer2(x))
        x = self.layer3(x)
        return x


In [None]:
# 커스텀 데이터셋 정의
class SRDataset(Dataset):
    def __init__(self, lr_folder, hr_folder, transform=None):
        self.lr_folder = lr_folder
        self.hr_folder = hr_folder
        self.transform = transform
        self.filenames = os.listdir(lr_folder)

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        lr_img = Image.open(os.path.join(self.lr_folder, self.filenames[idx]))
        hr_img = Image.open(os.path.join(self.hr_folder, self.filenames[idx].replace('H01','H03')))

        if self.transform:
            lr_img = self.transform(lr_img)
            lr_img = ToTensor()(lr_img)
            hr_img = ToTensor()(hr_img)
        return lr_img, hr_img

In [None]:
# 데이터 로드
# SRCNN 모델은 내부적으로 upscale을 진행하지 않기 때문에 input 이미지를 키워서 입력
transform = Resize((1440, 2560), interpolation=Image.BICUBIC)
#터미널 창에 lr_folder와 hr_folder를 함께 입력으로 제공
train_dataset = SRDataset(lr_folder=lr_folder, hr_folder=hr_folder, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)

In [None]:
# 모델, 손실 함수, 옵티마이저 정의
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SRCNN().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
def psnr(img1, img2, border=0):
    h, w = img1.shape[:2]
    img1 = img1[border:h-border, border:w-border]
    img2 = img2[border:h-border, border:w-border]
    mse = torch.mean((img1 - img2) ** 2)
    score =  torch.log10(255.0 / torch.sqrt(mse))
    return score

In [None]:
num_epochs = 10

for epoch in range(num_epochs):
    final_score = 0
    model.train()
    with tqdm.tqdm(train_loader) as pbar:
        pbar.set_description("Epoch " + str(epoch + 1))
        for batch_idx, (inputs, labels) in enumerate(pbar):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            score = psnr(outputs, labels)
            final_score+=score
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    # print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, PSNR: {final_score:.4f}") # Loss: {loss.item():.4f},
    torch.save(model.state_dict(), model_weight_path+'/weight_{}.pt'.format(epoch))
    pbar.close()

Epoch [1/1], Batch [1/839], Loss: 0.2387
Epoch [1/1], Batch [2/839], Loss: 0.1987
Epoch [1/1], Batch [3/839], Loss: 0.2302
Epoch [1/1], Batch [4/839], Loss: 0.1811
Epoch [1/1], Batch [5/839], Loss: 0.1602
Epoch [1/1], Batch [6/839], Loss: 0.2172
Epoch [1/1], Batch [7/839], Loss: 0.1778
Epoch [1/1], Batch [8/839], Loss: 0.1715
Epoch [1/1], Batch [9/839], Loss: 0.2096
Epoch [1/1], Batch [10/839], Loss: 0.2044
Epoch [1/1], Batch [11/839], Loss: 0.1840
Epoch [1/1], Batch [12/839], Loss: 0.1521
Epoch [1/1], Batch [13/839], Loss: 0.1180
Epoch [1/1], Batch [14/839], Loss: 0.1497
Epoch [1/1], Batch [15/839], Loss: 0.1308
Epoch [1/1], Batch [16/839], Loss: 0.1313
Epoch [1/1], Batch [17/839], Loss: 0.1462
Epoch [1/1], Batch [18/839], Loss: 0.1096
Epoch [1/1], Batch [19/839], Loss: 0.0767
Epoch [1/1], Batch [20/839], Loss: 0.1139
Epoch [1/1], Batch [21/839], Loss: 0.0586
Epoch [1/1], Batch [22/839], Loss: 0.0656
Epoch [1/1], Batch [23/839], Loss: 0.0603
Epoch [1/1], Batch [24/839], Loss: 0.0494
E

**Inference**

In [None]:
import os
import torch
from torchvision.transforms import ToTensor, Resize, ToPILImage
from PIL import Image
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import sys
import cv2

In [None]:
#모델 경로 입력
model_weight_path='/content/weight/weight_0.pt'
#LR 경로 입력
input_file = '/content/Test_LR.zip'
#SR 저장될 경로 입력
output_zip_name = '/content/output.npy'

input_folder=input_file[:-4]
if not os.path.exists(input_folder):
    os.system('unzip {} -d {}'.format(input_file,input_folder))
output_folder=output_zip_name[:-4]

In [None]:
# 모델 불러오기
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SRCNN().to(device)
#터미널 창에 3번 째 입력으로 불러오고자 하는 weight 경로를 지정
model.load_state_dict(torch.load(model_weight_path))
model.eval()


SRCNN(
  (layer1): Conv2d(3, 64, kernel_size=(9, 9), stride=(1, 1), padding=(4, 4))
  (layer2): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1))
  (layer3): Conv2d(32, 3, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (relu): ReLU()
)

In [None]:
# Test 폴더의 이미지로 추론
input_first_path=input_folder+'/'+os.listdir(input_folder)[0]
input_first_img=cv2.imread(input_first_path)
h,w=input_first_img.shape[0],input_first_img.shape[1]
transform = Resize((int(h*2), int(w*2)), interpolation=Image.BICUBIC)
to_image = ToPILImage()

if not os.path.exists(output_folder):
    os.makedirs(output_folder)

In [None]:
for filename in os.listdir(input_folder):
    img = Image.open(os.path.join(input_folder, filename))
    img_tensor = transform(img)
    img_tensor = ToTensor()(img_tensor).unsqueeze(0).to(device)

    with torch.no_grad():
        upscaled = model(img_tensor)

    result = to_image(upscaled[0].cpu())
    result.save(os.path.join(output_folder, filename))

**최종적으로 output.npy를 다운받아서 제춯**

In [None]:
import numpy as np
from PIL import Image
import os

def images_to_npy(directory,filename):
    # 폴더 내의 모든 파일을 가져옵니다.
    filenames = [f for f in sorted(os.listdir(directory)) if os.path.isfile(os.path.join(directory, f))]

    # 이미지 파일만 필터링합니다.
    img_files = [f for f in filenames if f.lower().endswith(('.jpg'))]

    # 모든 이미지를 로드하고 numpy 배열로 변환합니다.
    images = []
    for img_file in img_files:
        path = os.path.join(directory, img_file)
        img = Image.open(path)
        img_array = np.array(img)
        images.append(img_array)

    # 이미지들을 하나의 numpy 배열로 변환합니다.
    all_images = np.array(images)

    # npy 파일로 저장합니다.
    np.save(filename, all_images)

# 함수 호출
images_to_npy(output_folder,output_zip_name)
print('저장 완료 !! : ', output_zip_name)

저장 완료 !! :  /content/output.npy


In [None]:
!pip uninstall typing-extensions
!pip install typing-extensions==4.5.0

Found existing installation: typing_extensions 4.8.0
Uninstalling typing_extensions-4.8.0:
  Would remove:
    /usr/local/lib/python3.10/dist-packages/typing_extensions-4.8.0.dist-info/*
    /usr/local/lib/python3.10/dist-packages/typing_extensions.py
Proceed (Y/n)? 

In [None]:
import gradio as gr
import os
import torch
from torchvision.transforms import ToTensor, Resize, ToPILImage
from PIL import Image
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import sys
import cv2


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SRCNN().to(device)
#터미널 창에 3번 째 입력으로 불러오고자 하는 weight 경로를 지정
model.load_state_dict(torch.load('/content/weight/weight_0.pt'))
model.eval()

def image_mod(image):
    to_image = ToPILImage()
    h,w=image.size[1],image.size[0]
    transform = Resize((int(h*2), int(w*2)), interpolation=Image.BICUBIC)

    img_tensor = transform(image)
    img_tensor = ToTensor()(img_tensor).unsqueeze(0).to(device)
    with torch.no_grad():
        upscaled = model(img_tensor)

    result = to_image(upscaled[0].cpu())
    return result


demo = gr.Interface(
    image_mod,
    gr.Image(type="pil"),
    "image",
)

if __name__ == "__main__":
    demo.launch()
