<a href="https://colab.research.google.com/github/inhovation97/Get-an-education-Computer-Vision/blob/main/GAN/SRGAN_20210615.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%tensorflow_version 1.x

TensorFlow 1.x selected.


In [None]:
# 구글 드라이브와 연동하는 방법 입니다. 
# Google File Drive Stream 접근을 허용해야 합니다.
from google.colab import auth
auth.authenticate_user()

# 구글 드라이브 mount 
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
%cd gdrive/My Drive/Colab Notebooks

/content/gdrive/My Drive/Colab Notebooks


In [None]:
from glob import glob
import numpy as np
import matplotlib.pyplot as plt
import cv2

# gan에서 hr은 보통 최대 512,512 사이즈
class DataLoader():
    def __init__(self, dataset_name, img_res=(384,384)):
        self.dataset_name = dataset_name
        self.img_res = img_res

    def load_data(self, batch_size=1, is_testing=False):
        data_type = "train" if not is_testing else "test"
        
        path = glob('./data/%s/*' % (self.dataset_name)) # 경로 내 파일 전부 검색

        imgs_hr = []
        imgs_lr = []
        for img_path in path:
            img = self.imread(img_path) # 이미지 읽어옴
            h, w = self.img_res # 아래 사이즈로 맞춰줄 것임
            # HR (384,384) LR (96,96)
            low_h, low_w = int(h / 4), int(w / 4)
            
            img_hr = cv2.resize(img, (384,384), interpolation=cv2.INTER_CUBIC) # hr size
            img_lr = cv2.resize(img, (low_h, low_w), interpolation=cv2.INTER_CUBIC) # lr size
            # hr / lr -> scale 바꿔주기
            # [0 ~ 255] --> [-1.0 ~ 1.0]
            img_hr = np.array(img_hr) / 127.5 - 1.

            # [0 ~ 255] --> [0.0 ~ 1.0]
            img_lr = np.array(img_lr) / 255.0

            # imgs_hr : (384,384,3)
            # imgs_lr : (94,94,3)
            img_hr = np.expand_dims(img_hr, axis=0)
            img_lr = np.expand_dims(img_lr, axis=0)
            # imgs_hr : (1,384,384,3)
            # imgs_lr : (1,94,94,3)
            
            yield img_hr, img_lr


    def imread(self, path):
        return cv2.imread(path, cv2.IMREAD_COLOR)

In [None]:
from __future__ import print_function, division
import scipy
import tensorflow as tf

from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate, BatchNormalization, Activation, ZeroPadding2D, Add, PReLU, LeakyReLU, UpSampling2D, Conv2D, GlobalAveragePooling2D
from tensorflow.keras.applications import VGG19
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import datetime
import matplotlib.pyplot as plt
import sys
import numpy as np
import os
import cv2

import tensorflow.keras.backend as K

