<a href="https://colab.research.google.com/github/manyasha-n-m/OT-tutorials/blob/main/Tutorial_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Hands-on Tutorials on Computational Optimal Transport
### Instructor: Nataliia Monina
-----

### 4. Dynamics and Sampling problems
In this tutorial:
-  We will take a look at a particular example of what was discussed in gradient flows course.
- Suppose you have a dataset of samples `x_real` from some distribution (e.g. pictures of cats, dogs, digits or some other type of data in general) and you want to find a way to somehow generate fake samples `x_fake` that very much resemble the true distribution.
    1. Trying to teach a neural network the distribution by comraring the empirical measures of real and generated samples via (entropic) Wasserstein distance
    2. Using theory of Wasserstein gradient flows: Flowing a source measure $\rho_0$ to the target measure $\pi$.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from ipywidgets import interact, FloatSlider, IntSlider
from scipy.stats import gaussian_kde
%matplotlib inline

import tensorflow as tf
from tensorflow.keras.layers import Dense, Input, Lambda, Flatten, Concatenate, Dropout
from tensorflow.keras.layers import Conv2D, Flatten, MaxPooling2D, Reshape
from tensorflow.keras import Model

In [None]:
from tensorflow.keras.datasets import mnist
(mnist_digits, mnist_labels), (_, _) = mnist.load_data()

### Displacement interpolation, Wasserstein geodesics:
Example: Think that you want to track the evolution of the particles in the system: at time $t=0$ you see that they are distributed according to some $\mu_0$ and then you track the particles at time $t=1$ and now you see distribution $\mu_1$. You may think: how exactly did my particles move to end up like this?

Mathematically, you take a look at the flow of initial measure $\mu_0$ towards $\mu_1$.

#### First, we will take a simple example in 1D.

**Recall:**

Use Brenier’s theorem in 1D: In 1D, the optimal transport (Monge) map is explicitly
$$T(x) = F^{-1}_1(F_0(X))$$
where $F_0$ and $F_1$ are, respectively, the CDFs of source  and target measures $\mu_0$ and $\mu_1$.

We now already know that the geodesic curve is given by
$$\mu_t = ((1-t)Id + t T)_\sharp \mu_0$$

In [None]:
# Setup
n = 500
x = np.linspace(-5, 10, 500)

# Source and target distributions
mu0_pdf = norm(loc=0, scale=1)
mu1_pdf = norm(loc=5, scale=2)

eps = 1e-6
u = np.linspace(eps, 1 - eps, n)

# Inverse CDF sampling (via quantile function)
x0_samples = mu0_pdf.ppf(u)
x1_samples = mu1_pdf.ppf(u)

# Interpolation function using samples
def plot_displacement_interpolation(t=0.5):
    xt = (1 - t) * x0_samples + t * x1_samples  # displacement interpolation

    # Estimate density using KDE
    kde = gaussian_kde(xt)
    mu_t = kde(x)

    plt.figure(figsize=(8, 5))
    plt.plot(x, mu0_pdf.pdf(x), label=r'$\mu_0$ (Source)', color='blue')
    plt.plot(x, mu1_pdf.pdf(x), label=r'$\mu_1$ (Target)', color='green')
    plt.plot(x, mu_t, label=fr'$\mu_t$,  t = {t:.2f}', color='red')
    plt.title("Displacement Interpolation (via OT map)")
    plt.xlabel("x")
    plt.ylabel("Density")
    plt.grid(True)
    plt.legend()
    plt.show()

# 3. Interactive slider
interact(plot_displacement_interpolation, t=FloatSlider(value=0.5, min=0.0, max=1.0, step=0.01));

#### Now let's look at 2D example

In [None]:
# Generate two 2D point clouds
np.random.seed(42)
n_points = 50
X0 = np.random.randn(n_points, 2) * 0.5 + np.array([0, 0])    # source
X1 = np.random.randn(n_points, 2) * 0.5 + np.array([3, 3])    # target

