# Assignment 1: Two dimensional Conditional Density Estimation

Deadline: Monday, 1/15/2024 9pm


## Your name: 



In [None]:
import math

import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from math import pi, log
from torch.utils import data
import numpy as np
from torch import tensor
from torch.distributions import Normal
import matplotlib as mpl

import jupyter_black

jupyter_black.load()


import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "True"

_ = torch.manual_seed(0)

# The data generating model
Generate data from $x =  \begin{pmatrix} 2 \cdot \sin(\theta)\\  \cos( \theta) \end{pmatrix} + \begin{pmatrix} \epsilon_1 \\ \epsilon_2  \end{pmatrix}$. \
We get $x \in R^2$ with two independent noise terms \
$\epsilon_1 \sim \mathcal{N}(0, scale_1)$ \
$\epsilon_2 \sim \mathcal{N}(0, scale_2)$. 

**Our goal is to learn the conditional distribution $p(x| \theta, \mathcal{D})$, for some data $\mathcal{D}$.**

In [None]:
def simulator(theta, noise_scale=[0.05, 0.2]):
    """ 
    Simulator for the 2D toy example.
    args:
        theta: torch.Tensor of shape (n, 1)
        noise_scale: list of length 2, noise scale for each dimension
    returns:
        x: torch.Tensor of shape (n, 2)
    """

    # noiseless simulator
    x1 = 2 * torch.sin(theta)
    x2 = torch.cos(theta)
    x = torch.concat([x1.T, x2.T]).T

    # create noise
    noise = torch.randn(x.shape) * torch.tensor(noise_scale)

    return x + noise

Let's generate the training data $\mathcal{D}$.


In [None]:
# generate data
n = 10_000  # number of datapoints
d = 1  # dimensionality of parameters theta

# define noise scale
noise_scale = [0.4, 0.1]

# sample random thetas in [0, 1.5*pi]
theta = torch.zeros((n, d))
theta[:, 0] = torch.rand(n) * (np.pi * 1.5)

samples = simulator(theta, noise_scale)

Let's have a look at the data:

In [None]:
# plot training data
sc = plt.scatter(samples[:, 0], samples[:, 1], c=theta[:, 0], vmin=0, vmax=1.5 * np.pi)

# plot an example point
example_theta = torch.tensor(np.pi).unsqueeze(0)
example_x = simulator(example_theta, noise_scale=[0, 0])

plt.plot(
    example_x[0],
    example_x[1],
    "o",
    color="r",
    label=f"simulator({np.round(example_theta.item(),2)})",
)
plt.plot(
    [example_x[0] - noise_scale[0], example_x[0] + noise_scale[0]],
    [example_x[1], example_x[1]],
    color="r",
)
plt.plot(
    [example_x[0], example_x[0]],
    [example_x[1] - noise_scale[1], example_x[1] + noise_scale[1]],
    color="r",
    label="+-std",
)


plt.xlabel("$x_1$")
plt.ylabel("$x_2$")
plt.colorbar(sc, label="$\\theta$")
plt.legend()

We want now to learn a conditional distribution $\hat{p}(x|\theta)$ s.t. for every input $\theta$ we get an approximation of the true (two dimensional) conditional data distribution.\
This homework is divided into 3 exercises:
1. implement (the log probability of) a two dimensional Normal distribution (3 points), 
2. implement a conditional network to learn $p(x|\theta)$ (5 points),
3. evaluate the infered network (2 points).

# Exercise 1 
(3 points)

### Learning mean and covariance matrix for a two dimensional Normal distribution conditional on inputs

**1. PDF for a two dimensional Normal distribution**

Before we define the conditioning network, we need to define a function which evaluates the probability of a two dimensional Normal distribution. 
For numerical reasons (can you explain why?) we do not evaluate the probability, but directly the log-probability:
$$\log(\mathcal{N(x, \mu, \Sigma )}),$$
for $x\in R^2$, mean $\mu \in R^2 $ and covariance matrix $\Sigma \in R^{2 \times 2}$.

*Task:*\
Please implement the pdf which also works for batched inputs (e.g. which is able to calculate the pdf for *n* values at the time).




*Hint:* \
The general formula for the probability density function (pdf) for a k-dimensional Normal distribution is
$$ f(x) = \frac{\exp(-0.5(x-\mu)^T\Sigma^{-1}(x-\mu))}{\sqrt{(2\pi)^k \det(\Sigma )}}$$

What happens if you take the log of this?

