<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/ufidon/ml/blob/main/mod6/gen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
  </td>
  <td>
    <a target="_blank" href="https://kaggle.com/kernels/welcome?src=https://github.com/ufidon/ml/blob/main/mod6/gen.ipynb"><img src="https://kaggle.com/static/images/open-in-kaggle.svg" /></a>
  </td>
</table>
<br>

Autoencoders, GANs, and Diffusion Models
---
_homl3 ch17_

- `Autoencoders` are ANNs capable of learning `dense representations with a much lower dimensionality` of the input data, called `latent representations or codings`, without any supervision
  - useful for visualization, feature detectors,  unsupervised pretraining of DNNs
  - some autoencoders are generative models capable of `randomly generating new data` that looks very similar to the training data
- `Generative adversarial networks (GANs)` are also ANNs capable of generating data
  - widely used for super resolution, colorization, powerful image editing, 
  - turning simple sketches into photorealistic images, 
  - predicting the next frames in a video, 
  - augmenting a dataset, generating other types of data
- `Diffusion Models` can generate more diverse and higher-quality images than GANs
  - while also being much easier to train
  - However, they are much slower to run

In [None]:
# Colab: Go to Runtime > Change runtime and select a GPU hardware
# Kaggle: Go to Settings > Accelerator and select GPU
# ⚠️ It may take more than one day to run the whole notebook without GPU
import sys, os, math, copy
from pathlib import Path



if "google.colab" in sys.modules:
    %pip install -q -U transformers
    %pip install -q -U datasets
else:
    os.environ["TF_USE_LEGACY_KERAS"] = "1"

from functools import partial
import numpy as np, pandas as pd, matplotlib.pyplot as plt, matplotlib as mpl
import sklearn as skl, sklearn.datasets as skds
import tensorflow as tf, tensorflow_datasets as tfds

💡 Demo
---
- [This person does not exist](https://thispersondoesnotexist.com/)
  - refresh the webpage and look

- Autoencoders, GANs, and diffusion models are `unsupervised` and all learn `latent representations`
  - have many similar applications such as `generative` models
- But they work very differently
  - Autoencoders learn efficient ways of representing the data by constraints on learning the identity function
  - GANs are composed of two neural networks: a `generator` that tries to generate data that looks similar to the training data, and a `discriminator` that tries to tell real data from fake data
  - A `denoising diffusion probabilistic model (DDPM)` is trained to remove a tiny bit of noise from an image
    - repeatedly run the diffusion model on a noisy image
      - a high-quality image will gradually emerge

# Autoencoder
- (p1) always composed of two parts:
  - an `encoder (or recognition network)` that converts the inputs to a latent representation
  - a `decoder (or generative network)` that converts the internal representation to the outputs
- typically has the same architecture as a multilayer perceptron (MLP)
  - except that the number of neurons in the output layer must be equal to the number of inputs
- The outputs are often called the `reconstructions` 
  - because the autoencoder tries to reconstruct the inputs
- The cost function contains a `reconstruction loss` that penalizes the model when the reconstructions are different from the inputs
- The autoencoder is said to be `undercomplete` if 
  - its internal representation has a lower dimensionality than the input data
  - so it is forced to learn the `most important features` in the input data
    - and drop the unimportant ones
- If the coding layer is equal to or larger than the inputs
  - the autoencoder is called `overcomplete`


🍎 Example 1
---
Given two number sequences below, which one is easier to remember?
- 40, 27, 25, 36, 81, 57, 10, 73, 19, 68
  - need to remember 10 random 2-digit numbers
- 50, 48, 46, 44, 42, 40, 38, 36, 34, 32, 30, 28, 26, 24, 22, 20, 18, 16, 14
  - `even numbers from 50 to 14`
  - only 50 and 14 two 2-digit numbers need to be remembered after recognizing its pattern
  - a condensed and effective `latent representation`

In [None]:
# 1. Performing PCA with an Undercomplete Linear Autoencoder
# 1) An autoencoder ends up performing principal component analysis (PCA) if 
#   it uses only linear activations 
#   and the cost function is the mean squared error (MSE)

import tensorflow as tf

encoder = tf.keras.Sequential([tf.keras.layers.Dense(2)])
decoder = tf.keras.Sequential([tf.keras.layers.Dense(3)])
autoencoder = tf.keras.Sequential([encoder, decoder])

optimizer = tf.keras.optimizers.SGD(learning_rate=0.5)
autoencoder.compile(loss="mse", optimizer=optimizer)

In [None]:
# 2) (p2) builds the same 3D dataset as in Chapter 8 

from scipy.spatial.transform import Rotation

m = 60
X = np.zeros((m, 3))  # initialize 3D dataset

angles = (np.random.rand(m) ** 3 + 0.5) * 2 * np.pi  # uneven distribution
X[:, 0], X[:, 1] = np.cos(angles), np.sin(angles) * 0.5  # oval
X += 0.28 * np.random.randn(m, 3)  # add more noise
X = Rotation.from_rotvec([np.pi / 29, -np.pi / 20, np.pi / 4]).apply(X)
X_train = X + [0.2, 0, 0.2]  # shift a bit

In [None]:
# 3) train the model then encoding
# X_train is used as both the inputs and the targets

history = autoencoder.fit(X_train, X_train, epochs=500, verbose=False)
codings = encoder.predict(X_train)

In [None]:
# 4) (p2) visualize the latent space
fig1, ax1 = plt.subplots(figsize=(4,3))
ax1.plot(codings[:,0], codings[:, 1], "b.")
ax1.set_xlabel("$z_1$", fontsize=18)
ax1.set_ylabel("$z_2$", fontsize=18, rotation=0)
ax1.grid(True)
ax1.set_title("2D codings of the 3D training set");

- (p2) shows the original 3D dataset and the output of the autoencoder’s hidden layer
  - i.e., the coding layer
- the autoencoder found the `best 2D plane` to project the data onto
   - `preserving as much variance` in the data as it could (just like PCA)
 - it is considered as performing a form of `self-supervised` learning
   - since it is based on a supervised learning technique with automatically generated labels
     - in this case simply equal to the inputs

# Stacked Autoencoders
- have `multiple hidden` layers
  - also called `deep autoencoders`
  - more layers help the autoencoder learn more complex codings
    - ⚠️ be aware of overfitting without learning any useful patterns
- (p3) typically symmetrical with regard to the central hidden layer (the coding layer)

In [None]:
# 1. Build a Stacked Autoencoder Using Keras
#  
# 1) loads, scales, and splits the fashion MNIST dataset
fashion_mnist = tf.keras.datasets.fashion_mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist
X_train_full = X_train_full.astype(np.float32) / 255
X_test = X_test.astype(np.float32) / 255
X_train, X_valid = X_train_full[:-5000], X_train_full[-5000:]
y_train, y_valid = y_train_full[:-5000], y_train_full[-5000:]

In [None]:
# 2) build a stacked Autoencoder 
# with 3 hidden layers and 1 output layer # (i.e., 2 stacked Autoencoders).
# much like a regular deep MLP
# a) The encoder takes inputs 28 × 28–pixel grayscale images
stacked_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(30, activation="relu"),
])