# Sort for consistent visualization
# will NOT give an OT-optimal map, but a possibility to see!!!!
X0_sorted = X0[np.argsort(X0[:, 0])]
X1_sorted = X1[np.argsort(X1[:, 0])]

def plot_interp(t):
    X_t = (1 - t) * X0_sorted + t * X1_sorted # test plan T_t(x)!

    plt.figure(figsize=(6, 6))
    plt.scatter(X0_sorted[:, 0], X0_sorted[:, 1], color='blue', label=r'$\mu_0$')
    plt.scatter(X1_sorted[:, 0], X1_sorted[:, 1], color='red', label=r'$\mu_1$')
    plt.scatter(X_t[:, 0], X_t[:, 1], color='green', label=fr'$\mu_t$, $t={t:.2f}$')

    for i in range(n_points):
        plt.plot([X0_sorted[i, 0], X1_sorted[i, 0]],
                 [X0_sorted[i, 1], X1_sorted[i, 1]], 'k--', alpha=0.1)

    plt.title("Displacement Interpolation between Point Clouds")
    plt.legend()
    plt.axis('equal')
    plt.grid(True)
    plt.show()

# Interactive widget
interact(plot_interp, t=FloatSlider(value=0.5, min=0.0, max=1.0, step=0.01));

### Exercises:
- Try to solve OT between Unif(X_0) and Unif(X_1) (i.e. construct a map T from the Kantorovich plan P (output of ot.emd and then max $x_i$ to $y = argmax_j(P_{ij})$ and then do the interpolation with an optimal map
- You have free will, you can change distributions


In [None]:
"""Free space"""

## Using Wasserstein distance as a loss for a Generator:

Often, `x_fake = G(z)` are generated from some lower dimensional data $z\sim Law(z)$, e.g., some sample from a Gaussian distribution $z\sim N(0, I)$



### Core idea for today:

In comparison to a GAN, where the generator is training to fool a discriminator network, we train it to minimize the regularized OT distance between
- Generated samples $G_\theta(z)$ from noise $z \sim \mathcal{N}(0, I)$
- Real data samples $x \sim \mu$

The OT cost serves as the loss function:
$$Loss = OT_\varepsilon(\mu, Law(G_\theta(z)))$$

In practice, computing the OT distance between entire distributions (say, full datasets or batches) is expensive, thus for training, at each step we may sample some batches of them and create empirical distributions from batches.

### Workflow:
1. Sample mini-batch from real data: $x_1, ..., x_n$
2. Sample mini-batch from generator: $G(z_1), ..., G(z_n)$
3. Compute cost matrix $C$ between generated and real samples
4. Compute Sinkhorn distance
5. Backpropagate and update generator weights


**Important comment:** In this implementation, we will essentially have OT between measures $Unif(X)$ and $Unif(Y)$ where $X = \{x_1, ..., x_n\}$ and $Y=\{G(z_1), ..., G(z_n)\}$ with a Cost composed of pairwise distances between samples.
$$C[i, j] = d(x_{real}[i], x_{fake}[j])^2$$
The idea of this is to try to sort of "match" the domain $Y$ to $X$.

*However, for examples with images, we will not be computing pairwise Wasserstein distance between "images" themself (i.e., normalized to be probability distributions) like we did in the barycenter example as it is a bit more expensive. Instead, we will use a cost which is $L_2$ comparison of grayscale images.*
$$C[i, j] = ||x_{real}[i] - x_{fake}[j]||^2$$


In [None]:
# rewriting Sinkhorn in a tensorflow language
@tf.function
def sinkhorn_tf(mu, nu, C, eps=0.01, max_iter=100, tol=1e-6):
    u = tf.zeros_like(mu)
    v = tf.zeros_like(nu)
    log_mu = tf.math.log(mu + 1e-8)
    log_nu = tf.math.log(nu + 1e-8)

    def cond(it, u, v, converged):
        return tf.logical_and(it < max_iter, tf.logical_not(converged))

    def body(it, u, v, converged):
        log_sum_u = tf.math.reduce_logsumexp((v[None, :] - C) / eps, axis=1)
        u = eps * log_mu - eps * log_sum_u

        log_sum = tf.math.reduce_logsumexp((u[:, None] - C) / eps, axis=0)
        v = eps * log_nu - eps * log_sum

        P = tf.exp((u[:, None]+ v[None, :] - C) / eps)
        # Convergence check
        converged =  tf.reduce_sum(tf.abs(tf.reduce_sum(P, axis=1) - mu)) < tol
        return it+1, u, v, converged

    it0 = tf.constant(0)
    converged0 = tf.constant(False)

    it, u_final, v_final, _ = tf.while_loop(cond, body, [it0, u, v, converged0])

    P = tf.exp((u_final[:, None] + v_final[None, :] - C) / eps)
    return P, u_final, v_final

@tf.function
def sinkhorn_loss(x, y, eps=0.05):
    # uniform measures on the sampled batches
    n = tf.shape(x)[0]
    m = tf.shape(y)[0]
    mu = tf.fill([n], 1.0 / tf.cast(n, tf.float32))
    nu = tf.fill([m], 1.0 / tf.cast(m, tf.float32))

    C = tf.reduce_sum((tf.expand_dims(x, 1) - tf.expand_dims(y, 0)) ** 2, axis=2)
    P, U, V = sinkhorn_tf(mu, nu, C, eps=eps)

    loss = - eps*tf.reduce_sum(P)
    loss += tf.reduce_sum(U*mu)
    loss += tf.reduce_sum(V*nu)

    # equivalently, could have used
    # loss = tf.reduce_sum(P * C) + eps * tf.reduce_sum(P * tf.math.log(P))
    # or even
    # loss = tf.reduce_sum(P * C)

    return loss

First, let's see an example where `x_real` are some points clouds in $\mathbb R^2$

In [None]:
# Parameters
latent_dim = 2
lr = 1e-3
epsilon = 0.1

# Real samples (2 blobs)
def sample_blobs(batch_size):
    centers = np.array([[2, 2], [-2, -2]])
    ids = np.random.randint(0, 2, size=batch_size)
    noise = 0.3 * np.random.randn(batch_size, 2)
    return centers[ids] + noise


def build_blobs_generator():
    model = tf.keras.Sequential([
        Dense(64, activation='relu'),
        Dense(64, activation='relu'),
        Dense(2)
    ])
    return model

# Training loop
blobs_generator = build_blobs_generator()
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
losses = []


In [None]:
epochs = 1000
batch_size = 128

for epoch in range(epochs):
    x_real = sample_blobs(batch_size).astype(np.float32)
    z = np.random.randn(batch_size, latent_dim).astype(np.float32)

    with tf.GradientTape() as tape:
        x_fake = blobs_generator(z)
        loss = sinkhorn_loss(tf.convert_to_tensor(x_real), x_fake, epsilon)

    grads = tape.gradient(loss, blobs_generator.trainable_variables)
    optimizer.apply_gradients(zip(grads, blobs_generator.trainable_variables))
    losses.append(loss)

    if (epoch+1) % 100 == 0:
        print(f"Epoch {epoch+1}: Sinkhorn Loss = {loss:.4f}")



In [None]:
"""run this cell many times to see many examples"""

x_real = sample_blobs(batch_size)
z = np.random.randn(batch_size // 2, latent_dim).astype(np.float32)
x_fake = blobs_generator(z).numpy()

plt.figure(figsize=(6, 6))
plt.scatter(x_real[:, 0], x_real[:, 1], color='blue', alpha=0.5, label='Real')
plt.scatter(x_fake[:, 0], x_fake[:, 1], color='red', alpha=0.5, label='Generated')
plt.legend()
plt.title("Real vs fake with Sinkhorn Loss")
plt.axis('equal')
plt.grid(True)
plt.show()

### Exercises:
- Experiment with network architecture (deepness, activations, etc.) and also latent dimensions of `z`
- Try to change values of $\varepsilon$, and also try to change the loss into the 2 other options as indicated in a comment inside `sinkhorn_loss`. Do you see any difference? Why? (Talk to me)
- Try to fake some other distributions and learn them by analogy (for instance take a higher dimension for `x_real`). How big the latent dimension of `z` must be to produce some meaningful results?

In [None]:
"""Free space for experiments"""

### Now let's try to generate some images of digits!

In [None]:
from tensorflow.keras.datasets import mnist

def build_mnist_generator():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dense(28*28, activation='sigmoid')
    ])
    return model

(mnist_digits, mnist_labels), (_, _) = mnist.load_data()

In [None]:
latent_dim = 64
batch_size = 32
epsilon = 0.01

digit = 3

number_generator = build_mnist_generator()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)


