### ABSTRACT
1. UNET의 성능이 생각보다 괜찮은 것 같아서 일단 unet으로 (3, 612, 816) 크기의 이미지를 넣어 그대로 출력하도록 하였기 때문에 맞추어서 예측을 시키고자 한다.
2. batch size가 4인 이미지 묶음을 입력한 것은 맞지만 채널수, 입력 이미지의 크기만 맞으면 되기 떄문에 weight를 갱신하는 상황이 아니고서는 이미지의 입력 묶음의 개수는 문제가 되는 부분이 아니다.

3. 입력 이미지를 잘라서 넣어주고자 한다, (3, 612, 816)으로 만들어야 하고 입력하는 이미지의 크기는 (3, 1224, 1832)그리고 (3, 2448, 3264)가 존재하기 때문에 이 크기에 맞추어서 crop하는 데이터의 개수를 정해 주어야 한다.

4. 우선 훈련용 dataset을 만든 뒤에 학습을 계속 시킬 예정이다.

5. 더불어서 학습 시킨 것에 맞게 이미지를 읽어와서 출력값을 하나의 이미지로 합칠 수 있도록 하는 predict함수를 만들어서 이를 제출해 볼 예정이다.

In [1]:
config = {
    'scale' : 1,
    'num_features' : 32,
    'num_rg' : 7,
    'num_rcab' : 3,
    'reduction' : 8
    }

In [2]:
import random, os, sys, cv2
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import torch, math
from torch import Tensor
from torch.cuda import amp
from torch import nn
from PIL import Image

import torchvision.transforms as transforms
import torch.fft as fft
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

import albumentations as albu

device = torch.device('cuda:0')

In [3]:
x_train_path = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/IMG/train_input_img'
x_label_path = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/IMG/train_label_img'
train_csv_path = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/CSV/train.csv'
x_test_path = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/IMG/test_input_img'
root_path = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/RCAN'
test_csv_path = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/CSV/test.csv'

In [4]:
train_csv = pd.read_csv(train_csv_path)

train_all_input_files = x_train_path + '/' + train_csv['input_img']
train_all_label_files = x_label_path + '/' + train_csv['label_img']

In [5]:
test_csv = pd.read_csv(test_csv_path)

test_all_input_files = x_test_path + '/' + test_csv['input_img']

**Functions For Loading and Saving UNet Parameters**

In [6]:
def load(ckpt_dir, net, optim):
  if not os.path.exists(ckpt_dir):
    epoch = 0
    return net, optim, epoch
  
  net_name = ckpt_dir.split('/')[-2]
  
  ckpt_lst = os.listdir(ckpt_dir)
  ckpt_lst.sort(key = lambda x: int(''.join(filter(str.isdigit, x))))

  dict_model = torch.load('%s/%s' %(ckpt_dir, ckpt_lst[-1]))

  net.load_state_dict(dict_model[net_name])
  optim.load_state_dict(dict_model['optim'])
  epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])

  return net, optim, epoch

def save(ckpt_dir, net, optim, epoch):
  net_name = ckpt_dir.split('/')[-2]

  if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)
  
  torch.save({net_name:net.state_dict(), 'optim':optim.state_dict()}, '%s/model_epoch%d.pth'%(ckpt_dir, epoch))

In [7]:
def visualize(full_in, full_out, output):
  full_in = tonumpy(full_in)
  full_out = tonumpy(full_out)
  output = tonumpy(output)
  plt.figure(1, figsize = (10, 10))
  plt.subplot(1, 3, 1)
  plt.imshow(full_in)
  plt.subplot(1, 3, 2)
  plt.imshow(output)
  plt.subplot(1, 3, 3)
  plt.imshow(full_out)
  plt.show()

In [8]:
def psnr(predict, target):
    if torch.equal(predict, target):
        return 100
    else:
        mse = torch.mean((predict-target)**2)
    return 10 * torch.log10(1/mse)

**Utils for VGG Loss**

In [9]:
from collections import namedtuple
import torch
import torchvision.models as models

