In [3]:
import tensorflow as tf
import numpy as np
import pandas as pd
import cv2 
import os
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras import layers, Model
from sklearn.model_selection import train_test_split
from keras import Model
from keras.layers import Conv2D
from keras.layers import PReLU
from keras.layers import BatchNormalization
from keras.layers import Flatten
from keras.layers import UpSampling2D
from keras.layers import LeakyReLU
from keras.layers import Dense
from keras.layers import Input
from keras.layers import add
from tqdm import tqdm

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [5]:
load_image('../img/bug/b_bigbug0000_croppped.png').shape

(1021, 1344, 3)

In [1]:
# Since the data is too large to fit in memory, we will use a generator to load the data in batches.
def load_image(path):
    try:
        img = cv2.imread(path)
        # If the image has not 3 channels, generate a 3 channels image from the gray scale image
        if img.shape[2]!=3:
            img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        return img
    except:
        print(path)
        return None

def load_data(batch_images, data_augmentation=0):
    imgs_hr, imgs_lr = [], []
    for img_path in batch_images:
        img_hr = load_image(img_path)
        # If the image is not at 1920x1080, resize it to 1920x1080
        if img_hr.shape[0]!=540 or img_hr.shape[1]!=960:
            img_hr = cv2.resize(img_hr, (540, 960), interpolation=cv2.INTER_CUBIC)
        
        # Make data augmentation here
        if data_augmentation>0:
            img_hr = aug_image(img_hr, data_augmentation)
        img_lr = cv2.resize(img_hr, (int(img_hr.shape[0]/4), int(img_hr.shape[1]/4)), interpolation=cv2.INTER_CUBIC)
        imgs_hr.append(img_hr)
        imgs_lr.append(img_lr)
    imgs_hr = np.array(imgs_hr)
    imgs_lr = np.array(imgs_lr)
    return imgs_hr, imgs_lr

# Augment the image by applying random rotation, random zoom and random translation
def aug_image(img,num_of_aug):
    img_list = []
    for i in range(num_of_aug):
        # Random rotation
        angle = np.random.randint(0,360)
        img_rot = rotate_image(img, angle)
        # Random zoom
        zoom_factor = np.random.randint(1,5)
        img_zoom = zoom_image(img_rot, zoom_factor)
        # Random translation
        x_shift = np.random.randint(-50,50)
        y_shift = np.random.randint(-50,50)
        img_shift = shift_image(img_zoom, x_shift, y_shift)
        img_list.append(img_shift)
    return img_list[np.random.randint(0,num_of_aug)]


def rotate_image(img, angle):
    rows,cols = img.shape[0:2]
    M = cv2.getRotationMatrix2D((cols/2,rows/2),angle,1)
    img_rot = cv2.warpAffine(img,M,(cols,rows))
    return img_rot

def zoom_image(img, zoom_factor):
    rows,cols = img.shape[0:2]
    M = cv2.getRotationMatrix2D((cols/2,rows/2),0,zoom_factor)
    img_zoom = cv2.warpAffine(img,M,(cols,rows))
    return img_zoom

def shift_image(img, x_shift, y_shift):
    rows,cols = img.shape[0:2]
    M = np.float32([[1,0,x_shift],[0,1,y_shift]])
    img_shift = cv2.warpAffine(img,M,(cols,rows))
    return img_shift

In [3]:
from tensorflow.keras.layers import Input, Activation, Dense, Conv2D, BatchNormalization, \
    LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam


