<a href="https://colab.research.google.com/github/shubham0204/Bayes_Text_Classifier_with_Kotlin/blob/master/Pix2Pix_ImageColorization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

!unzip -q img_color_data.zip


In [None]:

import tensorflow as tf
import datetime


In [None]:

input_size = 256

# Normalize the given images ( mean = 127.5 and std_dev = 127.6 )
def normalize_images( input_image , output_image ):
    normalized_input_image = ( input_image - 127.5 ) / 127.5
    normalized_output_image = ( output_image - 127.5 ) / 127.5
    return normalized_input_image , normalized_output_image

# Resize the given images to 256 * 256
def resize_images( input_image , output_image ):
    resized_input_image = tf.image.resize( input_image , size=[ input_size , input_size ] )
    resized_output_image = tf.image.resize( output_image , size=[ input_size , input_size ] )
    return resized_input_image , resized_output_image

# Load the image from the given filepath
def load_image( image_filepath ):
    input_image = tf.io.read_file( image_filepath )
    input_image = tf.image.decode_jpeg( input_image , channels=3 )
    output_image = tf.image.rgb_to_grayscale( input_image )

    # resize and normalize
    input_image , output_image = resize_images( input_image , output_image )
    input_image , output_image = normalize_images( input_image , output_image )

    return input_image , output_image


In [None]:

# Directory where the images are stored
master_dir = 'data'
# Num examples to be included in the test dataset
test_dataset_size = 50
# Batch size for training the model
batch_size = 32

# Load the images from their filepaths
dataset = tf.data.Dataset.list_files( master_dir + '/*.jpg' )
dataset = dataset.map( load_image )

# Splitting the dataset into train and test datasets ( See https://stackoverflow.com/questions/48213766/split-a-dataset-created-by-tensorflow-dataset-api-in-to-train-and-test )
test_dataset = dataset.take( test_dataset_size )
test_dataset = test_dataset.shuffle( 1000 ).batch( batch_size )

train_dataset = dataset.skip( test_dataset_size )
train_dataset = train_dataset.shuffle( 1000 ).batch( batch_size )


In [None]:

# Downsampling block. Implements Conv -> Batch Norm ( if required ) -> LeakyReLU
def block_down( x , filters , size , apply_batchnorm=True ):
    x = tf.keras.layers.Conv2D( 
        filters , 
        size ,
        strides=2 , 
        kernel_initializer=tf.random_normal_initializer( 0.0 , 0.02 ) , 
        padding='same',
        use_bias=False) ( x )
    if apply_batchnorm:
        x = tf.keras.layers.BatchNormalization()( x )
    x = tf.keras.layers.LeakyReLU()( x )
    return x

# Upsampling block. Implements Transposed Conv -> Batch Norm -> Dropout ( if required ) -> LeakyReLU
def block_up( x , filters , size , apply_dropout=True ):
    x = tf.keras.layers.Conv2DTranspose( 
        filters,
        size ,
        strides=2 , 
        kernel_initializer=tf.random_normal_initializer( 0.0 , 0.02 ) , 
        padding='same',
        use_bias=False) ( x )
    x = tf.keras.layers.BatchNormalization()( x )
    if apply_dropout:
        x = tf.keras.layers.Dropout( rate=0.5 )( x )
    x = tf.keras.layers.ReLU()( x )
    return x