# b) The decoder takes inputs codings of size 30 (output by the encoder)
# reshapes the final vectors into 28 × 28 arrays 
# so the decoder’s outputs have the same shape as the encoder’s inputs

stacked_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])
stacked_ae = tf.keras.Sequential([stacked_encoder, stacked_decoder])

stacked_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 3) train the stacked Autoencoder                   
history = stacked_ae.fit(X_train, X_train, epochs=20,
                         validation_data=(X_valid, X_valid))

In [None]:
# 4) Visualizing the Reconstructions
# The reconstructions are recognizable, but a bit too lossy
# but remember that the images were compressed down 
#   to just 30 numbers, instead of 784
def plot_reconstructions(model, images=X_valid, n_images=5,
                         title="Original images (top) and their reconstructions (bottom)"):
    reconstructions = np.clip(model.predict(images[:n_images]), 0, 1)
    fig = plt.figure(figsize=(n_images * 1.5, 3))
    plt.title(title)
    for image_index in range(n_images):
        plt.subplot(2, n_images, 1 + image_index)
        plt.imshow(images[image_index], cmap="binary")
        plt.axis("off")
        plt.subplot(2, n_images, 1 + n_images + image_index)
        plt.imshow(reconstructions[image_index], cmap="binary")
        plt.axis("off")

plot_reconstructions(stacked_ae);

In [None]:
# 5) Visualizing the Fashion MNIST Dataset
# compared to other dimensionality reduction algorithms
#   autoencoders can handle large datasets 
#    with many instances and many features
# A strategy of visualization
# a) use an autoencoder to reduce the dimensionality down to a reasonable level
# b) then use another dimensionality reduction algorithm for visualization

from sklearn.manifold import TSNE

# a) use the stacked autoencoder to reduce the dimensionality down to 30
X_valid_compressed = stacked_encoder.predict(X_valid)
# b) then use t-SNE to reduce the dimensionality down to 2 for visualization
tsne = TSNE(init="pca", learning_rate="auto")
X_valid_2D = tsne.fit_transform(X_valid_compressed)

In [None]:
# c) visualize
plt.scatter(X_valid_2D[:, 0], X_valid_2D[:, 1], c=y_valid, s=10, cmap="tab10");

In [None]:
# d) better visualization
plt.figure(figsize=(10, 8))
cmap = plt.cm.tab10
Z = X_valid_2D
Z = (Z - Z.min()) / (Z.max() - Z.min())  # normalize to the 0-1 range
plt.scatter(Z[:, 0], Z[:, 1], c=y_valid, s=10, cmap=cmap)
image_positions = np.array([[1., 1.]])
for index, position in enumerate(Z):
    dist = ((position - image_positions) ** 2).sum(axis=1)
    if dist.min() > 0.02: # if far enough from other images
        image_positions = np.r_[image_positions, [position]]
        imagebox = mpl.offsetbox.AnnotationBbox(
            mpl.offsetbox.OffsetImage(X_valid[index], cmap="binary"),
            position, bboxprops={"edgecolor": cmap(y_valid[index]), "lw": 2})
        plt.gca().add_artist(imagebox)

plt.axis("off");

Unsupervised Pretraining Using Stacked Autoencoders
---
- (p4) first train a stacked autoencoder using all the data
  - this is a large dataset but most of it is `unlabeled`
- then `reuse the lower layers` to create a neural network for the actual task 
  - and train it using the labeled data
  - freeze the pretrained layers (at least the lower ones)

Two techniques for training stacked autoencoders
---
- Tying weights
  - Tying the weights of the decoder layers to the weights of the encoder layers
    - halves the number of weights in the model
    - speeds up training and limits the risk of overfitting
- (p5) Training one autoencoder at a time
  - train one shallow autoencoder at a time
    - encode the whole training set using this autoencoder to get a new (compressed) training set
    - then train the next autoencoder on this new dataset and so on
  - finally stack all these autoencoders into a single stacked autoencoder
    - first stack the hidden layers of each autoencoder
    - then the output layers in reverse order

In [None]:
# 1. Tying weights
# 
class DenseTranspose(tf.keras.layers.Layer):
    def __init__(self, dense, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.dense = dense
        self.activation = tf.keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.biases = self.add_weight(name="bias",
                                      shape=self.dense.input_shape[-1],
                                      initializer="zeros")
        super().build(batch_input_shape)

    def call(self, inputs):
        Z = tf.matmul(inputs, self.dense.weights[0], transpose_b=True)
        return self.activation(Z + self.biases)


dense_1 = tf.keras.layers.Dense(100, activation="relu")
dense_2 = tf.keras.layers.Dense(30, activation="relu")

tied_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    dense_1,
    dense_2
])

tied_decoder = tf.keras.Sequential([
    DenseTranspose(dense_2, activation="relu"),
    DenseTranspose(dense_1),
    tf.keras.layers.Reshape([28, 28])
])

tied_ae = tf.keras.Sequential([tied_encoder, tied_decoder])

# compiles the model
tied_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 2) train the model
history = tied_ae.fit(X_train, X_train, epochs=10,
                      validation_data=(X_valid, X_valid)) 

In [None]:
# 3) show the reconstructions
plot_reconstructions(tied_ae)     

In [None]:
# 2. Training one autoencoder at a time
# 1) train autoencoder
def train_autoencoder(n_neurons, X_train, X_valid, n_epochs=10,
                      output_activation=None):
    n_inputs = X_train.shape[-1]
    encoder = tf.keras.layers.Dense(n_neurons, activation="relu")
    decoder = tf.keras.layers.Dense(n_inputs, activation=output_activation)
    autoencoder = tf.keras.Sequential([encoder, decoder])
    autoencoder.compile(loss="mse", optimizer="nadam")
    autoencoder.fit(X_train, X_train, epochs=n_epochs,
                    validation_data=(X_valid, X_valid))
    return encoder, decoder, encoder(X_train), encoder(X_valid)

In [None]:
# 2) train 2 autoencoders
X_train_flat = tf.keras.layers.Flatten()(X_train)
X_valid_flat = tf.keras.layers.Flatten()(X_valid)
enc1, dec1, X_train_enc1, X_valid_enc1 = train_autoencoder(
    100, X_train_flat, X_valid_flat)
