# 🎉 Party Time!  
This notebook provides a comprehensive overview of autoencoders and their applications in deep learning. You will learn how to build and train autoencoders for dimensionality reduction, image denoising, and anomaly detection. The workflow includes:

- [**Basic Autoencoder**](#Autoencoders) 🧠: Learn the fundamentals by compressing and reconstructing Fashion MNIST images using dense layers.
- [**Image Denoising**](#Second-example-Image-denoising) 🧹🖼️: Apply convolutional autoencoders to remove noise from images, demonstrating practical data cleaning techniques.
- [**Anomaly Detection**](#Third-example-Anomaly-detection) 🚨📈: Use autoencoders to identify abnormal patterns in ECG time series data, illustrating unsupervised anomaly detection.
- [**Image-to-Image Translation (pix2pix)**](#pix2pix-Image-to-image-translation-with-a-conditional-GAN) 🏢➡️🏙️: Explore conditional GANs for translating architectural label images into realistic building facades.

Each section includes code, explanations, and visualizations to help you understand the concepts and implementation details.


## Autoencoders

We will explore autoencoders with three examples: the basics, image denoising, and anomaly detection.

An autoencoder is a special type of neural network that is trained to copy its input to its output. For example, given an image of a handwritten digit, an autoencoder first encodes the image into a lower dimensional latent representation, then decodes the latent representation back to an image. An autoencoder learns to compress the data while minimizing the reconstruction error.

### Import TensorFlow and other libraries

In [None]:
!pip install tensorflow

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model

### Load the dataset
To start, you will train the basic autoencoder using the Fashion MNIST dataset. Each image in this dataset is 28x28 pixels.

In [None]:
(x_train, _), (x_test, _) = fashion_mnist.load_data()

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

print (x_train.shape)
print (x_test.shape)

### First example: Basic autoencoder
![Basic autoencoder results](https://www.tensorflow.org/static/tutorials/generative/images/intro_autoencoder_result.png)

Define an autoencoder with two Dense layers: an `encoder`, which compresses the images into a 64 dimensional latent vector, and a `decoder`, that reconstructs the original image from the latent space.

To define your model, use the [Keras Model Subclassing API](https://www.tensorflow.org/guide/keras/custom_layers_and_models).


In [None]:
# Define an Autoencoder class inheriting from tf.keras.Model
class Autoencoder(Model):
  def __init__(self, latent_dim, shape):
    super(Autoencoder, self).__init__()  # Initialize the base class
    self.latent_dim = latent_dim         # Store the size of the latent space
    self.shape = shape                   # Store the original input shape

    # Encoder: flattens input and encodes to latent_dim
    self.encoder = tf.keras.Sequential([
      layers.Flatten(),                  # Flatten input to 1D
      layers.Dense(latent_dim, activation='relu'),  # Dense layer for encoding
    ])

    # Decoder: reconstructs original shape from latent vector
    self.decoder = tf.keras.Sequential([
      layers.Dense(tf.math.reduce_prod(shape).numpy(), activation='sigmoid'),  # Dense layer to expand back to original size
      layers.Reshape(shape)               # Reshape output to original input shape
    ])

  # Forward pass: encode then decode
  def call(self, x):
    encoded = self.encoder(x)             # Encode input
    decoded = self.decoder(encoded)       # Decode latent vector
    return decoded                        # Return reconstruction

# Set the shape and latent dimension for the autoencoder
shape = x_test.shape[1:]                  # Get shape of input images (e.g., (28, 28))
latent_dim = 64                           # Set size of latent space

# Instantiate the Autoencoder model
autoencoder = Autoencoder(latent_dim, shape)


Mean Squared Error (MSE) is commonly used as the loss function for autoencoders working with images because it measures the average squared difference between the original and reconstructed pixel values. This is appropriate for images because:

- **Pixel-wise similarity:** MSE penalizes large differences between corresponding pixels, encouraging the autoencoder to produce reconstructions that are visually similar to the input.
- **Continuous values:** Image data is often represented as continuous values (e.g., pixel intensities between 0 and 1), making MSE a natural choice.
- **Smooth gradients:** MSE provides smooth and stable gradients, which helps neural networks learn effectively during training.

In summary, MSE is simple, effective, and aligns well with the goal of minimizing reconstruction error in image autoencoders.

In [None]:
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

Train the model using `x_train` as both the input and the target. The `encoder` will learn to compress the dataset from 784 dimensions to the latent space, and the `decoder` will learn to reconstruct the original images.
.

In [None]:
autoencoder.fit(x_train, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test, x_test))

Now that the model is trained, let's test it by encoding and decoding images from the test set.

In [None]:
encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()

In [None]:
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
  # display original
  ax = plt.subplot(2, n, i + 1)
  plt.imshow(x_test[i])
  plt.title("original")
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)

  # display reconstruction
  ax = plt.subplot(2, n, i + 1 + n)
  plt.imshow(decoded_imgs[i])
  plt.title("reconstructed")
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)
plt.show()

### Second example: Image denoising


![Image denoising results](https://www.tensorflow.org/static/tutorials/generative/images/image_denoise_fmnist_results.png)

An autoencoder can also be trained to remove noise from images. In the following section, you will create a noisy version of the Fashion MNIST dataset by applying random noise to each image. You will then train an autoencoder using the noisy image as input, and the original image as the target.

Let's reimport the dataset to omit the modifications made earlier.

In [None]:
(x_train, _), (x_test, _) = fashion_mnist.load_data()

In [None]:
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

print(x_train.shape)

Adding random noise to the images

In [None]:
noise_factor = 0.2  # Set the amount of noise to add

# Add random Gaussian noise to the training images
x_train_noisy = x_train + noise_factor * tf.random.normal(shape=x_train.shape)
# Add random Gaussian noise to the test images
x_test_noisy = x_test + noise_factor * tf.random.normal(shape=x_test.shape)

# Clip the noisy training images to be between 0 and 1
x_train_noisy = tf.clip_by_value(x_train_noisy, clip_value_min=0., clip_value_max=1.)
# Clip the noisy test images to be between 0 and 1
x_test_noisy = tf.clip_by_value(x_test_noisy, clip_value_min=0., clip_value_max=1.)

Plot the noisy images.


In [None]:
n = 10
plt.figure(figsize=(20, 2))
for i in range(n):
    ax = plt.subplot(1, n, i + 1)
    plt.title("original + noise")
    plt.imshow(tf.squeeze(x_test_noisy[i]))
    plt.gray()
plt.show()

#### Define a convolutional autoencoder

In this example, you will train a convolutional autoencoder using  [Conv2D](https://www.tensorflow.org/api_docs/python/tf/keras/layers/Conv2D) layers in the `encoder`, and [Conv2DTranspose](https://www.tensorflow.org/api_docs/python/tf/keras/layers/Conv2DTranspose) layers in the `decoder`.

`Conv2DTranspose` is a type of convolutional layer often called a "deconvolution" or "upsampling" layer. It performs the reverse operation of a standard `Conv2D` layer: instead of reducing the spatial dimensions (height and width) of the input, it increases them.

- **Conv2D**: Used in the encoder part of an autoencoder to extract features and reduce the spatial size of the input (downsampling).
- **Conv2DTranspose**: Used in the decoder part to reconstruct the original image size from the compressed representation (upsampling).

**Why not use Conv2D in the decoder?**

- `Conv2D` reduces spatial dimensions, which is the opposite of what we want in the decoder.
- The decoder needs to upsample (increase) the spatial dimensions to reconstruct the original image, which is exactly what `Conv2DTranspose` does.

In summary, use `Conv2D` for downsampling (encoder) and `Conv2DTranspose` for upsampling (decoder) in convolutional autoencoders.

In [None]:
# Define a convolutional autoencoder model for image denoising
class Denoise(Model):
  def __init__(self):
    super(Denoise, self).__init__()  # Initialize the base Model class

    # Define the encoder as a Sequential model
    self.encoder = tf.keras.Sequential([
      layers.Input(shape=(28, 28, 1)),  # Input layer for 28x28 grayscale images
      layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),  # Downsample with 16 filters
      layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)    # Further downsample with 8 filters
    ])

    # Define the decoder as a Sequential model
    self.decoder = tf.keras.Sequential([
      layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),   # Upsample with 8 filters
      layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),  # Further upsample with 16 filters
      layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')                # Output layer to reconstruct the image
    ])

  # Define the forward pass
  def call(self, x):
    encoded = self.encoder(x)   # Pass input through encoder
    decoded = self.decoder(encoded)  # Pass encoded output through decoder
    return decoded             # Return the reconstructed image

# Instantiate the Denoise autoencoder model
autoencoder = Denoise()

In [None]:
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

In [None]:
autoencoder.fit(x_train_noisy, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test_noisy, x_test))

