In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
!ls "/content/drive/My Drive"

'biz telecom material.gdoc'
'biz telecom report.gdoc'
 case1.mp4
 checkpoint
'Colab Notebooks'
 DiscoGAN
 DiscoGAN-192.data-00000-of-00001
 DiscoGAN-192.meta
 DiscoGAN-194.data-00000-of-00001
 DiscoGAN-194.index
 DiscoGAN-194.meta
 DiscoGAN-196.data-00000-of-00001
 DiscoGAN-196.index
 DiscoGAN-196.meta
 DiscoGAN-198.data-00000-of-00001
 DiscoGAN-198.index
 DiscoGAN-198.meta
 DiscoGAN-200.data-00000-of-00001
 DiscoGAN-200.index
 DiscoGAN-200.meta
'ECE 9039 Project Proposal.gdoc'
'ECE9063 Project Report.gdoc'
'Eurika UGA solution.gsheet'
'flight meal cost.gdoc'
 horse2zebra
'Migrate for Anthos vs Migrate with Docker Image Deployment.gdoc'
 photo2monet
 pre.mp4
'Silicon Valley Technology Companies Stock Price Prediction.gslides'
'survey_results_public (1).csv.gsheet'
 survey_results_public.csv.gsheet
'telecom assignment 1.gdoc'
 Untitled
'Untitled document.gdoc'


In [None]:
#import necessary libraries
import os
import glob
import numpy as np
import cv2 as cv
from functools import reduce
import tensorflow as tf
import tensorflow.keras as keras

#load GPU
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

#compose sequence
def compose(*funcs):
    if funcs:
        return reduce(lambda f, g: lambda *a, **kw: g(f(*a, **kw)), funcs)
    else:
        raise ValueError('Composition of empty sequence not supported.')


#Normalization processing
class InstanceNormalization(keras.layers.Layer):
    def __init__(self, beta_initializer='zeros', gamma_initializer='ones',
                 beta_regularizer=None, gamma_regularizer=None,
                 beta_constraint=None, gamma_constraint=None, epsilon=1e-5,
                 **kwargs):
        super(InstanceNormalization, self).__init__(**kwargs)
        self.epsilon = epsilon
        self.beta_initializer = keras.initializers.get(beta_initializer)
        self.gamma_initializer = keras.initializers.get(gamma_initializer)
        self.beta_regularizer = keras.regularizers.get(beta_regularizer)
        self.gamma_regularizer = keras.regularizers.get(gamma_regularizer)
        self.beta_constraint = keras.constraints.get(beta_constraint)
        self.gamma_constraint = keras.constraints.get(gamma_constraint)

    def build(self, input_shape):
        assert len(input_shape) == 4
        self.gamma = self.add_weight(shape=(input_shape[-1],), name='gamma', initializer=self.gamma_initializer,
                                     regularizer=self.gamma_regularizer, constraint=self.gamma_constraint)
        self.beta = self.add_weight(shape=(input_shape[-1],), name='beta', initializer=self.beta_initializer,
                                    regularizer=self.beta_regularizer, constraint=self.beta_constraint)

    def call(self, inputs, **kwargs):
        mean, variance = tf.nn.moments(inputs, axes=[1, 2])
        mean = tf.reshape(mean, shape=[-1, 1, 1, inputs.shape[-1]])
        variance = tf.reshape(variance, shape=[-1, 1, 1, inputs.shape[-1]])
        outputs = (inputs - mean) / tf.sqrt(variance + self.epsilon)
        return outputs * self.gamma + self.beta

    def get_config(self):
        config = {
            'epsilon': self.epsilon,
            'beta_initializer': keras.initializers.serialize(self.beta_initializer),
            'gamma_initializer': keras.initializers.serialize(self.gamma_initializer),
            'beta_regularizer': keras.regularizers.serialize(self.beta_regularizer),
            'gamma_regularizer': keras.regularizers.serialize(self.gamma_regularizer),
            'beta_constraint': keras.constraints.serialize(self.beta_constraint),
            'gamma_constraint': keras.constraints.serialize(self.gamma_constraint)
        }
        base_config = super(InstanceNormalization, self).get_config()

        return dict(list(base_config.items()) + list(config.items()))


