In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Import Libraries

In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline
from tqdm import tqdm
from PIL import Image
from IPython import display

import tensorflow as tf
from tensorflow.keras import models, layers, initializers, losses, optimizers
from tensorflow.keras.utils import plot_model
from tensorflow.keras.preprocessing.image import img_to_array, load_img

# Load Data

In [None]:
root_dir = "/kaggle/input/pix2pix-dataset/maps/maps"

In [None]:
folders = os.listdir(root_dir)
folders

In [None]:
folders = ["train", "val"]

In [None]:
train_image_paths = []

for file in tqdm(os.listdir(os.path.join(root_dir, folders[0]))):
    train_image_paths.append(os.path.join(root_dir, folders[0], file))

In [None]:
valid_image_paths = []

for file in tqdm(os.listdir(os.path.join(root_dir, folders[1]))):
    valid_image_paths.append(os.path.join(root_dir, folders[1], file))

In [None]:
print("Train Images:", len(train_image_paths))
print("Valid Images:", len(valid_image_paths))

In [None]:
def load_image(path):
    img = load_img(path)
    img = img_to_array(img)
    img = tf.cast(img, tf.float32)
    
    width = tf.shape(img)[1] // 2
    
    image = img[:, :width, :]
    mask = img[:, width:, :]
    
    image = (tf.image.resize(image, (256, 256))) / 255
    mask = (tf.image.resize(mask, (256, 256))) / 255

    return image, mask

In [None]:
train_images = []
train_masks = []

for i, path in tqdm(enumerate(train_image_paths)):
    image, mask = load_image(path)
    
    train_images.append(image)
    train_masks.append(mask)

In [None]:
train_images = np.array(train_images)
train_masks = np.array(train_masks)

In [None]:
valid_images = []
valid_masks = []

for i, path in tqdm(enumerate(valid_image_paths)):
    image, mask = load_image(path)
    
    valid_images.append(image)
    valid_masks.append(mask)

In [None]:
valid_images = np.array(valid_images)
valid_masks = np.array(valid_masks)

# Visualization

In [None]:
def plot_image_and_mask(images, masks):
    plt.figure(figsize=(10, 6))

    for i in range(8):
        idx = np.random.randint(len(images))
        plt.subplot(3, 8, i + 1)
        plt.imshow(images[idx], cmap="gray")
        plt.axis("off")
        plt.title(train_image_paths[idx].split("/")[-1])

        plt.subplot(3, 8, i + 9)
        plt.imshow(masks[idx], cmap="gray")
        plt.axis("off")
        plt.title("Mask")

    plt.show()

In [None]:
plot_image_and_mask(train_images, train_masks)

# Model

### Generator

In [None]:
def encoder_block(input_layer, num_filters):
    conv = layers.Conv2D(num_filters, kernel_size=(3, 3), strides=(2, 2), padding="same", kernel_initializer="he_normal", use_bias=False)(input_layer)
    conv = layers.LeakyReLU(0.2)(conv)
    conv = layers.BatchNormalization()(conv)
    return conv

In [None]:
def decoder_block(input_layer, skip_features, num_filters):
    uconv = layers.Conv2DTranspose(num_filters, kernel_size=(4, 4), strides=(2, 2), padding="same", kernel_initializer="he_normal", use_bias=False)(input_layer)
    uconv = layers.LeakyReLU(0.2)(uconv)
    uconv = layers.BatchNormalization()(uconv)
    uconv = layers.ReLU()(uconv)
    cnt = layers.concatenate([uconv, skip_features])
    return cnt

In [None]:
g_input_layer = layers.Input(shape=(256, 256, 3))

e1 = encoder_block(g_input_layer, 64)
e2 = encoder_block(e1, 128)
e3 = encoder_block(e2, 256)
e4 = encoder_block(e3, 512)
e5 = encoder_block(e4, 512)
e6 = encoder_block(e5, 512)
e7 = encoder_block(e6, 512)

g_bottle = encoder_block(e7, 512)

d1 = decoder_block(g_bottle, e7, 512)
d2 = decoder_block(d1, e6, 512)
d3 = decoder_block(d2, e5, 512)
d4 = decoder_block(d3, e4, 512)
d5 = decoder_block(d4, e3, 256)
d6 = decoder_block(d5, e2, 128)
d7 = decoder_block(d6, e1, 64)

g_initializer = initializers.RandomNormal(stddev=0.02, seed=42)

g_output_layer = layers.Conv2DTranspose(3, kernel_size=(4, 4), strides=(2, 2), padding="same", kernel_initializer=g_initializer, activation="tanh")(d7)

generator = models.Model(inputs=g_input_layer, outputs=g_output_layer)

In [None]:
plot_model(generator)

### Discriminator

In [None]:
d_input_layer1 = layers.Input(shape=(256, 256, 3))
d_input_layer2 = layers.Input(shape=(256, 256, 3))

d_cnt = layers.concatenate([d_input_layer1, d_input_layer2])