class VGG16(torch.nn.Module):
    def __init__(self, requires_grad = False):
        super(VGG16, self).__init__()
        vgg_pretrained_features = models.vgg16(pretrained = True).features
        self.block1 = torch.nn.Sequential()
        self.block2 = torch.nn.Sequential()
        self.block3 = torch.nn.Sequential()
        self.block4 = torch.nn.Sequential()

        for i in range(4):
            self.block1.add_module(str(i), vgg_pretrained_features[i])
        for i in range(4, 9):
            self.block2.add_module(str(i), vgg_pretrained_features[i])
        for i in range(9, 16):
            self.block3.add_module(str(i), vgg_pretrained_features[i])
        for i in range(16, 23):
            self.block4.add_module(str(i), vgg_pretrained_features[i])

        if requires_grad == False:
            for param in self.parameters():
                param.requires_grad = False
    
    def forward(self, x):
        h = self.block1(x)
        h_relu1_2 = h
        h = self.block2(h)
        h_relu2_2 = h
        h = self.block3(h)
        h_relu3_2 = h
        h = self.block4(h)
        h_relu4_2 = h

        vgg_outputs = namedtuple("VggOutputs", ['relu1_2', 'relu2_2', 'relu3_2', 'relu4_2'])
        out = vgg_outputs(h_relu1_2, h_relu2_2, h_relu3_2, h_relu4_2)

        return out


def gram_matrix(x):
    (b, ch, h, w) = x.size()
    feat = x.view(b, ch, w*h)
    feat_t = feat.transpose(1, 2)
    gram = feat.bmm(feat_t) / (ch * w * h)

    return gram

def normalize(x):
    mean = x.new_tensor([0.485, 0.456, 0.406]).view(-1, 1, 1)
    std = x.new_tensor([0.229, 0.224, 0.225]).view(-1, 1, 1)
    return (x-mean)/std

#### Function for FFL (Focal Frequency Loss)
- torch.fft.fft2를 사용해서 2D discrete Fourier transform을 predicted image와 target image각각에 적용을 시킨다.
- 이렇게 변환된 벡터의 절댓값을 (alpha + 2)만큼의 값의 횟수만큼 곱해준다. (이 떄 alpha의 default값은 1) 결국에는 세제곱을 한 뒤에 평균을 내는 것과 마찬가지이다.
- m * n 의 크기의 이미지일 경우 해당 크기로 나눈다.

In [10]:
def FFLLoss(output, target, alpha = 1):
    """input and target must be a tensor of the image"""
    # fourier transform of the 2D dimension
    if (output.dim == 4):
        m,n = output.shape[2], output.shape[3]
    
    # transform dimension = (2, 3)
    # normalization (1/sqrt(n))
    ft_out = fft.fft2(output, dim = (2, 3), norm = 'ortho').to(device)
    ft_tar = fft.fft2(target, dim = (2, 3), norm = 'ortho').to(device)

    #ft_out = torch.Tensor(ft_out, requires_grad = True)
    #ft_tar = torch.Tensor(ft_tar, requires_grad = True)
    f_uv = torch.Tensor(torch.mean((abs(ft_out - ft_tar))**(alpha + 2)))

    return f_uv


