GAN: https://vbarra.github.io/DLbook/gan.html
VAE: https://vbarra.github.io/DLbook/vae.html

# 1. Setup and Data Handling

## 1.1. Environment Setup
Libraries: Install necessary libraries: tensorflow, keras, numpy, matplotlib, urllib (you can also add tqdm for progress bars). Consider using tensorflow-gpu if you have a compatible GPU.

Reproducibility: Set random seeds for TensorFlow and NumPy to ensure reproducibility.

Notebook Setup: Start your Jupyter Notebook and import all the necessary libraries in the first cell.

Constants: Define constants like IMAGE_SIZE, BATCH_SIZE, CLASSES, DATA_DIR, etc. at the beginning of your notebook.

Using Python 3.9.21

In [9]:
!pip install tensorflow numpy matplotlib urllib3 jupyter tqdm

[0m

In [10]:
# General imports
import numpy as np
import os
import time
import matplotlib.pyplot as plt

# TensorFlow and Keras
import tensorflow as tf
from tensorflow import keras
from keras import layers

# For downloading Quick, Draw! data (if not already downloaded)
import urllib.request

# For progress bars (optional)
from tqdm import tqdm

print("TensorFlow version:", tf.__version__)
print("NumPy version:", np.__version__)
print(f"Keras version: {tf.keras.__version__}")

TensorFlow version: 2.18.0
NumPy version: 2.0.2
Keras version: 3.8.0


**Setting Random Seeds for Reproducibility**: 42 is a common choice (it's the "Answer to the Ultimate Question of Life, the Universe, and Everything" from The Hitchhiker's Guide to the Galaxy). The important thing is to use the same seed consistently to get the same results.

In [11]:
tf.random.set_seed(42) 
np.random.seed(42)   

In [12]:
# --- Constants ---
IMAGE_SIZE = 28       # Height and width of the images
CHANNELS = 1          # Number of channels in the images (grayscale)
BATCH_SIZE = 64       # Batch size for training
LATENT_DIM = 128      # Dimension of the VAE latent space
LATENT_DIM_GAN = 128  # Dimension of the GAN latent space (noise vector)
EPOCHS = 50           # Number of training epochs (you might need more)
CLASSES = ['cat', 'dog', 'bird', 'tree', 'house']  # Chosen classes
DATA_DIR = "quickdraw_data"  # Directory to store the data
MAX_ITEMS_PER_CLASS = 5000 # Limit the number of samples per class

## 1.2. Data Download and Preparation:

Class Selection: Choose your five classes: cat, dog, bird, tree, house.

Download: Use the provided download_quickdraw_data function (from the "Data Loading and Preparation" section) to download the .npy files for the chosen classes. This function efficiently checks if the files already exist locally.

Load Data: Use the load_quickdraw_data function to load the data into memory.

Important: Limit the number of samples per class using max_items_per_class (e.g., 5000 or 10000) to manage memory usage, especially if you're not using a GPU.

Reshape and Normalize: Reshape the data to (num_samples, 28, 28, 1) and normalize pixel values to the range [0, 1] (or [-1, 1] if you're using tanh in the GAN generator).

Split Data: Divide the data into training and testing sets (e.g., 80% train, 20% test).

One-Hot Encode Labels (Optional): If you plan to implement a Conditional GAN (cGAN) or explore class-conditional generation with the VAE, one-hot encode the labels using to_categorical.

In [13]:
# --- Data Download and Preparation ---
import os
import urllib.request
import numpy as np
from tqdm import tqdm  # Import tqdm

def download_quickdraw_data(classes, base_url="https://storage.googleapis.com/quickdraw_dataset/full/numpy_bitmap/"):
    """Downloads Quick, Draw! data if not already present."""
    for class_name in classes:
        file_name = f"{class_name.replace(' ', '_')}.npy"
        file_path = os.path.join(DATA_DIR, file_name)  # Use DATA_DIR constant
        if not os.path.exists(file_path):
            print(f"Downloading {file_name}...")
            with tqdm(unit="B", unit_scale=True, unit_divisor=1024, miniters=1) as t:
                urllib.request.urlretrieve(base_url + file_name, file_path, reporthook=lambda b, bsize, tsize: t.update(bsize))
        else:
            print(f"{file_name} already downloaded.")

def load_quickdraw_data(classes, max_items_per_class, data_dir=DATA_DIR):
    """Loads Quick, Draw! data into memory."""
    all_data = []
    all_labels = []
    for idx, class_name in enumerate(classes):
        file_name = f"{class_name.replace(' ', '_')}.npy"
        file_path = os.path.join(data_dir, file_name)
        try:
            data = np.load(file_path)
            if data.shape[0] > max_items_per_class:
                data = data[:max_items_per_class]
            all_data.append(data)
            all_labels.append(np.full((data.shape[0],), idx))
        except FileNotFoundError:
            print(f"Error: File not found: {file_path}")
            print("Please ensure the data has been downloaded correctly.")
            return None, None

    return np.concatenate(all_data), np.concatenate(all_labels)

# Download and load data
download_quickdraw_data(CLASSES)  # Use CLASSES constant
x_data, labels = load_quickdraw_data(CLASSES, MAX_ITEMS_PER_CLASS)  # Use CLASSES and MAX_ITEMS_PER_CLASS constants

# Skip if data loading failed
if x_data is None or labels is None:
    print("Data loading failed. Please check previous steps.")
else:
    # Reshape and normalize data
    x_data = x_data.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, CHANNELS).astype('float32') / 255.

    # Split into training and testing sets
    num_samples = len(x_data)
    train_size = int(0.8 * num_samples)
    x_train, x_test = x_data[:train_size], x_data[train_size:]
    y_train, y_test = labels[:train_size], labels[train_size:]

    # One-hot encode labels (optional, for conditional models or classification tasks)
    num_classes = len(CLASSES)
    y_train_oh = tf.keras.utils.to_categorical(y_train, num_classes)
    y_test_oh = tf.keras.utils.to_categorical(y_test, num_classes)

    # Print shapes
    print("x_train shape:", x_train.shape)
    print("y_train shape:", y_train.shape)
    print("x_test shape:", x_test.shape)
    print("y_test shape:", y_test.shape)

cat.npy already downloaded.
dog.npy already downloaded.
bird.npy already downloaded.
tree.npy already downloaded.
house.npy already downloaded.
x_train shape: (20000, 28, 28, 1)
y_train shape: (20000,)
x_test shape: (5000, 28, 28, 1)
y_test shape: (5000,)


# 2. Model Building

In [31]:
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras import ops

# Define the latent dimension
LATENT_DIM = 128

class Sampling(layers.Layer):
    """Sampling layer for VAE"""
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim))
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

