# Normalising flows and bounded distributions

Michael J. Williams 2023


**Note:** this notebooks uses the current 'main' branch of `glasflow`.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from glasflow import CouplingNSF, RealNVP
import corner
from copy import deepcopy
from scipy.stats import chi2

import torch
from torch import optim

In [None]:
corner_kwargs = dict(
    bins=32,
    smooth=0.9,
    color="teal",
    quantiles=[0.16, 0.84],
    levels=(1 - np.exp(-0.5), 1 - np.exp(-2), 1 - np.exp(-9 / 2.0)),
    plot_density=True,
    plot_datapoints=True,
    fill_contours=True,
    show_titles=True,
    hist_kwargs=dict(density=True),
)

In [None]:
def make_batch(n):
    x = torch.cat(
        [4 * torch.randn(n // 2, 2), torch.rand(n // 2, 2)],
        axis=1
    )
    return x

In [None]:
kwargs = deepcopy(corner_kwargs)
kwargs["color"] = "C1"
kwargs["hist_kwargs"]["color"] = "C1"
example = make_batch(10000)
fig_data = corner.corner(example.numpy(), **kwargs)

In [None]:
def train(flow, n_iter, batch_size=500, logit=False, uniform=False):
    train_loss = []
    optimizer = optim.Adam(flow.parameters())
    for i in range(n_iter):
        t_loss = 0

        x = make_batch(batch_size)
        if logit:
            x[:, 2:] = torch.logit(x[:, 2:])
        if uniform:
            x[:, :2] = torch.sigmoid(x[:, :2])
        optimizer.zero_grad()
        loss = -flow.log_prob(inputs=x).mean()
        loss.backward()
        optimizer.step()
        t_loss += loss.item()

        train_loss.append(t_loss)
    return flow, train_loss

In [None]:
realnvp = RealNVP(4, 4, n_neurons=128)
realnvp, loss = train(realnvp, 1000)
plt.plot(loss)
plt.show()

In [None]:
with torch.inference_mode():
    samples = realnvp.sample(1000).numpy()

In [None]:
z = torch.randn(1000, 4)
with torch.no_grad():
    x_inv, _ = realnvp.inverse(z)
x_inv = x_inv.numpy()

In [None]:
in_bounds = np.logical_and((x_inv[:, 2:] > 0).all(axis=1), (x_inv[:, 2:] < 1).all(axis=1))

In [None]:
plt.plot(x_inv[:, 2][in_bounds], x_inv[:, 3][in_bounds], '.', label="In bounds")
plt.plot(x_inv[:, 2][~in_bounds], x_inv[:, 3][~in_bounds], '.', label="Out of bounds")
plt.legend()
plt.show()

In [None]:
plt.plot(z[:, 2][in_bounds], z[:, 3][in_bounds], '.', label="In bounds")
plt.plot(z[:, 2][~in_bounds], z[:, 3][~in_bounds], '.', label="Out of bounds")
plt.legend()
plt.show()

In [None]:
r = np.sum(z.numpy() ** 2,axis=1)

In [None]:
x = np.linspace(0, 20, 1000)

In [None]:
plt.hist(r[in_bounds], 20, density=True, histtype="step", label="In bounds")
plt.hist(r[~in_bounds], 20, density=True, histtype="step", label="Out of bounds")
plt.plot(x, chi2(4).pdf(x), label="Gaussian")
plt.xlabel("Radius")
plt.legend()
plt.show()

In [None]:
fig = corner.corner(x_inv, **corner_kwargs)


In [None]:
y = torch.linspace(0, 1, 100000)
plt.plot(y, torch.logit(y).numpy())
plt.title("Logit")
plt.show()

## With logit

In [None]:
example_logit = make_batch(2000)
example_logit[:, 2:] = torch.logit(example_logit[:, 2:])
fig = corner.corner(example_logit.numpy(), **kwargs)

In [None]:
realnvp = RealNVP(4, 4, n_neurons=128)
realnvp, loss = train(realnvp, 1000, logit=True)
plt.plot(loss)
plt.show()

In [None]:
realnvp.eval()
with torch.no_grad():
    samples = realnvp.sample(10_000)
# Apply inverse
samples[:, 2:] = torch.sigmoid(samples[:, 2:])
samples = samples.numpy()

In [None]:
fig = corner.corner(samples, **corner_kwargs)

## Neural spline

In [None]:
nsf = CouplingNSF(4, 4, distribution="uniform")
nsf, loss = train(nsf, 1000, uniform=True)
plt.plot(loss)
plt.show()

In [None]:
with torch.no_grad():
    samples = nsf.sample(5000)
# Inverse
samples[:, :2] = torch.logit(samples[:, :2])
samples = samples.numpy()

In [None]:
fig = corner.corner(example.numpy(), label="Truth", **kwargs)
fig = corner.corner(samples, fig=fig, label="NSF", **corner_kwargs)