In [1]:
import numpy as np
import pandas as pd
import scipy.linalg

import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
import matplotlib.transforms as transforms

import sys
print(sys.version)
sys.path.append('../')

from models.powertrain.bounded_powertrain import Bounded_powertrain
from models.kinematic.ideal_diff_drive import Ideal_diff_drive
from models.kinematic.ICR_based import *
from models.kinematic.Perturbed_unicycle import *
from models.kinematic.enhanced_kinematic import *

from pylgmath import Transformation
from pysteam.trajectory import Time
from pysteam.trajectory.const_vel import Interface as TrajectoryInterface
from pysteam.problem import OptimizationProblem, StaticNoiseModel, L2LossFunc, WeightedLeastSquareCostTerm
from pysteam.solver import GaussNewtonSolver, DoglegGaussNewtonSolver
from pysteam.evaluable import vspace as vspaceev, se3 as se3ev
from pysteam.evaluable.se3 import SE3StateVar
from pysteam.evaluable.vspace import VSpaceStateVar

import scipy.interpolate
from scipy.interpolate import UnivariateSpline
from scipy.interpolate import make_smoothing_spline

3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0]


In [2]:
#import training dataset

dataset = pd.read_pickle('/home/dominic/repos/norlab_WMRD/data/marmotte/grand_salon_20_01_a/torch_dataset_all.pkl')

print(dataset)
print(len(dataset))

cmd_left_str_list = []
cmd_right_str_list = []
for i in range(0, 40):
    str_cmd_left_i = 'cmd_left_' + str(i)
    str_cmd_right_i = 'cmd_right_' + str(i)
    cmd_left_str_list.append(str_cmd_left_i)
    cmd_right_str_list.append(str_cmd_right_i)
cmd_left_vels_array = dataset[cmd_left_str_list].to_numpy()
cmd_right_vels_array = dataset[cmd_right_str_list].to_numpy()

encoder_left_str_list = []
encoder_right_str_list = []
for i in range(0, 40):
    str_encoder_left_i = 'left_wheel_vel_' + str(i)
    str_encoder_right_i = 'right_wheel_vel_' + str(i)
    encoder_left_str_list.append(str_encoder_left_i)
    encoder_right_str_list.append(str_encoder_right_i)
encoder_left_vels_array = dataset[encoder_left_str_list].to_numpy()
encoder_right_vels_array = dataset[encoder_right_str_list].to_numpy()

icp_x_str_list = []
icp_y_str_list = []
icp_z_str_list = []
icp_roll_str_list = []
icp_pitch_str_list = []
icp_yaw_str_list = []
for i in range(0, 40):
    str_icp_x_i = 'icp_x_' + str(i)
    str_icp_y_i = 'icp_y_' + str(i)
    str_icp_z_i = 'icp_z_' + str(i)
    str_icp_roll_i = 'icp_roll_' + str(i)
    str_icp_pitch_i = 'icp_pitch_' + str(i)
    str_icp_yaw_i = 'icp_yaw_' + str(i)
    icp_x_str_list.append(str_icp_x_i)
    icp_y_str_list.append(str_icp_y_i)
    icp_z_str_list.append(str_icp_z_i)
    icp_roll_str_list.append(str_icp_roll_i)
    icp_pitch_str_list.append(str_icp_pitch_i)
    icp_yaw_str_list.append(str_icp_yaw_i)
icp_x_array = dataset[icp_x_str_list].to_numpy()
icp_y_array = dataset[icp_y_str_list].to_numpy()
icp_z_array = dataset[icp_z_str_list].to_numpy()
icp_roll_array = dataset[icp_roll_str_list].to_numpy()
icp_pitch_array = dataset[icp_pitch_str_list].to_numpy()
icp_yaw_array = dataset[icp_yaw_str_list].to_numpy()

# import input space info
input_space_dataframe = pd.read_pickle('/home/dominic/repos/norlab_WMRD/data/marmotte/input_space/input_space_data.pkl')
ideal_diff_drive = Ideal_diff_drive(input_space_dataframe['calibrated_radius [m]'].to_numpy()[0], input_space_dataframe['calibrated baseline [m]'].to_numpy()[0], 0.05)

# import model error
ideal_diff_drive_errors_dataframe = pd.read_pickle('../data/marmotte/eval_results/grand_salon_b/ideal_diff_drive_full_eval_metrics.pkl')
enhanced_kinematic_errors_dataframe = pd.read_pickle('../data/marmotte/eval_results/grand_salon_b/enhanced_kinematic_full_eval_metrics.pkl')
print(ideal_diff_drive_errors_dataframe)
print(len(ideal_diff_drive_errors_dataframe))

# import powertrain model
time_constant = 1.5
dt = 0.05
bounded_powertrain = Bounded_powertrain(-11.5, -11.5, time_constant, dt)

     init_icp_x  init_icp_y  init_icp_z  init_icp_roll  init_icp_pitch  \
0           0.0         0.0         0.0            0.0             0.0   
1           0.0         0.0         0.0            0.0             0.0   
2           0.0         0.0         0.0            0.0             0.0   
3           0.0         0.0         0.0            0.0             0.0   
4           0.0         0.0         0.0            0.0             0.0   
..          ...         ...         ...            ...             ...   
814         0.0         0.0         0.0            0.0             0.0   
815         0.0         0.0         0.0            0.0             0.0   
816         0.0         0.0         0.0            0.0             0.0   
817         0.0         0.0         0.0            0.0             0.0   
818         0.0         0.0         0.0            0.0             0.0   

     init_icp_yaw  calib_step  cmd_left_0  cmd_right_0  cmd_left_1  ...  \