# Rest of the code remains the same since it doesn't use K.
def build_improved_vae_encoder(latent_dim):
    """Builds an improved encoder for the VAE."""
    encoder_inputs = keras.Input(shape=(28, 28, 1))
    
    x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, 3, activation="relu", strides=1, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(32, activation="relu")(x)
    x = layers.BatchNormalization()(x)
    
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    
    return keras.Model(encoder_inputs, [z_mean, z_log_var], name="encoder")

def build_improved_vae_decoder(latent_dim):
    """Builds an improved decoder for the VAE."""
    latent_inputs = keras.Input(shape=(latent_dim,))
    
    x = layers.Dense(7 * 7 * 128, activation="relu")(latent_inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Reshape((7, 7, 128))(x)
    
    x = layers.Conv2DTranspose(128, 3, activation="relu", strides=1, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    
    decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
    
    return keras.Model(latent_inputs, decoder_outputs, name="decoder")

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.sampling = Sampling()
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        
    def build(self, input_shape):
        # Build encoder
        self.encoder.build(input_shape)
        # Build decoder with latent dim shape
        latent_shape = (input_shape[0], self.encoder.output_shape[-1])
        self.decoder.build(latent_shape)
        self.built = True
        
    def call(self, inputs):
        # Encode
        z_mean, z_log_var = self.encoder(inputs)
        # Sample
        z = self.sampling([z_mean, z_log_var])
        # Decode
        reconstruction = self.decoder(z)
        
        # Compute losses
        reconstruction_loss = ops.mean(
            ops.sum(
                keras.losses.binary_crossentropy(inputs, reconstruction),
                axis=[1, 2]
            )
        )
        
        kl_loss = -0.5 * ops.mean(
            ops.sum(
                1 + z_log_var - ops.square(z_mean) - ops.exp(z_log_var),
                axis=1
            )
        )
        
        self.add_loss(reconstruction_loss + kl_loss)
        return reconstruction

    def train_step(self, data):
        with tf.GradientTape() as tape:
            reconstruction = self(data, training=True)
            total_loss = self.losses[0]  # Get the total loss we added in call()
            
        grads = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(self.metrics[1].result())
        self.kl_loss_tracker.update_state(self.metrics[2].result())
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

# Build and compile the VAE
encoder = build_improved_vae_encoder(LATENT_DIM)
decoder = build_improved_vae_decoder(LATENT_DIM)
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0005))

