In [None]:
'''
 * Copyright (c) 2004 Radhamadhab Dalai
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
'''

##  An Example of Score-Based Generative Models: Variance Exploding PF-ODE

###  Model Formulation

To define our own score-based generative model (SBGM), we need the following elements:
- The drift $ f(x, t) $
- The diffusion $ g(t) $
- The form of $ p_0^t(x_t | x_0) $

In [1] and [20], three examples of SBGM are provided: 
- Variance Exploding (VE) SDE
- Variance Preserving (VP) SDE
- Sub-VP SDE

Here, we focus on the **VE SDE**, which assumes the following choices for the drift and diffusion:

$$
f(x, t) = 0, \quad g(t) = \sigma t,
$$
where \( \sigma > 0 \) is a hyperparameter and $ t \in [0, 1] $.

By plugging in the choices for $ f(x, t) $ and $ g(t) $ into the general form of the PF-ODE, we get:

$$
\frac{dx_t}{dt} = - \frac{\sigma^2 t}{2} \nabla_{x_t} \ln p_t(x_t), \tag{9.31}
$$

where the term $ \nabla_{x_t} \ln p_t(x_t) $ represents the score function.

Now, to learn the score model, we need to define the conditional distribution $ p_0^t(x_t | x_0) $. Fortunately, the theory of SDEs (e.g., see Chapter 5 of [19]) gives us a way to calculate $ p_0^t(x_t | x_0) $.

The solution for $ p_0^t(x_t | x_0) $ is given by:

$$
p_0^t(x_t | x_0) = \mathcal{N}(x_t | x_0, (\sigma^2 t - 1)I), \quad \text{for} \quad t \in [0, 1]. \tag{9.32}
$$

The variance function over time is:

$$
\sigma_t^2 = (\sigma^2 t - 1), \tag{9.33}
$$

Thus, the final distribution $ p_1(x) $ (for sufficiently large $ \sigma $) is approximately:

$$
p_1(x) = p_0(x_0) * \mathcal{N}(x | x_0, (\sigma^2 - 1)I). \tag{9.34}
$$

For large $ \sigma $, the distribution becomes:

$$
p_1(x) \approx \mathcal{N}(x | 0, (\sigma^2 - 1)I). \tag{9.35}
$$

###  The Choice of $ \lambda_t $

One important consideration is the choice of $ \lambda_t $ in the definition of the loss function $ L_t(\theta) $. 

Although Ho et al. [3] simply set $ \lambda_t \equiv 1 $, Song and Kingma [21] showed that setting $ \lambda_t = \sigma_t^2 $ is actually beneficial for the VE PF-ODE. This choice of $ \lambda_t $ helps us use the sum over $ L_t(\theta) $ as a proxy for the log-likelihood function, which is useful for early stopping during training.

This leads to a simpler loss function:

$$
L_t(\theta) = \mathbb{E}_{x_0 \sim p_d} \left[ \mathbb{E}_{x_t \sim p_0^t(x_t | x_0)} \left[ \lambda_t \| s_\theta(x_t, t) - \nabla_{x_t} \ln p_0^t(x_t | x_0) \|^2 \right] \right].
$$


In [None]:
import random
import math
import numpy as np
import matplotlib.pyplot as plt

# Define the score model (using a simple linear model for demonstration)
class ScoreModel:
    def __init__(self, input_dim):
        self.input_dim = input_dim
    
    def forward(self, x, t):
        # A simple model that approximates the score function (for simplicity)
        return np.array([x_i / (t + 1e-8) for x_i in x])

# Drift and Diffusion Functions
def f(x, t):
    return 0.0  # Zero drift (no movement)

def g(t, sigma):
    return sigma * t  # Diffusion scales with time t

# Noisy distribution function based on the VE-SDE model
def noisy_distribution(x0, t, sigma):
    noise_std = (sigma**2 * t - 1)**0.5
    noise = np.random.normal(0, noise_std, size=x0.shape)
    return x0 + noise

# Denoising score matching loss function
def denoising_score_matching_loss(model, x0, t, sigma):
    x_t = noisy_distribution(x0, t, sigma)
    
    # True score function (gradient of log p_0^t(x_t | x_0))
    true_score = (x_t - x0) / ((sigma**2 * t - 1)**0.5 + 1e-8)
    
    # Model prediction (score function)
    predicted_score = model.forward(x_t, t)
    
    # Compute loss: Mean squared error between true and predicted score
    loss = np.mean((predicted_score - true_score) ** 2)
    return loss

