In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install tensorflow_addons

In [None]:
import os
import datetime
import random
import warnings
import numpy as np
import sys
import matplotlib.pyplot as plt


os.chdir('/content/drive/MyDrive/GitHub Repositories')
baseDir = './CXGAN/Cascaded Model/Synthetic'

normal = np.load(os.path.join(baseDir, 'dataset', 'Circle', '0', 'images.npy'), mmap_mode = 'r')[:]
infected = np.load(os.path.join(baseDir, 'dataset', 'Circle', '1', 'images.npy'), mmap_mode = 'r')[:]
print('Synthetic Data Class-0 : {}'.format(normal.shape))
print('Synthetic Data Class-1 : {}'.format(infected.shape))

train_normal = normal[:490]
test_normal = normal[490:]
train_infected = infected[:490]
test_infected = infected[490:]
print('Train-0 : {} | Test-0: {}'.format(train_normal.shape, test_normal.shape))
print('Train-1 : {} | Test-1: {}'.format(train_infected.shape, test_infected.shape))


Synthetic Data Class-0 : (500, 128, 128, 1)
Synthetic Data Class-1 : (500, 128, 128, 1)
Train-0 : (490, 128, 128, 1) | Test-0: (10, 128, 128, 1)
Train-1 : (490, 128, 128, 1) | Test-1: (10, 128, 128, 1)


In [None]:
class ImageBuffer():
    """This class implements an image buffer that stores previously generated images.
    This buffer enables us to update discriminators using a history of generated images
    rather than the ones produced by the latest generators.
    """

    def __init__(self, buffer_size):
        """Initialize the ImagePool class
        Parameters:
            pool_size (int) -- the size of image buffer, if pool_size=0, no buffer will be created
        """
        self.buffer_size = buffer_size
        if self.buffer_size > 0:  # create an empty pool
            self.num_imgs = 0
            self.images = []

    def query(self, images):
        """Return an image from the pool.
        Parameters:
            images: the latest generated images from the generator
        Returns images from the buffer.
        By 50/100, the buffer will return input images.
        By 50/100, the buffer will return images previously stored in the buffer,
        and insert the current images to the buffer.
        """
        if self.buffer_size == 0:  # if the buffer size is 0, do nothing
            return images
        return_images = []
        for image in images:
            #image = torch.unsqueeze(image.data, 0)
            if self.num_imgs < self.buffer_size:   # if the buffer is not full; keep inserting current images to the buffer
                self.num_imgs = self.num_imgs + 1
                self.images.append(image)
                return_images.append(image)
            else:
                p = random.uniform(0, 1)
                if p > 0.5:  # by 50% chance, the buffer will return a previously stored image, and insert the current image into the buffer
                    random_id = random.randint(0, self.buffer_size - 1)  # randint is inclusive
                    tmp = self.images[random_id]
                    self.images[random_id] = image
                    return_images.append(tmp)
                else:       # by another 50% chance, the buffer will return the current image
                    return_images.append(image)
        return return_images

**Generator with U-Net Architecture**                              
**Discriminator**                                                               
**Combined Network**

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa

def padding(input, pad_top, pad_bottom, pad_left, pad_right, mode="REFLECT"):
    if mode=="CONSTANT":
        return tf.keras.layers.Lambda(lambda x: tf.pad(x, [[0,0], [pad_top, pad_bottom], [pad_left, pad_right], [0,0]],
                                                       'CONSTANT'))(input)
    elif mode=="REFLECT":
        return tf.keras.layers.Lambda(lambda x: tf.pad(x, [[0,0], [pad_top, pad_bottom], [pad_left, pad_right], [0,0]],
                                                       'REFLECT'))(input)
    elif mode=="SYMMETRIC":
        return tf.keras.layers.Lambda(lambda x: tf.pad(x, [[0,0], [pad_top, pad_bottom], [pad_left, pad_right], [0,0]],
                                                       'SYMMETRIC'))(input)

def normalization(input, mode="instance", name=None):
    if mode == 'batch':
        return tf.keras.layers.BatchNormalization(name=name)(input)
    elif mode == 'instance':
        return tfa.layers.InstanceNormalization(axis=3, center=True, scale=True, name=name)(input)
    elif mode == 'layer':
        return tf.keras.layers.LayerNormalization(name=name)(input)
    elif mode == 'group':
        return tfa.layers.GroupNormalization()(input)