def get_generator():
    inputs = tf.keras.layers.Input( shape=( 256 , 256 , 3 ) )

    d1 = block_down( inputs , 64 , 4 , apply_batchnorm=False ) 
    d2 = block_down( d1 , 128 , 4 ) 
    d3 = block_down( d2 , 256 , 4 ) 
    d4 = block_down( d3 , 512 , 4 ) 
    d5 = block_down( d4 , 512 , 4 ) 
    d6 = block_down( d5 , 512 , 4 )

    u1 = block_up( d6 , 512 , 4 )

    concat1 = tf.keras.layers.Concatenate()( [ u1 , d5 ] )
    u2 = block_up( concat1 , 512 , 4 )

    concat2 = tf.keras.layers.Concatenate()( [ u2 , d4 ] )
    u3 = block_up( concat2 , 512 , 4 )

    concat3 = tf.keras.layers.Concatenate()( [ u3 , d3 ] )
    u4 = block_up( concat3 , 256 , 4 )

    concat4 = tf.keras.layers.Concatenate()( [ u4 , d2 ] )
    u5 = block_up( concat4 , 128 , 4 )

    outputs = tf.keras.layers.Conv2DTranspose( 3 , 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=tf.random_normal_initializer(0., 0.02),
                                         activation='tanh')( u5 )

    return tf.keras.Model( inputs , outputs )


In [None]:

model = get_generator()
tf.keras.utils.plot_model(model, show_shapes=True, dpi=64)


In [None]:

def get_discriminator():
  initializer = tf.random_normal_initializer(0., 0.02)

  inp = tf.keras.layers.Input(shape=[256, 256, 3], name='generated_image')
  tar = tf.keras.layers.Input(shape=[256, 256, 3], name='target_image')

  x = tf.keras.layers.concatenate([inp, tar])

  down1 = block_down(64, 4, False)(x)
  down2 = block_down(128, 4)(down1)
  down3 = block_down(256, 4)(down2)

  zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)
  conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)

  batchnorm1 = tf.keras.layers.BatchNormalization()(conv)
  leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)
  zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)
  last = tf.keras.layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)

  return tf.keras.Model(inputs=[inp, tar], outputs=last)
  

In [None]:

binary_crossentropy = tf.keras.losses.BinaryCrossentropy()

def generator_loss( discriminator_output , generated_image , target_image ):
    # Minimize -log( D( G( x ) ))
    crossentropy_loss = binary_crossentropy( tf.ones_like( discriminator_output ) , discriminator_output )
    # L1 Loss
    l1_loss = tf.reduce_mean( tf.abs( generated_image - target_images ) ) 
    total loss = crossentropy_loss + ( 100 * l1_loss )
    return total_loss , crossentropy_loss , total_loss

def discriminator_loss( disc_real_output , disc_generated_output ):
    real_image_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    generated_image_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_loss = real_image_loss + generated_image_loss
    return total_loss


In [None]:

generator_optimizer = tf.keras.optimizers.Adam( 2e-4 , beta_1=0.5 )
discriminator_optimizer = tf.keras.optimizers.Adam( 2e-4 , beta_1=0.5 )

generator = get_generator()
discriminator = get_discriminator()

summary_writer = tf.summary.create_file_writer( "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") )

@tf.function
def train_step( input_image , target_image  num_epoch ):

    with tf.GradientTape() as gen_tape , tf.GradientTape() as disc_tape:

        generator_output = generator( input_image , training=True )

        disc_real_output = discriminator( [ input_image , target_image ] , training=True )
        disc_generated_output = discriminator( [ input_image , generator_output ] , training=True )

        total_loss , crossentropy_loss , l1_loss = generator_loss( disc_generated_output , generated_output , target_image )
        disc_loss = discriminator_loss( disc_real_output , disc_generated_output )

    generator_optimizer.minimize( total_loss , variables=generator.trainable_variables )
    discriminator_optimizer.minimize( disc_loss , variables=discriminator.trainable_variables )

    with summary_writer.as_default():
        tf.summary.scalar( 'gen_total_loss', total_loss, step=epoch )
        tf.summary.scalar( 'gen_gan_loss', crossentropy_loss, step=epoch )
        tf.summary.scalar( 'gen_l1_loss', l1_loss, step=epoch )
        tf.summary.scalar( 'disc_loss', disc_loss, step=epoch )


In [None]:

for epoch in 100:
    for n , ( input_images , target_images ) in train_dataset.enumerate():
        train_step( input_images , target_images , n )