# Backward Euler's method for sampling
def backward_euler_step(x_t, t, model, sigma, delta_t=0.1):
    score = model.forward(x_t, t)
    dx_t = -0.5 * g(t, sigma)**2 * score * delta_t
    x_t_next = x_t + dx_t
    return x_t_next

# Training loop
def train_score_model(model, data, sigma, num_epochs=100, learning_rate=1e-3):
    losses = []
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for x0 in data:
            t = random.random()  # Random time steps
            
            # Compute the denoising score matching loss
            loss = denoising_score_matching_loss(model, x0, t, sigma)
            epoch_loss += loss
        
        losses.append(epoch_loss / len(data))
        
        # Print progress
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(data)}')
    
    return losses

# Sampling from the trained model using backward Euler's method
def sample_from_model(model, initial_condition, sigma, num_steps=100, delta_t=0.1):
    x_t = initial_condition
    for step in range(num_steps):
        t = step / num_steps  # Linearly increasing t
        x_t = backward_euler_step(x_t, t, model, sigma, delta_t)
    return x_t

# Plotting functions
def plot_loss_curve(losses):
    plt.plot(losses)
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Training Loss Curve")
    plt.show()

def plot_generated_samples(original_data, generated_sample):
    original_data = np.array(original_data)
    plt.scatter(original_data[:, 0], original_data[:, 1], label='Original Data', alpha=0.5)
    generated_sample = np.array(generated_sample)
    plt.scatter(generated_sample[:, 0], generated_sample[:, 1], label='Generated Sample', color='red', marker='x')
    plt.xlabel("x")
    plt.ylabel("y")
    plt.title("Original vs Generated Samples")
    plt.legend()
    plt.show()

# Example usage
def main():
    # Generate synthetic data (e.g., from a Gaussian distribution)
    initial_data = np.random.randn(100, 2)  # 100 samples, 2D data
    
    # Initialize the score model
    model = ScoreModel(input_dim=2)
    
    # Hyperparameter
    sigma = 2.0  # Variance parameter for the VE-SDE
    
    # Train the model and get loss values
    losses = train_score_model(model, initial_data, sigma, num_epochs=100, learning_rate=1e-3)
    
    # Plot the loss curve
    plot_loss_curve(losses)
    
    # Sample from the model after training
    initial_condition = np.random.randn(2)  # Starting point for sampling
    generated_sample = sample_from_model(model, initial_condition, sigma)
    
    # Plot original vs generated samples
    plot_generated_samples(initial_data, [generated_sample])

if __name__ == "__main__":
    main()


### 9.3.3.2 The Choice of \( \lambda_t \)

The last remark, before we move to the training procedure, is about the choice of \( \lambda_t \) in the definition of \( L_t(\theta) \). So far, I simply omitted that, but I had a good reason. Ho et al. [3] simply set \( \lambda_t \equiv 1 \). Done! Really though? Well, as you can imagine, but smart reader, that is not so easy. Song and Kingma [21] showed that it is actually beneficial to set \( \lambda_t = \sigma_t^2 \) in the case of VE PF-ODE. Then, we can even use the sum over \( L_t(\theta) \) as a proxy to the log-likelihood function. We will take advantage of that for early stopping in our training procedure.

### 9.3.3.3 Training

We present a training procedure based on the chosen example of the VE SBGM. As we outlined earlier in the case of the score matching method, the procedure is relatively easy and straightforward. It consists of the following steps:

#### Training Procedure for VE SBGM

1. Pick a datapoint \( x_0 \).
2. Sample \( x_1 \sim \pi(x) = \mathcal{N}(x | 0, I) \).
3. Sample \( t \sim \text{Uniform}(0, 1) \).
4. Calculate \( x_t = x_0 + 2 \ln\left( \frac{1}{\sigma} \right) (\sigma^2 t - 1) \cdot x_1 \). This is a sample from \( p_0^t(x_t | x_0) \).
5. Evaluate the score model at \( (x_t, t) \), \( s_\theta(x_t, t) \).
6. Calculate the score matching loss for a single sample:
   \[
   L_t(\theta) = \sigma_t^2 \|x_1 - \sigma_t s_\theta(x_t, t)\|^2
   \]
7. Update \( \theta \) using a gradient-based method with \( \nabla_\theta L_t(\theta) \).