enc2, dec2, _, _ = train_autoencoder(
    30, X_train_enc1, X_valid_enc1, output_activation="relu")

In [None]:
# 3) stack the two autoencoders
stacked_ae_1_by_1 = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    enc1, enc2, dec2, dec1,
    tf.keras.layers.Reshape([28, 28])
])

In [None]:
# 4) visualize the stacked autoencoder's generated fashions
plot_reconstructions(stacked_ae_1_by_1)

# Convolutional Autoencoders
- the previous autoencoders are built with normal DNNs
  - work with very small images
- it is natural to build [autoencoders with CNNs](https://homl.info/convae) (convolutional autoencoders) for large images
  - The encoder is a regular CNN composed of convolutional layers and pooling layers
    - reduces the spatial dimensionality of the inputs (i.e., height and width)
    - increases the depth (i.e., the number of feature maps)
  - The decoder must do the reverse
    - upscales the image and reduce its depth back to the original dimensions
      - with transpose convolutional layers 
      - by combining upsampling layers with convolutional layers

In [None]:
# 1. build a convolutional autoencoder for Fashion MNIST
# 1) with 3 hidden layers and 1 output layer 
# 

conv_encoder = tf.keras.Sequential([
    tf.keras.layers.Reshape([28, 28, 1]),
    tf.keras.layers.Conv2D(16, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2),  # output: 14 × 14 x 16
    tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2),  # output: 7 × 7 x 32
    tf.keras.layers.Conv2D(64, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2),  # output: 3 × 3 x 64
    
    tf.keras.layers.Conv2D(30, 3, padding="same", activation="relu"),
    tf.keras.layers.GlobalAvgPool2D()  # output: 30
])
conv_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(3 * 3 * 16),
    tf.keras.layers.Reshape((3, 3, 16)),
    tf.keras.layers.Conv2DTranspose(32, 3, strides=2, activation="relu"),
    tf.keras.layers.Conv2DTranspose(16, 3, strides=2, padding="same",
                                    activation="relu"),
    tf.keras.layers.Conv2DTranspose(1, 3, strides=2, padding="same"),
    tf.keras.layers.Reshape([28, 28])
])
conv_ae = tf.keras.Sequential([conv_encoder, conv_decoder])

# compiles the model
conv_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 2) train the model
history = conv_ae.fit(X_train, X_train, epochs=10,
                      validation_data=(X_valid, X_valid))

In [None]:
# 3) shows the reconstructions
plot_reconstructions(conv_ae)

Recurrent autoencoders
---
- built with RNNs

In [None]:
# 1. build a recurrent autoencoder
# treat each Fashion MNIST image as a sequence of 28 vectors, 
#   each with 28 dimensions:

recurrent_encoder = tf.keras.Sequential([
    tf.keras.layers.LSTM(100, return_sequences=True),
    tf.keras.layers.LSTM(30)
])
recurrent_decoder = tf.keras.Sequential([
    tf.keras.layers.RepeatVector(28),
    tf.keras.layers.LSTM(100, return_sequences=True),
    tf.keras.layers.Dense(28)
])
recurrent_ae = tf.keras.Sequential([recurrent_encoder, recurrent_decoder])
recurrent_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 2) train the model
history = recurrent_ae.fit(X_train, X_train, epochs=10,
                           validation_data=(X_valid, X_valid))

In [None]:
# 3) show the reconstructions
plot_reconstructions(recurrent_ae)

More types of autoencoders
---
- Denoising autoencoders
- Sparse autoencoders
- Variational autoencoders

[Denoising Autoencoders](https://homl.info/114)
---
- (p6) The noise on the inputs can be 
  - pure `Gaussian noise`
  - or `randomly switched-off` inputs just like in dropout
- (p7) implemented by applying to the encoder’s inputs
  - a GaussianNoise layer
  - or an additional Dropout layer
  - both layers are only active during training to add noise

In [None]:
# 1. build a denoising autoencoder with dropout
dropout_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(30, activation="relu")
])
dropout_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])
dropout_ae = tf.keras.Sequential([dropout_encoder, dropout_decoder])

# 2) compiles the model
dropout_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 3) train the model
history = dropout_ae.fit(X_train, X_train, epochs=10,
                         validation_data=(X_valid, X_valid))

In [None]:
# 4) denoisy inputs
dropout = tf.keras.layers.Dropout(0.5)
plot_reconstructions(dropout_ae, dropout(X_valid, training=True),
                     title="Noisy images (top) and their cleaned (bottom)")

In [None]:
# 5) denoisy inputs obscured by Gaussian noise
noise = tf.keras.layers.GaussianNoise(0.2)
plot_reconstructions(dropout_ae, noise(X_valid, training=True),
                     title="Noisy images (top) and their cleaned (bottom)")

Sparse autoencoders (SAEs)
---
- often have good feature extraction by applying the constraint of `sparsity`
- reduce the number of active neurons in the coding layer by adding an appropriate term to the cost function
  - each neuron in the coding layer typically ends up representing a useful feature
- A simple approach of introducing sparsity
  - the encoders
    - use the `sigmoid` activation function in the coding layer
      - to limit the codings in range (0,1)
    - use a large coding layer with some ℓ₁ regularization
      - to preserve the most important codings while eliminating unimportant ones
  - the decoders are normal decoder
- Another approach of introducing sparsity
  - measure the actual sparsity of the coding layer at each training iteration
    - by computing the average activation of each neuron in the coding layer over the whole training batch
  - penalize the model when the measured sparsity differs from a target sparsity
    - (p8) by adding to the cost function a `sparsity loss` such as
      - the mean squared error (MSE)
      - the mean absolute error (MAE)
      - or the Kullback–Leibler (KL) divergence
        - which has much stronger gradients than MSE

The KL divergence
---
- ${D_{KL}(P \Vert Q) }$ between two discrete probability distributions P and Q can be computed
  - ${\displaystyle D_{KL}(P \Vert Q) = \sum_{i}P(i)\log\dfrac{P(i)}{Q(i)} }$
- Measuring the divergence between the `target probability p` that a neuron in the coding layer will activate and the `actual probability q` estimated by measuring the `mean activation over the training batch` by:
  - ${\displaystyle D_{KL}(p \Vert q) = p\log\dfrac{p}{q} + (1-p)\log\dfrac{(1-p)}{(1-q)} }$
  - then sum up all neuron's losses and add the result to the cost function
- Multiplying the sparsity loss by a sparsity weight hyperparameter can control the `relative importance` of the sparsity loss and the reconstruction loss

