In [None]:
import numpy as np
import matplotlib.pylab as plt
import pandas as pd
%reload_ext autoreload
%autoreload 2

plt.rcParams.update({
    "text.usetex": True,
    "font.family": "DejaVu Sans",
    "font.size": 12,
})
plt.rc('text.latex', preamble=r'\usepackage{bm}')
figsize = 7

#%matplotlib inline 
#%matplotlib widget
%matplotlib notebook

### Study distance distributions

In [None]:
# compare distances of reflected vs. original
from poly_certificate.problem import generate_distances, Problem
from _scripts.evaluate_real import ANCHOR_CHOICE

dataset = "trial5"
use_anchors = ANCHOR_CHOICE["top"]

prob = Problem.init_from_dataset(dataset, 
                                 time_range=None, 
                                 use_anchors=use_anchors,
                                 use_gt=False)

def plot_distribution(prob):
    D_real = generate_distances(prob.trajectory,  prob.anchors)
    D_noisy = prob.D_noisy.copy()
    D_noisy[D_noisy > 0] = np.sqrt(D_noisy[D_noisy > 0])

    fig_hist, axs_hist = plt.subplots(prob.anchors.shape[0], sharey=True, sharex=True)
    fig, axs = plt.subplots(prob.anchors.shape[0], sharey=True, sharex=True)
    fig.set_size_inches(10, 10)
    fig_hist.set_size_inches(10, 10)
    
    var_biased = []
    var_unbiased = []
    for i in range(prob.anchors.shape[0]):
        indices = np.where(prob.D_noisy[i, :] > 0)[0]
        axs[i].plot(prob.times[indices],D_real[i, indices], ls="-", label="real", color="k")
        axs[i].scatter(prob.times[indices],D_noisy[i, indices], s=1.0, label="noisy", color="C1")

        error = D_noisy[i, indices] - D_real[i, indices]
        var_biased.append(np.mean(error**2))
        var_unbiased.append(np.mean((error - np.mean(error))**2))
        axs_hist[i].hist(error)
        axs_hist[i].set_ylabel(f"a{i}")

        axs[i].set_ylabel(f"a{i}")
        axs[i].grid()
        
    print("biased variance:", np.round(var_biased, 2))
    print("unbiased variance:", np.round(var_unbiased, 2))
        
    fig.tight_layout()
    axs[0].set_title("distances over time")
    axs[i].set_xlabel("time [s]")
    axs[i].legend()

    fig_hist.tight_layout()
    axs_hist[0].set_title("error histograms")
    axs_hist[i].set_xlabel("distance error [m]")
    
plot_distribution(prob)

In [None]:
prob = Problem.init_from_dataset(dataset, 
                                 time_range=None, 
                                 use_anchors=use_anchors,
                                 use_gt=False, 
                                 calibrate=True)
plot_distribution(prob)

D_real = generate_distances(prob.trajectory, prob.anchors)

In [None]:
# dataset statistics
for i in range(1, 17):
    dataset = f"trial{i}"
    print(f"\n\n dataset {dataset}")
    prob = Problem.init_from_dataset(dataset, traj_only=False, use_anchors=use_anchors)
    print(prob.E)
    print(prob.anchors.shape)
    ttot =  prob.times[-1]-prob.times[0]
    print("total time and N:", ttot, prob.N)
    print("average frequency:", prob.N / ttot)
    print("frequency per anchor:")
    for a in range(prob.W.shape[0]):
        print(f"a{a} : {np.sum(prob.W[a, :]) / ttot:.2f}", end=", ")

### Study velocity profiles

In [None]:
# compare distances of reflected vs. original
from poly_certificate.problem import generate_distances, Problem

time_range = None #[10, 20]
datasets = [f"trial{i}" for i in range(1, 2)]
fig, axs = plt.subplots(len(datasets), sharex=True, squeeze=False)
axs=axs.flatten()
fig.set_size_inches(10, 2*(len(datasets)))