Let's take a look at a summary of the encoder. Notice how the images are downsampled from 28x28 to 7x7.

In [None]:
autoencoder.encoder.summary()

The decoder upsamples the images back from 7x7 to 28x28.

In [None]:
autoencoder.decoder.summary()

Plotting both the noisy images and the denoised images produced by the autoencoder.

In [None]:
encoded_imgs = autoencoder.encoder(x_test_noisy).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()

In [None]:
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):

    # display original + noise
    ax = plt.subplot(2, n, i + 1)
    plt.title("original + noise")
    plt.imshow(tf.squeeze(x_test_noisy[i]))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # display reconstruction
    bx = plt.subplot(2, n, i + n + 1)
    plt.title("reconstructed")
    plt.imshow(tf.squeeze(decoded_imgs[i]))
    plt.gray()
    bx.get_xaxis().set_visible(False)
    bx.get_yaxis().set_visible(False)
plt.show()

### Third example: Anomaly detection

#### Overview


In this example, you will train an autoencoder to detect anomalies on the [ECG5000 dataset](http://www.timeseriesclassification.com/description.php?Dataset=ECG5000). This dataset contains 5,000 [Electrocardiograms](https://en.wikipedia.org/wiki/Electrocardiography), each with 140 data points. You will use a simplified version of the dataset, where each example has been labeled either `0` (corresponding to an abnormal rhythm), or `1` (corresponding to a normal rhythm). You are interested in identifying the abnormal rhythms.

Note: This is a labeled dataset, so you could phrase this as a supervised learning problem. The goal of this example is to illustrate anomaly detection concepts you can apply to larger datasets, where you do not have labels available (for example, if you had many thousands of normal rhythms, and only a small number of abnormal rhythms).

How will you detect anomalies using an autoencoder? Recall that an autoencoder is trained to minimize reconstruction error. You will train an autoencoder on the normal rhythms only, then use it to reconstruct all the data. Our hypothesis is that the abnormal rhythms will have higher reconstruction error. You will then classify a rhythm as an anomaly if the reconstruction error surpasses a fixed threshold.

#### Load ECG data

The dataset you will use is based on one from [timeseriesclassification.com](http://www.timeseriesclassification.com/description.php?Dataset=ECG5000).


**What is ECG data?**

ECG (Electrocardiogram) data is a time series recording of the electrical activity of the heart. Each ECG sample typically consists of a sequence of voltage measurements taken at regular intervals, representing the heart's rhythm and electrical conduction patterns. These signals are used by clinicians to detect and diagnose various cardiac conditions, such as arrhythmias, heart attacks, and other abnormalities.

In machine learning, ECG data is often represented as a 1D array (or vector) of numerical values, where each value corresponds to the electrical potential measured at a specific time point. For example, in the ECG5000 dataset used above, each ECG record contains 140 data points. The dataset may also include labels indicating whether the rhythm is normal or abnormal, which can be used for supervised or unsupervised learning tasks such as anomaly detection.

In [None]:
# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()

In [None]:
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

Normalize the data to `[0,1]`.


In [None]:
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

You will train the autoencoder using only the normal rhythms, which are labeled in this dataset as `1`. Separate the normal rhythms from the abnormal rhythms.

In [None]:
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

Plot a normal ECG.

In [None]:
plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()

Plot an anomalous ECG.

In [None]:
plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

#### Build the model

In [None]:
class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])

    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(140, activation="sigmoid")])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