class SRGAN():
    def __init__(self):
        # Input shape
        self.channels = 3
        self.lr_height = 96                # Low resolution height
        self.lr_width = 96                 # Low resolution width
        self.lr_shape = (self.lr_height, self.lr_width, self.channels)
        self.hr_height = self.lr_height*4   # High resolution height
        self.hr_width = self.lr_width*4     # High resolution width
        self.hr_shape = (self.hr_height, self.hr_width, self.channels)

        # Number of residual blocks in the generator
        self.n_residual_blocks = 16

        optimizer = Adam(0.0001, 0.9)

        # We use a pre-trained VGG19 model to extract image features from the high resolution
        # and the generated high resolution images and minimize the mse between them
        self.vgg = self.build_vgg()
        self.vgg.trainable = False
        # self.vgg.compile(loss='mse',optimizer=optimizer,metrics=['accuracy'])


        basepath = os.getcwd() # 현재 경로를 절대 경로로 설정
        targetpath = os.path.join(basepath, "data", "CelebA")

        self.files_ = os.listdir(targetpath) # 파일 리스트를 생성
        print (len(self.files_), "in the folder") # 파일 개수 확인
        # Configure data loader
        self.dataset_name = 'CelebA'
        self.data_loader = DataLoader(dataset_name=self.dataset_name,img_res=(self.hr_height, self.hr_width))

        # Calculate output shape of D (PatchGAN)
        patch = int(self.hr_height / 2**4) # down sampling 4번을 한 것이 패치가 됨
        self.disc_patch = (patch, patch, 1) 

        # Number of filters in the first layer of G and D
        self.gf = 64 
        self.df = 64

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # High res. and low res. images
        img_hr = Input(shape=self.hr_shape)
        img_lr = Input(shape=self.lr_shape)

        # Generate a fake image
        fake_hr = self.generator(img_lr)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # Discriminator determines validity of generated high res. images
        validity = self.discriminator(fake_hr)

        # Extract image features of the generated img
        fake_features = self.vgg(fake_hr)

        self.combined = Model(inputs=[img_lr, img_hr], 
                            outputs=[validity, fake_features]) # 1과 BCE
        self.combined.compile(loss=['binary_crossentropy', 'mse'],
                              loss_weights=[1e-3, 1], # scale 맞추기
                              optimizer=optimizer)

    def bgr2rgb(self, img):
        b, g, r = cv2.split(img)
        return cv2.merge([r,g,b])

    def build_vgg(self):
        vgg = VGG19(weights="imagenet", include_top=False, input_shape=(self.hr_shape))
        vggout = vgg.get_layer('block4_conv1').output /12.75 # 여기서 fm을 뽑음 (피쳐 인코더로만 쓴다.)
        return Model(inputs=vgg.input, outputs=vggout)

    def build_generator(self):
        def residual_block(layer_input, filters):
            """Residual block described in paper"""
            d = Conv2D(filters, kernel_size=3, strides=1,use_bias=False, padding='same')(layer_input)
            d = BatchNormalization(momentum=0.8)(d)
            d = Activation('relu')(d)
            d = Conv2D(filters, kernel_size=3, strides=1, use_bias=False, padding='same')(d)
            d = BatchNormalization(momentum=0.8)(d)
            d = Add()([d, layer_input])
            return d

        def deconv2d(layer_input): # conv -> upsample -> Act func
            """Layers used during upsampling"""
            u = Conv2D(256, kernel_size=3, strides=1, padding='same')(layer_input) # transpose2d도 가능
            u = UpSampling2D(size=2)(u)
            u = Activation('relu')(u)
            return u

        def pixelshuffle(layer_input): # pixel shuffle 검색해서 이미지로 보면 편한데, r^2사이즈의 fm들의 동일 픽셀 원소들을 r*r로 만들어 사이즈 업 시킴(가로,세로 *r만큼 사이즈업 됨 )
            conv = Conv2D(self.gf * (2 ** 2), (3,3), kernel_initializer='he_normal', padding='SAME')(layer_input)# 두배 늘릴거니까 r=2
            conv = tf.nn.depth_to_space(conv, 2)
            conv = LeakyReLU(alpha=0.3)(conv)
            return conv

        # Low resolution image input
        img_lr = Input(shape=self.lr_shape)
        # (96,96,3)

        # Pre-residual block = Encoding layer
        c1 = Conv2D(64, kernel_size=9, strides=1, padding='same')(img_lr)
        c1 = Activation('relu')(c1)

        # Propogate through residual blocks
        r = residual_block(c1, self.gf)
        for _ in range(self.n_residual_blocks - 1):
            r = residual_block(r, self.gf)

        # Post-residual block
        c2 = Conv2D(64, kernel_size=3, strides=1, padding='same')(r)
        c2 = BatchNormalization(momentum=0.8)(c2)
        c2 = Add()([c2, c1])
        # residual in residual 블록

        # Upsampling
        # u1 = deconv2d(c2)
        # u2 = deconv2d(u1)

        u1 = pixelshuffle(c2)
        u2 = pixelshuffle(u1)
        # Generate high resolution output
        gen_hr = Conv2D(self.channels, kernel_size=9, strides=1, padding='same', activation='tanh')(u2)

        return Model(img_lr, gen_hr)

    def build_discriminator(self):

        def d_block(layer_input, filters, strides=1, bn=True):
            """Discriminator layer"""
            if bn == True: # 첫 번쨰 layer에서는 필요없으므로 if문
                d = Conv2D(filters, kernel_size=3, strides=strides,use_bias=False, padding='same')(layer_input)
                d = BatchNormalization(momentum=0.8)(d)
                d = LeakyReLU(alpha=0.2)(d)
            else:
                d = Conv2D(filters, kernel_size=3, strides=strides, padding='same')(layer_input)
                d = LeakyReLU(alpha=0.2)(d)
            
            return d

        # Input img
        d0 = Input(shape=self.hr_shape)
        # (None, 384, 384, 3)

        d1 = d_block(d0, self.df, bn=False)
        # (None, 384, 384, 64)
        d2 = d_block(d1, self.df, strides=2)
        # (None, 192, 192, 64)
        d3 = d_block(d2, self.df*2)
        # (None, 192, 192, 128)
        d4 = d_block(d3, self.df*2, strides=2)
        # (None, 96, 96, 128)
        d5 = d_block(d4, self.df*4)
        # (None, 96, 96, 256)
        d6 = d_block(d5, self.df*4, strides=2)
        # (None, 48, 48, 256)
        d7 = d_block(d6, self.df*8)
        # (None, 48, 48, 512)
        d8 = d_block(d7, self.df*8, strides=2)
        # (None, 24, 24, 512)

        # 총 2의 4승의 down smapling
        
        d9 = GlobalAveragePooling2D()(d8) # 논문 그대로하려면 아래 flatten 방식을 해야하는게 맞지만, 알다시피 GAP이 훨씬 좋음 -> GAP랑 덴스 없이 그냥 출력하면 patch 방식임
        # Feature_flatten = Flatten()(d8)
        # d9 = Dense(1024)(Feature_flatten)
        # d10 = LeakyReLU(alpha=0.2)(d9)
        validity = Dense(1, activation='sigmoid')(d9)

        
        return Model(d0, validity)

    def train(self, epochs, batch_size=1, sample_interval=50):

        start_time = datetime.datetime.now()
        for epoch in range(epochs):
            data = self.data_loader.load_data(batch_size)
            for i in range(len(self.files_)):

                # Sample images and their conditioning counterparts
                imgs_hr , imgs_lr = next(data)

                # From low res. image generate high res. version
                fake_hr = self.generator.predict(imgs_lr)

                # valid = np.ones((batch_size,) + self.disc_patch) <- 이건 D가 패치혗태면 씀
                # fake = np.zeros((batch_size,) + self.disc_patch)
                valid = np.ones((batch_size,1))
                fake = np.zeros((batch_size,1))

                # Train the discriminators (original images = real / generated = Fake)
                d_loss_real = self.discriminator.train_on_batch(imgs_hr, valid)
                d_loss_fake = self.discriminator.train_on_batch(fake_hr, fake)
                d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

                # The generators want the discriminators to label the generated images as real
                # valid = np.ones((batch_size,) + self.disc_patch)
                valid = np.ones((batch_size,1))

                # Extract ground truth image features using pre-trained VGG19 model
                image_features = self.vgg.predict(imgs_hr)

                # Train the generators
                g_loss = self.combined.train_on_batch([imgs_lr, imgs_hr], [valid, image_features])

                elapsed_time = datetime.datetime.now() - start_time
                # Plot the progress
                # print ("%d time: %s" % (epoch, elapsed_time))

                print ("Epoch:", epoch,"/", epochs, "batch:", i,"/","1000", " d_loss_real:", d_loss_real[0], "d_loss_fake:", d_loss_fake[0], "g_loss:", g_loss[0])

                # If at save interval => save generated image samples
                if i % sample_interval == 0:
                    self.sample_images(epoch, i, imgs_hr , imgs_lr)

    def sample_images(self, epoch, i, imgs_hr, imgs_lr):
        os.makedirs('images/%s' % self.dataset_name, exist_ok=True)

        fake_hr = self.generator.predict(imgs_lr)

        # Rescale images 0 - 1
        imgs_lr = imgs_lr[0]
        fake_hr = 0.5 * fake_hr[0] + 0.5
        imgs_hr = 0.5 * imgs_hr[0] + 0.5

        imgs_lr = np.clip(imgs_lr, 0, 1)
        imgs_hr = np.clip(imgs_hr, 0, 1)
        fake_hr = np.clip(fake_hr, 0, 1)
        

        fig = plt.figure()
        rows = 1
        cols = 3

        ax1 = fig.add_subplot(rows, cols, 1)
        ax1.imshow(self.bgr2rgb(imgs_lr))
        ax1.set_title("LR")
        ax1.axis("off")

        ax2 = fig.add_subplot(rows, cols, 2)
        ax2.imshow(self.bgr2rgb(fake_hr))
        ax2.set_title("SR")
        ax2.axis("off")

        ax3 = fig.add_subplot(rows, cols, 3)
        ax3.imshow(self.bgr2rgb(imgs_hr))
        ax3.set_title("HR")
        ax3.axis("off")

        fig.savefig("images/%s/%d_%d.png" % (self.dataset_name, epoch, i))
        plt.close()