In [None]:
# 1. build a SAE with ℓ₁ regularization
# 1) use the sigmoid activation function in the coding layer. 
# also add ℓ₁ regularization to it by adding an `ActivityRegularization` layer 
# after the coding layer. Alternatively, 
# we could add `activity_regularizer=tf.keras.regularizers.l1(1e-4)` 
# to the coding layer itself.

sparse_l1_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(300, activation="sigmoid"),
    tf.keras.layers.ActivityRegularization(l1=1e-4)
])
sparse_l1_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])
sparse_l1_ae = tf.keras.Sequential([sparse_l1_encoder, sparse_l1_decoder])

#  compiles the model
sparse_l1_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 2) train the model
history = sparse_l1_ae.fit(X_train, X_train, epochs=10,
                           validation_data=(X_valid, X_valid))

In [None]:
# 3) shows the reconstructions
plot_reconstructions(sparse_l1_ae)

In [None]:
# 2. build a SAE with KL-Divergence regularization
# 1) define a custom regularizer for KL-Divergence regularization:

kl_divergence = tf.keras.losses.kullback_leibler_divergence

class KLDivergenceRegularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, weight, target):
        self.weight = weight
        self.target = target

    def __call__(self, inputs):
        mean_activities = tf.reduce_mean(inputs, axis=0)
        return self.weight * (
            kl_divergence(self.target, mean_activities) +
            kl_divergence(1. - self.target, 1. - mean_activities))

In [None]:
# 2) use this regularizer to push the model to have 
# about 10% sparsity in the coding layer:
kld_reg = KLDivergenceRegularizer(weight=5e-3, target=0.1)
sparse_kl_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(300, activation="sigmoid",
                          activity_regularizer=kld_reg)
])
sparse_kl_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])
sparse_kl_ae = tf.keras.Sequential([sparse_kl_encoder, sparse_kl_decoder])

# compiles the model
sparse_kl_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 3) train the model
history = sparse_kl_ae.fit(X_train, X_train, epochs=10,
                           validation_data=(X_valid, X_valid))

In [None]:
# 4) shows the reconstructions
plot_reconstructions(sparse_kl_ae)

[Variational autoencoders (VAEs)](https://homl.info/115)
---
- `probabilistic` autoencoders whose outputs are partly determined by chance
- `generative` autoencoders that can generate new instances that look like training instances
- (p9) perform `variational Bayesian inference`
  - the encoder learns a mean coding μ and a standard deviation σ
  - The actual coding is then sampled randomly from a Gaussian distribution ${\displaystyle {\mathcal {N}}(\mu ,\sigma ^{2})}$
  - the decoder decodes the `sampled coding` normally
  - the final output `resembles` the training instance

VAE cost function
---
composed of two parts:
- ❶ the usual `reconstruction loss` that pushes the autoencoder to reproduce its inputs
  - can be measured with `MSE`
- ❷ the `latent loss` that pushes the autoencoder to have codings that look sampled from a simple Gaussian distribution
  - can be measured with the `KL divergence` between the target distribution and the actual distribution of the codings
  - computed by ${\displaystyle ℒ = -\dfrac{1}{2}\sum_{i=1}^{n}\left( 1+\log(σ_i^2) - σ_i^2 - μ_i^2 \right) }$
    - `n` is the codings’ `dimensionality`
    - `μᵢ and σᵢ` are the `mean and standard deviation` of the iᵗʰ component of the codings
    - The vectors $\boldsymbol{μ}$ and $\boldsymbol{σ}$ (which contain all the μᵢ and σᵢ) are output by the encoder
  - A common tweak to the VAE’s architecture is to make the encoder output ${γ = \log(\boldsymbol{σ}^2)}$ rather than $\boldsymbol{σ}$
    - ${\displaystyle ℒ = -\dfrac{1}{2}\sum_{i=1}^{n}\left( 1+ γ_i - e^{γ_i} - μ_i^2 \right) }$

In [None]:
# 1. build a VAE for Fashion MNIST
# 1) define a custom layer to sample the codings, given μ and γ
# This samples a codings vector from the Gaussian distribution N(μ,σ²)

class Sampling(tf.keras.layers.Layer):
    def call(self, inputs):
        mean, log_var = inputs
        return tf.random.normal(tf.shape(log_var)) * tf.exp(log_var / 2) + mean 

In [None]:
# 2) create the encoder using the functional API
codings_size = 10

inputs = tf.keras.layers.Input(shape=[28, 28])
Z = tf.keras.layers.Flatten()(inputs)
Z = tf.keras.layers.Dense(150, activation="relu")(Z)
Z = tf.keras.layers.Dense(100, activation="relu")(Z)
codings_mean = tf.keras.layers.Dense(codings_size)(Z)  # μ
codings_log_var = tf.keras.layers.Dense(codings_size)(Z)  # γ
codings = Sampling()([codings_mean, codings_log_var])
variational_encoder = tf.keras.Model(
    inputs=[inputs], outputs=[codings_mean, codings_log_var, codings])

# 3) create the decoder
decoder_inputs = tf.keras.layers.Input(shape=[codings_size])
x = tf.keras.layers.Dense(100, activation="relu")(decoder_inputs)
x = tf.keras.layers.Dense(150, activation="relu")(x)
x = tf.keras.layers.Dense(28 * 28)(x)
outputs = tf.keras.layers.Reshape([28, 28])(x)
variational_decoder = tf.keras.Model(inputs=[decoder_inputs], outputs=[outputs])

In [None]:
# 4) build the VAE
_, _, codings = variational_encoder(inputs)
reconstructions = variational_decoder(codings)
variational_ae = tf.keras.Model(inputs=[inputs], outputs=[reconstructions])

# add the latent loss
latent_loss = -0.5 * tf.reduce_sum(
    1 + codings_log_var - tf.exp(codings_log_var) - tf.square(codings_mean),
    axis=-1)
variational_ae.add_loss(tf.reduce_mean(latent_loss) / 784.)

# compile the model
variational_ae.compile(loss="mse", optimizer="nadam")

In [None]:
# 5) train the model
history = variational_ae.fit(X_train, X_train, epochs=25, batch_size=128,
                             validation_data=(X_valid, X_valid))

In [None]:
# 6) show reconstructions
plot_reconstructions(variational_ae)

In [None]:
# 2. Generating Fashion MNIST Images with VAE
# 1) use the VAE above to generate images that look like fashion items
# by sampling random codings from a Gaussian distribution then decoding them

codings = tf.random.normal(shape=[3 * 7, codings_size])
images = variational_decoder(codings).numpy()

In [None]:
# 2) show the generated fashions
def plot_multiple_images(images, n_cols=None):
    n_cols = n_cols or len(images)
    n_rows = (len(images) - 1) // n_cols + 1
    if images.shape[-1] == 1:
        images = images.squeeze(axis=-1)
    plt.figure(figsize=(n_cols, n_rows))
    for index, image in enumerate(images):
        plt.subplot(n_rows, n_cols, index + 1)
        plt.imshow(image, cmap="binary")
        plt.axis("off")

plot_multiple_images(images, 7)

Semantic interpolation
---
- can be done with VAEs at the codings level instead of at the pixel level
- ex. take a few codings along an `arbitrary line in latent space` and decode them

In [None]:
# 1. Semantic interpolation
# get a sequence of images that gradually go from one fashion to another
codings = np.zeros([7, codings_size])
codings[:, 6] = np.linspace(0.8, -0.8, 7)  # axis 3 looks best in this case
images = variational_decoder(codings).numpy()

plot_multiple_images(images)

[Generative Adversarial Networks](https://homl.info/gan)
---
- Idea:  
  - make neural networks compete against each other 
  - in the hope that this competition will push them to excel
- Structure: (p10) a GAN is composed of two neural networks:
  - Generator
    - offers the same functionality as a decoder in a VAE
    - takes as input a random distribution and outputs some data typically an image
      - the random input is the `latent representation` of the image to be generated
  - Discriminator
    - takes as input either a fake image from the generator 
      - or a real image from the training set
    - then guesses whether the input image is fake or real
- Training:
  - the generator and the discriminator have opposite goals
    - the discriminator tries to tell fake images from real images
    - the generator tries to produce images that look real enough to trick the discriminator
  - ∴ cannot be trained like a regular neural network
- Each training iteration is divided into two phases:
  - ❶ train the `discriminator` as a `binary classifier`
    - the inputs are real images from the training set and fake images produced by the generator
    - the labels are set to 0 for fake images and 1 for real images
    - backpropagation only optimizes the `weights of the discriminator`
      - the generator's weights are frozen during this phase
  - ❷ train the generator
    - first the generator is used to produce `another` batch of fake images
    - then the discriminator is used to tell whether these `new` images are fake or real
      - all the new images are labeled as 1 so that the discriminator will wrongly believe to be real
    - similarly, backpropagation only updates the weights of the generator
      - the weights of the discriminator are frozen during this phase

In [None]:
# 1. build a simple GAN for Fashion MNIST
# 1) build the generator and the discriminator. 

codings_size = 30

Dense = tf.keras.layers.Dense
# The generator is similar to an autoencoder’s decoder, 
generator = tf.keras.Sequential([
    Dense(100, activation="relu", kernel_initializer="he_normal"),
    Dense(150, activation="relu", kernel_initializer="he_normal"),
    Dense(28 * 28, activation="sigmoid"),
    tf.keras.layers.Reshape([28, 28])
])

# the discriminator is a regular binary classifier
discriminator = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    Dense(150, activation="relu", kernel_initializer="he_normal"),
    Dense(100, activation="relu", kernel_initializer="he_normal"),
    Dense(1, activation="sigmoid")
])
gan = tf.keras.Sequential([generator, discriminator])

