# Drive and GPU Connection

Run to connect with google drive to access data

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


Verify GPU

In [0]:
# import tensorflow as tf 
# tf.test.gpu_device_name()

from tensorflow.python.client import device_lib
device_lib.list_local_devices()

In [0]:
!nvcc --version

# Data preprocesing and visualization

In [0]:
%tensorflow_version 2.x
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras import Sequential, Input, Model
from tensorflow.keras.layers import Dense, Activation, Flatten, Reshape, Lambda
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, UpSampling2D, MaxPooling2D
from tensorflow.keras.layers import LeakyReLU, Dropout
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.optimizers import Adam, RMSprop
from keras import backend as K

def load_data(filename):
    with np.load(filename) as f:
        x_train, x_test = f['arr_0'], f['arr_1']
        y_train, y_test = f['arr_2'], f['arr_3']
        return (x_train, y_train), (x_test, y_test)

def preprocess_data(data, feature_range=(-1.0, 1.0)):
    (x_train, y_train), (x_test, y_test) = data
    min, max = feature_range

    x_train = x_train.reshape(-1, 160, 160, 3).astype(np.float32)
    x_train = x_train/255.0
    x_train = x_train * (max - min) + min

    x_test = x_test.reshape(-1, 160, 160, 3).astype(np.float32)
    x_test = x_test/255.0
    x_test = x_test * (max - min) + min

    y_train = y_train.reshape(-1, 160, 160, 3).astype(np.float32)
    y_train = y_train/255.0
    y_train = y_train * (max - min) + min

    y_test = y_test.reshape(-1, 160, 160, 3).astype(np.float32)
    y_test = y_test/255.0
    y_test = y_test * (max - min) + min

    print("Values after preprocessing, min: " + str(x_train.min()) + ", max: " + str(x_train.max()))
    return (x_train, y_train), (x_test, y_test)

def add_mirrored_data(data):
    (x_train, y_train), (x_test, y_test) = data

    x_train_mirrored = np.flip(x_train, 2)
    y_train_mirrored = np.flip(y_train, 2)
    x_test_mirrored = np.flip(x_test, 2)
    y_test_mirrored = np.flip(y_test, 2)

    x_train = np.concatenate((x_train, x_train_mirrored))
    y_train = np.concatenate((y_train, y_train_mirrored))
    x_test = np.concatenate((x_test, x_test_mirrored))
    y_test = np.concatenate((y_test, y_test_mirrored))

    return (x_train, y_train), (x_test, y_test)

def get_random_batch(data, batch_size):
    idx = np.random.choice(data.shape[0], batch_size)
    batch = data[idx].reshape((batch_size, 160, 160, 3))
    return batch

def show_images(images, feature_range=(-1.0, 1.0)):
    min, max = feature_range
    plt.figure(figsize=(10,10))
    
    for im in range(images.shape[0]):
      plt.subplot(4, 4, im+1)
      image = images[im, :, :, :]
      image = np.reshape(image, [160,160,3])
      image = (image - min) / (max - min)
      plt.imshow(image)
      plt.axis('off')
    plt.tight_layout()
    plt.show()

def visualize(sample_X, sample_Y, encoder, decoder, feature_range=(-1.0, 1.0)):
    min, max = feature_range
    count = sample_X.shape[0]

    reconstructed_X = []
    reconstructed_Y = []
    accumulated = []

    for n in range(count):
        code = encoder.predict(sample_X[n][None])[0]
        reco = decoder.predict(code[None])[0]

        reconstructed_X.append(reco)

        code = encoder.predict(sample_Y[n][None])[0]
        reco = decoder.predict(code[None])[0]

        reconstructed_Y.append(reco)

    for i in range(4):
        accumulated.append(sample_X[i])
        accumulated.append(reconstructed_X[i])
        accumulated.append(sample_Y[i])
        accumulated.append(reconstructed_Y[i])

    count = len(accumulated)

    plt.figure(figsize=(8,8))

    for i in range(count):
        plt.subplot(4, 4, i + 1)
        image = (accumulated[i] - min) / (max - min)
        plt.imshow(image)
        plt.axis('off')
    plt.tight_layout()
    plt.show()