In [None]:
autoencoder.compile(optimizer='adam', loss='mae')

Notice that the autoencoder is trained using only the normal ECGs, but is evaluated using the full test set.

In [None]:
history = autoencoder.fit(normal_train_data, normal_train_data,
          epochs=20,
          batch_size=512,
          validation_data=(test_data, test_data),
          shuffle=True)

In [None]:
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()

You will soon classify an ECG as anomalous if the reconstruction error is greater than one standard deviation from the normal training examples. First, let's plot a normal ECG from the training set, the reconstruction after it's encoded and decoded by the autoencoder, and the reconstruction error.

In [None]:
encoded_data = autoencoder.encoder(normal_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(normal_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], normal_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

Create a similar plot, this time for an anomalous test example.

In [None]:
encoded_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(anomalous_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], anomalous_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

#### Detect anomalies

Detect anomalies by calculating whether the reconstruction loss is greater than a fixed threshold. In this tutorial, you will calculate the mean average error for normal examples from the training set, then classify future examples as anomalous if the reconstruction error is higher than one standard deviation from the training set.


Plot the reconstruction error on normal ECGs from the training set

In [None]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

Choose a threshold value that is one standard deviations above the mean.

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

Note: There are other strategies you could use to select a threshold value above which test examples should be classified as anomalous, the correct approach will depend on your dataset.

If you examine the reconstruction error for the anomalous examples in the test set, you'll notice most have greater reconstruction error than the threshold. By varing the threshold, you can adjust the [precision](https://developers.google.com/machine-learning/glossary#precision) and [recall](https://developers.google.com/machine-learning/glossary#recall) of your classifier.

In [None]:
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

Classify an ECG as an anomaly if the reconstruction error is greater than the threshold.

In [None]:
def predict(model, data, threshold):
  # Use the model to reconstruct the input data
  reconstructions = model(data)
  # Calculate the mean absolute error between the reconstructions and the original data
  loss = tf.keras.losses.mae(reconstructions, data)
  # Return True if the loss is less than the threshold (i.e., normal), otherwise False (anomaly)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  # Print the accuracy of the predictions compared to the true labels
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  # Print the precision of the predictions
  print("Precision = {}".format(precision_score(labels, predictions)))
  # Print the recall of the predictions
  print("Recall = {}".format(recall_score(labels, predictions)))

In [None]:
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)

## pix2pix: Image-to-image translation with a conditional GAN