class Discriminator:
    """
    Implementation of the discriminator network for the adversarial
    component of the perceptual loss.

    Args:
        patch_size: integer, determines input size as (patch_size, patch_size, 3).
        kernel_size: size of the kernel in the conv blocks.

    Attributes:
        model: Keras model.
        name: name used to identify what discriminator is used during GANs training.
        model._name: identifies this network as the discriminator network
            in the compound model built by the trainer class.
        block_param: dictionary, determines the number of filters and the strides for each
            conv block.

    """
    
    def __init__(self, patch_heigt,patch_width, kernel_size=3):
        self.patch_heigt = patch_heigt
        self.patch_width = patch_width
        self.kernel_size = kernel_size
        self.block_param = {}
        self.block_param['filters'] = (64, 128, 128, 256, 256, 512, 512)
        self.block_param['strides'] = (2, 1, 2, 1, 1, 1, 1)
        self.block_num = len(self.block_param['filters'])
        self.model = self._build_disciminator()
        optimizer = Adam(0.0002, 0.5)
        self.model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
        self.model._name = 'discriminator'
        self.name = 'srgan-large'
    
    def _conv_block(self, input, filters, strides, batch_norm=True, count=None):
        """ Convolutional layer + Leaky ReLU + conditional BN. """
        
        x = Conv2D(
            filters,
            kernel_size=self.kernel_size,
            strides=strides,
            padding='same',
            name='Conv_{}'.format(count),
        )(input)
        x = LeakyReLU(alpha=0.2)(x)
        if batch_norm:
            x = BatchNormalization(momentum=0.8)(x)
        return x
    
    def _build_disciminator(self):
        """ Puts the discriminator's layers together. """
        
        HR = Input(shape=(self.patch_heigt, self.patch_width, 3))
        x = self._conv_block(HR, filters=64, strides=1, batch_norm=False, count=1)
        for i in range(self.block_num):
            x = self._conv_block(
                x,
                filters=self.block_param['filters'][i],
                strides=self.block_param['strides'][i],
                count=i + 2,
            )
        x = Dense(self.block_param['filters'][-1] * 2, name='Dense_1024')(x)
        x = LeakyReLU(alpha=0.2)(x)
        # x = Flatten()(x)
        x = Dense(1, name='Dense_last')(x)
        HR_v_SR = Activation('sigmoid')(x)
        
        discriminator = Model(inputs=HR, outputs=HR_v_SR)
        return discriminator

In [4]:
import numpy as np

def process_array(image_array, expand=True):
    """ Process a 3-dimensional array into a scaled, 4 dimensional batch of size 1. """
    
    image_batch = image_array / 255.0
    if expand:
        image_batch = np.expand_dims(image_batch, axis=0)
    return image_batch


def process_output(output_tensor):
    """ Transforms the 4-dimensional output tensor into a suitable image format. """
    
    sr_img = output_tensor.clip(0, 1) * 255
    sr_img = np.uint8(sr_img)
    return sr_img


def split_image_into_overlapping_patches(image_array, patch_size, padding_size=2):
    """ Splits the image into partially overlapping patches.

    The patches overlap by padding_size pixels.

    Pads the image twice:
        - first to have a size multiple of the patch size,
        - then to have equal padding at the borders.

    Args:
        image_array: numpy array of the input image.
        patch_size: size of the patches from the original image (without padding).
        padding_size: size of the overlapping area.
    """
    
    xmax, ymax, _ = image_array.shape
    x_remainder = xmax % patch_size
    y_remainder = ymax % patch_size
    
    # modulo here is to avoid extending of patch_size instead of 0
    x_extend = (patch_size - x_remainder) % patch_size
    y_extend = (patch_size - y_remainder) % patch_size
    
    # make sure the image is divisible into regular patches
    extended_image = np.pad(image_array, ((0, x_extend), (0, y_extend), (0, 0)), 'edge')
    
    # add padding around the image to simplify computations
    padded_image = pad_patch(extended_image, padding_size, channel_last=True)
    
    xmax, ymax, _ = padded_image.shape
    patches = []
    
    x_lefts = range(padding_size, xmax - padding_size, patch_size)
    y_tops = range(padding_size, ymax - padding_size, patch_size)
    
    for x in x_lefts:
        for y in y_tops:
            x_left = x - padding_size
            y_top = y - padding_size
            x_right = x + patch_size + padding_size
            y_bottom = y + patch_size + padding_size
            patch = padded_image[x_left:x_right, y_top:y_bottom, :]
            patches.append(patch)
    
    return np.array(patches), padded_image.shape


def stich_together(patches, padded_image_shape, target_shape, padding_size=4):
    """ Reconstruct the image from overlapping patches.

    After scaling, shapes and padding should be scaled too.

    Args:
        patches: patches obtained with split_image_into_overlapping_patches
        padded_image_shape: shape of the padded image contructed in split_image_into_overlapping_patches
        target_shape: shape of the final image
        padding_size: size of the overlapping area.
    """
    
    xmax, ymax, _ = padded_image_shape
    patches = unpad_patches(patches, padding_size)
    patch_size = patches.shape[1]
    n_patches_per_row = ymax // patch_size
    
    complete_image = np.zeros((xmax, ymax, 3))
    
    row = -1
    col = 0
    for i in range(len(patches)):
        if i % n_patches_per_row == 0:
            row += 1
            col = 0
        complete_image[
        row * patch_size: (row + 1) * patch_size, col * patch_size: (col + 1) * patch_size, :
        ] = patches[i]
        col += 1
    return complete_image[0: target_shape[0], 0: target_shape[1], :]