Using TensorFlow backend.


# VAEGAN models

In [0]:
def sampling(args):
    mean_mu, log_var = args
    epsilon = tf.random.normal(shape=K.shape(mean_mu), mean=0., stddev=1.) 
    return mean_mu + K.exp(log_var/2)*epsilon

def build_encoder(img_shape, code_size):
    encoder_input = Input(shape=img_shape, name='encoder_input')

    x = encoder_input

    x = Conv2D(filters=64, kernel_size=(5, 5), strides=(2, 2), padding='same', name='encoder_conv_1')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2D(filters=128, kernel_size=(5, 5), strides=(2, 2), padding='same', name='encoder_conv_2')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2D(filters=256, kernel_size=(5, 5), strides=(2, 2), padding='same', name='encoder_conv_3')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Flatten()(x)

    mean_mu = Dense(code_size, name='mean_mu')(x)
    log_var = Dense(code_size, name='log_var')(x)
    encoder_output = Lambda(sampling, name='encoder_output')([mean_mu, log_var])

    return mean_mu, log_var, Model(encoder_input, encoder_output)

def build_generator(img_shape, code_size):
    size1 = int(img_shape[0]/8) # 8 is 2^number_of_UpSampling2D_layers in generator
    size2 = int(img_shape[1]/8)

    generator_input = Input(shape=(int(code_size,)), name='generator_input')
    x = generator_input

    x = Dense(units=(size1*size2*256))(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)
    x = Reshape(target_shape=(size1, size2, 256))(x)

    x = Conv2DTranspose(filters=256, kernel_size=(5, 5), strides=(1, 1), padding='same', name='generator_conv_1')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2DTranspose(filters=128, kernel_size=(5, 5), strides=(2, 2), padding='same', name='generator_conv_2')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2DTranspose(filters=32, kernel_size=(5, 5), strides=(2, 2), padding='same', name='generator_conv_3')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2DTranspose(filters=3, kernel_size=(5, 5), strides=(2, 2), padding='same', name='generator_conv_4')(x)
    x = Activation('tanh')(x)

    generator_output = x

    model = Model(generator_input, generator_output)

    return model

def build_discriminator(img_shape):

    discriminator_input = Input(shape=img_shape, name='discriminator_input')
    x = discriminator_input

    x = Conv2D(filters=32, kernel_size=(5, 5), strides=(1, 1), padding='same', name='discriminator_conv_0')(x)
    x = LeakyReLU()(x)

    x = Conv2D(filters=128, kernel_size=(5, 5), strides=(2, 2), padding='same', name='discriminator_conv_1')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2D(filters=256, kernel_size=(5, 5), strides=(2, 2), padding='same', name='discriminator_conv_2')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2D(filters=256, kernel_size=(5, 5), strides=(2, 2), padding='same', name='discriminator_conv_3')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Flatten()(x)
    x = Dense(1, activation='sigmoid')(x)

    discriminator_output = x

    model = Model(discriminator_input, discriminator_output)

    return model


# Training XY

In [0]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

def encoder_loss(original, faked):
    tf.debugging.check_numerics(faked, "Nan or Inf in input occurred", name=None)
    return mean_squared_error(original, faked)

def kl_loss(mean_mu, log_var):
    kl_loss =  -0.5 * tf.reduce_sum(1 + log_var - tf.square(mean_mu) - tf.exp(log_var), axis = 1)
    return kl_loss

cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
mean_squared_error = tf.keras.losses.MeanSquaredError()

# Load and prepare data
npz_path = './drive/My Drive/ColabStuff/DiCaprioToDowneyJr_Big_VAE.npz'
(x_train, y_train), (x_test, y_test) = load_data(npz_path)

