In [9]:
import os.path
import math

import click
import tensorflow as tf
from matplotlib import pyplot as plt
from returns.curry import partial
from tqdm import tqdm

In [10]:
input_dir = 'gs://flickr-faces-mini/resized'
output_dir = './output'
img_size = 28
noise_dim = 100
batch_size = 256
shuffle_buffer_size = batch_size * 1

In [11]:
def process_path(file_path: tf.Tensor, img_size: int) -> tf.Tensor:
    file_contents = tf.io.read_file(file_path)
    img = tf.io.decode_png(file_contents, channels=1)
    normalized_image = (tf.cast(img, tf.float32) - 127.5) / 127.5
    return normalized_image

In [12]:
files_pattern = os.path.join(input_dir, "*", "*.png")
train_ds = (
    tf.data.Dataset.list_files(files_pattern, shuffle=False)
    .map(
        partial(process_path, img_size=img_size),
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    )
    .cache()
    .shuffle(shuffle_buffer_size)
    .batch(batch_size)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

2022-08-22 15:08:10.661675: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-08-22 15:08:10.661717: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-08-22 15:08:10.661748: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (lapdog): /proc/driver/nvidia/version does not exist
2022-08-22 15:08:10.663914: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-08-22 15:08:10.691944: W tensorflow/core/platform/cloud/google_auth_provider.cc:184] All attempts to get a Goog

In [13]:
cardinality = tf.data.experimental.cardinality(train_ds).numpy().tolist()
print(cardinality)

264


In [14]:
# plt.figure(figsize=(10, 10))
# for batch in train_ds.take(1):
#     batch_size, *_ = batch.shape
#     for i in range(batch_size):
#       plt.subplot(math.ceil(batch_size / 16), 16, i + 1)
#       plt.imshow(batch[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
#       plt.axis('off')


In [15]:
def generate_and_save_images(
    output_dir: str, model: tf.keras.Model, epoch: int, test_input: tf.Tensor
) -> None:
    # Notice `training` is set to False.
    # This is so all layers run in inference mode (batchnorm).
    predictions = model(test_input, training=False)

    fig = plt.figure(figsize=(4, 4))
    batch_size, *_ = predictions.shape
    for i in range(batch_size):
        plt.subplot(4, 4, i + 1)
        plt.imshow(predictions[i, :, :, 0] * 127.5 + 127.5, cmap="gray")
        plt.axis("off")

    plt.savefig(os.path.join(output_dir, f"image_at_epoch_{epoch:04d}.png"))

In [16]:
def make_generator_model(*, img_size: int, noise_dim: int) -> tf.keras.Model:
    model = tf.keras.Sequential()
    model.add(
        tf.keras.layers.Dense(
            7 * 7 * 256, use_bias=False, input_shape=(noise_dim,)
        )
    )
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Reshape((7, 7, 256)))
    assert model.output_shape == (
        None,
        7,
        7,
        256,
    )  # Note: None is the batch size

    model.add(
        tf.keras.layers.Conv2DTranspose(
            128, (5, 5), strides=(1, 1), padding="same", use_bias=False
        )
    )
    assert model.output_shape == (None, 7, 7, 128)
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())

    model.add(
        tf.keras.layers.Conv2DTranspose(
            64, (5, 5), strides=(2, 2), padding="same", use_bias=False
        )
    )
    assert model.output_shape == (None, 14, 14, 64)
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())

    model.add(
        tf.keras.layers.Conv2DTranspose(
            1,
            (5, 5),
            strides=(2, 2),
            padding="same",
            use_bias=False,
            activation="tanh",
        )
    )
    assert model.output_shape == (None, img_size, img_size, 1)

    model.summary()
    return model

In [17]:
def make_discriminator_model(*, img_size: int) -> tf.keras.Model:
    model = tf.keras.Sequential()
    model.add(
        tf.keras.layers.Conv2D(
            64,
            (5, 5),
            strides=(2, 2),
            padding="same",
            input_shape=[img_size, img_size, 1],
        )
    )
    model.add(tf.keras.layers.LeakyReLU())
    model.add(tf.keras.layers.Dropout(0.3))

    model.add(
        tf.keras.layers.Conv2D(128, (5, 5), strides=(2, 2), padding="same")
    )
    model.add(tf.keras.layers.LeakyReLU())
    model.add(tf.keras.layers.Dropout(0.3))

    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(1))

    return model

In [18]:
generator = make_generator_model(img_size=img_size, noise_dim=noise_dim)
discriminator = make_discriminator_model(img_size=img_size)

cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

checkpoint = tf.train.Checkpoint(
    generator_optimizer=generator_optimizer,
    discriminator_optimizer=discriminator_optimizer,
    generator=generator,
    discriminator=discriminator,
)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 12544)             1254400   
                                                                 
 batch_normalization (BatchN  (None, 12544)            50176     
 ormalization)                                                   
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 12544)             0         
                                                                 
 reshape (Reshape)           (None, 7, 7, 256)         0         
                                                                 
 conv2d_transpose (Conv2DTra  (None, 7, 7, 128)        819200    
 nspose)                                                         
                                                                 
 batch_normalization_1 (Batc  (None, 7, 7, 128)        5

In [19]:
def discriminator_loss(
    cross_entropy, real_output: tf.Tensor, fake_output: tf.Tensor
) -> tf.Tensor:

    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss


def generator_loss(cross_entropy, fake_output: tf.Tensor) -> tf.Tensor:
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [20]:
latest_checkpoint_dir = tf.train.latest_checkpoint(output_dir)
_, epoch_str = latest_checkpoint_dir.rsplit("-", 1)
start_epoch = int(epoch_str)
_ = checkpoint.restore(latest_checkpoint_dir)
print("Loaded checkoint from", latest_checkpoint_dir)

Loaded checkoint from ./output/ckpt-103


In [21]:
num_examples_to_generate = 16
seed = tf.random.normal([num_examples_to_generate, noise_dim])

In [22]:
@tf.function
def train_step(images: tf.Tensor) -> None:
    noise = tf.random.normal([batch_size, noise_dim])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)

        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = generator_loss(cross_entropy, fake_output)
        disc_loss = discriminator_loss(
            cross_entropy, real_output, fake_output
        )

        gradients_of_generator = gen_tape.gradient(
            gen_loss, generator.trainable_variables
        )
        gradients_of_discriminator = disc_tape.gradient(
            disc_loss, discriminator.trainable_variables
        )

        generator_optimizer.apply_gradients(
            zip(gradients_of_generator, generator.trainable_variables)
        )
        discriminator_optimizer.apply_gradients(
            zip(
                gradients_of_discriminator,
                discriminator.trainable_variables,
            )
        )

In [23]:
def train(dataset, epochs):
    generate_and_save_images(output_dir, generator, start_epoch, seed)

    for epoch in range(start_epoch, epochs + start_epoch):
        for image_batch in tqdm(
            dataset,
            total=cardinality,
            desc=f"Epoch {epoch + 1}",
        ):
            train_step(image_batch)

        # Produce images for the GIF as you go
        generate_and_save_images(output_dir, generator, epoch + 1, seed)

        checkpoint.save(file_prefix=os.path.join(output_dir, "ckpt"))

In [25]:
train(train_ds, 5)

Epoch 104:  70%|███████   | 186/264 [07:23<03:05,  2.38s/it]