We repeat these seven steps for available training data until some stop criterion is met. Obviously, in practice, we use mini-batches instead of single datapoints.

In this training procedure, we use \( -\sigma_t s_\theta(x_t, t) \) on purpose because \( -\sigma_t s_\theta(x_t, t) = \epsilon_\theta(x_t, t) \), and then the criterion \( \sigma_t^2 \|x_1 - \epsilon_\theta(x_t, t)\|^2 \) corresponds to diffusion-based models [4, 5]. Now, you see why we pushed for seeing diffusion-based models as dynamical systems!

### 9.3.3.4 Sampling

After training the score model, we can finally generate samples! For that, we need to run backward Euler’s method (or other ODE solvers, please remember that), which takes the following form for the VE PF-ODE:

\[
x_{t+\Delta} = x_t + \frac{\sigma}{2} s_\theta(x_t, t) \Delta
\]

or equivalently:

\[
x_{t+\Delta} = x_t - \frac{\sigma_t}{2} s_\theta(x_t, t) \Delta
\]

starting from $ x_1 \sim p_1(x) = \mathcal{N}\left( x | 0, \left( \sigma^2 - 1 \right) \ln \sigma I \right) \).

Note that in the first equation, we have the plus sign because the diffusion for the VE PF-ODE is \( -\frac{1}{2} \sigma^2 t \); therefore, the minus sign in backward Euler’s method turns to plus. Maybe this is very obvious to you, my reader, but I always mess around with pluses and minuses, so I prefer to be very precise here.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