In [11]:
class LG_Dataset(torch.utils.data.Dataset):
    # read and change image size(3, 2448, 3264) -> (3, 306, 408)
    def __init__(self, x_img_dirs, y_img_dirs, is_train, split, crop_size = (612, 816), shuffle = True):
        # img_dirs는 이미지의 저장 경로가 담겨있는 리스트의 형태
        # crop_size는 잘라낼 이미지의 크기로, 일정하게 정해 놓을 예정
        self.input = x_img_dirs
        self.target = y_img_dirs
        self.split = int(len(self.input) * split)
        self.w, self.h = crop_size[0], crop_size[1]
        

        if is_train:
          self.input = self.input[:self.split]
          self.target = self.target[:self.split]
        else:
          self.input = self.input[self.split:]
          self.target = self.target[self.split:]

        self.samples = []
        # target, label이미지의 저장 경로를 하나씩 리스트에 넣어줌
        for x, y in zip(self.input, self.target):
            self.samples.append([x, y])
        
        if shuffle:
          random.shuffle(self.samples)
    
    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx):
        inputs, labels = self.samples[idx]
        in_img, label_img = pil_image.open(inputs).convert('RGB'), pil_image.open(labels).convert('RGB')
        imgtonp = lambda x: np.array(x).astype(np.float32)

        w,h = in_img.size[0], in_img.size[1]
        # height는 153만큼, width는 204만큼 이동하도록 시킨다
        scale_w, scale_h = 153, 204
        stride_w, stride_h = 14, 14
        
        transform = []
        transform.append(transforms.Resize((612, 816)))
        transform.append(transforms.ToTensor())
        transform = transforms.Compose(transform)

        start_x, start_y = 0, 0
        data = []
        resized_x, resized_y = transform(in_img), transform(label_img)
        data.append({'input' : resized_x, 'target' : resized_y})

        for i in range(stride_h+1):
            for j in range(stride_w+1):
                end_x, end_y = start_x + 408, start_y + 306
                crop_x = in_img.crop((start_x, start_y, end_x, end_y))
                crop_y = label_img.crop((start_x, start_y, end_x, end_y))

                crop_x = np.array(crop_x).astype(np.float32)
                crop_y = np.array(crop_y).astype(np.float32)

                # pil_image.read로 읽었기 떄문에 (h, w, 3)이므로
                # 입력으로 모델에 넣어 줄 떄에는 (3, h, w)로 바꾸어야 함

                # scale 0-255.0 -> 0-1
                crop_x = crop_x/255.0
                crop_y = crop_y/255.0

                crop_x = transforms.ToTensor()(crop_x)
                crop_y = transforms.ToTensor()(crop_y)

                data.append({'input' : crop_x, 'target' : crop_y})
                start_x += scale_h
            start_y += scale_w
            start_x = 0
        #data = random.shuffle(data)
        return data    

In [12]:
import torch
import torch.utils.data.dataset as Dataset
import numpy as np
import PIL.Image as pil_image
import torchvision.transforms as transforms


class CroppedDataset(torch.utils.data.Dataset):
    # read and change image size(3, 2448, 3264) -> (3, 612, 816)
    def __init__(self, x_img_dirs, y_img_dirs, split, is_train, crop_size = (612, 816)):
        # img_dirs는 이미지의 저장 경로가 담겨있는 리스트의 형태
        # crop_size는 잘라낼 이미지의 크기로, 일정하게 정해 놓을 예정
        self.input = x_img_dirs
        self.target = y_img_dirs
        self.split = int(len(self.input) * split)
        self.w, self.h = crop_size[0], crop_size[1]

        if is_train:
          self.input = self.input[:self.split]
          self.target = self.target[:self.split]
        else:
          self.input = self.input[self.split:]
          self.target = self.target[self.split:]

        self.samples = []
        # target, label이미지의 저장 경로를 하나씩 리스트에 넣어줌
        for x, y in zip(self.input, self.target):
            self.samples.append([x, y])
    
    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx):
        inputs, labels = self.samples[idx]
        in_img, label_img = pil_image.open(inputs).convert('RGB'), pil_image.open(labels).convert('RGB')
        imgtonp = lambda x: np.array(x).astype(np.float32)
        
        transform_1, transform_2 = [], []
        transform_1.append(transforms.Resize((self.w, self.h)))
        #transform_1.append(transforms.RandomHorizontalFlip())
        #transform_1.append(transforms.RandomRotation(degrees = 90))
        transform_1.append(transforms.RandomHorizontalFlip(p = 1))
        transform_1.append(transforms.ToTensor())
        transform_1 = transforms.Compose(transform_1)

        transform_2.append(transforms.RandomVerticalFlip(p = 1))
        transform_2.append(transforms.ToTensor())
        transform_2 = transforms.Compose(transform_2)

        resized_x = transform_1(in_img)
        resized_y = transform_1(label_img)

        h, w = in_img.size[0], in_img.size[1]

        scale = int(w/self.w)
        img_nums = scale ** 2

        start_x, start_y = 0, 0
        data = []
        data.append({'input' : resized_x, 'target' : resized_y})
        for i in range(1, scale+1):
            for j in range(1, scale+1):
                end_x, end_y = start_x + self.h, start_y + self.w
                crop_x = in_img.crop((start_x, start_y, end_x, end_y))
                crop_y = label_img.crop((start_x, start_y, end_x, end_y))

                crop_x = transforms.RandomVerticalFlip(p = 1)(crop_x)
                crop_y = transforms.RandomVerticalFlip(p = 1)(crop_y)

                crop_x = np.array(crop_x).astype(np.float32)
                crop_y = np.array(crop_y).astype(np.float32)

                # pil_image.read로 읽었기 떄문에 (h, w, 3)이므로
                # 입력으로 모델에 넣어 줄 떄에는 (3, h, w)로 바꾸어야 함

                # scale 0-255.0 -> 0-1
                crop_x = crop_x/255.0
                crop_y = crop_y/255.0

                crop_x = transforms.ToTensor()(crop_x)
                crop_y = transforms.ToTensor()(crop_y)

                data.append({'input' : crop_x, 'target' : crop_y})
                start_x += self.h
            start_y += self.w
            start_x = 0

        return data           

