In [None]:
import struct
import numpy as np
from keras.layers import Conv2D
from keras.layers import Input
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU,Dense
from keras.layers import ZeroPadding2D,Flatten
from keras.layers import UpSampling2D,Lambda

from keras.layers import add, Concatenate,GlobalAveragePooling2D,Softmax
from keras.models import Model
import tensorflow as tf
from keras.applications import vgg19
import tensorflow_datasets as tfds
import os
import cv2 
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import pickle
import seaborn as sns
import matplotlib.image as mpimg
import shutil
import torch
import torch.nn as nn
import time

import shutil
shutil.rmtree('/your/folder/path/')
shutil.copytree('../input/chechpoint-of-sisr', './checkpoint_dir')

In [None]:
train_paths_to_HR_images= ['../input/patches/train/HR/'+name for name in os.listdir('../input/patches/train/HR')]
train_paths_to_LR_images= ['../input/patches/train/LR/'+name for name in os.listdir('../input/patches/train/LR')]

test_paths_to_HR_images= ['../input/patches/test/HR/'+name for name in os.listdir('../input/patches/test/HR/')]
test_paths_to_LR_images= ['../input/patches/test/LR/'+name for name in os.listdir('../input/patches/test/LR/')]

# visualizing the patches

# Creating the pipeline

In [None]:
def read(path_HR,path_LR):

    # 1)reading the content of the path

     # 3)Normalizing the image

    # 1)reading the content of the path
    content_HR=tf.io.read_file(path_HR)
    content_LR=tf.io.read_file(path_LR)
     
    # 2)decoding the content
    image_HR= tf.cast(tf.io.decode_png(content_HR,channels=3),dtype= tf.float32)
    image_LR= tf.io.decode_png(content_LR,channels=3)
     
    #The pixel value of HR images should be between [-1,1]
    image_HR= image_HR*2.0/255.0-1.0

    #The pixel value for LR images should be between [0,1]
    image_LR = tf.image.convert_image_dtype(image_LR, tf.float32)
    

    return image_HR,image_LR

def pipeline(HR_pat,LR_pat):

   dataset=tf.data.Dataset.from_tensor_slices((HR_pat,LR_pat))
    
   
   #reading the images.....................................................
   dataset=dataset.map(read,num_parallel_calls=tf.data.experimental.AUTOTUNE)

   #Creating the batch
   dataset=dataset.batch(batch_size=batch,drop_remainder=True)

   #All the batches will be stored in the cache after the first iteration
   dataset.cache()

   dataset.prefetch(tf.data.experimental.AUTOTUNE)   
   return dataset


# #Dataset Parameter

# train_dataset= pipeline(train_paths_to_HR_images[0:100],train_paths_to_LR_images[0:100])

# for HR,LR in train_dataset:
    
#     plt.figure(figsize=[8,6])
#     imgplot = plt.imshow(HR[0])
#     plt.show()
#     break

## Model


*Generator*

In [None]:
Growth_rate=32

def dense_block(inpt):
    """
    Dense block containes total 4 conv blocks with leakyRelu 
    activation, followed by post conv layer
    Params: tensorflow layer
    Returns: tensorflow layer
    """
    b1 = Conv2D(Growth_rate, kernel_size=3, strides=1, padding='same')(inpt)
    b1 = LeakyReLU(0.2)(b1)
    b1 = Concatenate()([inpt,b1])

    b2 = Conv2D(Growth_rate, kernel_size=3, strides=1, padding='same')(b1)
    b2 = LeakyReLU(0.2)(b2)
    b2 = Concatenate()([inpt,b1,b2]) 

    b3 = Conv2D(Growth_rate, kernel_size=3, strides=1, padding='same')(b2)
    b3 = LeakyReLU(0.2)(b3)
    b3 = Concatenate()([inpt,b1,b2,b3])

    b4 = Conv2D(Growth_rate, kernel_size=3, strides=1, padding='same')(b3)
    b4 = LeakyReLU(0.2)(b4)
    b4 = Concatenate()([inpt,b1,b2,b3,b4])

    b5 = Conv2D(Growth_rate, kernel_size=3, strides=1, padding='same')(b4)
    b5 = Lambda(lambda x:x*0.2)(b5)
#     print(b5.shape)
    b5 = add([b5, inpt])
    
    return b5