# Define the score model neural network
class ScoreModel(nn.Module):
    def __init__(self, input_dim):
        super(ScoreModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, input_dim)

    def forward(self, x, t):
        # Input is (x, t), so t is concatenated to x before feeding into the network
        x = torch.cat([x, t.unsqueeze(-1)], dim=-1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Hyperparameters
input_dim = 2
sigma = 1.1  # Slightly greater than 1 to avoid division by 0
lr = 1e-3
num_epochs = 10000
batch_size = 64

# Initialize score model and optimizer
score_model = ScoreModel(input_dim)
optimizer = optim.Adam(score_model.parameters(), lr=lr)

# Generate training data (for example, samples from a standard Gaussian distribution)
def generate_data(batch_size):
    x0 = torch.randn(batch_size, input_dim)
    x1 = torch.randn(batch_size, input_dim)  # Noise
    t = torch.rand(batch_size)  # Uniform random time between [0, 1]
    
    # Generate xt from p0t(x | x0) = N(x | x0, (sigma^2 t - 1)I)
    sigma_t = (sigma**2 * t - 1).sqrt()
    xt = x0 + sigma_t.unsqueeze(-1) * x1
    return x0, xt, x1, t

# Score matching loss function
def score_matching_loss(x1, xt, s_theta, t):
    # Calculate the score matching loss
    s = s_theta(xt, t)
    sigma_t = (sigma**2 * t - 1).sqrt()
    loss = torch.mean(sigma_t**2 * (x1 - sigma_t * s)**2)
    return loss

# Training procedure
for epoch in range(num_epochs):
    # Sample a mini-batch of data
    x0, xt, x1, t = generate_data(batch_size)
    
    # Zero gradients
    optimizer.zero_grad()
    
    # Forward pass: compute score matching loss
    loss = score_matching_loss(x1, xt, score_model, t)
    
    # Backward pass and optimization
    loss.backward()
    optimizer.step()
    
    if epoch % 1000 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Sampling from the trained score model using backward Euler’s method
def sample_from_model(score_model, num_samples=100, T=1.0, delta_t=0.01):
    # Start from x1 ~ p1(x) = N(x | 0, (sigma^2 - 1)I)
    x_t = torch.randn(num_samples, input_dim) * (sigma**2 - 1).sqrt()
    t = T  # Start at the final time
    samples = [x_t]
    
    # Run backward Euler's method
    while t > 0:
        # Evaluate the score at (x_t, t)
        s = score_model(x_t, t)
        
        # Update x_t using backward Euler's method
        x_t = x_t - sigma * (t) * s * delta_t / 2
        t -= delta_t
        samples.append(x_t)
    
    # Return the samples
    return torch.stack(samples[::-1])

# Generate samples after training
samples = sample_from_model(score_model)

# Plot the samples
plt.scatter(samples[:, 0].detach().numpy(), samples[:, 1].detach().numpy(), label="Generated samples")
plt.title("Generated Samples from VE SBGM")
plt.xlabel("x1")
plt.ylabel("x2")
plt.legend()
plt.show()


In [5]:
import random
import math
import matplotlib.pyplot as plt

# Define the Score Model (simplified)
class ScoreModel:
    def __init__(self, input_dim):
        self.input_dim = input_dim
        self.weights = [random.gauss(0, 1) for _ in range(input_dim)]  # Simple linear model for score function
        self.bias = random.gauss(0, 1)

    def forward(self, x, t):
        # Simplified score function: Linear combination of inputs and time
        return [w * x_i + self.bias * t for w, x_i in zip(self.weights, x)]

    def update(self, gradients, lr=0.001):
        # Update weights using simple gradient descent
        for i in range(self.input_dim):
            self.weights[i] -= lr * gradients[i]
        self.bias -= lr * gradients[-1]


# Hyperparameters
sigma = 1.1  # Slightly greater than 1 to avoid division by 0
lr = 0.01
num_epochs = 5000
batch_size = 64
input_dim = 2  # 2D example for simplicity

# Generate data (random normal distributed x0)
def generate_data(batch_size):
    x0 = [random.gauss(0, 1) for _ in range(input_dim)]
    x1 = [random.gauss(0, 1) for _ in range(input_dim)]  # Noise
    t = random.uniform(0.001, 1)  # Uniform random time between [0.001, 1] to avoid issues at t=0
    
    # Generate xt from p0t(x | x0) = N(x | x0, (sigma^2 t - 1)I)
    sigma_t = math.sqrt(sigma**2 * t - 1) if sigma**2 * t - 1 > 0 else 0  # Ensure positive square root
    xt = [x0_i + sigma_t * x1_i for x0_i, x1_i in zip(x0, x1)]
    return x0, xt, x1, t

# Score matching loss function
def score_matching_loss(x1, xt, score_model, t):
    # Calculate the score matching loss (simplified)
    s = score_model.forward(xt, t)
    sigma_t = math.sqrt(sigma**2 * t - 1) if sigma**2 * t - 1 > 0 else 0
    loss = sum([(x1_i - sigma_t * s_i) ** 2 for x1_i, s_i in zip(x1, s)])
    return loss

# Training procedure
score_model = ScoreModel(input_dim)

for epoch in range(num_epochs):
    # Sample a mini-batch of data
    x0, xt, x1, t = generate_data(batch_size)
    
    # Compute the score matching loss
    loss = score_matching_loss(x1, xt, score_model, t)
    
    # Backpropagation: compute gradients and update model (simplified)
    gradients = [random.gauss(0, 0.1) for _ in range(input_dim)]  # Fake gradient for simplicity
    score_model.update(gradients, lr)

    if epoch % 1000 == 0:
        print(f"Epoch {epoch}, Loss: {loss}")

# Sampling from the trained score model using backward Euler’s method
def sample_from_model(score_model, num_samples=100, T=1.0, delta_t=0.01):
    # Start from x1 ~ p1(x) = N(x | 0, (sigma^2 - 1)I)
    x_t = [random.gauss(0, math.sqrt(sigma**2 - 1)) for _ in range(input_dim)]
    t = T  # Start at the final time
    samples = [x_t]

    # Run backward Euler's method
    while t > 0:
        # Evaluate the score at (x_t, t)
        s = score_model.forward(x_t, t)

        # Update x_t using backward Euler's method
        x_t = [x_i - sigma * t * s_i * delta_t / 2 for x_i, s_i in zip(x_t, s)]
        t -= delta_t
        samples.append(x_t)

    # Return the samples
    return samples

# Generate samples after training
samples = sample_from_model(score_model)

# Plot the samples
samples = [sample[0] for sample in samples]  # Use first component of each sample for plotting
x_vals = [sample[0] for sample in samples]
y_vals = [sample[1] for sample in samples]

plt.scatter(x_vals, y_vals, label="Generated samples")
plt.title("Generated Samples from VE SBGM")
plt.xlabel("x1")
plt.ylabel("x2")
plt.legend()
plt.show()
## Do it

Epoch 0, Loss: 0.4263892080593279
Epoch 1000, Loss: 1.1007253686144405
Epoch 2000, Loss: 1.921324229043416
Epoch 3000, Loss: 5.522069715782781
Epoch 4000, Loss: 0.04137683225271107


TypeError: 'float' object is not subscriptable