In [None]:
def get_log_prob_Gauss(x, mean, sigma):
    """evaluates the log probability for a 2d Normal distribution

    Args:
        x (tensor): the points to evaluate, shape (batch, 2)
        mean (tensor): mean of the distribution, shape (batch, 2)
        sigma (tensor): covariance matrix, shape (batch, 2, 2)

    Returns:
        tensor: log-probabilities p(x|mean,sigma)
    """

    ###
    # your code goes here
    #
    ###



    return log_prob

## Validation of Solution

Pytorch implements Gaussian distributions with a `.log_prob()` method. You can use this to validate your solution.

In [None]:
# Validation of log_prob_Gauss

from torch.distributions import MultivariateNormal

# create random means and cov matrices
means = torch.randn((2,))
sigma_raw = torch.randn((2, 2))
sigmas = sigma_raw @ sigma_raw.transpose(0, 1) + 0.01 * torch.eye(2)

# initialize the distribution
true_dist = MultivariateNormal(means, sigmas)

# sample from the distribution
x = true_dist.sample((10,))

# calculate the log prob from the pytorch distribution
log_prob_pytorch = true_dist.log_prob(x)
log_prob_own = get_log_prob_Gauss(x, means, sigmas)

# print the logprobs
print(log_prob_pytorch, "\n", log_prob_own)
# check if they are equal
print("These log probs are equal: ", torch.allclose(log_prob_own, log_prob_pytorch))

# Exercise 2

(1 + 4 points)

### Conditioning network


As we have seen in the lecture, we can define a neural network (NN), that takes as input the value of $\theta $ and predicts the parameters for the conditional distribution:
$NN(\theta) = (\mu, \Sigma )$.
For a one dimensional distribution, the only constraint is $\sigma >0$. 
For a two dimensional distribution it gets already a bit more tricky: the covariance matrix $\Sigma$ needs to be symmetric and positive definite. However, we can express $\Sigma$ in terms of the marginal variances $\sigma_1, \sigma_2 >0$ and the correlation $\rho \in [-1,1]$ in the following way:
$$\Sigma = \begin{bmatrix} \sigma_1^2 & \rho \sigma_1 \sigma_2 \\ \rho \sigma_1 \sigma_2 & \sigma_2^2  \end{bmatrix}.$$

*Tasks:*

a) Implement a neural net, that takes $\theta$ as input and returns some (how many do you need?) values that are in in b) converted to the parameters of the Normal distribution. The network should be fully connected, and have one hidden layer with a reasonable number of units. Don't forget to include an activation function. 

b) Implement two functions `get_conditional_params(nn_output)` that converts the output of the neural net two the parameters (mean, var, rho) and  `get_sigma(var, rho)` which returns the corresponding covariance matrix $\Sigma$.





In [None]:
# a)

#net = ????

In [None]:
# b)


def get_conditional_params(nn_output):
    """converts NN output to mean, var, rho
    such that all constraints are fullfilled.

    Args:
        nn_output (tensor): (batch, ????)

    Returns:
        tensor: mean, var, rho
            with shapes: (batch, 2)(batch, 2)(batch,1)
    """

    # YOUR CODE GOES HERE

    return mean, var, rho


def get_sigma(var, rho):
    """returns the covariance matrix

    Args:
        var (tensor): marginal variances. shape (batch, 2)
        rho (tensor): correlation. shape (batch, 1)

    Returns:
        tensor: sigma (batch, 2, 2)
    """

    batch = var.shape[0]
    sigma = torch.zeros(batch, 2, 2)

    # YOUR CODE GOES HERE
    # sigma[:, 0, 0] = ???
    # ...


    return sigma

Let's put all this together and see if it worked:

In [None]:
# define the dataset
dataset = data.TensorDataset(theta, samples)
train_loader = data.DataLoader(dataset, batch_size=100)

# specify the optimizer
opt = optim.Adam(net.parameters(), lr=0.001)

# initialize loss to store
store_loss = []

epochs = 20
for e in range(epochs):
    for theta_batch, x_batch in train_loader:
        opt.zero_grad()

        # run the forward model to get the raw NN output
        nn_output = net(theta_batch)

        # convert the output to the corresponding parameters for a Normal distribution
        mean, var, rho = get_conditional_params(nn_output)
        sigma = get_sigma(var, rho)

        # evaluate the Normal distribution
        log_prob_Gauss = get_log_prob_Gauss(x_batch, mean, sigma)

        # calculate the loss
        loss = -(log_prob_Gauss).sum()

        # take a gradient step
        loss.backward()
        opt.step()

        # store the loss
        store_loss.append(loss.detach().item())