value_range = (-1.0, 1.0)
print("Imported data shape: " + str(x_train.shape))

(x_train, y_train), (x_test, y_test) = preprocess_data(((x_train, y_train), (x_test, y_test)), value_range)

# imported_data_sample = get_random_batch(x_train, 16)
# show_images(imported_data_sample, value_range)

(x_train, y_train), (x_test, y_test) = add_mirrored_data(((x_train, y_train), (x_test, y_test)))

# imported_data_sample = get_random_batch(x_train, 16)
# show_images(imported_data_sample, value_range)

save_path = './drive/My Drive/ColabStuff/VAEGAN/'

train_x = np.concatenate((x_train, x_test))
train_y = np.concatenate((y_train, y_test))
train_z = np.concatenate((train_x, train_y))

batch_size = 32
code_size = 200
epochs_number_1 =20000
epochs_number_2 =500
LEARNING_RATE = 0.0005
IMG_SHAPE = train_x.shape[1:]

enc_loss_plot = []
gen_loss_plot = []
disc_loss_plot = []
plot_iteration = []

mean, logvar, encoder = build_encoder(IMG_SHAPE, code_size)
discriminator = build_discriminator(IMG_SHAPE)
generator = build_generator(IMG_SHAPE, code_size)

encoder_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
generator_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
discriminator_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)

for i in range(1, epochs_number_1 + 1):
  images_train = get_random_batch(train_z, batch_size)

  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape, tf.GradientTape() as enc_tape:
      latent = encoder(images_train)
      generated_images = generator(latent, training=True)

      real_output = discriminator(images_train, training=True)
      fake_output = discriminator(generated_images, training=True)

      enc_loss = encoder_loss(images_train, generated_images) * 100
      gen_loss = generator_loss(fake_output) + enc_loss
      disc_loss = discriminator_loss(real_output, fake_output)
  
  gradients_of_encoder = enc_tape.gradient(enc_loss, encoder.trainable_variables)
  gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
  gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

  encoder_optimizer.apply_gradients(zip(gradients_of_encoder, encoder.trainable_variables))
  generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
  discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

  enc_loss_plot.append(enc_loss)
  gen_loss_plot.append(gen_loss)
  disc_loss_plot.append(disc_loss)
  plot_iteration.append(i)
  
  if (i != 1) and (i % 10 == 0):
    print('setp: ' + str(i) + ", gen_loss: " + str(gen_loss.numpy()) + ", disc_loss: " + str(disc_loss.numpy()) + ", enc_loss: " + str(enc_loss.numpy()))

  if (i != 1)and (i % 100 == 0):

      (fig, (ax1)) = plt.subplots(1)
      fig.set_size_inches(8, 8)

      ax1.plot(plot_iteration, enc_loss_plot, label="enccoder loss")
      ax1.legend()

      ax1.plot(plot_iteration, gen_loss_plot, "C1", label="generator loss")
      ax1.legend()

      ax1.plot(plot_iteration, disc_loss_plot, "C2", label="discriminator loss")
      ax1.legend()

      plt.show()
  
  if (i != 1) and (i % 100 == 0):
    test_sample_X = get_random_batch(train_x, 4)
    test_sample_Y = get_random_batch(train_y, 4)

    visualize(test_sample_X, test_sample_Y, encoder, generator, value_range)

  if (i != 1) and (i % 100 == 0):
    encoder.save(save_path + "encoder_" + str(i) + ".h5")
    generator.save(save_path + "generator_XY_" + str(i) + ".h5")
    discriminator.save(save_path + "discriminator_XY_" + str(i) + ".h5")

print("Program finished successfully!")

Output hidden; open in https://colab.research.google.com to view.

#Training X