for i, dataset in enumerate(datasets):
    prob = Problem.init_from_dataset(dataset, 
                                     time_range=time_range, 
                                     use_anchors=use_anchors,
                                     use_gt=False)
    velocities = (prob.trajectory[1:] - prob.trajectory[:-1]) / (prob.times[1:] - prob.times[:-1])[:, None]
    velocities_mag = np.linalg.norm(velocities, axis=1)
    axs[i].plot(prob.times[1:],velocities_mag)
    axs[i].axvline(prob.times[0] + 8, color="C1")
    axs[i].axvline(prob.times[-1] - 8, color="C1")
    axs[i].set_ylabel(f"v [m/s], {dataset}")
    axs[i].set_ylim(0, 2)
    axs[i].grid()
fig.tight_layout()
axs[0].set_title("velocities over time")
axs[i].set_xlabel("time [s]")

if len(datasets) == 16:
    from poly_certificate.utils.plotting_tools import savefig
    savefig(fig, "_plots/velocity-profiles.pdf")

## Finding local minima

### compute reflections

In [None]:
from poly_certificate.problem import Problem, generate_distances

prob = Problem.init_from_dataset("trial1", sigma_dist_est=0.1, sigma_acc_est=0.2, 
                                 time_range=[10, 20], 
                                 #use_anchors=None)
                                 #use_gt=True)
                                 use_anchors=use_anchors) # top
                                 #use_anchors=[0, 1, 3, 4]) #diagonal)

traj = prob.trajectory.copy() 

def plot_xy_xz(points, axs=None, **kwargs):
    if axs is None:
        fig, axs = plt.subplots(1 ,2)
    axs[0].plot(points[:, 0], points[:, 1], **kwargs)
    axs[0].axis("equal")
    axs[0].set_xlabel("x")
    axs[0].set_ylabel("y")
    axs[1].plot(points[:, 0], points[:, 2], **kwargs)
    axs[1].axis("equal")
    axs[1].set_xlabel("x")
    axs[1].set_ylabel("z")
    return axs

axs = plot_xy_xz(traj)

axs[0].scatter(prob.anchors[:, 0], prob.anchors[:, 1])
axs[1].scatter(prob.anchors[:, 0], prob.anchors[:, 2])

#traj_refl = reflect_points(prob.trajectory.T, prob.anchors.T, verbose=True).T
traj_refl = prob.reflected_init(fraction=1.0)
plot_xy_xz(traj_refl, axs=axs)

traj_part = prob.reflected_init(fraction=0.5)
plot_xy_xz(traj_part, axs=axs, ls=":")

In [None]:
from poly_certificate.utils.plotting_tools import plot_3d_curves
plot_3d_curves({"original": traj, "reflected":traj_refl}, anchors=prob.anchors)

### evaluation

In [None]:
from poly_certificate.gauss_newton import gauss_newton
from poly_certificate.certificate import get_certificate, get_rho_and_lambdas
regularization = "constant-velocity"
#time_range = [10, 20]
time_range = None
#use_anchors = range(6) # all anchors
use_anchors = [0, 2, 3, 5] # all top anchors
#use_anchors = [0, 1, 3, 4] # diagonal anchors

#sigma_dist_est = 0.05
#calibrate = False

sigma_dist_est = 0.05
calibrate = True

REG = 1e-3
use_gt = False
sigma_acc_est = 1e-3
extra_noise = 0 # only split between no convergence / convergence

In [None]:
dataset = "trial1"

np.random.seed(0)
prob = Problem.init_from_dataset(
    dataset,
    time_range=time_range,
    sigma_dist_est=sigma_dist_est,
    sigma_acc_est=sigma_acc_est,
    use_gt=use_gt,
    use_anchors=use_anchors,
    calibrate=calibrate
)
traj = prob.trajectory

# yield local:
#theta_0 = prob.random_init(regularization) 
#theta_0 = prob.reflected_init(regularization, fraction=0.5)
#theta_0 = prob.reflected_init(regularization, fraction=1.0)
#prob.D_noisy += np.random.normal(loc=0, scale=extra_noise, size=prob.D_noisy.shape)

# yields global:
theta_0 = prob.gt_init(regularization, extra_noise=0)#0.2)

theta_hat, stats = gauss_newton(
    theta_0, prob, regularization=regularization, max_iter=100, verbose=False,
    tol=1e-5, progressbar=True
)
if stats["success"] is False:
    print("Gauss-Newton did not converge.")