In [None]:
# 2) The gan model is also a binary classifier
# the generator will only be trained through the gan model, 
# so we do not need to compile it at all.

# the discriminator is trainable by itself
discriminator.compile(loss="binary_crossentropy", optimizer="rmsprop")

# the discriminator should not be trained during the second phase
discriminator.trainable = False
gan.compile(loss="binary_crossentropy", optimizer="rmsprop")

In [None]:
# 3) create the training scheme

# a) create a Dataset to iterate through the images
batch_size = 32
dataset = tf.data.Dataset.from_tensor_slices(X_train).shuffle(1000)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(1)

# repeat the two training phases
def train_gan(gan, dataset, batch_size, codings_size, n_epochs):
    generator, discriminator = gan.layers
    for epoch in range(n_epochs):
        print(f"Epoch {epoch + 1}/{n_epochs}") 
        for X_batch in dataset:
            # phase 1 - training the discriminator
            noise = tf.random.normal(shape=[batch_size, codings_size])
            generated_images = generator(noise)
            
            X_fake_and_real = tf.concat([generated_images, X_batch], axis=0)
            y1 = tf.constant([[0.]] * batch_size + [[1.]] * batch_size)
            discriminator.train_on_batch(X_fake_and_real, y1)
            
            # phase 2 - training the generator
            noise = tf.random.normal(shape=[batch_size, codings_size])
            y2 = tf.constant([[1.]] * batch_size)
            gan.train_on_batch(noise, y2)
        # plot images during training
        plot_multiple_images(generated_images.numpy(), 8)
        plt.show()

In [None]:
# 4) train the GAN
train_gan(gan, dataset, batch_size, codings_size, n_epochs=50)

In [None]:
# 5) generate new images by feeding the generator 
# with randomly sampled codings from a Gaussian distribution
codings = tf.random.normal(shape=[batch_size, codings_size])
generated_images = generator.predict(codings)

# show the image
plot_multiple_images(generated_images, 8)

The Difficulties of Training GANs
---
- the training is a `zero-sum` game that the generator and the discriminator constantly try to outsmart each other
- the game may end up in a state `Nash equilibrium` named by game theorists
  - when no player would be better off changing their own strategy
    - assuming the other players do not change theirs
- Different initial states and dynamics may lead to one equilibrium or the other
  - a GAN can only reach a single Nash equilibrium
    - when the generator produces perfectly realistic images 
    - and the discriminator is forced to guess randomly
  - However, nothing guarantees the equilibrium will ever be reached
- The biggest difficulty is called `mode collapse`
  - when the generator’s outputs gradually become less diverse
  - `ex`. Suppose that the generator gets better at producing convincing shoes than any other class
    - It will fool the discriminator a bit more with shoes, and this will encourage it to produce even more images of shoes
    - Gradually, it will `forget` how to produce anything else
  - Meanwhile, the only fake images that the discriminator will see will be shoes
    - so it will also forget how to discriminate fake images of other classes
  - Eventually, when the discriminator manages to discriminate the fake shoes from the real ones
    - the generator will be forced to move to another class
    - It may then become good at shirts, forgetting about shoes, and the discriminator will follow
  - The GAN may gradually `cycle across a few classes`, never really becoming very good at any of them
- Moreover, GAN's parameters may end up `oscillating and becoming unstable`
  - because the generator and the discriminator are constantly pushing against each other
- GANs are `very sensitive` to the hyperparameters
  - you may have to spend a lot of effort fine-tuning them
  - ex. optimizer `RMSProp` is better than `Nadam` in the GAN above