In [0]:
# Load model
encoder_path = './drive/My Drive/ColabStuff/VAEGAN/encoder_200.h5'
loaded_enocder = tf.keras.models.load_model(encoder_path, compile=False)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

def encoder_loss(original, faked):
    tf.debugging.check_numerics(faked, "Nan or Inf in input occurred", name=None)
    return mean_squared_error(original, faked)

def kl_loss(mean_mu, log_var):
    kl_loss =  -0.5 * tf.reduce_sum(1 + log_var - tf.square(mean_mu) - tf.exp(log_var), axis = 1)
    return kl_loss

cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
mean_squared_error = tf.keras.losses.MeanSquaredError()

# Load and prepare data
npz_path = './drive/My Drive/ColabStuff/DiCaprioToDowneyJr_Big_VAE.npz'
(x_train, y_train), (x_test, y_test) = load_data(npz_path)

value_range = (-1.0, 1.0)
print("Imported data shape: " + str(x_train.shape))

(x_train, y_train), (x_test, y_test) = preprocess_data(((x_train, y_train), (x_test, y_test)), value_range)

# imported_data_sample = get_random_batch(x_train, 16)
# show_images(imported_data_sample, value_range)

(x_train, y_train), (x_test, y_test) = add_mirrored_data(((x_train, y_train), (x_test, y_test)))

# imported_data_sample = get_random_batch(x_train, 16)
# show_images(imported_data_sample, value_range)

save_path = './drive/My Drive/ColabStuff/VAEGAN/'

train_x = np.concatenate((x_train, x_test))
train_y = np.concatenate((y_train, y_test))
train_z = np.concatenate((train_x, train_y))

batch_size = 32
code_size = 200
epochs_number_1 =20000
epochs_number_2 =500
LEARNING_RATE = 0.0005
IMG_SHAPE = train_x.shape[1:]

gen_loss_plot = []
disc_loss_plot = []
plot_iteration = []

discriminator = build_discriminator(IMG_SHAPE)
generator = build_generator(IMG_SHAPE, code_size)

generator_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
discriminator_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)

for i in range(1, epochs_number_1 + 1):
  images_train = get_random_batch(train_x, batch_size)
  latent = loaded_enocder.predict(images_train)

  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_images = generator(latent, training=True)

      real_output = discriminator(images_train, training=True)
      fake_output = discriminator(generated_images, training=True)

      enc_loss = encoder_loss(images_train, generated_images) * 100
      gen_loss = generator_loss(fake_output) + enc_loss
      disc_loss = discriminator_loss(real_output, fake_output)
  
  gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
  gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

  generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
  discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

  gen_loss_plot.append(gen_loss)
  disc_loss_plot.append(disc_loss)
  plot_iteration.append(i)
  
  if (i != 1) and (i % 10 == 0):
    print('setp: ' + str(i) + ", gen_loss: " + str(gen_loss.numpy()) + ", disc_loss: " + str(disc_loss.numpy()))

  if (i != 1)and (i % 100 == 0):

      (fig, (ax1)) = plt.subplots(1)
      fig.set_size_inches(8, 8)

      ax1.plot(plot_iteration, gen_loss_plot, "C1", label="generator loss")
      ax1.legend()

      ax1.plot(plot_iteration, disc_loss_plot, "C2", label="discriminator loss")
      ax1.legend()

      plt.show()
  
  if (i != 1) and (i % 100 == 0):
    test_sample_X = get_random_batch(train_x, 4)
    test_sample_Y = get_random_batch(train_y, 4)

    visualize(test_sample_X, test_sample_Y, loaded_enocder, generator, value_range)

  if (i != 1) and (i % 100 == 0):
    generator.save(save_path + "generator_X_" + str(i) + ".h5")
    discriminator.save(save_path + "discriminator_X_" + str(i) + ".h5")

print("Program finished successfully!")

Output hidden; open in https://colab.research.google.com to view.

# Training Y