else:
    print("Gauss-Newton converged in", stats["n it"])
print("rho, lambdas")
rho, lambdas = get_rho_and_lambdas(
    theta_hat, prob, regularization=regularization
)

traj_hat = theta_hat[:, : prob.d]
error = np.linalg.norm(traj_hat - prob.trajectory) / np.sqrt(
    prob.N * prob.d
)
print("RMSE:", error)
cert = get_certificate(
    prob,
    rho,
    lambdas,
    regularization=regularization,
    reg=REG,
    verbose=False
)
print("Certificate:", cert)

traj_0 = theta_0[:, :prob.d]
plot_3d_curves({"original": traj, "estimate":traj_hat}, anchors=prob.anchors)
#plot_3d_curves({"original": traj, "estimate":traj_hat}, anchors=anchors)

In [None]:
grad = stats["grad"].reshape((prob.N, -1))
mask = np.abs(grad) > 1e-6
print("maximum gradient", np.max(grad[mask]))
I, J = np.where(mask > 0)
fig, ax = plt.subplots()
ax.pcolorfast(mask)

traj_problem = traj_hat[I, :]

plot_3d_curves({"original": traj, "estimate":traj_hat, "problem":traj_problem}, anchors=prob.anchors)

In [None]:
# comparison of two terms
from poly_certificate.sdp_setup import get_f
from poly_certificate.sdp_setup import get_prob_matrices
Q, A_0, A_list, R = get_prob_matrices(prob, regularization)
f_hat = get_f(theta_hat)
print(f_hat.T @ Q @ f_hat)
print(f_hat.T @ R @ f_hat)

In [None]:
from poly_certificate.sdp_setup import get_prob_matrices, get_H
from poly_certificate.certificate import get_block_matrices
from poly_certificate.decompositions import get_matrix_from_blocks

if prob.N < 500:
    print("Comparing certificate with eigenvalues of H, size:", prob.N)
    Q, A_0, A_list, R = get_prob_matrices(prob, regularization)
    H = get_H(Q + R, A_0, A_list, rho, lambdas)
    H_ii, H_ij, h_i_list, h = get_block_matrices(prob, rho, lambdas)

    H_blocks = get_matrix_from_blocks(H_ii, H_ij, h_i_list, h)

    np.testing.assert_allclose(H_blocks, H.toarray())
    print("Eigenvalues of H:", np.linalg.eigvalsh(H_blocks))

    np.seterr(divide='ignore')

    fig, axs = plt.subplots(1, 4)
    fig.set_size_inches(8, 3)
    fig.tight_layout()
    axs[0].matshow(np.log10(np.abs(Q[:20, :20].toarray())))
    axs[1].matshow(np.log10(np.abs(R[:20, :20].toarray())))
    axs[2].matshow(np.log10(np.abs(H[:20, :20].toarray())))
    axs[2].set_title("sum of Q and R")
    axs[3].matshow(np.log10(np.abs(H_blocks[:20, :20])))
    axs[3].set_title("from blocks")

    fig, axs = plt.subplots(1, 4)
    fig.set_size_inches(8, 3)
    fig.tight_layout()
    axs[0].matshow(np.log10(np.abs(Q[-20:, -20:].toarray())))
    axs[1].matshow(np.log10(np.abs(R[-20:, -20:].toarray())))
    axs[2].matshow(np.log10(np.abs(H[-20:, -20:].toarray())))
    axs[2].set_title("sum of Q and R")
    axs[3].matshow(np.log10(np.abs(H_blocks[-20:, -20:])))
    axs[3].set_title("from blocks")
else:
    print("N is too big for eigenvalue computation")

### calibration

determine the sigma to minimize rmse

In [None]:
from poly_certificate.certificate import get_certificate