0             0.0         0.0    0.0

TypeError: Bounded_powertrain.__init__() takes 6 positional arguments but 7 were given

In [None]:
# extract icp poses through specific window

ideal_diff_drive = Ideal_diff_drive(input_space_dataframe['calibrated_radius [m]'].to_numpy()[0], input_space_dataframe['calibrated baseline [m]'].to_numpy()[0], 0.05)
input_space_dataframe = pd.read_pickle('/home/dominic/repos/norlab_WMRD/data/marmotte/input_space/input_space_data.pkl')
r = input_space_dataframe['calibrated_radius [m]'].to_numpy()[0]
baseline = input_space_dataframe['calibrated baseline [m]'].to_numpy()[0]
dt = 0.05

window_id = 512
# window_id = 303
# window_id = 231
print(cmd_left_vels_array.shape[1])
window_size = cmd_left_vels_array.shape[1]

predicted_states_idd = np.zeros((window_size, 6))
predicted_states_idd[0, 0] = dataset['init_icp_x'][window_id]
predicted_states_idd[0, 1] = dataset['init_icp_y'][window_id]
predicted_states_idd[0, 2] = dataset['init_icp_z'][window_id]
predicted_states_idd[0, 3] = dataset['init_icp_roll'][window_id]
predicted_states_idd[0, 4] = dataset['init_icp_pitch'][window_id]
predicted_states_idd[0, 5] = dataset['init_icp_yaw'][window_id]

icp_states = np.zeros((window_size, 6))
icp_states[0, 0] = dataset['init_icp_x'][window_id]
icp_states[0, 1] = dataset['init_icp_y'][window_id]
icp_states[0, 2] = dataset['init_icp_z'][window_id]
icp_states[0, 3] = dataset['init_icp_roll'][window_id]
icp_states[0, 4] = dataset['init_icp_pitch'][window_id]
icp_states[0, 5] = dataset['init_icp_yaw'][window_id]

for i in range(1, window_size):
    unbounded_input_array = np.array([cmd_left_vels_array[window_id, i-1], cmd_right_vels_array[window_id, i-1]])
    predicted_states_idd[i, :] = ideal_diff_drive.predict(predicted_states_idd[i-1, :], unbounded_input_array)
    icp_states[i,:] = np.array([icp_x_array[window_id, i], icp_y_array[window_id, i], icp_z_array[window_id, i], 
                                icp_roll_array[window_id, i], icp_pitch_array[window_id, i], icp_yaw_array[window_id, i]])
    # print([cmd_left_vels_array[window_id, i-1], cmd_right_vels_array[window_id, i-1]])
    
# print(icp_states)

In [None]:
# define spline

time_vector = np.linspace(0, 2, 40)

icp_x_spline = make_smoothing_spline(time_vector, icp_states[:, 0])
icp_y_spline = make_smoothing_spline(time_vector, icp_states[:, 1])
icp_z_spline = make_smoothing_spline(time_vector, icp_states[:, 2])
icp_roll_spline = make_smoothing_spline(time_vector, icp_states[:, 3])
icp_pitch_spline = make_smoothing_spline(time_vector, icp_states[:, 4])
icp_yaw_spline = make_smoothing_spline(time_vector, icp_states[:, 5])

# icp_x_spline = UnivariateSpline(time_vector, icp_states[:, 0])
# icp_y_spline = UnivariateSpline(time_vector, icp_states[:, 1])

print(icp_x_spline)

In [None]:
# plot uncertainty propagation

plt.figure(figsize=(10,10))
ax = plt.gca()
fig = plt.gcf()

alpha_plot = 1.0

int_states = ax.scatter(icp_states[:, 0], icp_states[:, 1], 
                  c = time_vector, 
                  cmap = 'hot', 
                  alpha = alpha_plot, 
                  lw=0, 
                  s=50,
                  label='end_state',
                 rasterized=True)

smoothed_spline = ax.plot(icp_x_spline(time_vector), icp_y_spline(time_vector), 
                  c = 'C8', 
                  alpha = alpha_plot, 
                  lw=1, 
                  label='end_state',
                 rasterized=True)

prediction_idd = ax.scatter(predicted_states_idd[1:, 0], predicted_states_idd[1:, 1], 
                  c = 'C3', 
                  cmap = 'hot', 
                  alpha = alpha_plot, 
                  lw=0, 
                  s=50,
                  label='end_state',
                 rasterized=True)

init_state = ax.scatter(dataset['init_icp_x'][window_id], dataset['init_icp_y'][window_id], 
                  c = 'C0', 
                  cmap = 'hot', 
                  alpha = alpha_plot, 
                  lw=0, 
                  s=50,
                  label='init_state',
                 rasterized=True)

end_state = ax.scatter(dataset['gt_icp_x'][window_id], dataset['gt_icp_y'][window_id], 
                  c = 'C1', 
                  cmap = 'hot', 
                  alpha = alpha_plot, 
                  lw=0, 
                  s=50,
                  label='end_state',
                 rasterized=True)


ax.set_xlim(-1,1)
ax.set_ylim(-1,1)

In [None]:
# compute prediction error
prediction_residual = 0

dt = 0.05

for i in range(1, window_size):
    timestamp = i * dt
    residual_x = icp_x_spline(timestamp) -  predicted_states_idd[i, 0]
    residual_y = icp_y_spline(timestamp) -  predicted_states_idd[i, 1]
    residual_yaw = icp_yaw_spline(timestamp) -  predicted_states_idd[i, 5]
    
    prediction_residual += residual_x + residual_y + residual_yaw