In [18]:
import os
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import *
import torch
import math
from os.path import join
import torch.utils.data as data
import numpy as np
from os import listdir
from os.path import join
from PIL import Image, ImageOps
import random
from random import randrange
from math import sqrt

'''
1. Normalization 과 Activation 을 포함한 Conv. Layer 모듈 생성 
'''
class ConvBlock(nn.Module):
    def __init__(self, input_size, output_size, kernel_size=3, stride=1, padding=1, bias=True, activation='prelu', norm=None):
        super(ConvBlock, self).__init__()
        self.conv = torch.nn.Conv2d(input_size, output_size, kernel_size, stride, padding, bias=bias)

        self.norm = norm
        if self.norm =='batch':
            self.bn = torch.nn.BatchNorm2d(output_size)
        elif self.norm == 'instance':
            self.bn = torch.nn.InstanceNorm2d(output_size)
        
        # self.bn : Conv Layer 출력에서 normalization 을 Instance 로 할지 Batch 로 할지 선택
        
        if self.activation == 'relu':
            self.act = torch.nn.ReLU(True)
        elif self.activation == 'prelu':
            self.act = torch.nn.PReLU()
        elif self.activation == 'lrelu':
            self.act = torch.nn.LeakyReLU(0.2, True)
        elif self.activation == 'tanh':
            self.act = torch.nn.Tanh()
        elif self.activation == 'sigmoid':
            self.act = torch.nn.Sigmoid()
        
        # self.act : Conv Layer 출력 Activation Function 선택
        
    def forward(self, x):
        if self.norm is not None:
            out = self.bn(self.conv(x))
        else:
            out = self.conv(x)

        if self.activation is not None:
            return self.act(out)
        else:
            return out
    
'''
2. (1)에서 만든 ConvBlock() 을 사용하여 SRCNN 을 만듬
    이때, 각 layer1, 2, 3 에서 weights 를 초기화
'''