def calibrate_sigma(use_anchors, calibrate, sigma_dist_est, sigma_acc_est_list, use_gt=False):
    time_range = None #[10, 50]
    dataset = "trial1"
    regularization = "constant-velocity"
    
    #sigma_acc_est_list = [0.0001, 0.0003, 0.0005, 0.001, 0.003] #np.arange(0.001, 0.01, step=0.002)

    rmses = np.full(len(sigma_acc_est_list), np.nan)
    costs = np.full(len(sigma_acc_est_list), np.nan)
    rhos = np.full(len(sigma_acc_est_list), np.nan)
    certs = np.full(len(sigma_acc_est_list), np.nan)

    for i, sigma_acc_est in enumerate(sigma_acc_est_list):
        print(f" sigma = {sigma_acc_est:.1e}")
        prob = Problem.init_from_dataset(
            dataset,
            time_range=time_range,
            sigma_dist_est=sigma_dist_est,
            sigma_acc_est=sigma_acc_est,
            use_gt=use_gt,
            use_anchors=use_anchors, 
            calibrate=calibrate
        )
        traj = prob.trajectory
        if use_gt:
            prob.add_noise(sigma_dist_est)

        theta_0 = prob.gt_init(regularization)

        theta_hat, stats = gauss_newton(
            theta_0, prob, regularization=regularization, 
        )

        if theta_hat is None:
            print("unsuccessful")
            continue

        costs[i] = stats["cost"]

        rho, lambdas = get_rho_and_lambdas(
            theta_hat, prob, regularization=regularization
        )
        rhos[i] = -rho
        cert = get_certificate(prob, rho, lambdas, regularization=regularization, reg=1e-3)
        if cert > -np.inf:
            certs[i] = cert

        traj_hat = theta_hat[:, : prob.d]
        error = np.linalg.norm(traj_hat - prob.trajectory) / np.sqrt(prob.N)
        rmses[i] = error
        
    return rmses, costs, rhos, certs

In [None]:
def plot_results(x, rmses, costs, rhos, certs, log=False):
    fig, ax = plt.subplots()
    ax.plot(x, certs)
    ax.set_ylabel("certificate")
    ax.set_xlabel("sigma")
    if log:
        ax.set_xscale("log")
    ax.grid()
    
    fig, ax = plt.subplots()
    ax.plot(x, rmses)
    if log:
        ax.set_xscale("log")
    ax.set_ylabel("RMSE")
    ax.set_xlabel("sigma")
    ax.grid()

    fig, ax = plt.subplots()
    ax.plot(x, costs, label="cost")
    ax.plot(x, rhos, ls=":", label="rho")
    if log:
        ax.set_xscale("log")
    ax.set_ylabel("cost")
    ax.set_xlabel("sigma")
    ax.legend()
    ax.grid()
    return fig, ax 
    #return rmses, costs, rhos
    #plot_3d_curves({"original": traj, "estimate":traj_hat}, anchors=anchors)

In [None]:
# fix sigma_dist_est at 5cm. 
sigma_dist_est = 0.05
use_anchors = [0, 2, 3, 5] # top
calibrate = True
sigma_acc_est_list1 = np.logspace(-5, 1, 7) #np.arange(0.001, 0.01, step=0.002)
results1 = calibrate_sigma(use_anchors, calibrate, sigma_dist_est, sigma_acc_est_list1)

sigma_acc_est_list2 = np.arange(1e-3, 6e-3, step=1e-3) #np.arange(0.001, 0.01, step=0.002)
results2 = calibrate_sigma(use_anchors, calibrate, sigma_dist_est, sigma_acc_est_list2)
rmses, costs, rhos, certs = results2
print("best sigma:", sigma_acc_est_list2[np.argmin(rmses)])

In [None]:
plot_results(sigma_acc_est_list1, *results1, log=True)
plot_results(sigma_acc_est_list2, *results2, log=False)

In [None]:
# make sure the same parameters work with simulated data
sigma_dist_est = 0.05
use_anchors = [0, 2, 3, 5] # top
calibrate = False
results3 = calibrate_sigma(use_anchors, calibrate, sigma_dist_est, sigma_acc_est_list2, use_gt=True)

In [None]:
plot_results(sigma_acc_est_list2, *results3, log=False)

### WIP: study the  covariance estimates