class ImageModel:
    """ISR models parent class.

    Contains functions that are common across the super-scaling models.
    """
    
    def predict(self, input_image_array, by_patch_of_size=None, batch_size=10, padding_size=2):
        """
        Processes the image array into a suitable format
        and transforms the network output in a suitable image format.

        Args:
            input_image_array: input image array.
            by_patch_of_size: for large image inference. Splits the image into
                patches of the given size.
            padding_size: for large image inference. Padding between the patches.
                Increase the value if there is seamlines.
            batch_size: for large image inferce. Number of patches processed at a time.
                Keep low and increase by_patch_of_size instead.
        Returns:
            sr_img: image output.
        """
        
        if by_patch_of_size:
            lr_img = process_array(input_image_array, expand=False)
            patches, p_shape = split_image_into_overlapping_patches(
                lr_img, patch_size=by_patch_of_size, padding_size=padding_size
            )
            # return patches
            for i in range(0, len(patches), batch_size):
                batch = self.model.predict(patches[i: i + batch_size])
                if i == 0:
                    collect = batch
                else:
                    collect = np.append(collect, batch, axis=0)
            
            scale = self.scale
            padded_size_scaled = tuple(np.multiply(p_shape[0:2], scale)) + (3,)
            scaled_image_shape = tuple(np.multiply(input_image_array.shape[0:2], scale)) + (3,)
            sr_img = stich_together(
                collect,
                padded_image_shape=padded_size_scaled,
                target_shape=scaled_image_shape,
                padding_size=padding_size * scale,
            )
        
        else:
            lr_img = process_array(input_image_array)
            sr_img = self.model.predict(lr_img)[0]
        
        sr_img = process_output(sr_img)
        return sr_img

In [5]:
import tensorflow as tf
from tensorflow.keras.initializers import RandomUniform
from tensorflow.keras.layers import concatenate, Input, Activation, Add, Conv2D, Lambda, UpSampling2D
from tensorflow.keras.models import Model