def activation(input, mode='relu', name=None):
    if mode == 'relu':
        return tf.keras.layers.ReLU(name=name)(input)
    elif mode == 'leaky_relu':
        return tf.keras.layers.LeakyReLU(alpha=0.2, name=name)(input)

def discriminator_loss(y_true, y_pred):
        return 0.5 * tf.keras.losses.mse(y_true, y_pred)



class CycleGAN_model():
    def __init__(self, image_height, image_width, image_channels):
        # As suggested in paper, while optimizing D authors divide the objective by 2,
        # which slows down the rate at which D learns, relative to the rate of G

        self.lambda_cycle = 10.0  # Cycle-consistency loss
        self.lambda_id = 0.5 * self.lambda_cycle  # Identity loss

        self.optimizer = tf.keras.optimizers.Adam(0.0002)

        # As suggested in paper Weights are initialized from a Gaussian distribution with mean 0 and std-dev 0.02.
        initializer = tf.keras.initializers.RandomNormal(mean=0, stddev=0.02)

        image_height = image_height
        image_width = image_width
        image_channels = image_channels

        image_shape = (image_height, image_width, image_channels)
        self.normalization_type = 'instance'

        self.D_A = self.Discriminator(image_shape=image_shape, initializer=initializer, name='Discriminator_A')
        self.D_B = self.Discriminator(image_shape=image_shape, initializer=initializer, name='Discriminator_B')
        self.D_A.compile(loss=discriminator_loss, optimizer=self.optimizer, metrics=['accuracy'])
        self.D_B.compile(loss=discriminator_loss, optimizer=self.optimizer, metrics=['accuracy'])

        self.disc_patch = (self.D_A.output_shape[1], self.D_A.output_shape[1], 1)

        self.G_A2B = self.Generator(image_shape=image_shape, initializer=initializer, name='Generator_A2B')
        self.G_B2A = self.Generator(image_shape=image_shape, initializer=initializer, name='Generator_B2A')

        # Input images from both domains
        image_A = tf.keras.layers.Input(shape=image_shape)
        image_B = tf.keras.layers.Input(shape=image_shape)
        # Translate images to the other domain
        fake_B = self.G_A2B(image_A)
        fake_A = self.G_B2A(image_B)
        # Reconstruct translated images back to original domain
        reconstr_A = self.G_B2A(fake_B)
        reconstr_B = self.G_A2B(fake_A)
        # Identity mapping of images
        image_A_id = self.G_B2A(image_A)
        image_B_id = self.G_A2B(image_B)

        # For the combined model we will only train the generators
        self.D_A.trainable = False
        self.D_B.trainable = False

        # Discriminators determines validity of translated images
        valid_A = self.D_A(fake_A)
        valid_B = self.D_B(fake_B)

        # Combined model trains generators to fool discriminators
        self.combined = tf.keras.models.Model(inputs=[image_A, image_B],
                                         outputs=[valid_A, valid_B, reconstr_A, reconstr_B, image_A_id, image_B_id])
        self.combined.compile(loss=['mse', 'mse', 'mae', 'mae', 'mae', 'mae'],
                         loss_weights=[1, 1, self.lambda_cycle, self.lambda_cycle, self.lambda_id, self.lambda_id], optimizer=self.optimizer)

    def num_resblocks(self, input, normalization_type, mode_activation, initializer, num_blocks):
        num_filters = input.get_shape()[-1]
        for i in range(num_blocks):
            output = self.residual_block(input, num_filters, normalization_type, mode_activation, initializer,
                                         name='resblock_{}'.format(i + 1))
            input = output

        return output

    def residual_block(self, input, num_filters, normalization_type, mode_activation, initializer, name=None):
        x = input

        x = padding(x, pad_top=1, pad_bottom=1, pad_left=1, pad_right=1, mode="REFLECT")
        x = tf.keras.layers.Conv2D(filters=num_filters, kernel_size=(3, 3), strides=(1, 1), padding='valid',
                                   use_bias=False,
                                   kernel_initializer=initializer, name='R256_conv1_{}'.format(name))(x)
        x = normalization(x, mode=normalization_type, name='R256_conv1_{}_instance'.format(name))
        x = activation(x, mode=mode_activation, name='R256_conv1_{}_relu'.format(name))

        x = padding(x, pad_top=1, pad_bottom=1, pad_left=1, pad_right=1, mode="REFLECT")
        x = tf.keras.layers.Conv2D(filters=num_filters, kernel_size=(3, 3), strides=(1, 1), padding='valid',
                                   use_bias=False,
                                   kernel_initializer=initializer, name='R256_conv2_{}'.format(name))(x)
        x = normalization(x, mode=normalization_type, name='R256_conv2_{}_instance'.format(name))

        return activation(tf.keras.layers.Add()([input, x]), mode=mode_activation, name='added_{}_relu'.format(name))

    def Generator(self, image_shape, initializer, name='Generator'):
        # reflection padding was used in the Cycle-GAN implementation to reduce the artifacts
        # c7s1_64 denotes a 7*7 Convolution-InstanceNorm-Relu layer with 64 filters and stride 1.
        # d128 denotes 3*3 Convolution-InstanceNorm-Relu layer with 128 filters and stride 2.
        # d256 denotes 3*3 Convolution-InstanceNorm-Relu layer with 256 filters and stride 2.
        # R256 denotes a residual block that contains two 3*3 Convolutionsal layers with the same number of filters (256) on both layers
        # c7s1_3 denotes a 7*7 Convolution-InstanceNorm-Relu layer with RGB filters and stride 1.

        input = tf.keras.Input(shape=image_shape)
        pad = padding(input, pad_top=3, pad_bottom=3, pad_left=3, pad_right=3, mode="REFLECT")
        c7s1_64 = tf.keras.layers.Conv2D(filters=64, kernel_size=(7, 7), strides=(1, 1), padding='valid',
                                         use_bias=False, kernel_initializer=initializer, name='c7s1_64')(pad)
        c7s1_64 = normalization(c7s1_64, mode=self.normalization_type, name='c7s1_64_instance')
        c7s1_64 = activation(c7s1_64, mode='relu', name='c7s1_64_relu')

        d_128 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3), strides=(2, 2), padding='same',
                                       use_bias=False, kernel_initializer=initializer, name='d128')(c7s1_64)
        d_128 = normalization(d_128, mode=self.normalization_type, name='d128_instance')
        d_128 = activation(d_128, mode='relu', name='d128_relu')

        d_256 = tf.keras.layers.Conv2D(filters=256, kernel_size=(3, 3), strides=(2, 2), padding='same',
                                       use_bias=False, kernel_initializer=initializer, name='d256')(d_128)
        d_256 = normalization(d_256, mode=self.normalization_type, name='d256_instance')
        d_256 = activation(d_256, mode='relu', name='d256_relu')

        # residual block (6 blocks for image_size = (128, 128, 3) and 9 blocks for greater image sizes)
        if image_shape[0] <= 128:
            R_256 = self.num_resblocks(d_256, self.normalization_type, 'relu', initializer, num_blocks=6)
        else:
            R_256 = self.num_resblocks(d_256, self.normalization_type, 'relu', initializer, num_blocks=9)

            # fractional-strided convolutions
        u_128 = tf.keras.layers.Conv2DTranspose(filters=128, kernel_size=(3, 3), strides=(2, 2), padding='same',
                                                use_bias=False, kernel_initializer=initializer, name='u64')(R_256)
        u_128 = normalization(u_128, mode=self.normalization_type, name='u64_instance')
        u_128 = activation(u_128, mode='relu', name='u64_relu')

        u_64 = tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=(3, 3), strides=(2, 2), padding='same',
                                               use_bias=False, kernel_initializer=initializer, name='u128')(u_128)
        u_64 = normalization(u_64, mode=self.normalization_type, name='u128_instance')
        u_64 = activation(u_64, mode='relu', name='u128_relu')

        # Note: The paper said that ReLU and _norm were used here but actually tanh was used and no normalization at
        # the original CycleGAN-pytorch implementation at https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix
        pad = padding(u_64, pad_top=3, pad_bottom=3, pad_left=3, pad_right=3, mode="REFLECT")
        c7s1_3 = tf.keras.layers.Conv2D(filters=1, kernel_size=(7, 7), strides=(1, 1), padding='valid',
                                        activation='tanh', kernel_initializer=initializer, name='c7s1-3')(pad)

        output = c7s1_3

        return tf.keras.Model(inputs=input, outputs=output, name=name)

    def Discriminator(self, image_shape, initializer, name=None):

        input = tf.keras.Input(shape=image_shape)

        pad = tf.keras.layers.ZeroPadding2D(padding=(1, 1))(input)
        c64 = tf.keras.layers.Conv2D(filters=64, kernel_size=(4, 4), strides=(2, 2), padding='same',
                                     kernel_initializer=initializer, name='c64')(pad)
        c64 = activation(c64, mode='leaky_relu', name='c64_leakyrelu')

        pad = tf.keras.layers.ZeroPadding2D(padding=(1, 1))(c64)
        c128 = tf.keras.layers.Conv2D(filters=128, kernel_size=(4, 4), strides=(2, 2), padding='same',
                                      use_bias=False, kernel_initializer=initializer, name='c128')(pad)
        c128 = normalization(c128, mode=self.normalization_type, name='c128_instance')
        c128 = activation(c128, mode='leaky_relu', name='c128_leakyrelu')

        pad = tf.keras.layers.ZeroPadding2D(padding=(1, 1))(c128)
        c256 = tf.keras.layers.Conv2D(filters=256, kernel_size=(4, 4), strides=(2, 2), padding='same',
                                      use_bias=False, kernel_initializer=initializer, name='c256')(pad)
        c256 = normalization(c256, mode=self.normalization_type, name='c256_instance')
        c256 = activation(c256, mode='leaky_relu', name='c256_leakyrelu')

        pad = tf.keras.layers.ZeroPadding2D(padding=(1, 1))(c256)
        c512 = tf.keras.layers.Conv2D(filters=512, kernel_size=(4, 4), strides=(1, 1), padding='same',
                                      use_bias=False, kernel_initializer=initializer, name='c512')(pad)
        c512 = normalization(c512, mode=self.normalization_type, name='c512_instance')
        c512 = activation(c512, mode='leaky_relu', name='c512_leakyrelu')

        output = tf.keras.layers.Conv2D(filters=1, kernel_size=(4, 4), strides=(1, 1), padding='same',
                                        kernel_initializer=initializer, name='output')(c512)

        return tf.keras.Model(inputs=input, outputs=output)