In [13]:
class SlidingWindowDataset(torch.utils.data.Dataset):
    # read and change image size(3, 2448, 3264) -> (3, 612, 816)
    def __init__(self, x_img_dirs, y_img_dirs, split, is_train, crop_size = (612, 816)):
        # img_dirs는 이미지의 저장 경로가 담겨있는 리스트의 형태
        # crop_size는 잘라낼 이미지의 크기로, 일정하게 정해 놓을 예정
        self.input = x_img_dirs
        self.target = y_img_dirs
        self.split = int(len(self.input) * split)
        self.w, self.h = crop_size[0], crop_size[1]
        
        self.crop_rate = 4 # 2의 제곱수, 2, 4, 8
        self.last = self.crop_rate - 2
        

        if is_train:
          self.input = self.input[:self.split]
          self.target = self.target[:self.split]
        else:
          self.input = self.input[self.split:]
          self.target = self.target[self.split:]

        self.samples = []
        # target, label이미지의 저장 경로를 하나씩 리스트에 넣어줌
        for x, y in zip(self.input, self.target):
            self.samples.append([x, y])
    
    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx):
        inputs, labels = self.samples[idx]
        in_img, label_img = pil_image.open(inputs).convert('RGB'), pil_image.open(labels).convert('RGB')
        imgtonp = lambda x: np.array(x).astype(np.float32)
        
        transform = []
        transform.append(transforms.Resize((self.w, self.h)))
        transform.append(transforms.ToTensor())
        transform = transforms.Compose(transform)

        resized_x = transform(in_img)
        resized_y = transform(label_img)

        h, w = in_img.size[0], in_img.size[1]

        # scale = int(w/self.w)
        # img_nums = scale ** 2

        stride_h = h//(4*self.crop_rate)
        stride_w = w//(4*self.crop_rate)
        scale_h = h//stride_h
        scale_w = w//stride_w

        start_x, start_y = 0, 0
        data = []
        data.append({'input' : resized_x, 'target' : resized_y})
        for i in range(1, scale_w-self.last):
            for j in range(1, scale_h-self.last):
                end_x, end_y = start_x + self.h, start_y + self.w
                crop_x = in_img.crop((start_x, start_y, end_x, end_y))
                crop_y = label_img.crop((start_x, start_y, end_x, end_y))

                crop_x = np.array(crop_x).astype(np.float32)
                crop_y = np.array(crop_y).astype(np.float32)

                # pil_image.read로 읽었기 떄문에 (h, w, 3)이므로
                # 입력으로 모델에 넣어 줄 떄에는 (3, h, w)로 바꾸어야 함

                # scale 0-255.0 -> 0-1
                crop_x = crop_x/255.0
                crop_y = crop_y/255.0

                crop_x = transforms.ToTensor()(crop_x)
                crop_y = transforms.ToTensor()(crop_y)

                data.append({'input' : crop_x, 'target' : crop_y})
                start_x += self.h
                # start_x += scale_h
            # start_y += self.w
            start_y += scale_w
            start_x = 0

        return data    