class Generator:

    def __init__(self,nbConvLayer,nbRDB,nbOutputCNN,nbOutputFilters,scalingFactor,patch_heigt,patch_width,cdim=3,kernel_size=3,upscaling='ups',init_extreme_val=0.05):
        self.nbConvLayer = nbConvLayer
        self.C = nbConvLayer
        self.D = nbRDB
        self.G = nbOutputCNN
        self.G0 = nbOutputFilters
        self.scale = scalingFactor
        self.nbRDB = nbRDB
        self.patch_heigt = patch_heigt
        self.patch_width = patch_width
        self.cdim = cdim
        self.kernel_size = kernel_size
        
        
        
        self.upscaling = upscaling
        self.initializer = RandomUniform(
            minval=-init_extreme_val, maxval=init_extreme_val, seed=None
        )
        self.model = self._build_rdn()
        self.model._name = 'generator'
        self.name = 'cnn-gan'
        

        
    def _upsampling_block(self, input_layer):
        """ Upsampling block for old weights. """
        
        x = Conv2D(
            self.cdim * self.scale ** 2,
            kernel_size=3,
            padding='same',
            name='UPN3',
            kernel_initializer=self.initializer,
        )(input_layer)
        return UpSampling2D(size=self.scale, name='UPsample')(x)
    
    def _pixel_shuffle(self, input_layer):
        """ PixelShuffle implementation of the upscaling layer. """
        
        x = Conv2D(
            self.cdim * self.scale ** 2,
            kernel_size=3,
            padding='same',
            name='UPN3',
            kernel_initializer=self.initializer,
        )(input_layer)
        return Lambda(
            lambda x: tf.nn.depth_to_space(x, block_size=self.scale, data_format='NHWC'),
            name='PixelShuffle',
        )(x)
    
    def _UPN(self, input_layer):
        """ Upscaling layers. With old weights use _upsampling_block instead of _pixel_shuffle. """
        
        x = Conv2D(
            64,
            kernel_size=5,
            strides=1,
            padding='same',
            name='UPN1',
            kernel_initializer=self.initializer,
        )(input_layer)
        x = Activation('relu', name='UPN1_Relu')(x)
        x = Conv2D(
            32, kernel_size=3, padding='same', name='UPN2', kernel_initializer=self.initializer
        )(x)
        x = Activation('relu', name='UPN2_Relu')(x)
        if self.upscaling == 'shuffle':
            return self._pixel_shuffle(x)
        elif self.upscaling == 'ups':
            return self._upsampling_block(x)
        else:
            raise ValueError('Invalid choice of upscaling layer.')
    
    def _RDBs(self, input_layer):
        """RDBs blocks.

        Args:
            input_layer: input layer to the RDB blocks (e.g. the second convolutional layer F_0).

        Returns:
            concatenation of RDBs output feature maps with G0 feature maps.
        """
        rdb_concat = list()
        rdb_in = input_layer
        for d in range(1, self.D + 1):
            x = rdb_in
            for c in range(1, self.C + 1):
                F_dc = Conv2D(
                    self.G,
                    kernel_size=self.kernel_size,
                    padding='same',
                    kernel_initializer=self.initializer,
                    name='F_%d_%d' % (d, c),
                )(x)
                F_dc = Activation('relu', name='F_%d_%d_Relu' % (d, c))(F_dc)
                # concatenate input and output of ConvRelu block
                # x = [input_layer,F_11(input_layer),F_12([input_layer,F_11(input_layer)]), F_13..]
                x = concatenate([x, F_dc], axis=3, name='RDB_Concat_%d_%d' % (d, c))
            # 1x1 convolution (Local Feature Fusion)
            x = Conv2D(
                self.G0, kernel_size=1, kernel_initializer=self.initializer, name='LFF_%d' % (d)
            )(x)
            # Local Residual Learning F_{i,LF} + F_{i-1}
            rdb_in = Add(name='LRL_%d' % (d))([x, rdb_in])
            rdb_concat.append(rdb_in)
        
        assert len(rdb_concat) == self.D
        
        return concatenate(rdb_concat, axis=3, name='LRLs_Concat')
    
    def _build_rdn(self):
        LR_input = Input(shape=(self.patch_heigt, self.patch_width, 3), name='LR')
        F_m1 = Conv2D(
            self.G0,
            kernel_size=self.kernel_size,
            padding='same',
            kernel_initializer=self.initializer,
            name='F_m1',
        )(LR_input)
        F_0 = Conv2D(
            self.G0,
            kernel_size=self.kernel_size,
            padding='same',
            kernel_initializer=self.initializer,
            name='F_0',
        )(F_m1)
        FD = self._RDBs(F_0)
        # Global Feature Fusion
        # 1x1 Conv of concat RDB layers -> G0 feature maps
        GFF1 = Conv2D(
            self.G0,
            kernel_size=1,
            padding='same',
            kernel_initializer=self.initializer,
            name='GFF_1',
        )(FD)
        GFF2 = Conv2D(
            self.G0,
            kernel_size=self.kernel_size,
            padding='same',
            kernel_initializer=self.initializer,
            name='GFF_2',
        )(GFF1)
        # Global Residual Learning for Dense Features
        FDF = Add(name='FDF')([GFF2, F_m1])
        # Upscaling
        FU = self._UPN(FDF)
        # Compose SR image
        SR = Conv2D(
            self.cdim,
            kernel_size=self.kernel_size,
            padding='same',
            kernel_initializer=self.initializer,
            name='SR',
        )(FU)
        
        return Model(inputs=LR_input, outputs=SR)
    

In [6]:
from tensorflow.keras.models import Model
from tensorflow.keras.applications.vgg19 import VGG19



class Cut_VGG19:
    """
    Class object that fetches keras' VGG19 model trained on the imagenet dataset
    and declares <layers_to_extract> as output layers. Used as feature extractor
    for the perceptual loss function.

    Args:
        layers_to_extract: list of layers to be declared as output layers.
        patch_size: integer, defines the size of the input (patch_size x patch_size).

    Attributes:
        loss_model: multi-output vgg architecture with <layers_to_extract> as output layers.
    """
    
    def __init__(self, patch_heigt,patch_width, layers_to_extract):
        self.patch_heigt = patch_heigt
        self.patch_width = patch_width
        self.input_shape = (patch_heigt, patch_width, 3)
        self.layers_to_extract = layers_to_extract
        
        if len(self.layers_to_extract) > 0:
            self._cut_vgg()
        else:
            raise ValueError('Invalid VGG instantiation: extracted layer must be > 0')
    
    def _cut_vgg(self):
        """
        Loads pre-trained VGG, declares as output the intermediate
        layers selected by self.layers_to_extract.
        """
        
        vgg = VGG19(weights='imagenet', include_top=False, input_shape=self.input_shape)
        vgg.trainable = False
        outputs = [vgg.layers[i].output for i in self.layers_to_extract]
        self.model = Model([vgg.input], outputs)
        self.model._name = 'feature_extractor'
        self.name = 'vgg19'  # used in weights naming


In [7]:

# Hyperparameters
batch_size = 2
epochs = 10
sample_interval = 1
path_train = '../img/bug'
path_test = 'data/test'