class SRCNN(nn.Module):
    def __init__(self, num_channels, base_filter, scale_factor):
        super(SRCNN, self).__init__()
        
        self.layer1 = ConvBlock(num_channels, base_filter, kernel_size=9, stride=1, padding=4, activation='relu', norm=None)
        # 9x9 filter, stride=1, padding=4 conv layer, Activation Function = 'ReLU', Normalization 없음
        self.layer2 = ConvBlock(base_filter, base_filter // 2, kernel_size=1, stride=1, padding=0, activation='relu', norm=None)
        # 1x1 filter, stride=1, padding=0 conv layer, Activation Function = 'ReLU', Normalization 없음
        self.layer3 = ConvBlock(base_filter // 2, 3, kernel_size=5, stride=1, padding=2, activation=None, norm=None)
        # 5x5 filter, stride=1, padding=2 conv layer, Activation Function = 'ReLU', Normalization 없음
        # 출력 channel 이 3인것은 color 이미지를 다루기 때문에
        
        '''
        이미 class 안에 정의되어있는 self. 가 붙은 변수들을 불러오려면 아래와 같은 방식을 취해야 함
        '''
        for m in self.modules():
        # 모듈 목록 (layer1, layer2, layer3) 불러오기 
            classname = m.__class__.__name__
            # 불러온 module 의 이름 
            if classname.find('Conv2d') != -1:
                torch.nn.init.kaiming_normal_(m.weight)
                if m.bias is not None:
                    m.bias.data.zero_()
            # Conv2d Layer 라면 He 방법으로 해당 Layer 의 weight 들을 초기화, bias 가 True 면 bias 를 0 으로 초기화
            elif classname.find('ConvTranspose2d') != -1:
                torch.nn.init.kaiming_normal_(m.weight)
                if m.bias is not None:
                    m.bias.data.zero_()
           # ConvTranspose2d Layer 라면 He 방법으로 해당 Layer 의 weight 들을 초기화, bias 가 True 면 bias 를 0 으로 초기화
        
    def forward(self, x):
        f1 = self.layer1(x)
        f2 = self.layer2(f1)
        y = self.layer3(f2)
        
        return y
    
'''
3. Activation 을 포함한 Conv. Layer 모듈 생성, (1)과 달리 Activation 선택은 불가 
'''
class Conv_ReLU_Block(nn.Module):
    def __init__(self):
        super(Conv_ReLU_Block, self).__init__()
        self.conv = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False)
        self.relu = nn.ReLU(inplace=True)
        
    def forward(self, x):
        return self.relu(self.conv(x))
    
'''
4. filename 에 .png, .jpg, .jpeg 중 하나라도 있으면 True 를 반환하는 함수
'''
def is_image_file(filename):
    return any(filename.endswith(extension) for extension in [".png", ".jpg", ".jpeg"])
'''
5. PIL 을 이용하여 filepath에서 이미지를 불러오고 RGB 로 convert 하여 반환하는 함수
'''
def load_img(filepath):
    img = Image.open(filepath).convert('RGB')
    return img
'''
6. PIL 을 이용하여 이미지를 Bicubic 으로 scale 만큼 High Resolution 으로 만드는 함수
'''
def rescale_img(img_in, scale):
    size_in = img_in.size
    # size_in = img_in 의 (width, height) tuple
    new_size_in = tuple([int(x * scale) for x in size_in])
    # new_size_in = (width * scale, height * scale)
    img_in = img_in.resize(new_size_in, resample=Image.BICUBIC)
    return img_in
'''
7. 원본이미지, downscale 이미지, upscale 이미지의 크기가 서로 다르므로 같은 크기로 crop 해 주는 함수 
'''
def get_patch(img_in, img_tar, img_bic, patch_size, scale, ix=-1, iy=-1):
    (ih, iw) = img_in.size
    (th, tw) = (scale * ih, scale * iw)

    patch_mult = scale #if len(scale) > 1 else 1
    tp = patch_mult * patch_size
    ip = tp // scale

    if ix == -1:
        ix = random.randrange(0, iw - ip + 1)
    if iy == -1:
        iy = random.randrange(0, ih - ip + 1)

    (tx, ty) = (scale * ix, scale * iy)

    img_in = img_in.crop((iy,ix,iy + ip, ix + ip))
    img_tar = img_tar.crop((ty,tx,ty + tp, tx + tp))
    img_bic = img_bic.crop((ty,tx,ty + tp, tx + tp))
                
    info_patch = {
        'ix': ix, 'iy': iy, 'ip': ip, 'tx': tx, 'ty': ty, 'tp': tp}
    # info_patch 에는 어떤 크기로 crop 했는지에 대한 정보가 있음
    return img_in, img_tar, img_bic, info_patch
'''
8. 이미지데이터 증강(augmentation)
'''
def augment(img_in, img_tar, img_bic, flip_h=True, rot=True):
    info_aug = {'flip_h': False, 'flip_v': False, 'trans': False}
    
    if random.random() < 0.5 and flip_h:
        img_in = ImageOps.flip(img_in)
        img_tar = ImageOps.flip(img_tar)
        img_bic = ImageOps.flip(img_bic)
        info_aug['flip_h'] = True
    # 50%의 확률로, flip_h = True 면 img_in, img_tar, img_bic 을 모두 flip

    if rot:
        if random.random() < 0.5:
            img_in = ImageOps.mirror(img_in)
            img_tar = ImageOps.mirror(img_tar)
            img_bic = ImageOps.mirror(img_bic)
            info_aug['flip_v'] = True
        if random.random() < 0.5:
            img_in = img_in.rotate(180)
            img_tar = img_tar.rotate(180)
            img_bic = img_bic.rotate(180)
            info_aug['trans'] = True
    # 50%의 확률로, rot = True 면 img_in, img_tar, img_bic 을 모두 mirror & 180도 rotate
    
    return img_in, img_tar, img_bic, info_aug
    # info_aug = 출력된 이미지에 어떤 preprocessing 을 했는지 알려줌
    
'''
9. PIL 이미지나 ndarray 이미지를 Tensor로 바꾸어주는 함수
'''
def transform():
    return Compose([ToTensor(),])

'''
10.사용자 정의 data 로드 - Training Data 
'''        
class DatasetFromFolder(data.Dataset):
    def __init__(self, image_dir, patch_size, upscale_factor, data_augmentation, transform=None):
        super(DatasetFromFolder, self).__init__()
        self.image_filenames = [join(image_dir, x) for x in listdir(image_dir) if is_image_file(x)]
        # self.image_filenames = image_dir 에 있는 모든 이미지의 경로 list
        self.patch_size = patch_size
        self.upscale_factor = upscale_factor
        self.transform = transform
        self.data_augmentation = data_augmentation
    '''
    def __getitem__(self, index) 에서는 index 번째 data를 return 하도록 코드를 짠다
    '''
    def __getitem__(self, index):
        target = load_img(self.image_filenames[index])
        # target = load_img 함수를 이용하여 index 번째 image 를 RGB 로 불러온 이미지
        input = target.resize((int(target.size[0]/self.upscale_factor),int(target.size[1]/self.upscale_factor)), Image.BICUBIC) 
        # input = target 으로 불러온 이미지를 Bicubic 으로 downscale 한 이미지
        bicubic = rescale_img(input, self.upscale_factor)
        # bicubic = rescale_img 함수로 input 을 다시 같은 비율로 upscale 한 이미지
        
        input, target, bicubic, _ = get_patch(input,target,bicubic,self.patch_size, self.upscale_factor)
        # 세 이미지를 같은 크기로 crop
        
        if self.data_augmentation:
            input, target, bicubic, _ = augment(input, target, bicubic)
        # data_augmentation = True 라면, 세 이미지를 augment 함수로 증강
        
        if self.transform:
            input = self.transform(input)
            bicubic = self.transform(bicubic)
            target = self.transform(target)
        # self.transform = True 라면, 세 이미지를 PIL image 에서 Tensor로 바꾸어 줌
        return input, target, bicubic
    '''
    def __len__(self) 에서는 data의 len을 return 하도록 코드를 짠다
    '''
    def __len__(self):
        return len(self.image_filenames)
'''
11.사용자 정의 data 로드 - Test Data (file 이름까지 사용자 정의 데이터화 시킴)
'''
class DatasetFromFolderEval(data.Dataset):
    def __init__(self, lr_dir, upscale_factor, transform=None):
        super(DatasetFromFolderEval, self).__init__()
        self.image_filenames = [join(lr_dir, x) for x in listdir(lr_dir) if is_image_file(x)]
        self.upscale_factor = upscale_factor
        self.transform = transform

    def __getitem__(self, index):
        target = load_img(self.image_filenames[index])
        _, file = os.path.split(self.image_filenames[index])
        # file = 이미지 경로 list 에서 file 이름만 잘라 냄

        input = target.resize((int(target.size[0]/self.upscale_factor),int(target.size[1]/self.upscale_factor)), Image.BICUBIC)       
        bicubic = rescale_img(input, self.upscale_factor)
        
        if self.transform:
            input = self.transform(input)
            bicubic = self.transform(bicubic)
            target = self.transform(target)
            
        return input, bicubic, target, file
        # file 이름까지 return
      
    def __len__(self):
        return len(self.image_filenames)
'''
12. Training Data 불러오기
'''
def get_training_set(data_dir, hr, upscale_factor, patch_size, data_augmentation):
    hr_dir = join(data_dir, hr)
    return DatasetFromFolder(hr_dir,patch_size, upscale_factor, data_augmentation,
                             transform=transform())
'''
13. Test Data 불러오기
'''
def get_eval_set(lr_dir, upscale_factor):
    return DatasetFromFolderEval(lr_dir, upscale_factor,
                             transform=transform())

print('finish')



finish


In [19]:
# Training code

from __future__ import print_function
import argparse
from math import log10

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
from torch.utils.data import DataLoader
import pdb
import socket
import time
import easydict

'''
from google.colab import drive
drive.mount('/content/gdrive')
'''
# colab 에서 필요한 코드 이므로 주석처리

opt = easydict.EasyDict({ 
    "batchSize": 32,           # batch size - 한번에 training할 patch의 숫자
    "lr": 1e-3,                # learning rate
    "upscale_factor": 2,       # Upscale 정도 - 2배, 4배
    "patch_size": 64,         # patch 크기
    "feature_number": 32,     # SRCNN model에서 사용할 feature 숫자
    
    "start_epoch": 1,           
    "nEpochs": 200,            # training 횟수
    "snapshots": 20,           # weight 저장 주기
   
    "data_dir":"C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/data_800개/train/", # dataset 저장 위치
    "hr_train_dataset": "DIV2K_train_HR", # training에 사용할 dataset 종류

    "model_type": "SRCNN",         # 모델이름
    "save_folder": "C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/weights/", # weight 저장 위치

    "pretrained_sr": "C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/weights/SRCNN_epoch_59.pth",
    "pretrained": False,
    "data_augmentation": False,
    "gpu_mode": True,
    "threads": 0,
    "gpus": 1
    })
'''
1. GPU 설정 및 parameter print
'''
gpus_list = range(opt.gpus)
cudnn.benchmark = True
print(opt)

cuda = opt.gpu_mode
if cuda and not torch.cuda.is_available():
    raise Exception("No GPU found, please run without --cuda")

'''
2. Training Data 로드
'''
print('===> Loading datasets')
train_set = get_training_set(opt.data_dir, opt.hr_train_dataset, opt.upscale_factor, opt.patch_size, opt.data_augmentation)
training_data_loader = DataLoader(dataset=train_set, num_workers=opt.threads, batch_size=opt.batchSize, shuffle=True)

'''
3. SRCNN 빌드 (Loss Function 으로 MSE 가 아니라 L1 Loss 사용)
'''
print('===> Building model ', opt.model_type)
model = SRCNN(num_channels=3, base_filter=opt.feature_number, scale_factor=opt.upscale_factor)
criterion = nn.L1Loss()

'''
4. GPU 사용 여부 및 pretrained model 사용 여부 설정
'''
if cuda:
    model = model.cuda(gpus_list[0])
    model = torch.nn.DataParallel(model, device_ids=gpus_list)
    criterion = criterion.cuda(gpus_list[0])
if opt.pretrained:
    model_name = os.path.join(opt.pretrained_sr)
    checkpoint = torch.load(model_name)
    model.load_state_dict(checkpoint['model_state_dict'])
    loss = checkpoint['loss']
    opt.start_epoch = checkpoint['epoch']
    print('Pre-trained SR model is loaded.')

'''
5. Optimizer 설정
'''
optimizer = optim.Adam(model.parameters(), lr=opt.lr, betas=(0.9, 0.999), eps=1e-8)

'''
6. 각 epoch 에서 어떻게 연산할지 설정하고 중간마다 weights 저장
'''
def train(epoch):
    epoch_loss = 0
    model.train()
    for iteration, batch in enumerate(training_data_loader, 1):
        _, target, bicubic = Variable(batch[0]), Variable(batch[1]), Variable(batch[2])
        if cuda:
            # input = input.cuda(gpus_list[0])
            target = target.cuda(gpus_list[0])
            bicubic = bicubic.cuda(gpus_list[0])

        optimizer.zero_grad()
        t0 = time.time()
        prediction = model(bicubic)

        loss = criterion(prediction, target)
        t1 = time.time()
        epoch_loss += loss.data
        loss.backward()
        optimizer.step()

        print("===> Epoch[{}]({}/{}): Loss: {:.4f} || Timer: {:.4f} sec.".format(epoch, iteration, len(training_data_loader), loss.data, (t1 - t0)))

    print("===> Epoch {} Complete: Avg. Loss: {:.4f}".format(epoch, epoch_loss / len(training_data_loader)))
    if (epoch+1) % (opt.snapshots) == 0:
      model_out_path = opt.save_folder+"SRCNN_epoch_{}.pth".format(epoch)
      torch.save({'epoch' : epoch,
              'model_state_dict' : model.state_dict(),
                'loss' : loss.data}, model_out_path)
      print("Checkpoint saved to {}".format(model_out_path))

def print_network(net):
    num_params = 0
    for param in net.parameters():
        num_params += param.numel()
    print(net)
    print('Total number of parameters: %d' % num_params)
    
'''
6. 훈련 시작
'''
if __name__ == '__main__':
    for epoch in range(opt.start_epoch, opt.nEpochs + 1):
        train(epoch)

{'batchSize': 32, 'lr': 0.001, 'upscale_factor': 2, 'patch_size': 64, 'feature_number': 32, 'start_epoch': 1, 'nEpochs': 200, 'snapshots': 20, 'data_dir': 'C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/data_800개/train/', 'hr_train_dataset': 'DIV2K_train_HR', 'model_type': 'SRCNN', 'save_folder': 'C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/weights/', 'pretrained_sr': 'C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/weights/SRCNN_epoch_59.pth', 'pretrained': False, 'data_augmentation': False, 'gpu_mode': True, 'threads': 0, 'gpus': 1}
===> Loading datasets
===> Building model  SRCNN


ModuleAttributeError: 'ConvBlock' object has no attribute 'activation'

In [23]:
# Validation code

from __future__ import print_function
import argparse

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from functools import reduce
from math import log10

# from scipy.misc import imsave
import scipy.io as sio
import time
import cv2
import easydict

'''
from google.colab import drive
drive.mount('/content/gdrive')
'''
# colab 에서 필요한 코드 이므로 주석처리

opt = easydict.EasyDict({ 
    "testBatchSize": 1, 
    "gpu_mode": True,
    "threads": 1,
    "gpus": 1,
    "upscale_factor": 2,

    "input_dir":"C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/data_800개/test/",  # test dataset 불러올위치
    "test_dataset": "Set5",                             # 사용할 test dataset
    "output": "C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/Results/", # 결과영상 저장위치
    "model_type": "SRCNN",
    "model": "C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/weights/SRCNN_epoch_19.pth"})  # test에 사용할 weight 불러올 위치


gpus_list=range(opt.gpus)
print(opt)

cuda = opt.gpu_mode
if cuda and not torch.cuda.is_available():
    raise Exception("No GPU found, please run without --cuda")



print('===> Loading datasets')
test_set = get_eval_set(os.path.join(opt.input_dir,opt.test_dataset), opt.upscale_factor)
testing_data_loader = DataLoader(dataset=test_set, num_workers=opt.threads, batch_size=opt.testBatchSize, shuffle=False)

print('===> Building model')

model = SRCNN(num_channels=3, base_filter=32, scale_factor=opt.upscale_factor)
model = torch.nn.DataParallel(model, device_ids=gpus_list)

checkpoint = torch.load(opt.model)
model.load_state_dict(checkpoint['model_state_dict'])
loss = checkpoint['loss']
epoch = checkpoint['epoch']
print('Pre-trained SR model is loaded.')

criterion = nn.MSELoss()

if cuda:
    model = model.cuda(gpus_list[0])

def eval():
    avg_psnr = 0
    psnr_sq = 0
    model.eval()
    for batch in testing_data_loader:
        with torch.no_grad():
            input, bicubic, target, name = Variable(batch[0]), Variable(batch[1]), Variable(batch[2]), batch[3]
        if cuda:
            input = input.cuda(gpus_list[0])
            bicubic = bicubic.cuda(gpus_list[0])
            target = target.cuda(gpus_list[0])

        prediction = model(bicubic)
        mse = criterion(prediction, target)
        psnr = 10 * log10(1 / mse.item())
        avg_psnr += psnr
        psnr_sq += psnr*psnr

        save_img(prediction.cpu().data, name[0])
        # save_img(bicubic.cpu().data, name[0])
        # save_img(target.cpu().data, name[0])
        average = avg_psnr/len(testing_data_loader)
        variance = psnr_sq/len(testing_data_loader)-average*average
    print("===> epoch number : %d" % (epoch))     
    print("===> Processing Done, Average PSNR : %.4f" % (average))
    print("===>PSNR Variance : %.4f" % (variance))
def save_img(img, img_name):
    save_img = img.squeeze().clamp(0, 1).numpy().transpose(1,2,0)
    # save img
    save_dir=os.path.join(opt.output,opt.test_dataset)
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
        
    save_fn = save_dir +'/'+ img_name
    cv2.imwrite(save_fn, cv2.cvtColor(save_img*255, cv2.COLOR_BGR2RGB),  [cv2.IMWRITE_PNG_COMPRESSION, 0])
    
##Eval Start!!!!
if __name__ == '__main__':
    eval()

{'testBatchSize': 1, 'gpu_mode': True, 'threads': 1, 'gpus': 1, 'upscale_factor': 2, 'input_dir': 'C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/data_800개/test/', 'test_dataset': 'Set5', 'output': 'C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/Results/', 'model_type': 'SRCNN', 'model': 'C:/Users/user/Desktop/SRCNN-20210324T094341Z-001/SRCNN/weights/SRCNN_epoch_19.pth'}
===> Loading datasets
===> Building model


ModuleAttributeError: 'ConvBlock' object has no attribute 'activation'