This tutorial demonstrates how to build and train a conditional generative adversarial network (cGAN) called pix2pix that learns a mapping from input images to output images, as described in [Image-to-image translation with conditional adversarial networks](https://arxiv.org/abs/1611.07004) by Isola et al. (2017). pix2pix is not application specific—it can be applied to a wide range of tasks, including synthesizing photos from label maps, generating colorized photos from black and white images, turning Google Maps photos into aerial images, and even transforming sketches into photos.

In this example, your network will generate images of building facades using the [CMP Facade Database](http://cmp.felk.cvut.cz/~tylecr1/facade/) provided by the [Center for Machine Perception](http://cmp.felk.cvut.cz/) at the [Czech Technical University in Prague](https://www.cvut.cz/). To keep it short, you will use a [preprocessed copy](https://efrosgans.eecs.berkeley.edu/pix2pix/datasets/) of this dataset created by the pix2pix authors.

In the pix2pix cGAN, you condition on input images and generate corresponding output images. cGANs were first proposed in [Conditional Generative Adversarial Nets](https://arxiv.org/abs/1411.1784) (Mirza and Osindero, 2014)

The architecture of your network will contain:

- A generator with a [U-Net](https://arxiv.org/abs/1505.04597)-based architecture.
- A discriminator represented by a convolutional PatchGAN classifier (proposed in the [pix2pix paper](https://arxiv.org/abs/1611.07004)).

Note that each epoch can take around 15 seconds on a single V100 GPU.

Below are some examples of the output generated by the pix2pix cGAN after training for 200 epochs on the facades dataset (80k steps).

![sample output_1](https://www.tensorflow.org/images/gan/pix2pix_1.png)
![sample output_2](https://www.tensorflow.org/images/gan/pix2pix_2.png)

### Import TensorFlow and other libraries

In [None]:
import tensorflow as tf

import os
import pathlib
import time
import datetime

from matplotlib import pyplot as plt
from IPython import display

### Load the dataset

Download the CMP Facade Database data (30MB). Additional datasets are available in the same format [here](http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/). In Colab you can select other datasets from the drop-down menu. Note that some of the other datasets are significantly larger (`edges2handbags` is 8GB in size).

In [None]:
dataset_name = "facades" #@param ["cityscapes", "edges2handbags", "edges2shoes", "facades", "maps", "night2day"]


In [None]:
_URL = f'http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/{dataset_name}.tar.gz'

path_to_zip = tf.keras.utils.get_file(
    fname=f"{dataset_name}.tar.gz",
    origin=_URL,
    extract=True)

path_to_zip  = pathlib.Path(path_to_zip)

PATH = path_to_zip/dataset_name

In [None]:
list(PATH.parent.iterdir())

Each original image is of size `256 x 512` containing two `256 x 256` images:

In [None]:
sample_img_path = str(PATH / os.path.join('train', '1.jpg'))
print(sample_img_path)
sample_image = tf.io.read_file(sample_img_path)
sample_image = tf.io.decode_jpeg(sample_image)
print(sample_image.shape)

In [None]:
plt.figure()
plt.imshow(sample_image)

You need to separate real building facade images from the architecture label images—all of which will be of size `256 x 256`.

Define a function that loads image files and outputs two image tensors:

In [None]:
def load(image_file):
  # Read and decode an image file to a uint8 tensor
  image = tf.io.read_file(image_file)
  image = tf.io.decode_jpeg(image)

  # Split each image tensor into two tensors:
  # - one with a real building facade image
  # - one with an architecture label image
  w = tf.shape(image)[1]
  w = w // 2
  input_image = image[:, w:, :]
  real_image = image[:, :w, :]

  # Convert both images to float32 tensors
  input_image = tf.cast(input_image, tf.float32)
  real_image = tf.cast(real_image, tf.float32)

  return input_image, real_image

Plot a sample of the input (architecture label image) and real (building facade photo) images:

In [None]:
inp, re = load(str(PATH / 'train/100.jpg'))
# Casting to int for matplotlib to display the images
plt.figure()
plt.imshow(inp / 255.0)
plt.figure()
plt.imshow(re / 255.0)

As described in the [pix2pix paper](https://arxiv.org/abs/1611.07004), you need to apply random jittering and mirroring to preprocess the training set.

Define several functions that:

1. Resize each `256 x 256` image to a larger height and width—`286 x 286`.
2. Randomly crop it back to `256 x 256`.
3. Randomly flip the image horizontally i.e., left to right (random mirroring).
4. Normalize the images to the `[-1, 1]` range.

**Jittering in pix2pix**

Jittering is a data augmentation technique used in pix2pix to improve the robustness and generalization of the model. It involves randomly resizing the input images to a slightly larger size (e.g., from 256×256 to 286×286), then randomly cropping them back to the original size (256×256), and randomly flipping them horizontally.

This process helps the model learn to handle small spatial variations and prevents overfitting by exposing it to more diverse training examples. Jittering is especially important in image-to-image translation tasks, where the model needs to generalize well to unseen data and not just memorize the training set.

In [None]:
# The facade training set consist of 400 images
BUFFER_SIZE = 400
# The batch size of 1 produced better results for the U-Net in the original pix2pix experiment
BATCH_SIZE = 1
# Each image is 256x256 in size
IMG_WIDTH = 256
IMG_HEIGHT = 256

In [None]:
def resize(input_image, real_image, height, width):
  input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
  real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

  return input_image, real_image

In [None]:
def random_crop(input_image, real_image):
  stacked_image = tf.stack([input_image, real_image], axis=0)
  cropped_image = tf.image.random_crop(
      stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 3])

  return cropped_image[0], cropped_image[1]

In [None]:
# Normalizing the images to [-1, 1]
def normalize(input_image, real_image):
  input_image = (input_image / 127.5) - 1
  real_image = (real_image / 127.5) - 1

  return input_image, real_image

In [None]:
@tf.function()
def random_jitter(input_image, real_image):
  # Resizing to 286x286
  input_image, real_image = resize(input_image, real_image, 286, 286)

  # Random cropping back to 256x256
  input_image, real_image = random_crop(input_image, real_image)

  if tf.random.uniform(()) > 0.5:
    # Random mirroring
    input_image = tf.image.flip_left_right(input_image)
    real_image = tf.image.flip_left_right(real_image)

  return input_image, real_image

You can inspect some of the preprocessed output:

In [None]:
plt.figure(figsize=(6, 6))
for i in range(4):
  rj_inp, rj_re = random_jitter(inp, re)
  plt.subplot(2, 2, i + 1)
  plt.imshow(rj_inp / 255.0)
  plt.axis('off')
plt.show()

Having checked that the loading and preprocessing works, let's define a couple of helper functions that load and preprocess the training and test sets:

In [None]:
def load_image_train(image_file):
  input_image, real_image = load(image_file)
  input_image, real_image = random_jitter(input_image, real_image)
  input_image, real_image = normalize(input_image, real_image)

  return input_image, real_image

In [None]:
def load_image_test(image_file):
  input_image, real_image = load(image_file)
  input_image, real_image = resize(input_image, real_image,
                                   IMG_HEIGHT, IMG_WIDTH)
  input_image, real_image = normalize(input_image, real_image)

  return input_image, real_image

### Build an input pipeline with `tf.data`

In [None]:
train_dataset = tf.data.Dataset.list_files(str(PATH / 'train/*.jpg'))
train_dataset = train_dataset.map(load_image_train,
                                  num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)

In [None]:
try:
  test_dataset = tf.data.Dataset.list_files(str(PATH / 'test/*.jpg'))
except tf.errors.InvalidArgumentError:
  test_dataset = tf.data.Dataset.list_files(str(PATH / 'val/*.jpg'))
test_dataset = test_dataset.map(load_image_test)
test_dataset = test_dataset.batch(BATCH_SIZE)

### Build the generator

The generator of your pix2pix cGAN is a _modified_ [U-Net](https://arxiv.org/abs/1505.04597). A U-Net consists of an encoder (downsampler) and decoder (upsampler). (You can find out more about it in the [Image segmentation](../images/segmentation.ipynb) tutorial and on the [U-Net project website](https://lmb.informatik.uni-freiburg.de/people/ronneber/u-net/).)

- Each block in the encoder is: Convolution -> Batch normalization -> Leaky ReLU
- Each block in the decoder is: Transposed convolution -> Batch normalization -> Dropout (applied to the first 3 blocks) -> ReLU
- There are skip connections between the encoder and decoder (as in the U-Net).

Define the downsampler (encoder):

In [None]:
OUTPUT_CHANNELS = 3

In [None]:
def downsample(filters, size, apply_batchnorm=True):
  # Initialize the weights with a normal distribution
  initializer = tf.random_normal_initializer(0., 0.02)

  # Create a Sequential model
  result = tf.keras.Sequential()
  # Add a Conv2D layer with the given number of filters and kernel size
  result.add(
      tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

  # Optionally add BatchNormalization for faster and more stable training
  if apply_batchnorm:
    result.add(tf.keras.layers.BatchNormalization())

  # Add LeakyReLU activation for non-linearity
  result.add(tf.keras.layers.LeakyReLU())

  return result

In [None]:
down_model = downsample(3, 4)
down_result = down_model(tf.expand_dims(inp, 0))
print (down_result.shape)

Define the upsampler (decoder):

In [None]:
def upsample(filters, size, apply_dropout=False):
  # Initialize the weights with a normal distribution
  initializer = tf.random_normal_initializer(0., 0.02)

  # Create a Sequential model for the upsampling block
  result = tf.keras.Sequential()
  # Add a Conv2DTranspose layer to upsample the input
  result.add(
    tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False))

  # Add BatchNormalization for faster and more stable training
  result.add(tf.keras.layers.BatchNormalization())

  # Optionally add Dropout for regularization (only in first 3 blocks of decoder)
  if apply_dropout:
      result.add(tf.keras.layers.Dropout(0.5))

  # Add ReLU activation for non-linearity
  result.add(tf.keras.layers.ReLU())

  return result

In [None]:
up_model = upsample(3, 4)
up_result = up_model(down_result)
print (up_result.shape)

Define the generator with the downsampler and the upsampler:

In [None]:
def Generator():
  # Define the input layer with shape (256, 256, 3)
  inputs = tf.keras.layers.Input(shape=[256, 256, 3])

  # Create the encoder (downsampling stack) using downsample blocks
  down_stack = [
    downsample(64, 4, apply_batchnorm=False),  # First block, no batchnorm
    downsample(128, 4),  # Second block
    downsample(256, 4),  # Third block
    downsample(512, 4),  # Fourth block
    downsample(512, 4),  # Fifth block
    downsample(512, 4),  # Sixth block
    downsample(512, 4),  # Seventh block
    downsample(512, 4),  # Eighth block
  ]

  # Create the decoder (upsampling stack) using upsample blocks
  up_stack = [
    upsample(512, 4, apply_dropout=True),  # First block, with dropout
    upsample(512, 4, apply_dropout=True),  # Second block, with dropout
    upsample(512, 4, apply_dropout=True),  # Third block, with dropout
    upsample(512, 4),  # Fourth block
    upsample(256, 4),  # Fifth block
    upsample(128, 4),  # Sixth block
    upsample(64, 4),   # Seventh block
  ]

  # Initialize the weights for the last layer
  initializer = tf.random_normal_initializer(0., 0.02)
  # Define the last layer to get the output image with tanh activation
  last = tf.keras.layers.Conv2DTranspose(
      OUTPUT_CHANNELS, 4,
      strides=2,
      padding='same',
      kernel_initializer=initializer,
      activation='tanh')  # Output shape: (batch_size, 256, 256, 3)

  x = inputs  # Start with the input

  # Downsampling through the encoder, saving skip connections
  skips = []
  for down in down_stack:
    x = down(x)      # Apply downsampling block
    skips.append(x)  # Save output for skip connection

  # Reverse all but the last skip for use in upsampling
  skips = reversed(skips[:-1])

  # Upsampling and adding skip connections from encoder
  for up, skip in zip(up_stack, skips):
    x = up(x)                              # Apply upsampling block
    x = tf.keras.layers.Concatenate()([x, skip])  # Add skip connection

  x = last(x)  # Apply the last layer to get the final output

  # Return the Keras Model
  return tf.keras.Model(inputs=inputs, outputs=x)

Visualize the generator model architecture:

In [None]:
generator = Generator()
tf.keras.utils.plot_model(generator, show_shapes=True, dpi=64)

Test the generator:

In [None]:
gen_output = generator(inp[tf.newaxis, ...], training=False)
plt.imshow(gen_output[0, ...])

#### Define the generator loss

GANs learn a loss that adapts to the data, while cGANs learn a structured loss that penalizes a possible structure that differs from the network output and the target image, as described in the [pix2pix paper](https://arxiv.org/abs/1611.07004).

- The generator loss is a sigmoid cross-entropy loss of the generated images and an **array of ones**.
- The pix2pix paper also mentions the L1 loss, which is a MAE (mean absolute error) between the generated image and the target image.
- This allows the generated image to become structurally similar to the target image.
- The formula to calculate the total generator loss is `gan_loss + LAMBDA * l1_loss`, where `LAMBDA = 100`. This value was decided by the authors of the paper.

In [None]:
LAMBDA = 100

#### Why BinaryCrossentropy Loss is Needed in pix2pix

In pix2pix, a conditional GAN (cGAN) is used for image-to-image translation. The GAN framework consists of two neural networks: the **generator** and the **discriminator**. The generator tries to produce realistic images from input data, while the discriminator tries to distinguish between real images and those produced by the generator.

##### BinaryCrossentropy Loss

The **BinaryCrossentropy** loss is used because the discriminator's task is a binary classification: it must decide whether each image patch is real (from the dataset) or fake (generated). In pix2pix, the discriminator is a PatchGAN, which outputs a matrix where each element corresponds to a small patch of the input image. Each patch is classified as real or fake, so BinaryCrossentropy is applied to every patch.

##### Does the Discriminator Classify the Whole Image or Each Pixel?

The discriminator does **not** classify the whole image as real or fake, nor does it classify each pixel individually. Instead, it classifies overlapping patches (e.g., 70x70 pixels) within the image. The output is a grid (e.g., 30x30) of predictions, each representing the probability that a corresponding patch is real. This encourages the generator to produce realistic details at the patch level.

##### Generator Loss

The generator's loss has two components:
- **GAN loss**: Encourages the generator to produce images that the discriminator classifies as real (for all patches). This is computed as BinaryCrossentropy between an array of ones (targeting "real") and the discriminator's output for generated images.
- **L1 loss**: Measures the mean absolute error between the generated image and the target image, encouraging structural similarity.

The total generator loss is:


In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def generator_loss(disc_generated_output, gen_output, target):
  # GAN loss: how well the generator fools the discriminator
  gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

  # L1 loss: mean absolute error between generated image and target image
  l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

  # Total generator loss: GAN loss + weighted L1 loss
  total_gen_loss = gan_loss + (LAMBDA * l1_loss)

  # Return all losses for logging and optimization
  return total_gen_loss, gan_loss, l1_loss

The training procedure for the generator is as follows:

![Generator Update Image](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/generative/images/gen.png?raw=1)


### Build the discriminator

The discriminator in the pix2pix cGAN is a convolutional PatchGAN classifier—it tries to classify if each image _patch_ is real or not real, as described in the [pix2pix paper](https://arxiv.org/abs/1611.07004).

- Each block in the discriminator is: Convolution -> Batch normalization -> Leaky ReLU.
- The shape of the output after the last layer is `(batch_size, 30, 30, 1)`.
- Each `30 x 30` image patch of the output classifies a `70 x 70` portion of the input image.
- The discriminator receives 2 inputs:
    - The input image and the target image, which it should classify as real.
    - The input image and the generated image (the output of the generator), which it should classify as fake.
    - Use `tf.concat([inp, tar], axis=-1)` to concatenate these 2 inputs together.

In [None]:
def Discriminator():
  initializer = tf.random_normal_initializer(0., 0.02)

  # Input layers for the input image and the target image
  inp = tf.keras.layers.Input(shape=[256, 256, 3], name='input_image')
  tar = tf.keras.layers.Input(shape=[256, 256, 3], name='target_image')

  # Concatenate the input and target images along the channel axis
  # Shape: (batch_size, 256, 256, 6)
  x = tf.keras.layers.concatenate([inp, tar])

  # First downsampling block: Conv2D -> (optional) BatchNorm -> LeakyReLU
  # Reduces spatial size, increases channels
  down1 = downsample(64, 4, False)(x)  # (batch_size, 128, 128, 64)

  # Second downsampling block
  down2 = downsample(128, 4)(down1)    # (batch_size, 64, 64, 128)

  # Third downsampling block
  down3 = downsample(256, 4)(down2)    # (batch_size, 32, 32, 256)

  # Zero padding to increase spatial dimensions
  zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)  # (batch_size, 34, 34, 256)

  # Convolution to extract features, stride=1 keeps spatial size
  conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)  # (batch_size, 31, 31, 512)

  # Batch normalization for stable training
  batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

  # LeakyReLU activation for non-linearity
  # LeakyReLU is used instead of ReLU to avoid the "dying ReLU" problem,
  # where neurons can become inactive and only output zero. LeakyReLU allows
  # a small, non-zero gradient when the unit is not active, which helps gradients
  # flow through the network and improves training stability for GANs.
  leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

  # Zero padding before the last layer
  zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)  # (batch_size, 33, 33, 512)

  # Final convolution: outputs a single-channel prediction map
  # Each value represents real/fake for a patch
  last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2)  # (batch_size, 30, 30, 1)

  # Return the Keras Model
  return tf.keras.Model(inputs=[inp, tar], outputs=last)

Visualize the discriminator model architecture:

In [None]:
discriminator = Discriminator()
tf.keras.utils.plot_model(discriminator, show_shapes=True, dpi=64)

Test the discriminator:

In [None]:
disc_out = discriminator([inp[tf.newaxis, ...], gen_output], training=False)
plt.imshow(disc_out[0, ..., -1], vmin=-20, vmax=20, cmap='RdBu_r')
plt.colorbar()

#### Define the discriminator loss

- The `discriminator_loss` function takes 2 inputs: **real images** and **generated images**.
- `real_loss` is a sigmoid cross-entropy loss of the **real images** and an **array of ones(since these are the real images)**.
- `generated_loss` is a sigmoid cross-entropy loss of the **generated images** and an **array of zeros (since these are the fake images)**.
- The `total_loss` is the sum of `real_loss` and `generated_loss`.

In [None]:
def discriminator_loss(disc_real_output, disc_generated_output):
  # Calculate loss for real images (should be classified as real/ones)
  real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

  # Calculate loss for generated images (should be classified as fake/zeros)
  generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

  # Total discriminator loss is the sum of real and generated losses
  total_disc_loss = real_loss + generated_loss

  return total_disc_loss

The training procedure for the discriminator is shown below.

To learn more about the architecture and the hyperparameters you can refer to the [pix2pix paper](https://arxiv.org/abs/1611.07004).

![Discriminator Update Image](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/generative/images/dis.png?raw=1)


### Define the optimizers and a checkpoint-saver


In [None]:
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [None]:
# Directory where checkpoints will be saved during training
checkpoint_dir = './training_checkpoints'

# Prefix for checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

# Create a TensorFlow checkpoint object to manage saving and restoring models and optimizers
checkpoint = tf.train.Checkpoint(
    generator_optimizer=generator_optimizer,      # Save generator optimizer state
    discriminator_optimizer=discriminator_optimizer,  # Save discriminator optimizer state
    generator=generator,                         # Save generator model weights
    discriminator=discriminator                  # Save discriminator model weights
)

### Generate images

Write a function to plot some images during training.

- Pass images from the test set to the generator.
- The generator will then translate the input image into the output.
- The last step is to plot the predictions and _voila_!

Note: The `training=True` is intentional here since you want the batch statistics, while running the model on the test dataset. If you use `training=False`, you get the accumulated statistics learned from the training dataset (which you don't want).

In [None]:
def generate_images(model, test_input, tar):
  prediction = model(test_input, training=True)
  plt.figure(figsize=(15, 15))

  display_list = [test_input[0], tar[0], prediction[0]]
  title = ['Input Image', 'Ground Truth', 'Predicted Image']

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # Getting the pixel values in the [0, 1] range to plot.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()

Test the function:

In [None]:
for example_input, example_target in test_dataset.take(1):
  generate_images(generator, example_input, example_target)

### Training

- For each example input generates an output.
- The discriminator receives the `input_image` and the generated image as the first input. The second input is the `input_image` and the `target_image`.
- Next, calculate the generator and the discriminator loss.
- Then, calculate the gradients of loss with respect to both the generator and the discriminator variables(inputs) and apply those to the optimizer.
- Finally, log the losses to TensorBoard.

In [None]:
# Set the directory for TensorBoard logs
log_dir = "logs/"

# Create a summary writer for TensorBoard.
# The logs will be saved in a subdirectory named with the current date and time.
# This allows you to visualize training metrics in TensorBoard.
summary_writer = tf.summary.create_file_writer(
  log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
@tf.function
def train_step(input_image, target, step):
  # Record operations for automatic differentiation for generator and discriminator
  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    # Generate an output image from the input using the generator (forward pass)
    gen_output = generator(input_image, training=True)

    # Get discriminator's output for real image pairs (input and ground truth)
    disc_real_output = discriminator([input_image, target], training=True)
    # Get discriminator's output for fake image pairs (input and generated output)
    disc_generated_output = discriminator([input_image, gen_output], training=True)

    # Compute generator losses: total loss, GAN loss, and L1 loss
    gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(
        disc_generated_output, gen_output, target)
    # Compute discriminator loss (real vs. fake)
    disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

  # Calculate gradients of generator loss w.r.t. generator's trainable variables
  generator_gradients = gen_tape.gradient(gen_total_loss,
                                          generator.trainable_variables)
  # Calculate gradients of discriminator loss w.r.t. discriminator's trainable variables
  discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

  # Apply gradients to update generator weights
  generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
  # Apply gradients to update discriminator weights
  discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))

  # Write loss values to TensorBoard for visualization
  with summary_writer.as_default():
    tf.summary.scalar('gen_total_loss', gen_total_loss, step=step//1000)
    tf.summary.scalar('gen_gan_loss', gen_gan_loss, step=step//1000)
    tf.summary.scalar('gen_l1_loss', gen_l1_loss, step=step//1000)
    tf.summary.scalar('disc_loss', disc_loss, step=step//1000)

The actual training loop. Since this tutorial can run of more than one dataset, and the datasets vary greatly in size the training loop is setup to work in steps instead of epochs.

- Iterates over the number of steps.
- Every 10 steps print a dot (`.`).
- Every 1k steps: clear the display and run `generate_images` to show the progress.
- Every 5k steps: save a checkpoint.

In [None]:
def fit(train_ds, test_ds, steps):
  # Get one example input and target from the test dataset for visualization
  example_input, example_target = next(iter(test_ds.take(1)))
  # Record the start time for timing training steps
  start = time.time()

  # Iterate over the training dataset for the specified number of steps
  for step, (input_image, target) in train_ds.repeat().take(steps).enumerate():
    # Every 1000 steps, clear the output and display progress
    if (step) % 1000 == 0:
      # Clear previous output in the notebook for a cleaner display
      display.clear_output(wait=True)

      # If not the first step, print the time taken for the last 1000 steps
      if step != 0:
        print(f'Time taken for 1000 steps: {time.time()-start:.2f} sec\n')

      # Reset the timer for the next 1000 steps
      start = time.time()

      # Generate and display images using the generator for visual progress
      generate_images(generator, example_input, example_target)
      # Print the current step in thousands (k)
      print(f"Step: {step//1000}k")

    # Perform one training step (update generator and discriminator)
    train_step(input_image, target, step)

    # Print a dot every 10 steps to indicate progress
    if (step+1) % 10 == 0:
      print('.', end='', flush=True)

    # Save a checkpoint every 5000 steps to preserve model state
    if (step + 1) % 5000 == 0:
      checkpoint.save(file_prefix=checkpoint_prefix)

This training loop saves logs that you can view in TensorBoard to monitor the training progress.

If you work on a local machine, you would launch a separate TensorBoard process. When working in a notebook, launch the viewer before starting the training to monitor with TensorBoard.

Launch the TensorBoard viewer (Sorry, this doesn't
display on tensorflow.org):

In [None]:
%load_ext tensorboard
%tensorboard --logdir {log_dir}

In [None]:
fit(train_dataset, test_dataset, steps=10000)

Interpreting the logs is more subtle when training a GAN (or a cGAN like pix2pix) compared to a simple classification or regression model. Things to look for:

- Check that neither the generator nor the discriminator model has "won". If either the `gen_gan_loss` or the `disc_loss` gets very low, it's an indicator that this model is dominating the other, and you are not successfully training the combined model.
- The value `log(2) = 0.69` is a good reference point for these losses, as it indicates a perplexity of 2 - the discriminator is, on average, equally uncertain about the two options.
- For the `disc_loss`, a value below `0.69` means the discriminator is doing better than random on the combined set of real and generated images.
- For the `gen_gan_loss`, a value below `0.69` means the generator is doing better than random at fooling the discriminator.
- As training progresses, the `gen_l1_loss` should go down.

## Restore the latest checkpoint and test the network

In [None]:
!ls {checkpoint_dir}

In [None]:
# Restoring the latest checkpoint in checkpoint_dir
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

## Generate some images using the test set

In [None]:
# Run the trained model on a few examples from the test set
for inp, tar in test_dataset.take(5):
  generate_images(generator, inp, tar)