def RRDB(inpt,Beta= 0.2):
    
    """
    RRDB(residual in residual dense block) contained three dense  
    block, each block followed by beta contant multiplication(0.2) 
    and addition with dense block input layer.
    Params: tensorflow layer
    Returns: tensorflow layer
    """
    x = dense_block(inpt)

    x = dense_block(x)

    x = dense_block(x)

    x = Lambda(lambda x:x*Beta)(x) # Beta constant

    out = add([x,inpt])
    return out

In [None]:
#Creating GMGAN

def Generator_model(growth_rate=64, rrdb= 16):
 input=Input(shape=(96,96,3),name='Input')

#First convolution layer
 x1= Conv2D(filters=Growth_rate,kernel_size=3,padding='Same',name='conv1')(input) 
#  print(x1.shape)
    
 for i in range(rrdb):
#     print(i)
    r= RRDB(x1)
    x1= r

#convolution layer after RRDB block
 x2= Conv2D(filters=Growth_rate,kernel_size=3,padding='Same')(r) 

 x=add([x2,x1]) #skip connection

#Upsampling the LOW RESOLUTION IMAGE to increase the number of pixel by 4x in super resolution image
 x= UpSampling2D(size=(4,4))(x)

 x= Conv2D(filters=Growth_rate,kernel_size=3,padding='Same')(x) 

 output= Conv2D(filters=3,activation= 'tanh',kernel_size=9,padding='Same')(x) 


 model=Model(input,output)
 model.summary()
 return model

generator=Generator_model(Growth_rate)

In [None]:
#Creating discriminator
def conv_block(input,filter,kernel_size,stride,padding):

        x=Conv2D(filters=filter,kernel_size=kernel_size,strides=stride,padding=padding)(input) 
        x=BatchNormalization()(x)
        x=LeakyReLU()(x)
        return x


input=Input(shape=(96*4,96*4,3),name='Input')


x= conv_block(input,filter= 64,kernel_size= 3,stride= 1,padding='same')
x=LeakyReLU()(x)

x= conv_block(x,filter= 64,kernel_size= 3,stride= 2,padding='same')

x= conv_block(x,filter= 128,kernel_size= 3,stride= 1,padding='same')

x= conv_block(x,filter= 128,kernel_size= 3,stride= 2,padding='same')

x= conv_block(x,filter= 256,kernel_size= 3,stride= 1,padding='same')

x= conv_block(x,filter= 256,kernel_size= 3,stride= 2,padding='same')

x= conv_block(x,filter= 512,kernel_size= 3,stride= 1,padding='same')

x= conv_block(x,filter= 512,kernel_size= 3,stride= 2,padding='same')

x= conv_block(x,filter= 1,kernel_size= 3,stride= 1,padding='same')

x= Flatten()(x)

x= Dense(1024)(x)
x= LeakyReLU(alpha=0.2)(x)

output=Dense(1)(x)

discriminator=Model(input,output)
discriminator.summary()


# Losses

In [None]:
#For perpectual loss
VGG_19 = tf.keras.applications.VGG19(input_shape=(96*4,96*4,3),include_top=False,weights='imagenet')


#Generator Loss
def GMSD(HR,SR):

#    print('The shape of input image',HR.shape)

   sobel_HR=tf.image.sobel_edges(HR) #[batch_size,H,W,d,2] 
#    print('The  shape of sobel_HR:',sobel_HR.shape)
#    print('The output after sobel_HR',sobel_HR[0,0,0])

   sobel_HR= sobel_HR**2 #[H-2,2-2,3,2]
   G_HR=tf.math.sqrt(tf.math.reduce_sum(sobel_HR,axis=-1)) #[H-2,W-2,3,1] 
#    print('The output m_HR',G_HR)
#    print('The  shape of m_HR:',G_HR.shape)
#    print('\n')
#    print('*****************************************************')


   #calculating the gradient matrix value for super resolution image
 
   sobel_SR=tf.image.sobel_edges(SR) #[H-2,W-2,3,2]
#    print('The  shape of sobel_SR:',sobel_SR.shape)
#    print('The output after sobel_GR',sobel_SR[0,0,0])


   sobel_SR= sobel_SR**2 #[H-2,2-2,3,2]
   G_SR=tf.math.sqrt(tf.math.reduce_sum(sobel_SR,axis=-1)) #[H-2,W-2,3,1] 