Approaches for training GANs
---
- [experience replay](https://homl.info/gansequal) stores images produced by the generator at each iteration in a cycle buffer
  - training the discriminator using real images plus fake images drawn from this buffer rather than just fake images produced by the current generator
  - This reduces the chances that the discriminator will overfit the latest generator’s outputs
- `mini-batch discrimination` measures how similar images are across the batch and provides this statistic to the discriminator 
  - so it can easily reject a whole batch of fake images that lack diversity
  - This encourages the generator to produce a greater `variety` of images
    - reducing the chance of mode collapse
- this is still a very active field of research 
   - the dynamics of GANs are still not perfectly understood

Two more powerful and more complex GANs
---
- Deep Convolutional GANs
- StyleGANs

Deep Convolutional GANs
---
- abbreviated as [DCGANs](https://homl.info/dcgan)
- The main guidelines for building stable convolutional GANs:
  - Replace any pooling layers with `strided` convolutions in the `discriminator`
    - and `transposed` convolutions in the `generator`
  - Use `batch normalization` in both the generator and the discriminator
    - except in the generator’s output layer and the discriminator’s input layer
  - Remove fully connected hidden layers for deeper architectures
  - Use `ReLU` activation in the generator for all layers 
    - except the output layer which should use `tanh`
  - Use leaky `ReLU` activation in the discriminator for all layers
- These guidelines will work in many cases but not always
  - better experiment with different hyperparameters
  - even just changing the random seed and training the exact same model again will sometimes work

In [None]:
# 1. build a small DCGAN with Fashion MNIST
# Feel free to tweak this architecture: 
#   you will see how sensitive it is to the hyperparameters
#   especially the relative learning rates of the two networks

codings_size = 100

# image size: 7x7 → 14x14 → 28x28
# depth: 128 → 64 → 1
generator = tf.keras.Sequential([
    tf.keras.layers.Dense(7 * 7 * 128),
    tf.keras.layers.Reshape([7, 7, 128]),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2DTranspose(64, kernel_size=5, strides=2,
                                    padding="same", activation="relu"),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2DTranspose(1, kernel_size=5, strides=2,
                                    padding="same", activation="tanh"),
])
# The discriminator looks much like a regular CNN 
#   for binary classification, except instead of 
#   using max pooling layers to downsample the image

discriminator = tf.keras.Sequential([
    tf.keras.layers.Conv2D(64, kernel_size=5, strides=2, padding="same",
                        activation=tf.keras.layers.LeakyReLU(0.2)),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Conv2D(128, kernel_size=5, strides=2, padding="same",
                        activation=tf.keras.layers.LeakyReLU(0.2)),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(1, activation="sigmoid")
])
gan = tf.keras.Sequential([generator, discriminator])

In [None]:
# 2) compiles the discrimator and the gan, as earlier
discriminator.compile(loss="binary_crossentropy", optimizer="rmsprop")
discriminator.trainable = False
gan.compile(loss="binary_crossentropy", optimizer="rmsprop")

In [None]:
# 3) reshape to add the channel dimension
# and rescale from [-1,1] to [0,1]
X_train_dcgan = X_train.reshape(-1, 28, 28, 1) * 2. - 1. 

In [None]:
# 4) generates the dataset and trains the GAN, just like earlier
batch_size = 32
dataset = tf.data.Dataset.from_tensor_slices(X_train_dcgan)
dataset = dataset.shuffle(1000)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(1)
train_gan(gan, dataset, batch_size, codings_size, n_epochs=50)

In [None]:
# 5) generate some fashions with the DCGAN above
noise = tf.random.normal(shape=[batch_size, codings_size])
generated_images = generator.predict(noise)
plot_multiple_images(generated_images, 8)

🏃 Exercise
---
- Scale up the GAN above and train it on a large dataset of faces, flowers, or animals
  - Can you get fairly realistic images?

DCGAN semantic operation
---
- (p11) DCGANs can learn quite `meaningful latent representations`
  - each image is generated from a `coding vector`
  - each column: the bottom image is generated from the average of the codings of the images above it
  - bottom row: arithmetic operations on the codings imply semantic operations on the images they represent
    - The eight other images around it were generated based on the same vector plus a bit of `noise`
- [Conditional GANs (CGANs)](https://homl.info/cgan) can generate classes of images with image class as an extra input
  - trained by adding each image's class as an extra input to both the generator and the discriminator
    - they will both learn what each class looks like

[Progressive Growing of GANs](https://homl.info/progan)
---
- gradually `adds convolutional layers` to both the generator and the discriminator to produce larger and larger images
  - The extra layers get added at the end of the generator 
    - and at the beginning of the discriminator
  - previously trained layers remain trainable
- ex. (p12) grows the generator’s outputs from `4 × 4` to `8 × 8`
  - The final outputs are a weighted sum = `the new outputs×α` + `the original outputs×(1 – α)` 
  - slowly increasing α from 0 to 1 to fade in the new outputs and fade out the original
- Other techniques improving GANs
  - `Mini-batch standard deviation layer`
    - computes the standard deviation across all channels and all instances in each batch
    - added near the end of the discriminator to help it discern generator's monotony
    - This will encourage the generator to produce more diverse outputs
      - reducing the risk of mode collapse
  - `Equalized learning rate`
    - initializes all weights using normal distribution ℕ(0,1) rather than using He initialization
    - meanwhile, the weights are scaled down at runtime by dividing by ${\displaystyle \sqrt{\dfrac{2}{n_{layerInputs}}}}$
    - This ensures that the `dynamic range` is the same for all parameters throughout training
      - This both speeds up and stabilizes training
  - `Pixelwise normalization layer`
    - added after each convolutional layer in the generator
    - normalizes each activation based on all the activations in the same image and at the same location but across all channels
      - by dividing by the square root of the mean squared activation
    - This avoids explosions in the activations due to excessive competition between the generator and the discriminator

[StyleGANs](https://homl.info/stylegan)
---
- able to generate state-of-the-art high-resolution images
  - with the same local structure as the training images at every scale
- (p13) A StyleGAN generator is composed of two networks
  - `Mapping network` maps the codings to multiple style vectors
    - An eight-layer MLP that maps the latent representations ${\mathbf{z}}$ to a vector ${\mathbf{w}}$
    - this ${\mathbf{w}}$ is then sent through multiple `affine transformations` (i.e Dense layers with no activation functions, represented by the "A" boxes) to produce multiple style vectors
    - These style vectors control the style of the generated image at different levels
      - from fine-grained texture  (e.g., hair color) 
      - to high-level features (e.g., adult or child)
  - `Synthesis network` are responsible for generating the images
    - It processes a constant learned input through multiple convolutional and upsampling layers with two twists
      - First, some noise is added to the input and to all the outputs of the convolutional layers  (before the activation function)
      - Second, each noise layer is followed by an `adaptive instance normalization (AdaIN)` layer

💡Demo
---
- [Alias-Free Generative Adversarial Networks (StyleGAN3)](https://nvlabs.github.io/stylegan3/)
- [CycleGAN](https://github.com/junyanz/CycleGAN)

Diffusion Models
---
- [Original idea appeared in 2015]((https://homl.info/diffusion))
  - train a model to learn the reverse process of diffusion
  - unfortunately shadowed by GAN back then
- In 2020, [denoising diffusion probabilistic model (DDPM)](https://homl.info/ddpm) can generate highly realistic images
  - improved [in 2021](https://homl.info/ddpm2) and finally beat GANs
- Compared to GANs and VAEs
  - much easier to train
  - the generated images are more diverse and of even higher quality
  - but slower to generate images

How a DDPM works?
---
- (p14) `Forward process` linearly drowns the input image with `Gaussian noise ℕ(0,βₜ)`
  - this noise is independent for each pixel, which is called `isotropic`
  - the input image fades linearly between time steps 0 and T (=2000∼4000)
  - the `mean` of the pixel values gradually approaches 0
    - since the pixel values get rescaled slightly at each step by a factor of ${\sqrt{1-β_t}}$
    - the image signal ${ \bar{α}_t = α_1 α_2 α_3⋯ α_t = ∏_{i=1}^t α_i }$ → 0
  - the `variance` will gradually converge to 1 since βₜ is a bit smaller than 1
    - since the standard deviation of the pixel values also gets scaled by ${\sqrt{α_t}}$
      - where αₜ is the remaining signal variance `αₜ = 1 - βₜ`
    - (p15) The `variance schedule` for the forward diffusion process is specified as
      - ${ β_t = 1 - \dfrac{α_t}{α_{t-1}} }$ with ${ α_t = \dfrac{f(t)}{f(0)} }$ and ${ f(t)=cos\left(\dfrac{t/T+s}{1+s}⋅\dfrac{π}{2} \right)^2 }$
      - s is a tiny value 0.008 by default which prevents βₜ from being too small near t = 0
      - βₜ is clipped to be no larger than 0.999 to avoid instabilities near t = T
  - the `probability distribution q` of the forward diffusion process is given by
    - ${\displaystyle q(\mathbf{x}_t | \mathbf{x}_{t-1}) = \mathcal{N} (\sqrt{1-β_t} \mathbf{x}_{t-1}, β_t\mathbf{I} ) }$
  - all the noise can be added in just one shot to ${\mathbf{x}_t}$ since the sum of multiple Gaussian distributions is also a Gaussian distribution by
    - ${ \displaystyle q(\mathbf{x}_t | \mathbf{x}_0) = \mathcal{N} \left(\sqrt{α_t} \mathbf{x}_0, (1-α_t)\mathbf{I} \right)  }$
    - this is a shortcut, so no need to calculate ${\mathbf{x}_1, \mathbf{x}_2, ⋯, \mathbf{x}_{t-1}}$ first
- `Reverse process` goes from ${\mathbf{x}_t}$ to ${\mathbf{x}_{t-1}}$ removing a tiny bit of noise from an image by
  - ${\displaystyle \mathbf{x}_{t-1} = \dfrac{1}{\sqrt{α_t}}\left(\mathbf{x}_t - \dfrac{β_t}{\sqrt{1-α_t}}\boldsymbol{ε_θ}\left(\mathbf{x}_t,t \right) \right) + \sqrt{β_t}\mathbf{z} }$
    - ${\boldsymbol{ε_θ}\left(\mathbf{x}_t,t \right)}$ represents the noise predicted by the model given the input image ${\mathbf{x}_t}$ and the time step t
    - The $\boldsymbol{θ}$ represents the model parameters
    - ${\mathbf{z}}$ is Gaussian noise with mean 0 and variance 1 which makes the reverse process stochastic: 
      - if you run it multiple times, you will get different images
  - there is no shortcut to get ${\mathbf{x}_0}$
    - so repeats the operation many times until all the noise is gone


In [None]:
# 1. create a variance schedule
def variance_schedule(T, s=0.008, max_beta=0.999):
    t = np.arange(T + 1)
    f = np.cos((t / T + s) / (1 + s) * np.pi / 2) ** 2
    alpha = np.clip(f[1:] / f[:-1], 1 - max_beta, 1)
    alpha = np.append(1, alpha).astype(np.float32)  # add α₀ = 1
    beta = 1 - alpha
    alpha_cumprod = np.cumprod(alpha)
    return alpha, alpha_cumprod, beta  # αₜ , α̅ₜ , βₜ for t = 0 to T

T = 4000
alpha, alpha_cumprod, beta = variance_schedule(T)

In [None]:
# 2. add noise to image
#  take a batch of clean images from the dataset and prepare them
def prepare_batch(X):
    X = tf.cast(X[..., tf.newaxis], tf.float32) * 2 - 1  # scale from –1 to +1
    X_shape = tf.shape(X)
    t = tf.random.uniform([X_shape[0]], minval=1, maxval=T + 1, dtype=tf.int32)
    alpha_cm = tf.gather(alpha_cumprod, t)
    alpha_cm = tf.reshape(alpha_cm, [X_shape[0]] + [1] * (len(X_shape) - 1))
    noise = tf.random.normal(X_shape)
    return {
        "X_noisy": alpha_cm ** 0.5 * X + (1 - alpha_cm) ** 0.5 * noise,
        "time": t,
    }, noise

In [None]:
# 3. prepare datasets for training and validation
#  apply the `prepare_batch()` function to every batch
def prepare_dataset(X, batch_size=32, shuffle=False):
    ds = tf.data.Dataset.from_tensor_slices(X)
    if shuffle:
        ds = ds.shuffle(10_000)
    return ds.batch(batch_size).map(prepare_batch).prefetch(1)

train_set = prepare_dataset(X_train, batch_size=32, shuffle=True)
valid_set = prepare_dataset(X_valid, batch_size=32)

In [None]:
# 4. just a quick sanity check
# take a look at a few training samples, along with the corresponding 
# noise to predict, and the original images (which we get by subtracting 
# the appropriately scaled noise from the appropriately scaled noisy image):

def subtract_noise(X_noisy, time, noise):
    X_shape = tf.shape(X_noisy)
    alpha_cm = tf.gather(alpha_cumprod, time)
    alpha_cm = tf.reshape(alpha_cm, [X_shape[0]] + [1] * (len(X_shape) - 1))
    return (X_noisy - (1 - alpha_cm) ** 0.5 * noise) / alpha_cm ** 0.5

X_dict, Y_noise = list(train_set.take(1))[0]  # get the first batch
X_original = subtract_noise(X_dict["X_noisy"], X_dict["time"], Y_noise)

print("Original images")
plot_multiple_images(X_original[:8].numpy())
plt.show()
print("Time steps:", X_dict["time"].numpy()[:8])
print("Noisy images")
plot_multiple_images(X_dict["X_noisy"][:8].numpy())
plt.show()
print("Noise to predict")
plot_multiple_images(Y_noise[:8].numpy())
plt.show()

In [None]:
# 5. implements a custom time encoding layer
# It will need to process both images and times. We will encode 
# the times using a sinusoidal encoding, as suggested in the DDPM paper, 
# just like in the [Attention is all you need](https://arxiv.org/abs/1706.03762) paper. 
# Given a vector of _m_ integers representing time indices (integers), 
# the layer returns an _m_ × _d_ matrix, where _d_ is the chosen embedding size.

embed_size = 64

class TimeEncoding(tf.keras.layers.Layer):
    def __init__(self, T, embed_size, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        assert embed_size % 2 == 0, "embed_size must be even"
        p, i = np.meshgrid(np.arange(T + 1), 2 * np.arange(embed_size // 2))
        t_emb = np.empty((T + 1, embed_size))
        t_emb[:, ::2] = np.sin(p / 10_000 ** (i / embed_size)).T
        t_emb[:, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T
        self.time_encodings = tf.constant(t_emb.astype(self.dtype))

    def call(self, inputs):
        return tf.gather(self.time_encodings, inputs)

In [None]:
# 6. utility function used to build the diffusion model
def build_diffusion_model():
    X_noisy = tf.keras.layers.Input(shape=[28, 28, 1], name="X_noisy")
    time_input = tf.keras.layers.Input(shape=[], dtype=tf.int32, name="time")
    time_enc = TimeEncoding(T, embed_size)(time_input)

    dim = 16
    Z = tf.keras.layers.ZeroPadding2D((3, 3))(X_noisy)
    Z = tf.keras.layers.Conv2D(dim, 3)(Z)
    Z = tf.keras.layers.BatchNormalization()(Z)
    Z = tf.keras.layers.Activation("relu")(Z)

    time = tf.keras.layers.Dense(dim)(time_enc)  # adapt time encoding
    Z = time[:, tf.newaxis, tf.newaxis, :] + Z  # add time data to every pixel

    skip = Z
    cross_skips = []  # skip connections across the down & up parts of the UNet

    for dim in (32, 64, 128):
        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.SeparableConv2D(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.SeparableConv2D(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        cross_skips.append(Z)
        Z = tf.keras.layers.MaxPooling2D(3, strides=2, padding="same")(Z)
        skip_link = tf.keras.layers.Conv2D(dim, 1, strides=2,
                                           padding="same")(skip)
        Z = tf.keras.layers.add([Z, skip_link])

        time = tf.keras.layers.Dense(dim)(time_enc)
        Z = time[:, tf.newaxis, tf.newaxis, :] + Z
        skip = Z

    for dim in (64, 32, 16):
        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.Conv2DTranspose(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        Z = tf.keras.layers.Activation("relu")(Z)
        Z = tf.keras.layers.Conv2DTranspose(dim, 3, padding="same")(Z)
        Z = tf.keras.layers.BatchNormalization()(Z)

        Z = tf.keras.layers.UpSampling2D(2)(Z)

        skip_link = tf.keras.layers.UpSampling2D(2)(skip)
        skip_link = tf.keras.layers.Conv2D(dim, 1, padding="same")(skip_link)
        Z = tf.keras.layers.add([Z, skip_link])

        time = tf.keras.layers.Dense(dim)(time_enc)
        Z = time[:, tf.newaxis, tf.newaxis, :] + Z
        Z = tf.keras.layers.concatenate([Z, cross_skips.pop()], axis=-1)
        skip = Z

    outputs = tf.keras.layers.Conv2D(1, 3, padding="same")(Z)[:, 2:-2, 2:-2]
    return tf.keras.Model(inputs=[X_noisy, time_input], outputs=[outputs])

In [None]:
# 7. build the diffusion model
model = build_diffusion_model()
model.compile(loss=tf.keras.losses.Huber(), optimizer="nadam")

# adds a ModelCheckpoint callback
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("my_diffusion_model",
                                                   save_best_only=True)

In [None]:
# 8. train the model
history = model.fit(train_set, validation_data=valid_set, epochs=100,
                    callbacks=[checkpoint_cb])  

In [None]:
# 9. generate images with the trained model
def generate(model, batch_size=32):
    X = tf.random.normal([batch_size, 28, 28, 1])
    for t in range(T - 1, 0, -1):
        print(f"\rt = {t}", end=" ")  # extra code – show progress
        noise = (tf.random.normal if t > 1 else tf.zeros)(tf.shape(X))
        X_noise = model({"X_noisy": X, "time": tf.constant([t] * batch_size)})
        X = (
            1 / alpha[t] ** 0.5
            * (X - beta[t] / (1 - alpha_cumprod[t]) ** 0.5 * X_noise)
            + (1 - alpha[t]) ** 0.5 * noise
        )
    return X

X_gen = generate(model)  # generated images

In [None]:
# 10. show the generated fashions
plot_multiple_images(X_gen.numpy(), 8)

(Optional) Semantic hashing using a binary autoencoder
---
- `semantic hash` identifies image's content
- images that look alike will have the same hash

In [None]:
# Build a HAE
hashing_encoder = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.GaussianNoise(15.),
    tf.keras.layers.Dense(16, activation="sigmoid"),
])
hashing_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(28 * 28),
    tf.keras.layers.Reshape([28, 28])
])
hashing_ae = tf.keras.Sequential([hashing_encoder, hashing_decoder])
hashing_ae.compile(loss="mse", optimizer="nadam")

history = hashing_ae.fit(X_train, X_train, epochs=10,
                         validation_data=(X_valid, X_valid))

In [None]:
plot_reconstructions(hashing_ae)
plt.show()

In [None]:
hashes = hashing_encoder.predict(X_valid).round().astype(np.int32)
hashes *= np.array([[2 ** bit for bit in range(16)]])
hashes = hashes.sum(axis=1)
for h in hashes[:5]:
    print(f"{h:016b}")
print("...")

In [None]:
from collections import Counter

n_hashes = 10
n_images = 8

top_hashes = Counter(hashes).most_common(n_hashes)

plt.figure(figsize=(n_images, n_hashes))
for hash_index, (image_hash, hash_count) in enumerate(top_hashes):
    indices = (hashes == image_hash)
    for index, image in enumerate(X_valid[indices][:n_images]):
        plt.subplot(n_hashes, n_images, hash_index * n_images + index + 1)
        plt.imshow(image, cmap="binary")
        plt.axis("off")

plt.show()