<a href="https://colab.research.google.com/github/mattberg88/Colab/blob/develop/SimpleGANS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install kaggle
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json


In [None]:
! kaggle datasets download jessicali9530/celeba-dataset


In [None]:
!unzip "/content/celeba-dataset.zip" -d "/content/"

In [None]:
import abc
import tensorflow as tf
from tensorflow import keras
from keras import Sequential, Input
from keras.layers import Conv2D, LeakyReLU, Flatten, Dropout, Dense, Reshape, Conv2DTranspose
from keras.losses import BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np


def load_from_directory(filename, resize, batch_size=32,):
    dataset = keras.preprocessing.image_dataset_from_directory(
        filename, label_mode=None, image_size=resize, batch_size=batch_size
    )
    dataset = dataset.map(lambda x: (x - 127.0) / 127.0)  # make 255 values between 0 and 1
    # dataset is of type MapDataset or map of tensors
    return dataset


def load_from_npz(filename, resize, data_parm='arr_0', batch_size=32, shuffle_buffer_size=100):
    with np.load(filename) as data:
        dataset = data[data_parm]

    dataset = [np.array(Image.fromarray(data, 'RGB').resize(size=resize)).astype('float32') for data in dataset]
    dataset = tf.data.Dataset.from_tensor_slices(dataset)
    dataset = dataset.map(lambda x: (x - 255.0) / 255.0)

    dataset = dataset.shuffle(shuffle_buffer_size).batch(
        batch_size)  # shuffle (5arbat 5arabit (｡･∀･)ﾉﾞ（＾∀＾●）ﾉｼ) and batch them
    return dataset


def display_sample(dataset):
    for x in dataset:
        plt.axis("off")
        plt.imshow((x.numpy() * 255).astype("int32")[0])
        break


In [None]:
from keras.layers.normalization.batch_normalization import BatchNormalization


class CustomModel:
    def __init__(self):
        self.model = self.build_model()

    @abc.abstractmethod
    def build_model(self):
        pass


class Discriminator(CustomModel):
    def build_model(self):
        model = Sequential(
            [
                # 64x64 input
                Conv2D(32, input_shape=(256, 256, 3), kernel_size=4, strides=2, padding="same", ),
                Conv2D(64, kernel_size=4, strides=2, padding="same", ),
                LeakyReLU(alpha=0.2),
                BatchNormalization(),
                # to 32x32
                Conv2D(128, kernel_size=4, strides=2, padding="same"),
                LeakyReLU(alpha=0.2),
                BatchNormalization(),
                # to 16x16
                Conv2D(256, kernel_size=4, strides=2, padding="same"),
                LeakyReLU(alpha=0.2),
                Flatten(),  # from matrix to vector
                Dropout(0.5),  # 20% of the network is at rest
                Dense(1, activation="sigmoid"),
            ],
            name="discriminator",
        )
        return model


class Generator(CustomModel):
    def __init__(self, latent_dim):
        self.latent_dim = latent_dim
        super().__init__()

    def build_model(self):
        model = keras.Sequential(
            [
                Input(shape=(self.latent_dim,)),
                Dense(16 * 16 * 512),
                Reshape((16, 16, 512)),
                Conv2DTranspose(256, kernel_size=4, strides=2, padding="same"),
                LeakyReLU(alpha=0.2),
                Conv2DTranspose(128, kernel_size=4, strides=2, padding="same"),
                LeakyReLU(alpha=0.2),
                Conv2DTranspose(64, kernel_size=4, strides=2, padding="same"),
                LeakyReLU(alpha=0.2),
                Conv2DTranspose(32, kernel_size=4, strides=2, padding="same"),
                LeakyReLU(alpha=0.2),
                BatchNormalization(),
                Conv2D(3, kernel_size=4, padding="same", activation="tanh"),
            ],
            name="generator",
        )
        return model


class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.d_loss_metric = keras.metrics.Mean(name="d_loss")
        self.g_loss_metric = keras.metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {
            "d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result(),
        }


class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img=3, latent_dim=256):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = keras.preprocessing.image.array_to_img(generated_images[i])
            img.save("/content/gdrive/My Drive/SimpleGANS/generated_images_7/generated_img_%03d_%d.png" % (epoch, i))

