In [None]:
!pip install kaggle


In [None]:
from google.colab import drive
drive.mount('content')

In [None]:
!mkdir ~/.kaggle

In [None]:
!cp /content/content/MyDrive/kaggle.json ~/.kaggle/kaggle.json

In [None]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# 2 giga dataset
!kaggle datasets download -d vikramtiwari/pix2pix-dataset

In [None]:
!unzip /content/pix2pix-dataset.zip

In [None]:
import tensorflow as tf
import os
import numpy as np
import matplotlib.pyplot as plt
import time

from keras.models import Model, Sequential
from keras.layers import Dense, Conv2D, Flatten, BatchNormalization, LeakyReLU
from keras.layers import Conv2DTranspose, Dropout, ReLU, Input, Concatenate, ZeroPadding2D
from keras.optimizers import Adam
from keras.utils import plot_model

In [None]:
BATCH_SIZE = 1
IMAGE_SIZE = 256

In [None]:
def load(image_file):                                      # like /content/train/*jpg
    image = tf.io.read_file(image_file)                    #save this data but encoded
    image = tf.image.decode_jpeg(image, channels=3)        #decode this saved img
    w = tf.shape(image)[1]
    w = w//2
    real_image = image[:, :w, :]
    input_image = image[:, w:, :]

    input_image = tf.cast(input_image, tf.float32)         #turn it to float from int using cast
    real_image = tf.cast(real_image, tf.float32)
    return input_image, real_image




In [None]:
#test for load data

path = "cityscapes/cityscapes/"
x,y = load(os.path.join(path, "train/1.jpg")) #join mean add train/1.jpg after path cityscapes/cityscapes/
print(x.shape, y.shape)


In [None]:
# visualize
fig, axes = plt.subplots(1,2, figsize = (16,5))
axes[0].imshow(x/255.0)                            #axes[0] mean first img
axes[1].imshow(y/255.0)

In [None]:
#normilize to -1 to 1 as we use tansh and lekyrelu which take range -1:1

def normalize(input_image, real_image):
    input_image = (input_image / 127.5) - 1
    real_image = (real_image / 127.5) - 1
    return input_image, real_image