In [None]:
import os
import numpy as np
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt

import tensorflow as tf
import random


image_width = 128
image_height = 128
image_channels = 1


model = CycleGAN_model(image_height=image_height, image_width=image_width, image_channels=image_channels)
print('********************* Generator Architecture*********************')
print(model.G_A2B.summary())
print('********************* Discriminator Architecture*********************')
print(model.D_A.summary())

def sample_images(epoch, batch_i, img_A, img_B):
    saveDir = baseDir+'/results'
    os.makedirs(saveDir, exist_ok=True)
    r, c = 2, 3

    # Translate images to the other domain
    fake_B = model.G_A2B.predict(img_A)
    fake_A = model.G_B2A.predict(img_B)
    # Translate back to original domain
    reconstr_A = model.G_B2A.predict(fake_B)
    reconstr_B = model.G_A2B.predict(fake_A)

    img_A = img_A[0, :][np.newaxis, ...]
    img_B = img_B[0, :][np.newaxis, ...]
    fake_A = fake_A[0, :][np.newaxis, ...]
    fake_B = fake_B[0, :][np.newaxis, ...]
    reconstr_A = reconstr_A[0, :][np.newaxis, ...]
    reconstr_B = reconstr_B[0, :][np.newaxis, ...]

    gen_imgs = np.concatenate([img_A, fake_B, reconstr_A, img_B, fake_A, reconstr_B])

    # Rescale images 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    titles = ['Original', 'Translated', 'Reconstructed']
    fig, axs = plt.subplots(r, c)
    cnt = 0
    for i in range(r):
        for j in range(c):
            axs[i, j].imshow(gen_imgs[cnt, :, :, 0], cmap='gray')
            axs[i, j].set_title(titles[j])
            axs[i, j].axis('off')
            cnt += 1
    fig.savefig(saveDir + '/result' + str(epoch) + '_' + str(batch_i) + '.png')
    plt.close()

