In [None]:
# GitHub URL: https://github.com/punampaul04/Deep_Learning_Week05_FINAL

In [2]:
#!/usr/bin/env python
# coding: utf-8

"""
## Generative Dog Images Using a GAN with a Custom Training Loop

**Objective:**
Train a Generative Adversarial Network (GAN) to generate realistic dog images from a dataset of dog images.

**Approach:**
- Utilize a custom training loop with `tf.GradientTape()` to manually control forward and backward passes, avoiding internal Keras trainer issues.
- The generator creates images from random noise.
- The discriminator classifies images as real or fake.
- The GAN (generator + frozen discriminator) trains the generator to produce images that the discriminator deems real.

**Dataset:**
- **Path Structure:** `/Users/vedangmehta/Downloads/dog_images/working/Images/<breed_name>/<image_file>.jpg`
- **Preprocessing:** Images are resized to 64x64 pixels and normalized to the range [0, 1].

**Evaluation:**
- **Metric:** MiFID (Memorization-informed Fréchet Inception Distance). Lower values indicate better performance.

**Note:**
Ensure that TensorFlow 2.x is installed in your environment. This code is designed to run in environments where eager execution is enabled.
"""

import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.layers import (
    Dense, LeakyReLU, BatchNormalization, Reshape, Flatten, 
    Conv2D, Conv2DTranspose, Dropout, Input
)
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import load_img, img_to_array

# ----------------------------
# 1. Enable Eager Execution
# ----------------------------

tf.config.run_functions_eagerly(True)
print("Eager execution enabled.")

# ----------------------------
# 2. Data Loading and Preprocessing
# ----------------------------

DATA_DIR = '/Users/vedangmehta/Downloads/dog_images'
WORKING_DIR = os.path.join(DATA_DIR, 'working/Images')

def load_dog_images(image_dir, target_size=(64, 64)):
    print(f"Loading images from: {image_dir}")
    image_paths = []
    for root, _, files in os.walk(image_dir):
        for file in files:
            if file.lower().endswith('.jpg'):
                image_paths.append(os.path.join(root, file))
    
    images = []
    for i, path in enumerate(image_paths):
        try:
            img = load_img(path, target_size=target_size)
            img_array = img_to_array(img) / 255.0
            images.append(img_array)
        except Exception as e:
            print(f"Error loading image {path}: {e}")
        if (i + 1) % 1000 == 0:
            print(f"{i + 1}/{len(image_paths)} images loaded.")
    
    return np.array(images)

dog_images = load_dog_images(WORKING_DIR)
print(f"Number of images loaded: {len(dog_images)}")
print(f"Dataset shape: {dog_images.shape}")

if len(dog_images) == 0:
    raise ValueError("No images found. Please check the dataset extraction and paths.")

# ----------------------------
# 3. Building the GAN Components
# ----------------------------

def build_generator():
    print("Building generator...")
    model = Sequential()
    model.add(Dense(8 * 8 * 256, input_dim=100))
    model.add(LeakyReLU(negative_slope=0.2))
    model.add(Reshape((8, 8, 256)))

    model.add(Conv2DTranspose(128, kernel_size=4, strides=2, padding="same"))
    model.add(LeakyReLU(negative_slope=0.2))
    model.add(BatchNormalization())

    model.add(Conv2DTranspose(64, kernel_size=4, strides=2, padding="same"))
    model.add(LeakyReLU(negative_slope=0.2))
    model.add(BatchNormalization())

    model.add(Conv2DTranspose(3, kernel_size=4, strides=2, padding="same", activation="tanh"))
    print("Generator built successfully.")
    return model

def build_discriminator(input_shape=(64, 64, 3)):
    print("Building discriminator...")
    model = Sequential()
    model.add(Conv2D(64, kernel_size=4, strides=2, padding="same", input_shape=input_shape))
    model.add(LeakyReLU(negative_slope=0.2))
    model.add(Dropout(0.3))

    model.add(Conv2D(128, kernel_size=4, strides=2, padding="same"))
    model.add(LeakyReLU(negative_slope=0.2))
    model.add(Dropout(0.3))

    model.add(Conv2D(256, kernel_size=4, strides=2, padding="same"))
    model.add(LeakyReLU(negative_slope=0.2))
    model.add(Dropout(0.3))

    model.add(Flatten())
    model.add(Dense(1, activation="sigmoid"))
    print("Discriminator built successfully.")
    return model