#resize img to IMAGE_SIZE = 256
def resize(input_image, real_image):
    input_image = tf.image.resize(input_image, [IMAGE_SIZE, IMAGE_SIZE], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    real_image = tf.image.resize(real_image, [IMAGE_SIZE, IMAGE_SIZE], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    return input_image, real_image




# replace some img with same img but fliping horrizontal
def random_jitter(input_image, real_image):
    #if tf.random.uniform(()) > 0.5:
    input_image = tf.image.flip_left_right(input_image)
    real_image = tf.image.flip_left_right(real_image)
    return input_image, real_image


In [None]:
x_jit, y_jit = random_jitter(x, y)
fig, axes = plt.subplots(1,2, figsize = (16,5))
axes[0].imshow(x_jit/255.0)
axes[1].imshow(y_jit/255.0)

In [None]:
# take images path "cityscapes/cityscapes/train/*jpg"   * mean take any img in train end by jpg
#after take path load data and resize and flip some of them then normalize

def load_train_images(image_path):
    input_image, real_image = load(image_path)
    input_image, real_image = resize(input_image, real_image)
    input_image, real_image = random_jitter(input_image, real_image)
    input_image, real_image = normalize(input_image, real_image)
    return input_image,real_image



def load_test_image(image_path):
    input_image, real_image = load(image_path)
    input_image, real_image = resize(input_image, real_image)
    input_image, real_image = normalize(input_image, real_image)
    return input_image, real_image

In [None]:
# create input pipeline
train_dataset = tf.data.Dataset.list_files(path + "train/*.jpg")  #load img in this folder
train_dataset = train_dataset.map(load_train_images)       #using map apply load_train_images fun on prev line

#take only 10 img from dataset and choose 1(BATCH_SIZE) image randomly then take another one randomly till 10 img
#end then take another 10 img and repat process this is good for memory
# tajecare that load_train_images should return two var but we save this 2 var in one called train_dataset
train_dataset = train_dataset.shuffle(10).batch(BATCH_SIZE)
train_dataset

In [None]:
test_dataset = tf.data.Dataset.list_files(path + "val/*.jpg")
test_dataset = test_dataset.map(load_test_image)
test_dataset = test_dataset.batch(BATCH_SIZE)
test_dataset

In [None]:
# downsample block
def downsample(filters, size, batchnorm = True):
    init = tf.random_normal_initializer(0.,0.02)   #intilize weight of neoural network with mean=0 and std = 0.02
    result = Sequential()
    result.add(Conv2D(filters, size, strides = 2, padding = "same", kernel_initializer = init, use_bias = False))
    if batchnorm == True:
        result.add(BatchNormalization())

    result.add(LeakyReLU())
    return result
down_model = downsample(3,4)
down_result = down_model(tf.expand_dims(x, axis = 0)) # axis = 0 mean put number of img you do down sample => (1,256,256,3)
print(down_result.shape)

In [None]:
# upsample block
def upsample(filters, size, dropout = False):
    init = tf.random_normal_initializer(0, 0.02)
    result = Sequential()
    result.add(Conv2DTranspose(filters, size, strides = 2, padding = "same", kernel_initializer = init, use_bias = False))
    result.add(BatchNormalization())
    if dropout == True:
        result.add(Dropout(0.5))
    result.add(ReLU())
    return result
up_model = upsample(3,4)
up_result = up_model(down_result)
print(up_result.shape)

In [None]:
def generator():
    inputs = Input(shape = [IMAGE_SIZE, IMAGE_SIZE, 3])
    down_stack = [
        downsample(64, 4, batchnorm=False),
        downsample(128, 4),
        downsample(256, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4)
    ]


    up_stack = [
        upsample(512, 4, dropout=True),
        upsample(512, 4, dropout=True),
        upsample(512, 4),
        upsample(512, 4),
        upsample(256, 4),
        upsample(128, 4),
        upsample(64, 4),
    ]
    init = tf.random_normal_initializer(0., 0.02)
    last = Conv2DTranspose(3, 4, strides = 2, padding = "same", kernel_initializer = init, activation ="tanh")
    x = inputs
    skips = []
    for down in down_stack:   #make skip connection
        x = down(x)
        skips.append(x)
    skips = reversed(skips[:-1])   #skip list now have[d8,d7,d6,...,d1]

    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = Concatenate()([x, skip])  #concatenate list have [u1&d8 , u2&d7 , ... , u8&d1]

    x = last(x)
    return Model(inputs = inputs, outputs = x)

In [None]:

gen = generator()
gen.summary()

In [None]:
plot_model(gen, show_shapes=True, dpi = 64)


In [None]:
from keras.losses import BinaryCrossentropy
loss_function = BinaryCrossentropy(from_logits=True)


In [None]:
# take o/p of discremnator on fake img and o/p of generator and real img
def generator_loss(disc_generated_output, gen_output, target):
    #we fill matrix with one as we try to cheat discremnator to make it predict fake img(0) as real img(1)

    gan_loss = loss_function(tf.ones_like(disc_generated_output), disc_generated_output)  # gan_loss is univeral loss
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + (LAMBDA * l1_loss) # total_gen_loss is custom loss for generator
    return total_gen_loss, gan_loss, l1_loss

In [None]:
def discriminator():
    init = tf.random_normal_initializer(0., 0.02)

    inp = Input(shape = [IMAGE_SIZE, IMAGE_SIZE, 3], name = "input_image")
    tar = Input(shape = [IMAGE_SIZE, IMAGE_SIZE, 3], name = "target_image")
    x = Concatenate()([inp, tar])
    down1 = downsample(64,4,False)(x)
    down2 = downsample(128, 4)(down1)
    down3 = downsample(256, 4)(down2)

    zero_pad1 = ZeroPadding2D()(down3)
    conv = Conv2D(256, 4, strides = 1, kernel_initializer = init, use_bias = False)(zero_pad1)
    leaky_relu = LeakyReLU()(conv)
    zero_pad2 = ZeroPadding2D()(leaky_relu)
    last = Conv2D(1, 4, strides = 1, kernel_initializer=init)(zero_pad2)
    return Model(inputs = [inp, tar], outputs = last)

In [None]:
disc = discriminator()
disc.summary()

In [None]:
plot_model(disc, show_shapes=True, dpi = 64)


In [None]:
def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_function(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = loss_function(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss

In [None]:
generator_optimizer = Adam(lr= 2e-4, beta_1=0.5)
discriminator_optimizer = Adam(lr = 2e-4, beta_1=0.5)

In [None]:
# after each epoch take one validation img and let generator predict it to see acc of generator

def save_images(model, test_input, target, epoch):
    prediction = model(test_input, training= True)
    plt.figure(figsize = (15,15))
    display_list= [test_input[0], target[0], prediction[0]]
    title = ["Input Image", "Ground Truth", "Predicton Image"]
    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis("off")
    plt.savefig(f"output/epoch_{epoch}.jpg")
    plt.close()


In [None]:
# make sure output directory exists to save images
if not os.path.exists("output"):
    os.mkdir("output")

In [None]:
epochs = 50
@tf.function
def train_step(input_image, target, epoch):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = gen(input_image, training = True)

        disc_real_output = disc([input_image, target], training = True)
        disc_generated_output = disc([input_image, gen_output], training = True)
        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)
        generator_gradients = gen_tape.gradient(gen_total_loss, gen.trainable_variables)
        discriminator_gradients = disc_tape.gradient(disc_loss, disc.trainable_variables)
        generator_optimizer.apply_gradients(zip(generator_gradients, gen.trainable_variables))
        discriminator_optimizer.apply_gradients(zip(discriminator_gradients, disc.trainable_variables))
        return gen_total_loss, disc_loss

In [None]:
def fit(train_ds, epochs, test_ds):
    for epoch in range(epochs):
        start = time.time()
        for input_, target in test_ds.take(1):
            save_images(gen, input_, target, epoch)
        # Train
        print(f"Epoch {epoch}")
        for n, (input_, target) in train_ds.enumerate():
            gen_loss, disc_loss = train_step(input_, target, epoch)
        print("Generator loss {:.2f} Discriminator loss {:.2f}".format(gen_loss, disc_loss))
        print("Time take for epoch {} is {} sec\n".format(epoch+1, time.time() - start))

In [None]:
import keras
keras.backend.clear_session()

In [None]:
fit(train_dataset, epochs, test_dataset)
