In [None]:
# Python Script: Gaussian Process Regression for Learning Dynamics

import numpy as np
import matplotlib.pyplot as plt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from tqdm import trange
import scipy.io

# Load simulation data
mat_file_path = 'vehicle_dynamics_data.mat'
mat_data = scipy.io.loadmat(mat_file_path)
results = mat_data['results'][0, 0]  # Extract the outer structure

# Terrain keys
terrains = ["Snow", "Mud", "SteepSlope"]

# Initialize storage for training data
data_X, data_Y = [], []

for terrain in terrains:
    # Extract terrain-specific data
    terrain_data = results[terrain][0, 0]
    x = terrain_data['x'][:, 0]    # 200x1 -> flatten to 1D
    y = terrain_data['y'][:, 0]
    theta = terrain_data['theta'][:, 0]
    v = terrain_data['v'][:, 0]
    delta = terrain_data['delta'][0, :]  # 1x200 -> flatten to 1D
    a = terrain_data['a'][0, :]

    # Calculate dynamics errors (ground truth - nominal)
    dx = np.diff(x)
    dy = np.diff(y)
    dv = np.diff(v)

    # Feature matrix: [v, delta, a]
    inputs = np.vstack((v[:-1], delta[:-1], a[:-1])).T
    data_X.append(inputs)

    # Targets: [dx, dy, dv]
    targets = np.vstack((dx, dy, dv)).T
    data_Y.append(targets)

# Combine data from all terrains
data_X = np.vstack(data_X)
data_Y = np.vstack(data_Y)

# Check shapes for debugging
print(f"Final data_X shape: {data_X.shape}")
print(f"Final data_Y shape: {data_Y.shape}")


# Train Gaussian Process Regressors (one for each output)
kernel = C(1.0, (1e-3, 1e3)) * RBF(10, (1e-2, 1e2))
gpr_models = []

for i in trange(data_Y.shape[1]):
    gpr = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=1e-2)
    gpr.fit(data_X, data_Y[:, i])
    gpr_models.append(gpr)

# Validation and Visualization
for i, gpr in enumerate(gpr_models):
    # Predict on training data for simplicity (or use a test split)
    y_pred, sigma = gpr.predict(data_X, return_std=True)

    # Plot predictions vs actual
    plt.figure(figsize=(8, 4))
    plt.plot(data_Y[:, i], label="True", alpha=0.7)
    plt.plot(y_pred, label="Predicted", linestyle="--")
    plt.fill_between(
        np.arange(len(y_pred)), y_pred - 1.96 * sigma, y_pred + 1.96 * sigma,
        alpha=0.2, label="Confidence Interval"
    )
    plt.title(f"Output Dimension {i + 1}: Prediction vs Actual")
    plt.xlabel("Sample Index")
    plt.ylabel("Dynamics Error")
    plt.legend()
    plt.grid()
    plt.show()

# Save models and evaluation metrics
import joblib

for i, gpr in enumerate(gpr_models):
    joblib.dump(gpr, f"gpr_model_output_{i}.joblib")

print("GPR training and validation complete. Models saved.")


In [None]:
import jax.numpy as np
import numpy as onp
from jax import vmap, jit, vjp, random
from jax.scipy.linalg import cholesky, solve_triangular

from jax import config
config.update("jax_enable_x64", True)

from pyDOE import lhs
from functools import partial
from tqdm import trange
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from matplotlib import rc

onp.random.seed(1234)
# Neural network \(\phi(x)\)
def neural_net_phi(nn_params, x):
    W1 = (np.array([nn_params[0], nn_params[1]]).reshape(-1,1)).T
    b1 = np.array([nn_params[2], nn_params[3]])
    W2 = np.array([nn_params[4], nn_params[5]]).reshape(-1,1)
    b2 = np.array(nn_params[6])
    h = np.tanh(np.dot(x, W1) + b1)  # Hidden layer
    return np.dot(h, W2) + b2  # Output layer
# A wrapper to call SciPy's L-BFGS-B optimizer
def minimize_lbfgs(objective, x0, bnds = None):
    result = minimize(objective, x0, jac=True,
                      method='L-BFGS-B', bounds = bnds,
                      callback=None)
    return result.x, result.fun
# A vectorized RBF kernel function
def RBF(x1, x2, params):
    output_scale = params[0]
    lengthscales = params[1:]
    diffs = np.expand_dims(x1 / lengthscales, 1) - \
            np.expand_dims(x2 / lengthscales, 0)
    r2 = np.sum(diffs**2, axis=2)
    return output_scale * np.exp(-0.5 * r2)