def learning_rate_decay(epoch, same_lr, reach_zero):
    return (1.0 - max(0, epoch + 1 - same_lr) / float(reach_zero))

********************* Generator Architecture*********************
Model: "Generator_A2B"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_54 (InputLayer)           [(None, 128, 128, 1) 0                                            
__________________________________________________________________________________________________
lambda_224 (Lambda)             (None, 134, 134, 1)  0           input_54[0][0]                   
__________________________________________________________________________________________________
c7s1_64 (Conv2D)                (None, 128, 128, 64) 3136        lambda_224[0][0]                 
__________________________________________________________________________________________________
c7s1_64_instance (InstanceNorma (None, 128, 128, 64) 128         c7s1_64[0][0]                    
____________________

In [None]:
epochs = 50
batch_size = 1
sample_interval=1000
batches = int(min(train_normal.shape[0], train_infected.shape[0])/batch_size)
# Adversarial loss ground truths
valid = np.ones((batch_size,) + model.disc_patch)
fake = np.zeros((batch_size,) + model.disc_patch)

fake_A_buffer = ImageBuffer(50)  # the buffer stores 50 generated images
fake_B_buffer = ImageBuffer(50)

print('****************************************** Training ************************************************')
for epoch in range(epochs):
    tf.keras.backend.set_value(model.optimizer.lr, tf.keras.backend.get_value(model.optimizer.lr) *
                                learning_rate_decay(epoch, epochs / 2, epochs / 2))

    if epoch == epochs // 2:
        print('Reduced the influence of cycle-consistency to 5, identity to 2.5')
        model.lambda_cycle = model.lambda_cycle // 2
        model.lambda_id = model.lambda_id // 2

        model.combined.compile(loss=['mse', 'mse', 'mae', 'mae', 'mae', 'mae'], loss_weights=[1, 1, model.lambda_cycle, model.lambda_cycle, 
                                                                                              model.lambda_id, model.lambda_id], 
                                optimizer=model.optimizer)

    for batch_i in range(min(len(train_infected), len(train_normal))):
        img_A = []
        img_B = []
        
        infected =  random.choice(train_infected)
        if np.random.random() > 0.5:
          infected = np.fliplr(infected)
        img_A.append(infected)

        normal = random.choice(train_normal)
        if np.random.random() > 0.5:
          normal = np.fliplr(normal)
        img_B.append(normal)
        
        img_A = np.array(img_A).reshape(-1, 128, 128, 1)
        img_A = img_A/127.5 - 1
        img_B = np.array(img_B).reshape(-1, 128, 128, 1)
        img_B = img_B/127.5 - 1


        # ----------------------
        #  Train Discriminators
        # ----------------------

        # Translate images to opposite domain
        fake_B = model.G_A2B.predict(img_A)
        fake_A = model.G_B2A.predict(img_B)


        fake_A = fake_A_buffer.query(fake_A)
        fake_B = fake_B_buffer.query(fake_B)

        # Train the discriminators (original images = real / translated = Fake)
        dA_input = np.vstack([img_A, fake_A])
        dA_output = np.vstack([valid, fake])
        dA_loss = model.D_A.train_on_batch(dA_input, dA_output)

        dB_input = np.vstack([img_B, fake_B])
        dB_output = np.vstack([valid, fake])
        dB_loss = model.D_B.train_on_batch(dB_input, dB_output)

        # Total disciminator loss

        d_loss = np.add(dA_loss, dB_loss)

        # ------------------
        #  Train Generators
        # ------------------

        # Train the generators
        g_loss = model.combined.train_on_batch([img_A, img_B],[valid, valid, img_A, img_B, img_A, img_B])

        template = 'epoch: {}/{} | batch: {}/{} | dis_loss: {:.4f} | gen_loss: {:.4f} | adv_loss: {:.4f} | recon_loss: {:.4f} | id_loss: {:.4f}'
        print(template.format(epoch, epochs, batch_i, min(len(train_infected), len(train_normal)), d_loss[0], g_loss[0], np.mean(g_loss[1:3]),
                              np.mean(g_loss[3:5]), np.mean(g_loss[5:7])), end='\r', flush=True)

        # If at save interval => save generated image samples
        if batch_i % sample_interval == 0:
            sample_images(epoch, batch_i, img_A, img_B)

    model_path = baseDir+'/saved_models/' + str(epoch)
    os.makedirs(model_path, exist_ok=True)
    model.combined.save_weights(model_path + '/combined.h5')
    print('\n')