# Build and compile the discriminator
discriminator = Discriminator(1080,1920)

def discriminator_loss(real_output,fake_output):
    real_loss = tf.keras.losses.binary_crossentropy(tf.ones_like(real_output), real_output)
    fake_loss = tf.keras.losses.binary_crossentropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

discriminator.model.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])
# discriminator.model.summary()
# Generate random weights for the discriminator
discriminator.model.save_weights('discriminator.h5')

# Build the generator
generator = Generator(3,3,4,4,4,1080//4,1920//4)

def generator_loss(fake_output):
    return tf.keras.losses.binary_crossentropy(tf.ones_like(fake_output), fake_output)


generator.model.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))
# generator.model.summary()
# Generate random weights for the generator
generator.model.save_weights('generator.h5')

# Build the VGG19 network
vgg = Cut_VGG19(1080,1920, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Adversarial ground truths
valid = np.ones((batch_size, 1))
fake = np.zeros((batch_size, 1))

# Prepare the batches of training data
train_paths = os.listdir(path_train)
for i in range(len(train_paths)):
    train_paths[i] = path_train + '/' + train_paths[i]

nbBatches = len(train_paths) // batch_size

# Separate the training data into batches of size batch_size
batches_path_list = []
for i in range(nbBatches):
    batches_path_list.append(train_paths[i * batch_size:(i + 1) * batch_size])



train_discriminator = True
train_generator = False


# Load the model weights
if train_generator:
    discriminator.model.load_weights('discriminator.h5')
if train_discriminator:
    generator.model.load_weights('generator.h5')



# Training
for epoch in range(epochs):
    cross_validation_batches_index = np.random.randint(0, nbBatches, 1)
    print('Cross validation batches index: ', cross_validation_batches_index)
    for batch_i in tqdm(range(nbBatches)):
        if batch_i == cross_validation_batches_index:
            continue

        # Load the data of the current batch
        imgs_hr, imgs_lr = load_data(batches_path_list[batch_i], data_augmentation=0)

        # ----------------------
        #  Train Discriminator
        # ----------------------
        if train_discriminator:
            # From low res. image generate high res. version
            fake_hr = generator.model.predict(imgs_lr)
            
            
            # Train the discriminators (original images = real / generated = Fake)
            d_loss_real = discriminator.model.train_on_batch(imgs_hr, valid)
            d_loss_fake = discriminator.model.train_on_batch(fake_hr, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
        
        # ------------------
        #  Train Generator
        # ------------------
        if train_generator:
            # Train the generators
            g_loss = generator.model.train_on_batch(imgs_lr, [imgs_hr, valid])
        
        # Plot the progress
        # print(
        #     "\n [Epoch %d/%d] [Batch %d/%d] [D loss: %f, acc: %.2f%%] [G loss: %f] \n"
        #     % (epoch, epochs, batch_i, nbBatches, d_loss,d_loss, g_loss)
        # )
    
    # Calculate the cross validation loss after each epoch
    imgs_hr, imgs_lr = load_data(batches_path_list[cross_validation_batches_index], data_augmentation=0)
    fake_hr = generator.model.predict(imgs_lr)
    cross_validation_loss_generator = generator.model.evaluate(imgs_lr, [imgs_hr, valid], verbose=0)
    cross_validation_loss_discriminator = discriminator.model.evaluate(fake_hr, fake, verbose=0)

    # Plot the progress
    print(
        "\n [Epoch %d/%d] [Cross validation loss generator: %f] [Cross validation loss discriminator: %f] \n"
        % (epoch, epochs, cross_validation_loss_generator, cross_validation_loss_discriminator)
    )


# Save the model weights after training
if train_discriminator:
    discriminator.model.save_weights('discriminator.h5')
if train_generator:
    generator.model.save_weights('generator.h5')


  0%|          | 0/7 [00:00<?, ?it/s]

: 

In [None]:
def training_step(batch_paths):
    imgs_hr, imgs_lr = load_data(batches_path_list[batch_i], data_augmentation=0)

    # ----------------------
    #  Train Discriminator
    # ----------------------
    # From low res. image generate high res. version
    fake_hr = generator.model.predict(imgs_lr)
    
    
    # Train the discriminators (original images = real / generated = Fake)
    d_loss_real = discriminator.model.train_on_batch(imgs_hr, valid)
    d_loss_fake = discriminator.model.train_on_batch(fake_hr, fake)
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

    # ------------------
    #  Train Generator
    # ------------------
    # Train the generators
    g_loss = generator.model.train_on_batch(imgs_lr, [imgs_hr, valid])