#    print('The output after joining m_SR',G_HR[0,0,0])
#    print('The  shape of m_SR:',G_SR.shape)
#    print('\n')
#    print('******/***********************************************')


    #DEFINING THE PAREAMETER C
   c=tf.math.multiply(2*G_SR,G_HR)+0.000001
   d= tf.math.square(G_SR)+tf.math.square(G_HR)+0.000001
   GMS=c/d 
#    print('The  shape of GMS:',GMS)

   GMSM=tf.math.reduce_mean(GMS) #will get a scalar value
#    print('The GMSM value is :',GMSM)

   GMSD=tf.math.sqrt(tf.math.reduce_mean(tf.math.square((GMS-GMSM))))
#    print('The GMSD value is :',GMSD)
   return GMSD


def MSE(HR,SR):
    a= tf.keras.losses.MSE(SR,HR) #[batch,height,width]
    return tf.math.reduce_mean(a)
    

def perpectual_loss(HR,SR):
    VGG_SR= VGG_19(SR)
    VGG_HR= VGG_19(HR)
    perpectual= MSE(VGG_SR,VGG_HR)
    return perpectual


def generator_loss(HR,HR_pred,SR,SR_pred):
    
    I_GA = -tf.reduce_mean(SR_pred)
    I_Q= GMSD(HR,SR)
    I_M= MSE(HR,SR)
    I_P= perpectual_loss(HR,SR)
    generator_los= 0.005*I_GA+I_Q+0.01*I_M+I_P


    return generator_los


In [None]:

@tf.function
def discriminator_train(HR,LR,batch_size,step):
    '''

        Reference: https://www.tensorflow.org/tutorials/generative/dcgan
    '''

    epsilon = tf.random.uniform(shape=[batch_size, 1, 1, 1], minval=0, maxval=1)
    ###################################
    # Train D
    ###################################
    with tf.GradientTape(persistent=True) as d_tape:
        with tf.GradientTape() as gp_tape:
            SR = generator(LR, training=True)
            fake_image_mixed = epsilon * tf.dtypes.cast(HR, tf.float32) + ((1 - epsilon) * SR)
            fake_mixed_pred = discriminator(fake_image_mixed, training=True)
            
        # Compute gradient penalty
#         print('Computing Gradient penalty')
        grads = gp_tape.gradient(fake_mixed_pred, fake_image_mixed)
        grad_norms = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gradient_penalty = tf.reduce_mean(tf.square(grad_norms - 1))
#         print('The value of gradient penalty',gradient_penalty)
        
#         print('calculating the loss for discriminator')
        fake_pred = discriminator(SR, training=True)
        real_pred = discriminator(HR, training=True)
        
        D_loss= 0.005*(tf.reduce_mean(real_pred) - tf.reduce_mean(fake_pred) + LAMBDA * gradient_penalty)

    # Calculate the gradients for discriminator
    D_gradients = d_tape.gradient(D_loss,discriminator.trainable_variables)
    
    # Apply the gradients to the optimizer
    D_optimizer.apply_gradients(zip(D_gradients,discriminator.trainable_variables))

    return D_loss
    
@tf.function
def generator_train(HR,LR, batch_size):
    '''
        One generator training step
        
        Reference: https://www.tensorflow.org/tutorials/generative/dcgan
    '''
#     print("retrace")

    ###################################
    # Train G
    ###################################
    with tf.GradientTape() as g_tape:
        SR = generator(LR, training=True)
        SR_pred = discriminator(SR, training=False)
        HR_pred= discriminator(HR, training=True)
        
        loss_gen= generator_loss(HR,HR_pred,SR,SR_pred)
        
        
    # Calculate the gradients for generator
    G_gradients = g_tape.gradient(loss_gen,generator.trainable_variables)
    
    # Apply the gradients to the optimizer
    G_optimizer.apply_gradients(zip(G_gradients,generator.trainable_variables))
    return loss_gen
    
@tf.function
def discriminator_test(HR,LR,batch_size):
    '''

        Reference: https://www.tensorflow.org/tutorials/generative/dcgan
    '''
#     print("retrace")
    epsilon = tf.random.uniform(shape=[batch_size, 1, 1, 1], minval=0, maxval=1)
    ###################################
    # Train D
    ###################################

    with tf.GradientTape() as gp_tape:
        SR = generator(LR, training=True)
        fake_image_mixed = epsilon * tf.dtypes.cast(HR, tf.float32) + ((1 - epsilon) * SR)
        fake_mixed_pred = discriminator(fake_image_mixed, training=True)

    # Compute gradient penalty