**RCAN Architecture**

In [14]:
class ChannelAttention(nn.Module):
    def __init__(self, num_features, reduction):
        super(ChannelAttention, self).__init__()
        self.module = nn.Sequential(
            # 각 채널별로 average pooling을 진행한다 (채널 수가 c개이면 c개 모두에 대해서 진행)
            nn.AdaptiveAvgPool2d(1),
            # downsampling                                             
            nn.Conv2d(num_features, num_features // reduction, kernel_size=1),
            nn.ReLU(inplace=True),
            # upsampling
            nn.Conv2d(num_features // reduction, num_features, kernel_size=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return x * self.module(x)


class RCAB(nn.Module):
  # RIR(Residual in Residual) 구조를 적용
    def __init__(self, num_features, reduction):
        super(RCAB, self).__init__()
        self.module = nn.Sequential(
            nn.Conv2d(num_features, num_features, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(num_features, num_features, kernel_size=3, padding=1),
            ChannelAttention(num_features, reduction)
        )

    def forward(self, x):
      # x는 RCAB의 input, output과 element-wise sum을 적용
      return x + self.module(x)


class RG(nn.Module):
  # RG(Residual Group)
    def __init__(self, num_features, num_rcab, reduction):
        super(RG, self).__init__()
        # num_recab = 12(하나의 RG안에 RCAB이 12개 존재)
        self.module = [RCAB(num_features, reduction) for _ in range(num_rcab)]
        self.module.append(nn.Conv2d(num_features, num_features, kernel_size=3, padding=1))
        self.module = nn.Sequential(*self.module)

    def forward(self, x):
        return x + self.module(x)

In [15]:
class RCAN(nn.Module):
    def __init__(self, config):
        super(RCAN, self).__init__()
        scale = config['scale']
        num_features = config['num_features']
        # number of residual groups
        num_rg = config['num_rg']
        num_rcab = config['num_rcab']
        reduction = config['reduction']
        
        # 먼저 low resolution의 이미지를 4배로 늘려주어 적용시킨다.
        self.deconv = nn.ConvTranspose2d(3, 3, kernel_size=9, stride=4, padding=4, output_padding=3)
        # 본격적인 특성 추출에 앞서서 표면적인 특성을 추출하는 하나의 convolution layer
        self.sf = nn.Conv2d(3, num_features, kernel_size=3, padding=1)
        # RIR, 즉 residual in residual모듈을 적용함
        self.rgs = nn.Sequential(*[RG(num_features, num_rcab, reduction) for _ in range(num_rg)])
        self.conv1 = nn.Conv2d(num_features, num_features, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(num_features, 3, kernel_size=3, padding=1)
        # 0.0 ~ 1.0 사이의 픽셀값을 갖는 이미지의 출력을 위해 sigmoid적용
        self.sigmoid = nn.Sigmoid()
        

    def forward(self, x):
        #x = self.deconv(x)
        x = self.sf(x)
        residual = x
        x = self.rgs(x)
        x = self.conv1(x)
        x += residual
        x = self.conv2(x)
        x = self.sigmoid(x)
        return x

- BATCH_SIZE를 1로 설정해 줌으로서 한번에 resize된 전체 이미지와 잘린 이미지의 데이터들을 받는다.
- 안그러면 이미지를 학습 시킬 떄에 오래 걸림..

In [16]:
model = RCAN(config)

In [17]:
LEARNING_RATE = 1e-4
BATCH_SIZE = 1
NUM_EPOCH = 100
FULLNET_CKPT_DIR = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/models/DEEPRCAN/L1Loss'

# loss function
fn_loss = nn.L1Loss().to(device)

# optimizer
optim = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE, betas = [0.9, 0.999], eps = 10**-8)

model, optim, start_epoch = load(ckpt_dir = FULLNET_CKPT_DIR, net= model, optim = optim)

#train_cd = LG_Dataset(train_all_input_files, train_all_label_files, split = 0.7, is_train = True, shuffle = True)
#val_cd = LG_Dataset(train_all_input_files, train_all_label_files, split = 0.7, is_train = False, shuffle = True)

train_cd = CroppedDataset(train_all_input_files, train_all_label_files, split = 1, is_train = True)
val_cd = CroppedDataset(train_all_input_files, train_all_label_files, split = 0.6, is_train = False)

#train_cd = SlidingWindowDataset(train_all_input_files, train_all_label_files, split = 0.7, is_train = True)
#val_cd = SlidingWindowDataset(train_all_input_files, train_all_label_files, split = 0.7, is_train = False)

num_train, num_val = len(train_cd), len(val_cd)

num_train_for_epoch, num_val_for_epoch = np.ceil(num_train/BATCH_SIZE), np.ceil(num_val/BATCH_SIZE)

train_writer = SummaryWriter(log_dir = os.path.join(root_path, 'train'))
val_writer = SummaryWriter(log_dir =  os.path.join(root_path, 'val'))

train_loader = DataLoader(train_cd, batch_size = BATCH_SIZE, shuffle = True)
val_loader = DataLoader(val_cd, batch_size = BATCH_SIZE, shuffle = True)

tonumpy = lambda x : x.cpu().detach().numpy().squeeze(0).transpose(1, 2, 0)

In [None]:
start_epoch = 0
num_epoch = 50
alpha = 0.5

model, optim, start_epoch = load(ckpt_dir = FULLNET_CKPT_DIR, net = model, optim = optim)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, T_max=10, eta_min=0)
model = model.to(device)



count = 0
for epoch in range(start_epoch + 1, num_epoch + 1+start_epoch):
  model.train()
  loss_arr = []
  best_psnr = 0

  for batch, data in enumerate(train_loader, 1):
    psnr_score = 0
    # forward
    count += 1
    full_in, full_out = data[0]['input'].to(device), data[0]['target'].to(device)
    output = model(full_in)
    optim.zero_grad()

    loss = fn_loss(output, full_out)
    loss.backward()
    optim.step()
    scheduler.step()
    loss_arr += [loss.item()]
    psnr_score = psnr(output, full_out)
    if batch%50 == 0:
      print('train : epoch %f/ %f | Batch %f/ %f | Loss %f  | PSNR %f'%(epoch, num_epoch+start_epoch, batch, num_train_for_epoch, loss, psnr_score))
      visualize(full_in, full_out, output)
    if psnr_score > best_psnr == 0:
      save(ckpt_dir = FULLNET_CKPT_DIR, net = model, optim = optim, epoch = count)
      best_psnr = psnr_score

    for i in range(1, len(data)):
      # count += 1
      # save(ckpt_dir = FULLNET_CKPT_DIR, net = model, optim = optim, epoch = count)
      in_img = data[i]['input'].to(device)
      label_img = data[i]['target'].to(device)
      output = model(in_img)
      loss = fn_loss(output, label_img)
      if loss > 0.0005:
        optim.zero_grad()
        loss.backward()
        optim.step()
        scheduler.step()
        loss_arr += [loss.item()]
    
  train_writer.add_scalar('loss', np.mean(loss_arr), epoch)
  
  # validation test
  # validation dataset을 이용해서 학습의 정확도를 판단하고자 하는 상황이기 때문에 backprop진행을 안함
  with torch.no_grad() :
    model.eval()
    loss_arr = []

    for batch, data in enumerate(val_loader, 1):
    # forward
      full_in, full_out = data[0]['input'].to(device), data[0]['target'].to(device)
      output = model(full_in)
      loss = fn_loss(output, label_img)
      loss_arr += [loss.item()]
      psnr_score = psnr(output, full_in)
      print('valid : epoch %f/ %f | Batch %f/ %f | Loss %f  | PSNR %f'%(epoch, num_epoch+start_epoch, batch, num_train_for_epoch, loss, psnr_score))
      if batch%10 == 0:
        visualize(full_in, full_out, output)

      for i in range(1, len(data)):
        in_img = data[i]['input'].to(device)
        label_img = data[i]['target'].to(device)
        output = model(in_img)
        #feat_x, feat_y = vgg(output), vgg(label_img)
        #loss = L1_loss(feat_x.relu2_2, feat_y.relu2_2)
        loss = fn_loss(output, label_img)
        loss_arr += [loss.item()]

    val_writer.add_scalar('loss', np.mean(loss_arr), epoch) 

    



In [None]:
save(ckpt_dir = FULLNET_CKPT_DIR, net = model, optim = optim, epoch = count)

In [None]:
model, optim, start_epoch = load(ckpt_dir = FULLNET_CKPT_DIR, net = model, optim = optim)

In [None]:
CKPT_DIR = '/content/drive/MyDrive/DACON/카메라이미지품질향상AI/DATA/models/DEEPRCAN/L1LOSS'
#model = RCAN(config)
#optim = torch.optim.Adam(model.parameters(), lr = 1e-4, betas = [0.9, 0.999], eps = 10**-8)
#model, optim, start_epoch = load(ckpt_dir = CKPT_DIR, net = model, optim = optim)
#model = model.to(device)

import PIL.Image as pil_image

def predict(img_dirs, model):
    results = []
    for i in range(len(img_dirs)):
        img = pil_image.open(img_dirs[i]).convert('RGB')
        print(img.size)
        w,h = img.size[0], img.size[1] # (h, w)
        plt.imshow(img)
        plt.show()
        scale = h//612
        start_x, start_y = 0,0
        result_img = np.zeros((h, w, 3))
        #print(result_img.shape)
        channel = 0
        for j in range(1, scale+1):
            for k in range(1, scale+1):
                end_x, end_y = start_x + 816, start_y + 612
                crop_img = img.crop((start_x, start_y, end_x, end_y))
                crop_img = np.array(crop_img).astype(np.float32)/255.0
                crop_img = transforms.ToTensor()(crop_img)
                pred = model(torch.unsqueeze(crop_img, 0).to(device))
                #print(pred.shape)
                pred = pred[0, :, :, :].to('cpu').detach().permute(1, 2, 0).numpy()
                #plt.imshow(pred)
                #plt.show()
                #print(pred.shape)
                for c in range(3):
                    result_img[start_y:start_y + 612, start_x:start_x + 816, c] = pred[:, :, c]
                start_x += 816
            start_y += 612
            start_x = 0
        plt.imshow(result_img)
        plt.show()
        results.append(result_img)

    return results

def make_submission(result):
    import zipfile
    os.makedirs('submission', exist_ok = True)
    sub_imgs = []
    for i, img in enumerate(result):
        img = (img * 255).astype(np.uint8)
        path = f'test_{20000+i}.png'
        cv2.imwrite(path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
        sub_imgs.append(path)
    submission = zipfile.ZipFile('submission.zip', 'w')
    for path in sub_imgs:
        submission.write(path)
    submission.close()


In [None]:
result = predict(test_all_input_files, model)

In [None]:
make_submission(result)

In [21]:
import PIL.Image as pil_image
import zipfile

def predict_means(img_dirs, model):
    #crop_rate = 4 # 2의 제곱수, 2, 4, 8
    #last = crop_rate - 2
    scale_h, scale_w = 153, 204
    results = []
    for i in range(len(img_dirs)):
        # position = []
        img = pil_image.open(img_dirs[i]).convert('RGB')
        #print(img.size)
        w,h = img.size[0], img.size[1] # (h, w)
        plt.imshow(img)
        plt.show()

        #stride_h = h//(4*crop_rate)
        #stride_w = w//(4*crop_rate)
        stride_h = (2448-612)//scale_h
        stride_w = (3264-816)//scale_w

        start_x, start_y = 0,0
        result_img = np.zeros((h, w, 3))
        voting_mask = np.zeros((h, w, 3))

        # 마지막 이미지 제외하고 iteration
        for i in range(13): # height, y
            for j in range(13): # width, x
                end_x, end_y = start_x + 816, start_y + 612
                # position.append([start_x, start_y])
                crop_img = img.crop((start_x, start_y, end_x, end_y))
                crop_img = np.array(crop_img).astype(np.float32)
                crop_img = crop_img/255.0
                crop_img = transforms.ToTensor()(crop_img)

                pred = model(torch.unsqueeze(crop_img, 0).to(device))*255
                pred = pred[0, :, :, :].to('cpu').detach().permute(1, 2, 0).numpy()

                #print(pred.shape)
                #plt.imshow(pred)
                #plt.show()
                #result_img[start_y:start_y + 612, start_x:start_x + 816, :] += pred[:, :]
                #voting_mask[start_y:start_y + 612, start_x:start_x + 816, :] += 1
                

                for c in range(3):
                    result_img[start_y:start_y + 612, start_x:start_x + 816, c] += pred[:, :, c]
                    voting_mask[start_y:start_y + 612, start_x:start_x + 816, c] += 1
                start_x += scale_w
            start_y += scale_h
            start_x = 0

        result_img = result_img/voting_mask
        #print(result_img)
        result_img = result_img.astype(np.uint8)
        plt.imshow(result_img)
        plt.show()

        results.append(result_img)
    return results

def make_mean_submission(result):
    import zipfile
    os.makedirs('submission', exist_ok = True)
    sub_imgs = []
    for i, img in enumerate(result):
        img = (img).astype(np.uint8)
        path = f'test_{20000+i}.png'
        #cv2.imwrite(path, img)
        cv2.imwrite(path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
        sub_imgs.append(path)
    submission = zipfile.ZipFile('submission.zip', 'w')
    for path in sub_imgs:
        submission.write(path)
    submission.close()

In [None]:
mean_result = predict_means(test_all_input_files, model)

In [23]:
make_mean_submission(mean_result)

In [20]:
import PIL.Image as pil_image
import zipfile

def largemean_predict(img_dirs, model):
    crop_rate = 4 # 2의 제곱수, 2, 4, 8
    last = crop_rate - 2
    results = []
    for i in range(len(img_dirs)):
        # position = []
        img = pil_image.open(img_dirs[i]).convert('RGB')
        #print(img.size)
        w,h = img.size[0], img.size[1] # (h, w)
        plt.imshow(img)
        plt.show()

        stride_h = h//(4*crop_rate)
        stride_w = w//(4*crop_rate)
        scale_h = h//stride_h
        scale_w = w//stride_w
        start_x, start_y = 0,0
        result_img = np.zeros((h, w, 3))
        voting_mask = np.zeros((h, w, 3))

        # 마지막 이미지 제외하고 iteration
        for i in range(1, scale_h-last): # height, y
            for j in range(1, scale_w-last): # width, x
                end_x, end_y = start_x + 816, start_y + 612
                # position.append([start_x, start_y])
                crop_img = img.crop((start_x, start_y, end_x, end_y))
                crop_img = np.array(crop_img).astype(np.float32)
                crop_img = crop_img/255.0
                crop_img = transforms.ToTensor()(crop_img)

                pred = model(torch.unsqueeze(crop_img, 0).to(device))*255
                pred = pred[0, :, :, :].to('cpu').detach().permute(1, 2, 0).numpy()

                #print(pred.shape)
                #plt.imshow(pred)
                #plt.show()
                #result_img[start_y:start_y + 612, start_x:start_x + 816, :] += pred[:, :]
                #voting_mask[start_y:start_y + 612, start_x:start_x + 816, :] += 1
                

                for c in range(3):
                    result_img[start_y:start_y + 612, start_x:start_x + 816, c] += pred[:, :, c]
                    voting_mask[start_y:start_y + 612, start_x:start_x + 816, c] += 1
                start_x += stride_w
            start_y += stride_h
            start_x = 0

        result_img = result_img/voting_mask
        #print(result_img)
        result_img = result_img.astype(np.uint8)
        plt.imshow(result_img)
        plt.show()

        results.append(result_img)
    return results

In [None]:
largemean_result = largemean_predict(test_all_input_files, model)

In [None]:
make_mean_submission(largemean_result)