<a href="https://colab.research.google.com/github/yonz2/geocoding/blob/master/MangaGAN2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip list | grep tensorflow
!pip install ffmpeg-python

In [None]:
print("Start!")

# Make sure the GDrive where the Images are stored is mounted....
import os.path
import shutil
from google.colab import drive

if not os.path.exists('/content/gdrive/MyDrive'):
   drive.mount('/content/gdrive')
   
print("Done!")

In [None]:
print("Start!")

# Import various support modules
import os
from sklearn.utils import shuffle
import numpy as np
import time
import cv2
import tqdm
import glob
import scipy
import imageio
import matplotlib.pyplot as plt
import ffmpeg
from datetime import datetime
from PIL import Image
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
from collections import deque
from datetime import datetime

# Import required tensforflow and keras modules
import tensorflow as tf
import keras.backend as K

from keras.initializers import RandomNormal
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers.core import Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.convolutional import UpSampling2D
from keras.layers.core import Flatten, Dropout
from keras.layers import Input, merge
from keras.layers.pooling import MaxPooling2D
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.models import Model
from keras.optimizers import SGD, Adam, RMSprop
from keras.layers.advanced_activations import LeakyReLU
from keras.models import load_model, save_model

print("Done!")

In [None]:
print("Start!")

from tensorflow.keras.layers import InputSpec, Layer

