In [None]:
import matplotlib.pyplot as plt
import mpld3
import numpy as np
from pydrake.all import (
    BezierCurve,
    MultilayerPerceptron,
    PerceptronActivationType,
    RandomGenerator,
)

from manipulation import running_as_notebook

if running_as_notebook:
    mpld3.enable_notebook()

rng = np.random.default_rng()
generator = RandomGenerator()

# Denoising diffusion as simple projection onto a manifold.

In [None]:
# Note: use domain [-1, 1]
traj = BezierCurve(
    0, 1, 2 * np.mat("0.1, 0.2; 0.4, 0.2; 0.5, 0.9; 0.7, 0.5; 0.9, 0.9").T - 1
)


def plot_trajectory(traj):
    plt.figure()
    plt.axis("square")
    plt.xlim([-1, 1])
    plt.ylim([-1, 1])
    ts = np.linspace(0, 1, 100)
    xs = traj.vector_values(ts)
    plt.plot(xs[0, :], xs[1, :])


plot_trajectory(traj)

In [None]:
# inputs: x_0[k], x_1[k], sigma_k
# outputs: x_0[k+1], x_1[k+1]
denoiser = MultilayerPerceptron(
    [4, 255, 255, 2]
)  # , [PerceptronActivationType.kTanh, PerceptronActivationType.kTanh, PerceptronActivationType.kIdentity])
context = denoiser.CreateDefaultContext()
denoiser.SetRandomContext(context, generator)
params = denoiser.GetMutableParameters(context)
dloss_dparams = 0 * params

# discrete noise levels (following Permenter23, Section 3.3.2)
K = 100
beta = 0.05  # choose 0 <= beta < 1
sigma = [1]
for i in range(K - 1, 0, -1):
    sigma.append((1 - beta) * sigma[-1])
sigma.reverse()
sigma = np.asarray(sigma)
print(f"sigma ∈ [{sigma.min()}, {sigma.max()}]")

# learning rate
eta = 0.001

err = []
for iter in range(500):
    N = 1000
    # generate N random samples from the manifold
    xs = traj.vector_values(rng.uniform(0, 1, N))
    # generate N random noise values
    eps = rng.normal(0, 1, (2, N))
    # generate N random noise levels
    ks = rng.integers(0, K, N)
    sigmas = sigma[ks]

    X = np.vstack((xs + sigmas * eps, np.sin(ks), np.cos(ks)))
    # plt.plot(np.vstack((X[0,:200], xs[0,:200])),np.vstack((X[1,:200], xs[1,:200])),color=str(0.5))

    # Take a few steps of gradient descent.
    # TODO(russt): Use ADAM.
    this_err = 0
    for step in range(1):
        this_err += denoiser.BackpropagationMeanSquaredError(
            context, X=X, Y_desired=eps, dloss_dparams=dloss_dparams
        )
        params -= eta * dloss_dparams
    err.append(this_err)

plt.figure()
plt.plot(err)
plt.xlabel("iter")
plt.ylabel("loss");

In [None]:
plot_trajectory(traj)

# Plot the denoiser "projection" back to the manifold for a particular noise level
k_plot = 50
assert k_plot >= 0 and k_plot < K
x0s, x1s = np.meshgrid(np.linspace(-1, 1, 11), np.linspace(-1, 1, 11), indexing="ij")
X = np.vstack(
    (
        x0s.flatten(),
        x1s.flatten(),
        0 * x0s.flatten() + np.sin(k_plot),
        0 * x0s.flatten() + np.cos(k_plot),
    )
)
Y = np.asfortranarray(np.zeros((2, X.shape[1])))
denoiser.BatchOutput(context, X, Y)
plt.plot(
    np.vstack((X[0, :], X[0, :] - sigma[k_plot] * Y[0, :])),
    np.vstack((X[1, :], X[1, :] - sigma[k_plot] * Y[1, :])),
    color=str(0.5),
);

In [None]:
plot_trajectory(traj)

for i in range(20):
    # Denoise a random initial sample
    xk = rng.normal(0, 1, [2, 1])
    plt.plot(xk[0], xk[1], "x", color=str(0.5))
    for k in range(K - 1, 0, -1):
        for j in range(2):
            denoiser.get_input_port().FixValue(
                context, np.vstack((xk, np.sin(k), np.cos(k)))
            )
            eps = denoiser.get_output_port().Eval(context).reshape([2, 1])
            xk = xk + (sigma[k - 1] - sigma[k]) * eps
        plt.plot(xk[0], xk[1], "x", color=str(0.5 * k / K))