de1 = encoder_block(d_cnt, 64)
de2 = encoder_block(de1, 128)
de3 = encoder_block(de2, 512)

d_initializer = initializers.RandomNormal(stddev=0.02, seed=42)

d_conv1 = layers.Conv2D(512, kernel_size=(4, 4), kernel_initializer=d_initializer, use_bias=False)(de3)
d_batchnorm1 = layers.BatchNormalization()(d_conv1)
d_leakyrelu1 = layers.LeakyReLU()(d_batchnorm1)

d_output_layer = layers.Conv2D(1, kernel_size=4, kernel_initializer=d_initializer)(d_leakyrelu1)

discriminator = models.Model(inputs=[d_input_layer1, d_input_layer2], outputs=d_output_layer)

In [None]:
plot_model(discriminator)

# Train

In [None]:
EPOCHS = 100
NUM_TESTS = 16
NOISE_DIM = 100
BUFFER_SIZE = len(train_images)
BATCH_SIZE = 32
LR_G = 0.0004
LR_D = 0.0004
BETA_G = 0.5
BETA_D = 0.5
LAMBDA = 100

generator_optimizer = optimizers.Adam(learning_rate=LR_G, beta_1=BETA_G)
discriminator_optimizer = optimizers.Adam(learning_rate=LR_D, beta_1=BETA_D)
cross_entropy = losses.BinaryCrossentropy(from_logits=True)

In [None]:
dataset = (tf.cast(train_images, tf.float32), tf.cast(train_masks, tf.float32))
data = tf.data.Dataset.from_tensor_slices(dataset).batch(BATCH_SIZE, drop_remainder=True)

In [None]:
def generator_loss(real_output, fake_output, image):
    gen_loss = cross_entropy(tf.ones_like(real_output), real_output)
    l1_loss = tf.reduce_mean(tf.abs(image - fake_output))
    total = gen_loss + (LAMBDA * l1_loss)
    return total

In [None]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

In [None]:
def show_generated_images(epoch, test_input, real_input, output):
    generated_images = generator(test_input, training=False)
    
    fig = plt.figure(figsize=(12, 8))
    
    plt.subplot(1, 3, 1)
    plt.tight_layout()
    plt.imshow(test_input[0] * 0.5 + 0.5)
    plt.axis("off")
    
    plt.subplot(1, 3, 2)
    plt.tight_layout()
    plt.imshow(real_input[0] * 0.5 + 0.5)
    plt.axis("off")
    
    plt.subplot(1, 3, 3)
    plt.tight_layout()
    plt.imshow(generated_images[0] * 0.5 + 0.5)
    plt.axis("off")
        
    if not os.path.exists(output):
        os.makedirs(output)

    fig.savefig(f"{output}/cgan_{epoch}.png")
    plt.show()

In [None]:
def train_step(inputs, target):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        fake_images = generator(inputs, training=True)
        real_preds = discriminator([inputs, target], training=True)
        fake_preds = discriminator([inputs, fake_images], training=True)
        gen_loss = generator_loss(fake_preds, fake_images, target)
        disc_loss = discriminator_loss(real_preds, fake_preds)
        
    generator_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    
    discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))
    return gen_loss, disc_loss

In [None]:
test_input, real_input = next(iter(data.take(1)))

In [None]:
g_loss = []
d_loss = []

for epoch in range(EPOCHS):
    for image, mask in data:
        gen_loss, disc_loss = train_step(image, mask)
        
    g_loss.append(gen_loss)
    d_loss.append(disc_loss)
    print(f"[+] Epoch {epoch + 1}, Generator Loss: {gen_loss}, Discriminator Loss: {disc_loss}")
    show_generated_images(epoch, test_input, real_input, "output")
    display.clear_output(wait=True)

# Test

In [None]:
plt.figure(figsize=(10, 8))

plt.plot(g_loss)
plt.plot(d_loss)
plt.legend(["Generator Loss", "Discriminator Loss"])
plt.show()

In [None]:
dataset_test = (tf.cast(valid_images, tf.float32), tf.cast(valid_masks, tf.float32))
data_test = tf.data.Dataset.from_tensor_slices(dataset_test).batch(BATCH_SIZE, drop_remainder=True)

In [None]:
def show_predictions(test_images):
    fig = plt.figure(figsize=(12, 8))
    test_input, real_input = next(iter(test_images.take(3)))

    generated_images = generator(test_input, training=False)

    plt.subplot(1, 3, 1)
    plt.tight_layout()
    plt.imshow(test_input[0] * 0.5 + 0.5)
    plt.axis("off")
    
    plt.subplot(1, 3, 2)
    plt.tight_layout()
    plt.imshow(real_input[0] * 0.5 + 0.5)
    plt.axis("off")
    
    plt.subplot(1, 3, 3)
    plt.tight_layout()
    plt.imshow(generated_images[0] * 0.5 + 0.5)
    plt.axis("off")

    plt.show()

In [None]:
show_predictions(data_test)