# Load MNIST and filter only digit, e.g. "3"
x_train = mnist_digits[mnist_labels == digit]
x_train = x_train.astype(np.float32) / 255.0
x_train = x_train.reshape((-1, 28*28))  # Flatten images

# Select subset for training
x_train = x_train[:1000]

losses = []

In [None]:
# train
n_epochs = 800

for epoch in range(n_epochs):
    idx = np.random.choice(len(x_train), batch_size, replace=False)
    x_real = tf.convert_to_tensor(x_train[idx])
    z = tf.random.normal((batch_size, latent_dim))

    with tf.GradientTape() as tape:
        x_fake = number_generator(z)
        loss = sinkhorn_loss(tf.convert_to_tensor(x_real), x_fake, epsilon)

    grads = tape.gradient(loss, number_generator.trainable_variables)
    optimizer.apply_gradients(zip(grads, number_generator.trainable_variables))
    losses.append(loss)

    if (epoch+1) % 100 == 0:
        print(f"Epoch {epoch+1}: Sinkhorn Loss = {loss:.4f}")

In [None]:
"""run this cell many times to see many examples"""

z = tf.random.normal((1, latent_dim))
generated_image = number_generator(z).numpy().reshape(28, 28)

plt.imshow(generated_image, cmap='gray')
plt.title(f"Generated digit '{digit}'")
plt.axis('off')
plt.show()