class Conv_Relu_In(keras.layers.Layer):
    def __init__(self, filters, kernel_size, strides, padding, name):
        super(Conv_Relu_In, self).__init__()
        self._name = name
        self.block = keras.Sequential([keras.layers.Conv2D(filters, kernel_size, strides, padding),
                                       keras.layers.LeakyReLU(0.2)])
        if name.find('in') != -1:
            self.block.add(InstanceNormalization())

    def call(self, inputs, **kwargs):

        return self.block(inputs)


class Upsampling_Conv_Relu_In_Concatenate(keras.layers.Layer):
    def __init__(self, filters, kernel_size, strides, padding, name):
        super(Upsampling_Conv_Relu_In_Concatenate, self).__init__()
        self._name = name
        self.block = keras.Sequential([keras.layers.UpSampling2D((2, 2)),
                                       keras.layers.Conv2D(filters, kernel_size, strides, padding, activation='relu'),
                                       InstanceNormalization()])
        self.concatenate = keras.layers.Concatenate()

    def call(self, inputs, **kwargs):
        x, shortcut = inputs
        x = self.block(x)
        output = self.concatenate([x, shortcut])

        return output

#difine the generator
def generator(input_shape, name):
    input_tensor = keras.layers.Input(input_shape, name='input')
    x = input_tensor

    x1 = Conv_Relu_In(64, (4, 4), (2, 2), 'same', name='conv_leakyrelu1')(x)
    x2 = Conv_Relu_In(128, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in2')(x1)
    x3 = Conv_Relu_In(256, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in3')(x2)
    x4 = Conv_Relu_In(512, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in4')(x3)
    x5 = Conv_Relu_In(512, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in5')(x4)
    x6 = Conv_Relu_In(512, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in6')(x5)
    x7 = Conv_Relu_In(512, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in7')(x6)

    y6 = Upsampling_Conv_Relu_In_Concatenate(512, (4, 4), (1, 1), 'same', name='upsampling_conv_relu_in_concatenate1')([x7, x6])
    y5 = Upsampling_Conv_Relu_In_Concatenate(512, (4, 4), (1, 1), 'same', name='upsampling_conv_relu_in_concatenate2')([y6, x5])
    y4 = Upsampling_Conv_Relu_In_Concatenate(512, (4, 4), (1, 1), 'same', name='upsampling_conv_relu_in_concatenate3')([y5, x4])
    y3 = Upsampling_Conv_Relu_In_Concatenate(256, (4, 4), (1, 1), 'same', name='upsampling_conv_relu_in_concatenate4')([y4, x3])
    y2 = Upsampling_Conv_Relu_In_Concatenate(128, (4, 4), (1, 1), 'same', name='upsampling_conv_relu_in_concatenate5')([y3, x2])
    y1 = Upsampling_Conv_Relu_In_Concatenate(64, (4, 4), (1, 1), 'same', name='upsampling_conv_relu_in_concatenate6')([y2, x1])

    y = compose(keras.layers.UpSampling2D((2, 2), name='upsampling'),
                keras.layers.Conv2D(3, (4, 4), (1, 1), 'same', activation='tanh', name='conv_tanh'))(y1)

    model = keras.Model(input_tensor, y, name=name)

    return model


#define the discriminator
def discriminator(input_shape, name):
    input_tensor = keras.layers.Input(input_shape, name='input')
    x = input_tensor

    x = compose(Conv_Relu_In(64, (4, 4), (2, 2), 'same', name='conv_leakyrelu1'),
                Conv_Relu_In(128, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in2'),
                Conv_Relu_In(256, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in3'),
                Conv_Relu_In(512, (4, 4), (2, 2), 'same', name='conv_leakyrelu_in4'),
                keras.layers.Conv2D(1, (4, 4), (1, 1), 'same', name='conv'))(x)

    model = keras.Model(input_tensor, x, name=name)

    return model


#the main process
def discogan(input_shapeA, input_shapeB, model_gAB, model_gBA, model_dA, model_dB):
    input_tensorA = keras.layers.Input(input_shapeA, name='input_A')
    input_tensorB = keras.layers.Input(input_shapeB, name='input_B')

    # fake_A is generated by a given input real_B, fake_B is generated by a given input real_A
    fake_A = model_gBA(input_tensorB)
    fake_B = model_gAB(input_tensorA)

    # recon_A is generated by a given input fake_B, recon_B is generated by a given input fake_A
    recon_A = model_gBA(fake_B)
    recon_B = model_gAB(fake_A)

    model_dA.trainable = False
    model_dB.trainable = False

    conf_A = model_dA(fake_A)
    conf_B = model_dB(fake_B)

    model = keras.Model([input_tensorA, input_tensorB], [conf_A, conf_B, fake_A, fake_B, recon_A, recon_B], name='DiscoGAN')

    return model


#read data from files and reandomly selected a batch_size of inputs
def read_data(data_path, img_size, batch_size):
    filename = glob.glob(data_path + '*.jpg')
    choose_name = np.random.choice(filename, batch_size)

    image_A = []
    image = cv.imread(choose_name[0]).astype(np.float32)
    image_A.append(image)

    image_A = np.array(image_A) / 127.5 - 1

    return image_A




Found GPU at: /device:GPU:0


In [None]:
batch_size = 1
epochs = 3000
tf.random.set_seed(42)
img_size = (256, 256)
#input and output file path
data_X_path = '/content/drive/My Drive/photo2monet/trainA/'
data_Y_path = '/content/drive/My Drive/photo2monet/trainB/'
save_path = '/content/drive/My Drive/photo2monet/test_out/'
save_A_path = '/content/drive/My Drive/photo2monet/test_A_out/'
save_B_path = '/content/drive/My Drive/photo2monet/test_B_out/'
if not os.path.exists(save_path):
    os.makedirs(save_path)
if not os.path.exists(save_A_path):
    os.makedirs(save_A_path)
if not os.path.exists(save_B_path):
    os.makedirs(save_B_path)

optimizer = keras.optimizers.Adam(0.0002, 0.5)
loss = keras.losses.BinaryCrossentropy()
#define loss variables
real_dAmse = keras.metrics.MeanSquaredError()
fake_dAmse = keras.metrics.MeanSquaredError()
real_dBmse = keras.metrics.MeanSquaredError()
fake_dBmse = keras.metrics.MeanSquaredError()
gAmse = keras.metrics.MeanSquaredError()
gBmse = keras.metrics.MeanSquaredError()
gBmse_history = []
#define generators
model_dA = discriminator(input_shape=(img_size[0], img_size[1], 3), name='DiscoGAN-DiscriminatorA')
model_dA.compile(optimizer=optimizer, loss='mse')
model_dB = discriminator(input_shape=(img_size[0], img_size[1], 3), name='DiscoGAN-DiscriminatorB')
model_dB.compile(optimizer=optimizer, loss='mse')

model_gAB = generator(input_shape=(img_size[0], img_size[1], 3), name='DiscoGAN-GeneratorAB')
model_gBA = generator(input_shape=(img_size[0], img_size[1], 3), name='DiscoGAN-GeneratorBA')

model_gAB.build(input_shape=(img_size[0], img_size[1], 3))
model_gAB.summary()
keras.utils.plot_model(model_gAB, 'DiscoGAN-generatorAB.png', show_shapes=True, show_layer_names=True)

model_gBA.build(input_shape=(img_size[0], img_size[1], 3))
model_gBA.summary()
keras.utils.plot_model(model_gBA, 'DiscoGAN-generatorBA.png', show_shapes=True, show_layer_names=True)
#define discriminators
model_dA.build(input_shape=(img_size[0], img_size[1], 3))
model_dA.summary()
keras.utils.plot_model(model_dA, 'DiscoGAN-discriminatorA.png', show_shapes=True, show_layer_names=True)

model_dB.build(input_shape=(img_size[0], img_size[1], 3))
model_dB.summary()
keras.utils.plot_model(model_dB, 'DiscoGAN-discriminatorB.png', show_shapes=True, show_layer_names=True)

model = discogan(input_shapeA=(img_size[0], img_size[1], 3), input_shapeB=(img_size[0], img_size[1], 3), model_gAB=model_gAB, model_gBA=model_gBA, model_dA=model_dA, model_dB=model_dB)
model.compile(optimizer=optimizer, loss=['mse', 'mse', 'mae', 'mae', 'mae', 'mae'], loss_weights=[0.5, 0.5, 5, 5, 5, 5])

model.build(input_shape=[(img_size[0], img_size[1], 3), (img_size[0], img_size[1], 3)])
model.summary()
keras.utils.plot_model(model, 'DiscoGAN.png', show_shapes=True, show_layer_names=True)

#training data for epoch times
for epoch in range(epochs):
    image_A = read_data(data_X_path, img_size, batch_size)
    image_B = read_data(data_Y_path, img_size, batch_size)

    fake_A = model_gBA(image_B)
    fake_B = model_gAB(image_A)

    real_dAmse(np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), model_dA(image_A))
    fake_dAmse(np.zeros((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), model_dA(fake_A))
    real_dBmse(np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), model_dB(image_B))
    fake_dBmse(np.zeros((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), model_dB(fake_B))
    gAmse(np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), model([image_A, image_B])[0])
    gBmse(np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), model([image_A, image_B])[1])

    real_dAloss = model_dA.train_on_batch(image_A, np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)))
    fake_dAloss = model_dA.train_on_batch(fake_A, np.zeros((batch_size, img_size[0] // 16, img_size[1] // 16, 1)))
    real_dBloss = model_dB.train_on_batch(image_B, np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)))
    fake_dBloss = model_dB.train_on_batch(fake_B, np.zeros((batch_size, img_size[0] // 16, img_size[1] // 16, 1)))

    gloss = model.train_on_batch([image_A, image_B], [np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), np.ones((batch_size, img_size[0] // 16, img_size[1] // 16, 1)), image_A, image_B, image_A, image_B])
    gBmse_history.append(gBmse.result())
    if epoch % 20 == 0:
        print('epoch = {}, real_dAmse = {}, fake_dAmse = {}, real_dBmse = {}, fake_dBmse = {}, gAmse = {}, gBmse = {}'.format(epoch, real_dAmse.result(), fake_dAmse.result(), real_dBmse.result(), fake_dBmse.result(), gAmse.result(), gBmse.result()))
        real_dAmse.reset_states()
        fake_dAmse.reset_states()
        real_dBmse.reset_states()
        fake_dBmse.reset_states()
        gAmse.reset_states()
        gBmse.reset_states()
        image_A = read_data(data_X_path, img_size, batch_size)
        image_B = read_data(data_Y_path, img_size, batch_size)
        fake_A = ((model_gBA(image_B).numpy().squeeze() + 1) * 127.5).astype(np.uint8)
        fake_B = ((model_gAB(image_A).numpy().squeeze() + 1) * 127.5).astype(np.uint8)
        image_A = ((image_A.squeeze() + 1) * 127.5).astype(np.uint8)
        image_B = ((image_B.squeeze() + 1) * 127.5).astype(np.uint8)
        cv.imwrite(save_path + '\\epoch{}.jpg'.format(epoch), np.concatenate([np.concatenate([image_B, fake_A], axis=1), np.concatenate([image_A, fake_B], axis=1)], axis=0))

Model: "DiscoGAN-GeneratorAB"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input (InputLayer)              [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
conv_leakyrelu1 (Conv_Relu_In)  (None, 128, 128, 64) 3136        input[0][0]                      
__________________________________________________________________________________________________
conv_leakyrelu_in2 (Conv_Relu_I (None, 64, 64, 128)  131456      conv_leakyrelu1[0][0]            
__________________________________________________________________________________________________
conv_leakyrelu_in3 (Conv_Relu_I (None, 32, 32, 256)  525056      conv_leakyrelu_in2[0][0]         
_______________________________________________________________________________

In [None]:
#generate translated pictures with trained models, and save them into folders
for i in range(100):
  image_A = read_data(data_X_path, img_size, batch_size)
  image_B = read_data(data_Y_path, img_size, batch_size)
  fake_A = ((model_gBA(image_B).numpy().squeeze() + 1) * 127.5).astype(np.uint8)
  fake_B = ((model_gAB(image_A).numpy().squeeze() + 1) * 127.5).astype(np.uint8)
  cv.imwrite(save_A_path + '\\outA{}.jpg'.format(i), fake_A)
  cv.imwrite(save_B_path + '\\outB{}.jpg'.format(i), fake_B)