mu = lambda x: np.cos(4.0*np.pi*x)
def RBF_with_phi(nn_params, x1, x2, theta):
    phi_x1 = vmap(lambda x: neural_net_phi(nn_params, x))(x1)
    phi_x2 = vmap(lambda x: neural_net_phi(nn_params, x))(x2)
    return RBF(phi_x1, phi_x2, theta)  # RBF kernel on transformed inputs
# A minimal Gaussian process class
class GPRegression:
    # Initialize the class
    def __init__(self, kernel_fn = RBF_with_phi):
        self.kernel = kernel_fn

    def random_init_GP(self, rng_key, dim):
        # GPR Parameters
        logsigma_f = np.log(50.0*random.uniform(rng_key, (1,)))
        loglength  = np.log(random.uniform(rng_key, (dim,)) + 1e-8)
        logsigma_n = np.array([-4.0]) + random.normal(rng_key, (1,))
        # Neural Network Parameters
        input_dim=1 
        hidden_dim=2 # ONE HIDDEN LAYER WITH 2 NEURONS
        output_dim=1
        k1, k2 = random.split(rng_key)
        W1 = random.normal(k1, (input_dim, hidden_dim)) * 0.1
        W1_1 = np.array(W1[0,0]).reshape(-1)  
        W1_2 = np.array(W1[0,1]).reshape(-1) 
        b1 = random.normal(k1, (hidden_dim,)) * 0.1
        b1_1 = b1[0].reshape(-1)
        b1_2 = b1[1].reshape(-1)
        W2 = random.normal(k2, (hidden_dim, output_dim)) * 0.1
        W2_1 = W2[0, 0].reshape(-1) 
        W2_2 = W2[1, 0].reshape(-1)
        b2 = random.normal(k2, (output_dim,)) * 0.1
        b2 = b2.reshape(-1)
        hyp = np.concatenate([W1_1,W1_2,b1_1,b1_2,W2_1,W2_2,b2,logsigma_f, loglength, logsigma_n])
        return hyp

    def compute_cholesky(self, params, batch):
        X, _ = batch
        N, D = X.shape
        # Fetch params
        sigma_n = np.exp(params[-1])
        theta = np.exp(params[-3:-1])
        nn_params = params[:7]
        # Compute kernel
        K = self.kernel(nn_params, X, X, theta) + np.eye(N)*(sigma_n + 1e-8)
        L = cholesky(K, lower=True)
        return L

    def likelihood(self, params, batch):
        _, y = batch
        N = y.shape[0]
        # Compute Cholesky
        L = self.compute_cholesky(params, batch)
        # Compute negative log-marginal likelihood
        alpha = solve_triangular(L.T,solve_triangular(L, y, lower=True))
        NLML = 0.5*np.matmul(np.transpose(y),alpha) + \
               np.sum(np.log(np.diag(L))) + 0.5*N*np.log(2.0*np.pi)
        return NLML

    @partial(jit, static_argnums=(0,))
    def likelihood_value_and_grad(self, params, batch):
        fun = lambda params: self.likelihood(params, batch)
        primals, f_vjp = vjp(fun, params)
        grads = f_vjp(np.ones_like(primals))[0]
        return primals, grads

    def train(self, batch, rng_key, num_restarts = 10):
        # Define objective that returns NumPy arrays (to be minimized with SciPy)
        def objective(params):
            value, grads = self.likelihood_value_and_grad(params, batch)
            out = (onp.array(value), onp.array(grads))
            return out
        # Optimize with random restarts
        params = []
        likelihood = []
        X, _ = batch
        dim = X.shape[1]
        rng_key = random.split(rng_key, num_restarts)
        for i in trange(num_restarts):
            init = self.random_init_GP(rng_key[i], dim)
            p, val = minimize_lbfgs(objective, init)
            params.append(p)
            likelihood.append(val)
        params = np.vstack(params)
        likelihood = np.vstack(likelihood)
        #### find the best likelihood (excluding any NaNs) ####
        bestlikelihood = np.nanmin(likelihood)
        idx_best = np.where(likelihood == bestlikelihood)
        idx_best = idx_best[0][0]
        best_params = params[idx_best,:]
        return best_params

    @partial(jit, static_argnums=(0,))
    def predict(self, params, batch, X_star):
        X, y = batch
        # Fetch params
        sigma_n = np.exp(params[-1])
        theta = np.exp(params[-3:-1])
        nn_params = params[:7]
        # Compute kernels
        k_pp = self.kernel(nn_params,X_star, X_star, theta) + np.eye(X_star.shape[0])*(sigma_n + 1e-8)
        k_pX = self.kernel(nn_params,X_star, X, theta)
        L = self.compute_cholesky(params, batch)
        alpha = solve_triangular(L.T,solve_triangular(L, y, lower=True))
        beta  = solve_triangular(L.T,solve_triangular(L, k_pX.T, lower=True))
        # Compute predictive mean, std
        mu = np.matmul(k_pX, alpha)
        cov = k_pp - np.matmul(k_pX, beta)
        std = np.sqrt(np.clip(np.diag(cov), a_min=0.))
        return mu, std

    @partial(jit, static_argnums=(0,))
    def draw_posterior_sample(self, rng_key, params, batch, X_star):
        X, y = batch
        N, D = X.shape
        # Fetch params
        sigma_n = np.exp(params[-1])
        theta = np.exp(params[:-1])
        nn_params = params[:7]
        # Compute kernels
        k_pp = self.kernel(nn_params,X_star, X_star, theta) + np.eye(X_star.shape[0])*(sigma_n + 1e-8)
        k_pX = self.kernel(nn_params,X_star, X, theta)
        L = self.compute_cholesky(params, batch)
        alpha = solve_triangular(L.T,solve_triangular(L, y, lower=True))
        beta  = solve_triangular(L.T,solve_triangular(L, k_pX.T, lower=True))
        # Compute predictive mean
        mu = np.matmul(k_pX, alpha).reshape(-1)
        cov = k_pp - np.matmul(k_pX, beta)
        sample = random.multivariate_normal(rng_key, mu, cov)
        return sample