In [None]:
latent_dim = 256

gan = GAN(discriminator=Discriminator().model, generator=Generator(latent_dim).model, latent_dim=latent_dim)
gan.compile(
    d_optimizer=Adam(learning_rate=0.0001),
    g_optimizer=Adam(learning_rate=0.0001),
    loss_fn=BinaryCrossentropy(),
)

In [None]:
dataset = load_from_directory('../content/img_align_celeba',(256,256))


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
checkpoint_path = "/content/gdrive/My Drive/SimpleGANS/Checkpoint7/"


In [None]:
gan.load_weights("/content/gdrive/My Drive/SimpleGANS/Checkpoint6/")

In [None]:
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, 
    verbose=1, 
    save_weights_only=True)

epochs = 100
gan.fit(dataset, 
        epochs=epochs,
        workers=8,
        use_multiprocessing=True,
        callbacks=[GANMonitor(num_img=10, latent_dim=latent_dim), cp_callback]
)

In [None]:
# super-resolution set up
!pip install nnabla-ext-cuda100
!git clone https://github.com/sony/nnabla-examples.git
%cd nnabla-examples/image-superresolution/esrgan
!wget https://nnabla.org/pretrained-models/nnabla-examples/esrgan/esrgan_latest_g.h5

In [None]:
# super-resolution from input directory
input_image = "/content/gdrive/MyDrive/SimpleGANS/generated_images_5/generated_img_001_9.png"
!python inference.py --loadmodel esrgan_latest_g.h5 --input_image $input_img

In [None]:
!pip -q install imageio
!pip -q install scikit-image
!pip install git+https://github.com/tensorflow/docs

In [None]:
#@title  { display-mode: "code" }
from absl import logging
import imageio

tf.random.set_seed(0)

import tensorflow_hub as hub
from tensorflow_docs.vis import embed
import time

try:
  from google.colab import files
except ImportError:
  pass

from IPython import display
from skimage import transform

# We could retrieve this value from module.get_input_shapes() if we didn't know
# beforehand which module we will be using.



# Interpolates between two vectors that are non-zero and don't both lie on a
# line going through origin. First normalizes v2 to have the same norm as v1. 
# Then interpolates between the two vectors on the hypersphere.
def interpolate_hypersphere(v1, v2, num_steps):
  v1_norm = tf.norm(v1)
  v2_norm = tf.norm(v2)
  v2_normalized = v2 * (v1_norm / v2_norm)

  vectors = []
  for step in range(num_steps):
    interpolated = v1 + (v2_normalized - v1) * step / (num_steps - 1)
    interpolated_norm = tf.norm(interpolated)
    interpolated_normalized = interpolated * (v1_norm / interpolated_norm)
    vectors.append(interpolated_normalized)
  return tf.stack(vectors)

# Simple way to display an image.
def display_image(image):
  image = tf.constant(image)
  image = tf.image.convert_image_dtype(image, tf.uint8)
  return PIL.Image.fromarray(image.numpy())

# Given a set of images, show an animation.
def animate(images):
  images = np.array(images)
  converted_images = np.clip(images * 255, 0, 255).astype(np.uint8)
  imageio.mimsave('./animation.gif', converted_images)
  return embed.embed_file('./animation.gif')

logging.set_verbosity(logging.ERROR)

In [None]:
def interpolate_between_vectors():
  v1 = tf.random.normal([256])
  v2 = tf.random.normal([256])

  # Creates a tensor with 25 steps of interpolation between v1 and v2.
  vectors = interpolate_hypersphere(v1, v2, 40)

  # Uses module to generate images from the latent space.
  interpolated_images = gan.generator.predict(vectors)

  return interpolated_images

interpolated_images = interpolate_between_vectors()
animate(interpolated_images)

In [None]:
# display random examples
plt.figure(figsize=(20,20)) # specifying the overall grid size

for i in range(25):
    generated_images = gan.generator.predict(tf.random.normal(shape=(10, 256)))
    generated_images *= 255
    plt.subplot(5,5,i+1)    # the number of images in the grid is 5*5 (25)
    img = keras.preprocessing.image.array_to_img(generated_images[0])
    plt.imshow(img)

plt.show()

In [None]:
gan.generator.save('stable_20_epochs_tanh_celeba.h5')