In [None]:
# Let's see what the loss looks like
plt.plot(np.linspace(0, epochs, len(store_loss)), store_loss)
plt.ylabel("loss")
plt.xlabel("epochs")

We now have a neural network which outputs the parameters of a probability density distribution given inputs.
In this case, it is a two dimensiona Normal distribution of which we learned the mean and covariance matrix. 


### Inspect the distribution for test points
Let's inspect the learned distribution for some test points *theta_test*.


In [None]:
theta_test = torch.tensor([0, torch.pi / 2, torch.pi, torch.pi * 1.5]).unsqueeze(1)
nn_output = net(theta_test).detach()
conditional_mean, var, rho = get_conditional_params(nn_output)
conditional_sigma = get_sigma(var, rho)

In [None]:
print(
    "conditional sigmas:",
    conditional_sigma,
    ", \nconditional var: ",
    conditional_sigma[:, 0, 0] ** 0.5,
    conditional_sigma[:, 1, 1] ** 0.5,
)
print(f"The variance should be neare the used the noise scale of {noise_scale}.")

Let's also compare the infered mean to the noiseless simulator:

In [None]:
# get x_test with no noise
x_test_noiseless = simulator(theta_test, noise_scale=[0, 0])

print(
    "Predicted conditional mean:\n ",
    conditional_mean,
    ", \n noiseless_x:\n",
    x_test_noiseless,
)

mse = ((conditional_mean - x_test_noiseless) ** 2).mean()

print("MSE on test points:", mse.item())

## Visualizing the Distribution

It's a bit hard to evaluate directly from looking at the learned distribution parameters. Let's visualize the learned distribution.

In [None]:
def visualize_2d_gaussian(
    mean, sigma, true_x, x_range=[-2, 2], y_range=[-2, 2], resolution=100
):
    """
    Visualize a 2D distribution by evaluating the log probabilities on a grid.

    Args:
        x_range (tuple): Range of x values (e.g., (-1, 1)).
        y_range (tuple): Range of y values (e.g., (-1, 1)).
        resolution (int): Number of points in each dimension of the grid.

    Returns:
        None
    """
    x = torch.linspace(x_range[0], x_range[1], resolution)
    y = torch.linspace(y_range[0], y_range[1], resolution)
    X, Y = torch.meshgrid(x, y)

    # evaluate the log probabilities on the grid
    X_flat = X.flatten()
    Y_flat = Y.flatten()
    grid = torch.stack([X_flat, Y_flat], dim=1)
    log_probs = get_log_prob_Gauss(grid, mean, sigma).view_as(X)

    plt.contourf(
        X.numpy(), Y.numpy(), torch.exp(log_probs).numpy(), levels=20, cmap="viridis"
    )
    if true_x is not None:
        plt.plot(true_x[0], true_x[1], "o", color="r", label="true mean")
        plt.legend()
    plt.colorbar(label="Probability")
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.show()


print(x_test_noiseless[0].size())
visualize_2d_gaussian(
    conditional_mean[0], conditional_sigma[0], true_x=x_test_noiseless[0]
)

# Exercise 3

(2 points)

### Alternative (visual) evaluation: for every $\theta$, sample from the predicted Gaussian distribution

We can visually compare the learned conditioned distributions by comparing samples from these distributions with our data samples.

*Task:* \
Complete the code below to sample points $x\sim p(\cdot |\theta,\mathcal{D})$ from the inferred conditional distribution for the first 500 points of $\theta$.

*Remarks:*
- You can use the pytorch implementation  to sample from a multivariate Normal distribution (`torch.distributions.multivariate_normal.MultivariateNormal`)

- You can compare these samples with the training data in the plotting cell below. 

In [None]:
# choose the thetas to test on
n = 500
theta_test = theta[:n]

# predict mean and sigma for theta_test

#####
# YOUR CODE 
# ###

# initialize MultivariateNormal and sample

#####
# YOUR CODE 
#
# samples_test = ...
# ###

In [None]:
plt.scatter(samples[:500, 0], samples[:500, 1], c="grey", label="data")
sc = plt.scatter(
    samples_test[:, 0],
    samples_test[:, 1],
    c=theta_test[:, 0],
    vmin=0,
    vmax=1.5 * np.pi,
    label="predicted samples",
)
plt.xlabel("$x_1$")
plt.ylabel("$x_2$")
plt.colorbar(sc, label="$\\theta$")

plt.legend()

Congrats! You managed the first assignment!