#https://github.com/WeidiXie/New_Layers-Keras-Tensorflow/blob/master/src/subpix_upsampling.py
#OR 
#BELOW  (https://github.com/farizrahman4u/keras-contrib/blob/a7520c8f520bb5643ff9a62d1b3532fe72ab7283/keras_contrib/layers/convolutional.py)
class SubPixelUpscaling(Layer):

    def __init__(self, scale_factor=2, data_format=None, **kwargs):
        super(SubPixelUpscaling, self).__init__(**kwargs)

        self.scale_factor = scale_factor
        self.data_format = normalize_data_format(data_format)

    def build(self, input_shape):
        pass

    def call(self, x, mask=None):
        y = K.depth_to_space(x, self.scale_factor, self.data_format)
        return y

    def compute_output_shape(self, input_shape):
        if self.data_format == 'channels_first':
            b, k, r, c = input_shape
            return (b, k // (self.scale_factor ** 2), r * self.scale_factor, c * self.scale_factor)
        else:
            b, r, c, k = input_shape
            return (b, r * self.scale_factor, c * self.scale_factor, k // (self.scale_factor ** 2))

    def get_config(self):
        config = {'scale_factor': self.scale_factor,
                  'data_format': self.data_format}
        base_config = super(SubPixelUpscaling, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

print("Done!")

In [None]:
print("Start!")

def normalize_data_format(value):
    if value is None:
        value = K.image_data_format()
    data_format = value.lower()
    if data_format not in {'channels_first', 'channels_last'}:
        raise ValueError('The `data_format` argument must be one of '
                         '"channels_first", "channels_last". Received: ' +
                         str(value))
    return data_format
    
print("Done!") 

In [None]:
print("Start!")

def get_gen_normal(noise_shape):
  noise_shape = noise_shape

  kernel_init = 'glorot_uniform'

  gen_input = Input(shape = noise_shape)
  generator = Conv2DTranspose(filters = 512, kernel_size = (4,4), strides = (1,1), padding = "valid", data_format = "channels_last", kernel_initializer = kernel_init)(gen_input)
  generator = BatchNormalization(momentum = 0.5)(generator)
  generator = LeakyReLU(0.2)(generator)
      
  generator = Conv2DTranspose(filters = 256, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
  generator = BatchNormalization(momentum = 0.5)(generator)
  generator = LeakyReLU(0.2)(generator)

  generator = Conv2DTranspose(filters = 128, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
  generator = BatchNormalization(momentum = 0.5)(generator)
  generator = LeakyReLU(0.2)(generator)

  generator = Conv2DTranspose(filters = 64, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
  generator = BatchNormalization(momentum = 0.5)(generator)
  generator = LeakyReLU(0.2)(generator)

  generator = Conv2D(filters = 64, kernel_size = (3,3), strides = (1,1), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
  generator = BatchNormalization(momentum = 0.5)(generator)
  generator = LeakyReLU(0.2)(generator)

  generator = Conv2DTranspose(filters = 3, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(generator)
  generator = Activation('tanh')(generator)
      
  gen_opt = Adam(learning_rate=0.00015, beta_1=0.5)
  # Note: Change "input" to "inputs" and "output" to "outputs"
  generator_model = Model(inputs = gen_input, outputs = generator)
  generator_model.compile(loss='binary_crossentropy', optimizer=gen_opt, metrics=['accuracy'])
  generator_model.summary()

  return generator_model

    
print("Done!")

In [None]:
print("Start!")

def get_disc_normal(image_shape=(64,64,3)):
  image_shape = image_shape

  dropout_prob = 0.4
  kernel_init = 'glorot_uniform'
  dis_input = Input(shape = image_shape)
  discriminator = Conv2D(filters = 64, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(dis_input)
  discriminator = LeakyReLU(0.2)(discriminator)
  discriminator = Conv2D(filters = 128, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(discriminator)
  discriminator = BatchNormalization(momentum = 0.5)(discriminator)
  discriminator = LeakyReLU(0.2)(discriminator)

  discriminator = Conv2D(filters = 256, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(discriminator)
  discriminator = BatchNormalization(momentum = 0.5)(discriminator)
  discriminator = LeakyReLU(0.2)(discriminator)

  discriminator = Conv2D(filters = 512, kernel_size = (4,4), strides = (2,2), padding = "same", data_format = "channels_last", kernel_initializer = kernel_init)(discriminator)
  discriminator = BatchNormalization(momentum = 0.5)(discriminator)
  discriminator = LeakyReLU(0.2)(discriminator)

  discriminator = Flatten()(discriminator)

  discriminator = Dense(1)(discriminator)
  discriminator = Activation('sigmoid')(discriminator)
  #also try the SGD optimiser, might work better for a few learning rates.
  dis_opt = Adam(learning_rate=0.0002, beta_1=0.5)
  # Note: Change "input" to "inputs" and "output" to "outputs"
  discriminator_model = Model(inputs = dis_input, outputs = discriminator)
  discriminator_model.compile(loss='binary_crossentropy', optimizer=dis_opt, metrics=['accuracy'])
  discriminator_model.summary()
  return discriminator_model
print("Done!")


In [None]:
print("Start!")

def norm_img(img):
    img = (img / 127.5) - 1
    #image normalisation to keep values between -1 and 1 for stability
    return img

def denorm_img(img):
    #for output
    img = (img + 1) * 127.5
    return img.astype(np.uint8) 


def sample_from_dataset(batch_size, image_shape, data_dir=None, data = None):
    sample_dim = (batch_size,) + image_shape
    sample = np.empty(sample_dim, dtype=np.float32)
    all_data_dirlist = list(glob.glob(data_dir))
    sample_imgs_paths = np.random.choice(all_data_dirlist,batch_size)
    for index,img_filename in enumerate(sample_imgs_paths):
        image = Image.open(img_filename)
        image = image.resize(image_shape[:-1])
        image = image.convert('RGB')
        image = np.asarray(image)
        image = norm_img(image)
        sample[index,...] = image
    return sample

print("Done!")

In [None]:
print("Start!")

def gen_noise(batch_size, noise_shape):
    #input noise for the generator should follow a probability distribution, like in this case, the normal distributon.
    return np.random.normal(0, 1, size=(batch_size,)+noise_shape)

def generate_images(generator, save_dir):
    
    noise = gen_noise(batch_size,noise_shape)
    fake_data_X = generator.predict(noise)
    print("Displaying generated images")
    plt.figure(figsize=(4,4))
    gs1 = gridspec.GridSpec(4, 4)
    gs1.update(wspace=0, hspace=0)
    rand_indices = np.random.choice(fake_data_X.shape[0],16,replace=False)
    for i in range(16):
        #plt.subplot(4, 4, i+1)
        ax1 = plt.subplot(gs1[i])
        ax1.set_aspect('equal')
        rand_index = rand_indices[i]
        image = fake_data_X[rand_index, :,:,:]
        fig = plt.imshow(denorm_img(image))
        plt.axis('off')
        fig.axes.get_xaxis().set_visible(False)
        fig.axes.get_yaxis().set_visible(False)
    plt.tight_layout()
    plt.savefig(save_dir+str(time.time())+"_GENimage.png",bbox_inches='tight',pad_inches=0)
    # plt.show()
    plt.close()

print("Done!")

In [None]:
print("Start!")
# Define some helper methode to handle files from GDrive
def get_from_gdrive_unzip(gdrive_zip_file_path): 
  # copy a ZIP File from Gdrive and unzipp it

  dest_dir_path = '/content/datadir'
  # Clean up before re-importing the data
  if  os.path.exists(dest_dir_path):
    !rm -R -f {dest_dir_path}
  !mkdir {dest_dir_path}

  data_file_dest_name = '/content/' + os.path.basename(gdrive_zip_file_path)
  if os.path.exists(data_file_dest_name):
    !rm -f {data_file_dest_name}
  !cp {gdrive_zip_file_path} {data_file_dest_name}

  !unzip -qq {data_file_dest_name} -d {dest_dir_path}

  return dest_dir_path
#

def send_to_gdrive(source_path, gdrive_path, zipit=True):
  # Zip the generated output files and copy back to GDrive

  if zipit:
    # First ZIP the Output Images
    output_base = os.path.basename(source_path)
    temp_zip_path = '/content/' + output_base + '.zip'
    if  os.path.exists(temp_zip_path):
      !rm -f {temp_zip_path}
    !zip -q {temp_zip_path} {source_path} 
    !cp {temp_zip_path} {gdrive_path}
  else:
    # Copy the Models and Movie files as well
    !cp -r {source_path}  {gdrive_path}
  
  return gdrive_path


def gen_movie(image_dir, output_dir):
  # Create a video from the generated output files  
  date_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
  video_stream = ffmpeg.input(image_dir +'*.png', pattern_type='glob', framerate=25)
  video_stream = ffmpeg.output(video_stream, output_dir + 'output_' + date_time + '.mp4')
  ffmpeg.run(video_stream)

print("Done!")

In [None]:
print("Start!")

def save_img_batch(img_batch,img_save_dir):
    plt.figure(figsize=(4,4))
    gs1 = gridspec.GridSpec(4, 4)
    gs1.update(wspace=0, hspace=0)
    rand_indices = np.random.choice(img_batch.shape[0],16,replace=False)
    for i in range(16):
        #plt.subplot(4, 4, i+1)
        ax1 = plt.subplot(gs1[i])
        ax1.set_aspect('equal')
        rand_index = rand_indices[i]
        image = img_batch[rand_index, :,:,:]
        fig = plt.imshow(denorm_img(image))
        plt.axis('off')
        fig.axes.get_xaxis().set_visible(False)
        fig.axes.get_yaxis().set_visible(False)
    plt.tight_layout()
    plt.savefig(img_save_dir,bbox_inches='tight',pad_inches=0)
    #plt.show()
    plt.close()   

print("Done!")

In [None]:
print("Start Setting up some basic parameters")

os.environ["KERAS_BACKEND"] = "tensorflow"
K.set_image_data_format('channels_first')

noise_shape = (1,1,100)
num_steps = 10000
batch_size =  64
image_shape = None
save_model = True
image_shape = (64,64,3)

print("Fetch the training data from GDrive")

get_from_gdrive_unzip('/content/gdrive/MyDrive/Manga-GAN-master.zip') 

data_dir =  "/content/datadir/Manga-GAN-master/data/*.png"
img_save_dir = "/content/datadir/Manga-GAN/output_images/"
log_dir = "/content/datadir/Manga-GAN/logs/"
save_model_dir = "/content/datadir/Manga-GAN/models/"
movie_dir = "/content/datadir/Manga-GAN/movies/"

gdrive_results = '/content/gdrive/MyDrive/MangaGAN_Results'

# Make sure the directories exists:
!mkdir -p {img_save_dir}
!mkdir -p {log_dir}
!mkdir -p {save_model_dir}
!mkdir -p {movie_dir}

# Check for a GPU. If found set the device for Tensorflow to use it
%tensorflow_version 2.x
print(tf.__version__)
print(tf.config.list_physical_devices())
print(" ")

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  device_name = '/cpu:0'

print('Tensorflow will use the ' + device_name + ' device for the heavy work')

print("Done!")

In [None]:
print("Start the real work...")

np.random.seed(7919) # Was 1337

with tf.device(device_name):
  discriminator = get_disc_normal(image_shape)
  generator = get_gen_normal(noise_shape)

print("Done!")

In [None]:
print("Start...")
with tf.device(device_name):
  discriminator.trainable = False

  opt = Adam(learning_rate=0.00015, beta_1=0.5) 
  gen_inp = Input(shape=noise_shape)
  GAN_inp = generator(gen_inp)
  GAN_opt = discriminator(GAN_inp)
  gan = Model(inputs = gen_inp, outputs = GAN_opt)
  gan.compile(loss = 'binary_crossentropy', optimizer = opt, metrics=['accuracy'])
  gan.summary()

print("Done!")

In [None]:
print("Start...")

with tf.device(device_name):
  avg_disc_fake_loss = deque([0], maxlen=250)     
  avg_disc_real_loss = deque([0], maxlen=250)
  avg_GAN_loss = deque([0], maxlen=250)

print("Done!")

In [None]:
print("Start....")

begin_time = time.time()
step_begin_time = time.time()
with tf.device(device_name):
  with open(log_dir+"training_log.txt", "w") as text_file:
    for step in range(num_steps): 
        
        real_data_X = sample_from_dataset(batch_size, image_shape, data_dir = data_dir)

        noise = gen_noise(batch_size,noise_shape)
        
        fake_data_X = generator.predict(noise)
        
        if (step % 10) == 0:
            step_num = str(step).zfill(5)
            save_img_batch(fake_data_X,img_save_dir+step_num+"_image.png")

        #concatenate real and fake data samples    
        data_X = np.concatenate([real_data_X,fake_data_X])
        #add noise to the label inputs
        real_data_Y = np.ones(batch_size) - np.random.random_sample(batch_size)*0.2
        
        
        fake_data_Y = np.random.random_sample(batch_size)*0.2
        
        data_Y = np.concatenate((real_data_Y,fake_data_Y))
            
        discriminator.trainable = True
        generator.trainable = False
        #training the discriminator on real and fake data can be done together, i.e., 
        #on the data_x and data_y, OR it can be done 
        #one by one as performed below. This is the safer choice and gives better results 
        #as compared to combining the real and generated samples.
        dis_metrics_real = discriminator.train_on_batch(real_data_X,real_data_Y)  
        dis_metrics_fake = discriminator.train_on_batch(fake_data_X,fake_data_Y)   
        
        avg_disc_fake_loss.append(dis_metrics_fake[0])
        avg_disc_real_loss.append(dis_metrics_real[0])
        
        generator.trainable = True

        GAN_X = gen_noise(batch_size,noise_shape)

        GAN_Y = real_data_Y
        
        discriminator.trainable = False
        
        gan_metrics = gan.train_on_batch(GAN_X,GAN_Y)
        
        text_file.write("Step: %d Disc: real loss: %f fake loss: %f GAN loss: %f\n" % (step, dis_metrics_real[0], dis_metrics_fake[0],gan_metrics[0]))

        if (step > 0) and (step % 10) == 0:
          end_time = time.time()
          diff_time = int(end_time - begin_time) / 60
          step_diff_time = int(end_time - step_begin_time) 
          print("Steps %d completed. Elapsed time: %s seconds. Total elapsed time: %s minutes." % (step, step_diff_time, diff_time))
          step_begin_time = time.time()

        if (step > 0) and (step % 1000) == 0:
            print("-----------------------------------------------------------------")
            print("Average Disc_fake loss: %f" % (np.mean(avg_disc_fake_loss)))    
            print("Average Disc_real loss: %f" % (np.mean(avg_disc_real_loss)))    
            print("Average GAN loss: %f" % (np.mean(avg_GAN_loss)))
            print("-----------------------------------------------------------------")
            discriminator.trainable = True
            generator.trainable = True
            step_num = str(step).zfill(5)
            generator.save(save_model_dir+step_num+"_GENERATOR_weights_and_arch.hdf5")
            discriminator.save(save_model_dir+step_num+"_DISCRIMINATOR_weights_and_arch.hdf5")
            gen_movie(img_save_dir,movie_dir)
            gpu_info = !nvidia-smi
            gpu_info = '\n'.join(gpu_info)
            print(gpu_info)
        # End For
      # End With (Open file)
    # End With (tf.device)
# Save the last image
step_num = str(step).zfill(5)
save_img_batch(fake_data_X,img_save_dir+step_num+"_image.png")
          
print("Done!")

In [None]:
print("Start....")

print("Save the Models for re-use")
with tf.device(device_name):
  discriminator.trainable = True
  generator.trainable = True
  generator.save(save_model_dir+"GENERATOR_weights_and_arch.hdf5")
  discriminator.save(save_model_dir+"DISCRIMINATOR_weights_and_arch.hdf5")
gen_movie(img_save_dir,movie_dir)


send_to_gdrive(img_save_dir, gdrive_results, zipit=True)
send_to_gdrive(save_model_dir, gdrive_results, zipit=False)
send_to_gdrive(log_dir, gdrive_results, zipit=False)
send_to_gdrive(movie_dir, gdrive_results, zipit=False)


print("Done!")