# Print model summary
vae.build((None, 28, 28, 1))
vae.summary()

### GAN model

In [34]:
# --- Improved GAN Model ---
import tensorflow as tf
from tensorflow import keras
from keras import layers

LATENT_DIM_GAN = 128  # Dimension of the random noise vector for the generator

def build_improved_gan_generator(latent_dim_gan):
    """Builds an improved generator for the GAN."""
    model = keras.Sequential(name="improved_generator")
    model.add(layers.Dense(7 * 7 * 256, use_bias=False, input_dim=latent_dim_gan))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Reshape((7, 7, 256)))
    model.add(layers.Conv2DTranspose(128, 5, strides=1, padding="same", use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2DTranspose(64, 5, strides=2, padding="same", use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2DTranspose(1, 5, strides=2, padding="same", use_bias=False, activation="tanh"))
    return model

def build_improved_gan_discriminator():
    """Builds an improved discriminator for the GAN."""
    model = keras.Sequential(name="improved_discriminator")
    model.add(layers.Conv2D(64, 5, strides=2, padding="same", input_shape=[28, 28, 1]))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    model.add(layers.Conv2D(128, 5, strides=2, padding="same"))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    model.add(layers.Flatten())
    model.add(layers.Dense(1))
    return model

def build_gan(generator, discriminator):
    """Combines the generator and discriminator into a GAN."""
    discriminator.trainable = False  # Freeze discriminator during GAN training
    gan_input = keras.Input(shape=(LATENT_DIM_GAN,))
    gan_output = discriminator(generator(gan_input))
    gan = keras.Model(gan_input, gan_output, name="gan")
    return gan

# Build the improved GAN
improved_generator = build_improved_gan_generator(LATENT_DIM_GAN)
improved_discriminator = build_improved_gan_discriminator()
improved_gan = build_gan(improved_generator, improved_discriminator)

# Compile models
improved_discriminator.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
                               loss='binary_crossentropy', metrics=['accuracy'])
improved_gan.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
                     loss='binary_crossentropy')

# Print model summaries
improved_generator.summary()
improved_discriminator.summary()
improved_gan.summary()

# 3. Training

## 3.1. VAE Training

In [36]:
# --- Training the VAE ---

epochs = 50  # Adjust the number of epochs as needed
batch_size = 64  # Adjust the batch size as needed

# Use the improved VAE model 'improved_vae'
history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_test, None))

# --- Save the Trained VAE Model ---

vae.save("improved_vae_model.h5")
print("Improved VAE model saved as improved_vae_model.h5")

Epoch 1/50


ValueError: Attr 'Toutput_types' of 'OptionalFromValue' Op passed list of length 0 less than minimum 1.

## 3.2. GAN Training