### Exercises
- Obviously, try different digits, experiment with network design, latent dimension of `z`, batch_size, values of $\varepsilon$, etc.
- Try some maybe different datasets other than `mnist`, for example `fashion-mnist`, `cifar-10`
- What happens if we don't restrict to a single label in a dataset, and just try to generate any sample from a chosen dataset, e.g. `mnist`. Does it generate an actual and meaningful "digit"?

In [None]:
"""Free space for experiments"""

# Sampling with Langevin Dynamics

Here, we want to first assume that out goal is to sample from a complex target distribution $\pi$ with density
$p(x) \propto e^{-V(x)} $ where $V(x)$ is some potential
(e.g., $\pi$ is a Bayesian posterior, Gibbs distribution). However, we might not be able to sample directly.

### Strategy
We simulate the evolution of a probability distribution $\rho_t$ with densitied $p_t(x)$
under a **Wasserstein gradient flow** that minimizes the KL divergence to the target distribution
$$\frac{\partial p_t}{\partial t} = \nabla\cdot (p_t \nabla \log (\frac{p_t}{p}))$$
This is equivalent to the Fokker-Planck equation, and it corresponds to the Langevin dynamics SDE:
$$d X_t = -\nabla (V(X_t)) dt +\sqrt{2} d B_t$$
where $(B_t)_{t>0}$ is a standard Brownian motion.

**Theorem:** It is known that is $\nabla V$ is Lipshitz continuous, then there exists a unique solution for any initial conditions, and its stationary distribution is $\pi$.