In [None]:
def plot_with_cov(theta, Cov, scaling=1.0):
    #from poly_certificate.gauss_newton import get_hess
    #H = get_hess(theta_hat, prob, regularization=regularization)
    from poly_certificate.utils.plotting_tools import plot_covariance
    
    covariances = []
    dim = theta_hat.shape[1]
    for n in range(prob.N):
        covariances.append(Cov[n*dim:n*dim + 2,n*dim:n*dim+2].toarray())

    fig, ax = plt.subplots()
    ax.plot(prob.trajectory[:, 0], prob.trajectory[:, 1], color="k")
    ax.scatter(prob.anchors[:, 0], prob.anchors[:, 1], color="k", marker="x")
    ax.plot(theta_hat[:, 0], theta_hat[:, 1])
    for i in range(prob.N)[::10]:
        plot_covariance(Cov=covariances[i], centre=theta_hat[i, :2], scaling=scaling, ax=ax)
    ax.axis("equal")
    return fig, ax

In [None]:
# fix sigma_dist_est at 5cm. 
use_gt = False
time_range = [10, 50]
sigma_dist_est = 0.05

dataset = "trial1"
use_anchors = [0, 2, 3, 5] # top
sigma_acc_est = 1e-3

print(f" sigma = {sigma_acc_est:.1e}")
prob = Problem.init_from_dataset(
    dataset,
    time_range=time_range,
    sigma_dist_est=sigma_dist_est,
    sigma_acc_est=sigma_acc_est,
    use_gt=use_gt,
    use_anchors=use_anchors,
    calibrate=True
)

for regularization in ["constant-velocity", "zero-velocity"]:
    theta_0 = prob.gt_init(regularization)
    theta_hat, stats = gauss_newton(
        theta_0, prob, regularization=regularization, max_iter=200
    )

    if theta_hat is None:
        print("unsuccessful")
    print("converged")

    rho, lambdas = get_rho_and_lambdas(
        theta_hat, prob, regularization=regularization
    )

    traj = prob.trajectory
    traj_hat = theta_hat[:, : prob.d]
    error = np.linalg.norm(traj_hat - prob.trajectory) / np.sqrt(prob.N)

    cert = get_certificate(prob, rho, lambdas, regularization)
    print(regularization, "rmse", error, "cert", cert)
    
    fig1 = plot_3d_curves({"original": traj, "estimate":traj_hat}, anchors=prob.anchors)
    
    print("calculate inverse...")
    import scipy.sparse.linalg as spl
    Cov = spl.inv(stats["cov"])
    print("done")
    
    fig2, ax = plot_with_cov(traj_hat, Cov, scaling=1e1)
    fig2.set_size_inches(10, 10)

In [None]:
from poly_certificate.gaussian_process import get_posterior_covariances
from poly_certificate.problem import Problem
from poly_certificate.utils.plotting_tools import add_scalebar, savefig, plot_covariance
# for covariance plotting:
# find good scaling for covariance matrix

fname = "_results/real_top_calib/results.pkl"
results = pd.read_pickle(fname)

row = results.iloc[0]
prob = Problem.init_from_dataset(row.dataset, traj_only=True)
cov_inv = row["inverse covariance"]
covs = get_posterior_covariances(prob, row["inverse covariance"], regularization=row.regularization)

scaling = 10

fig, ax = plt.subplots()
fig.set_size_inches(figsize, figsize)
ax.plot(row["ground truth"][:, 0], row["ground truth"][:, 1], color="k", ls="-")
ax.scatter(row.estimate[:, 0], row.estimate[:, 1], color="C1", s=1.0)
select_ns = range(row.estimate.shape[0])[::10]
print(f"plotting {len(select_ns)} covariance estimates")
for n in select_ns:
    plot_covariance(covs[n], row.estimate[n, :2], scaling, ax,
                    facecolor="C1", alpha=0.4)
                
ax.grid("on", which="both")
ax.xaxis.set_ticks_position('none')
ax.yaxis.set_ticks_position('none')
ax.set_xticklabels([])
ax.set_yticklabels([])
ax.set_xlim(-4.5, 4.5)
ax.set_ylim(-4.5, 4.5)
ax.set_xlabel(f"RMSE: {row.RMSE:.2f} m", fontsize=12)