#         print('Computing Gradient penalty')
    grads = gp_tape.gradient(fake_mixed_pred, fake_image_mixed)
    grad_norms = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
    gradient_penalty = tf.reduce_mean(tf.square(grad_norms - 1))

    fake_pred = discriminator(SR, training=True)
    real_pred = discriminator(HR, training=True)

    D_loss= 0.005*(tf.reduce_mean(real_pred) - tf.reduce_mean(fake_pred) + LAMBDA * gradient_penalty)


    return D_loss
    
@tf.function
def generator_test(HR,LR, batch_size):
    '''
        One generator training step
        
        Reference: https://www.tensorflow.org/tutorials/generative/dcgan
    '''
#     print("retrace")

    ###################################
    # Train G
    ###################################
    
    SR = generator(LR, training=True)
    SR_pred = discriminator(SR, training=False)
    HR_pred= discriminator(HR, training=True)

    loss_gen= generator_loss(HR,HR_pred,SR,SR_pred)

    return loss_gen


In [None]:
def Create_checkpoints(load_previous_model=True):
        #Creating an checkpoint object and manager to keep track of the checkpoint
    !mkdir ./checkpoint_dir
    checkpoint_path= './checkpoint_dir'
    ckpt = tf.train.Checkpoint(generator=generator,
                               discriminator=discriminator,
                               G_optimizer=G_optimizer,
                               D_optimizer=D_optimizer)

    manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)
    previous_epoch= 0

    if load_previous_model== True:

        #Loading the previous checkpoints so that to start the training from that epoch             
        ckpt.restore(manager.latest_checkpoint)

        if manager.latest_checkpoint:
            previous_epoch= int(manager.latest_checkpoint.split('\\')[-1].split('-')[-1])
            print('Loading the model parameters from previous epoch ie:',previous_epoch)
            print('\n')
    else:

        #Deleting the previous checkpoints directory
        shutil.rmtree('./Checkpoints')
        !mkdir './Checkpoints'

    return manager,previous_epoch



#optimizer
D_optimizer = tf.keras.optimizers.Adam(0.0001)
G_optimizer = tf.keras.optimizers.Adam(0.0001)

#Creating and loading the checkpoint
manager,previous_epoch= Create_checkpoints(load_previous_model= True)


In [None]:
#Parameters
LAMBDA= 10
epochs= 10
N_CRITIC= 5
batch= 5

#Dataset 
train_dataset= pipeline(train_paths_to_HR_images,train_paths_to_LR_images)
test_dataset= pipeline(test_paths_to_HR_images,test_paths_to_LR_images)

#Steps
train_steps= int(len(train_paths_to_HR_images)/batch)
test_steps= int(len(test_paths_to_HR_images)/batch)
nu_to_train_dis= 0

for epoch in range(1,epochs):
    start = time.time()
    
    
    for step, [HR,LR] in enumerate(train_dataset):
  
        #Training the discriminator
      
        loss_dis= discriminator_train(HR,LR,batch,1)         
        nu_to_train_dis+=1
    
        if nu_to_train_dis == N_CRITIC:
            loss_gen= generator_train(HR,LR,batch)
            nu_to_train_dis= 0
            
        if step%500==0 and step!=0:
            print('Epoch: ',epoch+previous_epoch,'/',epochs+previous_epoch)
            print(step,'/',train_steps,'/n')
            print('loss of discriminator ========',loss_dis.numpy())
            print('loss of generator ========',loss_gen.numpy())
        
        
#     print('/nTESTING:')
#     #For testing imagaes
        
#     for step, [HR,LR] in enumerate(test_dataset):
  
#         #Training the discriminator
#         loss_dis= discriminator_train(HR,LR,batch)         
#         nu_to_train_dis+=1
    
#         while nu_to_train_dis < N_CRITIC:
#             loss_gen= WGAN_GP_train_g_step(HR,LR,batch)
`
#             nu_to_train_dis= 0
        
#         #Saving the checkpoints
#         manager.save()

#     print('Epoch: ',epoch+previous_epoch,'/',epochs+previous_epoch)
#     print(step,'/',train_steps,'/n')
#     print('loss of discriminator ========',loss_dis)
#     print('loss of generator ========',loss_gen)
    print('/******************************************************************/')
    print('Time taken for epoch',epoch+previous_epoch,'is sec\n',time.time()-start)
    #Saving the model
    manager.save()

    