# Update kernel function in GP model


In [None]:
train_key, noise_key = random.split(random.PRNGKey(1))
for i in trange(data_Y.shape[1]):
    gp_model = GPRegression(kernel_fn=RBF_with_phi)
    X_train_nf = data_X[:,i].reshape(-1,1)
    X_test = data_X[:,i].reshape(-1,1)
    y_train_nf = data_Y[:,i].reshape(-1,1)
    y_true = data_Y[:,i]
    y_test = data_Y[:,i]
    mu_y_nf, sigma_y_nf = y_train_nf.mean(0), y_train_nf.std(0)
    #X_train_nf_norm = (X_train_nf - (-1)) / (1 - (-1))  # Normalize to [0, 1]
    #y_train_nf_norm = (y_train_nf - mu_y_nf) / sigma_y_nf

    # Train the GP model
    opt_params_nf = gp_model.train((X_train_nf, y_train_nf), train_key, num_restarts=10)
    mean_nf, std_nf = gp_model.predict(opt_params_nf, (X_train_nf, y_train_nf), X_test)
    # Test data and exact step function
    #X_test = np.linspace(0, 30, 597)[:, None]
   
    #X_test_norm = (X_test - (-1)) / (1 - (-1))  # Normalize to [0, 1]
    #mean_nf, std_nf = mean_nf * sigma_y_nf + mu_y_nf, std_nf * sigma_y_nf
    mean_nf = mean_nf.flatten()

    # Check accuracy
    error_nf = np.linalg.norm(mean_nf-y_test,2)/np.linalg.norm(y_test,2)
    print("Relative L2 error u_nf: %e" % (error_nf))

    # Draw Posterior Samples
    num_draws = 10
    test_keys = random.split(random.PRNGKey(0), num_draws)

    sample_fn_nf = lambda key: gp_model.draw_posterior_sample(key, opt_params_nf, (X_train_nf, y_train_nf), X_test)
    f_samples_nf = vmap(sample_fn_nf)(test_keys)
    f_samples_nf = f_samples_nf*sigma_y_nf + mu_y_nf


    plt.figure(figsize=(32, 8))

    # Noise-free data
    plt.subplot(1, 2, 1)
    plt.plot(y_true[:], label="True", alpha=0.7)
    plt.plot(y_train_nf[:], 'kx', label="Training data")
    plt.plot(mean_nf[:], 'r--', label="GP Mean")
    #plt.fill_between((mean_nf[:] - 2 * std_nf[:]).flatten(), (mean_nf[:] + 2 * std_nf[:]).flatten(),
    #                 color='orange', alpha=0.5, label="95% confidence interval")
    plt.fill_between(
            np.arange(len(mean_nf)), mean_nf - 1.96 * std_nf, mean_nf + 1.96 * std_nf,
            alpha=0.2, label="Confidence Interval"
        )
    #for i in range(num_draws):
    #     plt.plot(X_test, f_samples_nf[i,:], '-', color='gray', lw = 0.5)
    plt.title("Noise-free data")
    plt.legend()



    plt.show()
    