In [None]:
gan = SRGAN()

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
Downloading data from https://github.com/fchollet/deep-learning-models/releases/download/v0.1/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5
804 in the folder
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [None]:
gan.train(epochs=20, batch_size=1, sample_interval=100)

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
Epoch: 13 / 20 batch: 628 / 1000  d_loss_real: 0.00056299847 d_loss_fake: 0.00056916644 g_loss: 0.09305419
Epoch: 13 / 20 batch: 629 / 1000  d_loss_real: 0.001013828 d_loss_fake: 0.0014539054 g_loss: 0.11641647
Epoch: 13 / 20 batch: 630 / 1000  d_loss_real: 7.774391e-05 d_loss_fake: 0.00034435897 g_loss: 0.068980694
Epoch: 13 / 20 batch: 631 / 1000  d_loss_real: 0.00042506325 d_loss_fake: 0.0006818577 g_loss: 0.085355796
Epoch: 13 / 20 batch: 632 / 1000  d_loss_real: 0.0005144944 d_loss_fake: 0.0005217241 g_loss: 0.11747016
Epoch: 13 / 20 batch: 633 / 1000  d_loss_real: 0.00018837808 d_loss_fake: 0.0002508514 g_loss: 0.09984046
Epoch: 13 / 20 batch: 634 / 1000  d_loss_real: 0.0002456993 d_loss_fake: 0.0011828428 g_loss: 0.08096587
Epoch: 13 / 20 batch: 635 / 1000  d_loss_real: 0.00014219702 d_loss_fake: 0.00036432044 g_loss: 0.046458136
Epoch: 13 / 20 batch: 636 / 1000  d_loss_real: 0.00026230598 d_loss_fake: 0.00032899034 g_loss: 0.069