### Idea:
So, by discretizing this SDE, we can evolve a set of particles toward the target distribution $\pi$.

$$X_{t+1} = X_t - \eta \nabla (V(X_t)) + \sqrt{2\eta} N(0, I) $$
where $\eta$ is a stepsize.



---
For the first example, let’s define a potential
$V(x) = \frac12 ||x - a||^2$, then $\pi$ will be a Gaussian with mean at $a$ and variance 1.

In [None]:
# Set seed for reproducibility
tf.random.set_seed(42)

# Target potential: V(x) = 0.5 * ||x - a||^2
def V(x, a):
    return 0.5 * tf.reduce_sum((x - a) ** 2, axis=1)

# Gradient of V
def grad_V(x, a):
    return x - a  # V'(x) = x - a

# Langevin dynamics sampler (ULA)
def langevin_sampler(a, mu_0, steps=1000, step_size=0.01, n_particles=100):
    d = a.shape[0]
    x = mu_0(n_particles, d)

    samples = []

    for _ in range(steps):
        noise = tf.random.normal(shape=tf.shape(x))
        grad = grad_V(x, a)
        x = x - step_size * grad + tf.sqrt(2.0 * step_size) * noise
        samples.append(x.numpy())

    return tf.stack(samples)  # shape: (steps, n_particles, d)

In [None]:
a = tf.constant([7.0, -4.0])
# Initial particles are uniform over (-10, 10)^2
mu_0 = lambda n_particles, d: tf.random.uniform((n_particles, d), minval=-10, maxval=10)

samples = langevin_sampler(a, mu_0, steps=500, step_size=0.01, n_particles=100)

In [None]:
# now let's see how it evolves

def plot_samples_at_step(step=0):
    data = samples[step]
    plt.figure(figsize=(6, 6))
    plt.scatter(data[:, 0], data[:, 1], alpha=0.5)
    plt.xlim(-10, 10)
    plt.ylim(-10, 10)
    plt.title(f"Step {step}")
    plt.xlabel("x1")
    plt.ylabel("x2")
    plt.grid(True)
    plt.show()

interact(plot_samples_at_step, step=IntSlider(min=0, max=len(samples)-1, step=10, value=0));

### Exercises
- Now, try to sample initial particles $X_0$ from other distributions. Do we still converge to a specified Gaussian $N(a, I)$?
- Also, you can try choosing different target distributions, as long as its density is of form $p(x)\propto e^{-V(x)}$.

In [None]:
"""Free space for experiments"""

### Now, what to do if you want other distributions?
Suppose we want to flow towards some other distributions, e.g. distributions of pictures of digits. What can we try doing?

We can try to assume that their distribution $d\pi = p(x)dx$ with has density $p(x)\propto e^{-V(x)}$. How do we choose $V(x)$?

#### Option 1: Use a Trained Classifier for ( $\nabla \log p(X_t)$)
Train a neural network to try to "learn" the distribution of data at time $t$ where we gradually add noice to the distribution:
$$s_\theta(x,t) \approx -\nabla\log p_t(x)$$
Then treat the log-probability output as the potential $V$ and then use it for the Langevin system.
$$X_{t+1} = X_t - \eta  (s_\theta(x,t)) + \sqrt{2\eta} N(0, I)$$

#### Option 2: Classical Langevin Dynamics / Wasserstein Gradient Flow

Choose an energy potential $V$ such that for real data we have low energy, and otherwise higher. Then $\nabla V$ pulls you toward high-probability regions (like digits), and noise helps you explore.

Key idea:
- You manually define an energy landscape $V$ (e.g., low energy near digit images, high elsewhere).
- Sampling = run Langevin dynamics to reach low-energy areas (realistic samples).

Particular algorithm: Langevin Monte Carlo

(Good implementation is slightly time-consuming, thus if you wish, you could try by yourself)