generator = build_generator()
discriminator = build_discriminator()

# ----------------------------
# 4. Defining Optimizers and Loss Functions
# ----------------------------

generator_optimizer = Adam(0.0002, 0.5)
discriminator_optimizer = Adam(0.0002, 0.5)
bce = tf.keras.losses.BinaryCrossentropy(from_logits=False)

print("Optimizers and loss functions defined.")

# ----------------------------
# 5. Custom Training Loop
# ----------------------------

@tf.function
def train_step(real_images):
    batch_size = tf.shape(real_images)[0]
    noise = tf.random.normal([batch_size, 100])
    real_labels = tf.ones((batch_size, 1))
    fake_labels = tf.zeros((batch_size, 1))

    with tf.GradientTape() as tape_d:
        real_preds = discriminator(real_images, training=True)
        d_loss_real = bce(real_labels, real_preds)

        fake_images = generator(noise, training=True)
        fake_preds = discriminator(fake_images, training=True)
        d_loss_fake = bce(fake_labels, fake_preds)

        d_loss = (d_loss_real + d_loss_fake) * 0.5

    grads_d = tape_d.gradient(d_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(grads_d, discriminator.trainable_variables))

    with tf.GradientTape() as tape_g:
        gen_images = generator(noise, training=True)
        gen_preds = discriminator(gen_images, training=False)
        g_loss = bce(real_labels, gen_preds)

    grads_g = tape_g.gradient(g_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(grads_g, generator.trainable_variables))

    return d_loss, g_loss

def train_gan(generator, discriminator, dataset, epochs=20, batch_size=128):
    print("Starting GAN training...")
    ds = tf.data.Dataset.from_tensor_slices(dataset).shuffle(buffer_size=10000).batch(batch_size, drop_remainder=True)

    for epoch in range(1, epochs + 1):
        print(f"Epoch {epoch}/{epochs} starting...")
        d_losses, g_losses = [], []
        for step, real_batch in enumerate(ds):
            d_loss, g_loss = train_step(real_batch)
            d_losses.append(d_loss)
            g_losses.append(g_loss)
            if step % 50 == 0:
                print(f"Step {step} | D Loss: {d_loss:.4f} | G Loss: {g_loss:.4f}")

        avg_d_loss = tf.reduce_mean(d_losses).numpy()
        avg_g_loss = tf.reduce_mean(g_losses).numpy()
        print(f"Epoch {epoch}/{epochs} completed | Avg D Loss: {avg_d_loss:.4f} | Avg G Loss: {avg_g_loss:.4f}")

train_gan(generator, discriminator, dog_images, epochs=5, batch_size=128)
print("GAN training completed.")


Eager execution enabled.
Loading images from: /Users/vedangmehta/Downloads/dog_images/working/Images
1000/20580 images loaded.
2000/20580 images loaded.
3000/20580 images loaded.
4000/20580 images loaded.
5000/20580 images loaded.
6000/20580 images loaded.
7000/20580 images loaded.
8000/20580 images loaded.
9000/20580 images loaded.
10000/20580 images loaded.
11000/20580 images loaded.
12000/20580 images loaded.
13000/20580 images loaded.
14000/20580 images loaded.
15000/20580 images loaded.
16000/20580 images loaded.
17000/20580 images loaded.
18000/20580 images loaded.
19000/20580 images loaded.
20000/20580 images loaded.
Number of images loaded: 20580
Dataset shape: (20580, 64, 64, 3)
Building generator...
Generator built successfully.
Building discriminator...
Discriminator built successfully.
Optimizers and loss functions defined.
Starting GAN training...
Epoch 1/5 starting...
Step 0 | D Loss: 0.7018 | G Loss: 0.7160
Step 50 | D Loss: 0.0038 | G Loss: 7.2601
Step 100 | D Loss: 0.2