In [0]:
# Load model
encoder_path = './drive/My Drive/ColabStuff/VAEGAN/encoder_200.h5'
loaded_enocder = tf.keras.models.load_model(encoder_path, compile=False)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

def encoder_loss(original, faked):
    tf.debugging.check_numerics(faked, "Nan or Inf in input occurred", name=None)
    return mean_squared_error(original, faked)

def kl_loss(mean_mu, log_var):
    kl_loss =  -0.5 * tf.reduce_sum(1 + log_var - tf.square(mean_mu) - tf.exp(log_var), axis = 1)
    return kl_loss

cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
mean_squared_error = tf.keras.losses.MeanSquaredError()

# Load and prepare data
npz_path = './drive/My Drive/ColabStuff/DiCaprioToDowneyJr_Big_VAE.npz'
(x_train, y_train), (x_test, y_test) = load_data(npz_path)

value_range = (-1.0, 1.0)
print("Imported data shape: " + str(x_train.shape))

(x_train, y_train), (x_test, y_test) = preprocess_data(((x_train, y_train), (x_test, y_test)), value_range)

# imported_data_sample = get_random_batch(x_train, 16)
# show_images(imported_data_sample, value_range)

(x_train, y_train), (x_test, y_test) = add_mirrored_data(((x_train, y_train), (x_test, y_test)))

# imported_data_sample = get_random_batch(x_train, 16)
# show_images(imported_data_sample, value_range)

save_path = './drive/My Drive/ColabStuff/VAEGAN/'

train_x = np.concatenate((x_train, x_test))
train_y = np.concatenate((y_train, y_test))
train_z = np.concatenate((train_x, train_y))

batch_size = 32
code_size = 200
epochs_number_1 =20000
epochs_number_2 =500
LEARNING_RATE = 0.0005
IMG_SHAPE = train_x.shape[1:]

gen_loss_plot = []
disc_loss_plot = []
plot_iteration = []

discriminator = build_discriminator(IMG_SHAPE)
generator = build_generator(IMG_SHAPE, code_size)

generator_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
discriminator_optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)

for i in range(1, epochs_number_1 + 1):
  images_train = get_random_batch(train_y, batch_size)
  latent = loaded_enocder.predict(images_train)

  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_images = generator(latent, training=True)

      real_output = discriminator(images_train, training=True)
      fake_output = discriminator(generated_images, training=True)

      enc_loss = encoder_loss(images_train, generated_images) * 100
      gen_loss = generator_loss(fake_output) + enc_loss
      disc_loss = discriminator_loss(real_output, fake_output)
  
  gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
  gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

  generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
  discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

  gen_loss_plot.append(gen_loss)
  disc_loss_plot.append(disc_loss)
  plot_iteration.append(i)
  
  if (i != 1) and (i % 10 == 0):
    print('setp: ' + str(i) + ", gen_loss: " + str(gen_loss.numpy()) + ", disc_loss: " + str(disc_loss.numpy()))

  if (i != 1)and (i % 100 == 0):

      (fig, (ax1)) = plt.subplots(1)
      fig.set_size_inches(8, 8)

      ax1.plot(plot_iteration, gen_loss_plot, "C1", label="generator loss")
      ax1.legend()

      ax1.plot(plot_iteration, disc_loss_plot, "C2", label="discriminator loss")
      ax1.legend()

      plt.show()
  
  if (i != 1) and (i % 100 == 0):
    test_sample_X = get_random_batch(train_x, 4)
    test_sample_Y = get_random_batch(train_y, 4)

    visualize(test_sample_X, test_sample_Y, loaded_enocder, generator, value_range)

  if (i != 1) and (i % 100 == 0):
    generator.save(save_path + "generator_Y_" + str(i) + ".h5")
    discriminator.save(save_path + "discriminator_Y_" + str(i) + ".h5")

print("Program finished successfully!")

Output hidden; open in https://colab.research.google.com to view.