# **Neural steering controller for autonomous parking**

This tutorial implements the physics-driven neural steering controller of the paper titled:

*Fast Planning and Tracking of Complex Autonomous Parking Maneuvers With Optimal Control and Pseudo-Neural Networks*

(available at https://ieeexplore.ieee.org/abstract/document/10309845)

## Initialization

### Import packages

In [1]:
import sys
import os
import torch
import pandas as pd
import scipy as sp

print('Current working directory: ',os.getcwd())
sys.path.append(os.path.join(os.getcwd(),'..'))
from neu4mes import *

# import a library for plots
import matplotlib as mpl
mpl.rcParams.update(mpl.rcParamsDefault)
#mpl.rcParams['text.usetex'] = True
import matplotlib.pyplot as plt
plt.close('all')
SMALL_SIZE = 14
MEDIUM_SIZE = 22
BIGGER_SIZE = 26
plt.rc('font', size=SMALL_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=MEDIUM_SIZE)    # fontsize of the axes title
plt.rc('axes', labelsize=SMALL_SIZE)     # fontsize of the x and y labels
plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=SMALL_SIZE)    # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title
plt.rc('grid', linestyle="--", color='grey')

# enable zooming on the plots
%matplotlib inline
import mpld3
mpld3.enable_notebook()


Current working directory:  /Users/mattiapiccinini/Documents/Research/Neu4Mes/tutorials
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>---- neu4mes ----<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<


### Configurations, known constant parameters and initial guesses

In [2]:
# Path to the data folder
data_folder = os.path.join(os.getcwd(),'datasets','control_steer_car_parking')

# Import the file with the vehicle data
vehicle_data_csv = os.path.join(data_folder,'other_data','vehicle_data.csv')
# Extract the vehicle data
L = pd.read_csv(vehicle_data_csv)['L'][0]  # [m] wheelbase

# Import the file with the steering maps
steer_maps_file = os.path.join(data_folder,'other_data','steer_map.txt')
steer_map_load  = np.loadtxt(steer_maps_file, delimiter='\t', skiprows=1)
delta_w_avg_map = torch.tensor(np.float64(np.deg2rad(steer_map_load[:,0])))  # [rad] average steering angle at the front wheels
delta_sw_map    = torch.tensor(np.float64(np.deg2rad(steer_map_load[:,1])))  # [rad] steering wheel angle

# Initial guesses
# Load the initial guesses for the curvature diagram approximation, computed in Matlab with a 2nd order optimizer
initial_guesses_curv_diagr = sp.io.loadmat(os.path.join(data_folder,'other_data','initial_guesses_curv_diagram','fit_curv_diagr_5th_ord_poly.mat'))
curv_diagram_params_guess = initial_guesses_curv_diagr['optim_params_poly'][0].astype('float64')

## NN model

### Custom parametric functions

In [3]:
# Curvature diagram
def curvat_diagram(curv,h_1,h_2,h_3):
  L = 2.6   # [m] wheelbase
  return torch.atan(h_1*curv + h_2*torch.pow(curv,3) + h_3*torch.pow(curv,5) + curv*L)

# Steering maps
def steer_map_spline(x):
  # Inputs: 
  # x: average steering angle at the front wheels [rad]
  # x_data: map of average steering angles at the front wheels (delta_w_avg_map) [rad]
  # y_data: map of steering wheel angles (delta_sw_map) [rad]
  # Output:
  # y: steering wheel angle [rad]
  x_data = torch.tensor([-0.6417, -0.6288, -0.6158, -0.6028, -0.5899, -0.5769, -0.5639, -0.5510,
                         -0.5380, -0.5251, -0.5121, -0.4991, -0.4862, -0.4732, -0.4602, -0.4473,
                         -0.4343, -0.4213, -0.4084, -0.3954, -0.3824, -0.3695, -0.3565, -0.3436,
                         -0.3306, -0.3176, -0.3047, -0.2917, -0.2787, -0.2658, -0.2528, -0.2398,
                         -0.2269, -0.2139, -0.2009, -0.1880, -0.1750, -0.1621, -0.1491, -0.1361,
                         -0.1232, -0.1102, -0.0972, -0.0843, -0.0713, -0.0583, -0.0454, -0.0324,
                         -0.0194, -0.0065,  0.0065,  0.0194,  0.0324,  0.0454,  0.0583,  0.0713,
                         0.0843,  0.0972,  0.1102,  0.1232,  0.1361,  0.1491,  0.1621,  0.1750,
                         0.1880,  0.2009,  0.2139,  0.2269,  0.2398,  0.2528,  0.2658,  0.2787,
                         0.2917,  0.3047,  0.3176,  0.3306,  0.3436,  0.3565,  0.3695,  0.3824,
                         0.3954,  0.4084,  0.4213,  0.4343,  0.4473,  0.4602,  0.4732,  0.4862,
                         0.4991,  0.5121,  0.5251,  0.5380,  0.5510,  0.5639,  0.5769,  0.5899,
                         0.6028,  0.6158,  0.6288,  0.6417])
  y_data = torch.tensor([-12.5664, -12.3177, -12.0690, -11.8203, -11.5716, -11.3229, -11.0742,
                         -10.8255, -10.5768, -10.3281, -10.0794,  -9.8307,  -9.5799,  -9.3286,
                          -9.0772,  -8.8258,  -8.5745,  -8.3231,  -8.0718,  -7.8204,  -7.5691,
                          -7.3177,  -7.0663,  -6.8126,  -6.5576,  -6.3026,  -6.0476,  -5.7926,
                          -5.5376,  -5.2826,  -5.0276,  -4.7727,  -4.5177,  -4.2627,  -4.0055,
                          -3.7475,  -3.4895,  -3.2315,  -2.9735,  -2.7155,  -2.4574,  -2.1994,
                          -1.9414,  -1.6834,  -1.4254,  -1.1663,  -0.9071,  -0.6480,  -0.3888,
                          -0.1296,   0.1296,   0.3888,   0.6480,   0.9071,   1.1663,   1.4254,
                           1.6834,   1.9414,   2.1994,   2.4574,   2.7155,   2.9735,   3.2315,
                           3.4895,   3.7475,   4.0055,   4.2627,   4.5177,   4.7727,   5.0276,
                           5.2826,   5.5376,   5.7926,   6.0476,   6.3026,   6.5576,   6.8126,
                           7.0663,   7.3177,   7.5691,   7.8204,   8.0718,   8.3231,   8.5745,
                           8.8258,   9.0772,   9.3286,   9.5799,   9.8307,  10.0794,  10.3281,
                          10.5768,  10.8255,  11.0742,  11.3229,  11.5716,  11.8203,  12.0690,
                          12.3177,  12.5664])

  # Linear interpolation of the steering map:
  # Find the indices of the intervals containing each x
  indices = torch.searchsorted(x_data, x, right=True).clamp(1, len(x_data) - 1)
  
  # Get the values for the intervals
  x1 = x_data[indices - 1]
  x2 = x_data[indices]
  y1 = y_data[indices - 1]
  y2 = y_data[indices]
  
  # Linear interpolation formula
  y = y1 + (y2 - y1) * (x - x1) / (x2 - x1)
  
  # Saturate the output if x is out of bounds
  y = torch.where(x < x_data[0], y_data[0], y)    # Saturate to minimum y_data
  y = torch.where(x > x_data[-1], y_data[-1], y)  # Saturate to maximum y_data
  return y

### Internal architecture

In [4]:
# Neural model inputs and outputs
curv         = Input('curv')          # [1/m] path curvature
steer        = Input('steer')         # [rad] steering wheel angle 
steer_target = Input('steer_target')  # [rad] steering wheel angle --> this is used only as a training target

num_samples_future_curv = 15   # number of samples in the future for the curvature prediction
num_samples_past_steer  = 15   # number of samples in the past for the steering wheel angle prediction

# Trainable parameters:
# Curvature diagram parameters
h_1_guess = Parameter('h_1',values=[[curv_diagram_params_guess[0]]])  # initial guess
h_2_guess = Parameter('h_2',values=[[curv_diagram_params_guess[1]]])  # initial guess
h_3_guess = Parameter('h_3',values=[[curv_diagram_params_guess[2]]])  # initial guess

# Parametric function to learn the curvature diagram 
out_curv_diagr = ParamFun(curvat_diagram,parameters=[h_1_guess,h_2_guess,h_3_guess])(curv.sw([0,num_samples_future_curv]))

# FIR layer to weigh the future predictions of the curvature diagram
out_fir        = Fir(parameter_init=init_negexp, parameter_init_params={'size_index':0, 'first_value':0.1, 'lambda':5})(out_curv_diagr)   

# Parametric function to model the steering map (i.e., the relation between the average steering angle at the front wheels and the steering wheel angle)
out_steer_map  = ParamFun(steer_map_spline)(out_fir)     

# FIR layer to weigh the past steering wheel angles computed by the NN (auto-regressive model)
out_arx        = Fir(parameter_init=init_negexp, parameter_init_params={'size_index':0, 'first_value':1e-3, 'lambda':5})(steer.sw([-num_samples_past_steer,0]))  

# Output of the neural model
out = Output('steering_angle', out_steer_map + out_arx)

### Neu4Mes framework

In [5]:
# Create a neu4mes model
steer_controller_park = Neu4mes(visualizer='Standard',seed=0,workspace=os.path.join(os.getcwd(),'trained_models'))  #visualizer=MPLVisulizer()

# Add the neural model to the neu4mes structure and neuralization of the model
steer_controller_park.addModel('steer_ctrl',[out])
steer_controller_park.addMinimize('steer_error', 
                                  steer_target.next(),  # next means the first value in the "future"
                                  out, 
                                  loss_function='mse')
steer_controller_park.neuralizeModel()

[32m{'Functions': {'FParamFun6': {'code': 'def curvat_diagram(curv,h_1,h_2,h_3):\n'
                                      '  L = 2.6   # [m] wheelbase\n'
                                      '  return torch.atan(h_1*curv + '
                                      'h_2*torch.pow(curv,3) + '
                                      'h_3*torch.pow(curv,5) + curv*L)\n',
                              'n_input': 1,
                              'name': 'curvat_diagram',
                              'parameters': ['h_1', 'h_2', 'h_3']},
               'FParamFun8': {'code': 'def steer_map_spline(x):\n'
                                      '  # Inputs: \n'
                                      '  # x: average steering angle at the '
                                      'front wheels [rad]\n'
                                      '  # x_data: map of average steering '
                                      'angles at the front wheels '
                                      '(delta_w_avg_map) [r

## Training and validation datasets

In [6]:
# Load the training and the validation dataset
data_struct = ['curv',('steer','steer_target')]  # both steer and steer_target are read from the same column of the csv file
data_folder_train = os.path.join(data_folder,'training')
data_folder_valid = os.path.join(data_folder,'validation')
data_folder_test  = os.path.join(data_folder,'test')
steer_controller_park.loadData(name='training_set', source=data_folder_train, format=data_struct, skiplines=1)
steer_controller_park.loadData(name='validation_set', source=data_folder_valid, format=data_struct, skiplines=1)
steer_controller_park.loadData(name='test_set', source=data_folder_test, format=data_struct, skiplines=1)

# check the definition of the windows in the inputs and outputs
#samples_test_set = steer_controller_park.get_samples('training_set', index=100, window=1) 
#print(samples_test_set)

[32mDataset Name:                 training_set[0m
[32mNumber of files:              1[0m
[32mTotal number of samples:      3092[0m
[32mShape of steer:               (3092, 15, 1)[0m
[32mShape of curv:                (3092, 15, 1)[0m
[32mShape of steer_target:        (3092, 1, 1)[0m
[32mDataset Name:                 validation_set[0m
[32mNumber of files:              1[0m
[32mTotal number of samples:      832[0m
[32mShape of steer:               (832, 15, 1)[0m
[32mShape of curv:                (832, 15, 1)[0m
[32mShape of steer_target:        (832, 1, 1)[0m
[32mDataset Name:                 test_set[0m
[32mNumber of files:              1[0m
[32mTotal number of samples:      691[0m
[32mShape of steer:               (691, 15, 1)[0m
[32mShape of curv:                (691, 15, 1)[0m
[32mShape of steer_target:        (691, 1, 1)[0m


## Training

### Custom training functions

In [7]:
# Early stopping function
def early_stopping_custom(train_losses, val_losses, params):
  # The function returns True if the training should stop
  patience = params['patience']
  val_losses_steer = val_losses['steer_error']
  if len(val_losses_steer) > patience:
    # Index of the minimum validation loss
    min_val_loss_index = val_losses_steer.index(min(val_losses_steer))
    # Check if the patience has been exceeded
    if min_val_loss_index < len(val_losses_steer) - patience:
      return True
  return False

### Train the NN in open-loop (no auto-regressive term)

In [8]:
num_epochs = 4000
batch_size = 100
learn_rate = 1e-3  # learning rate
early_stop_patience = 100
training_pars_open_loop = {'num_of_epochs':num_epochs, 
                           'val_batch_size':batch_size, 
                           'train_batch_size':batch_size, 
                           'lr':learn_rate}

steer_controller_park.trainModel(train_dataset='training_set', validation_dataset='validation_set', 
                                 training_params=training_pars_open_loop, optimizer='Adam', shuffle_data=True,
                                 early_stopping=early_stopping_custom, early_stopping_params={'patience':early_stop_patience})  

[32mmodels:                       ['steer_ctrl'][0m
[32mtrain dataset:                training_set[0m
[32mtrain {batch size, samples}:  {100, 3092}[0m
[32mval dataset:                  validation_set[0m
[32mval {batch size, samples}:    {100, 832}[0m
[32mnum of epochs:                30[0m
[32mshuffle data:                 True[0m
[32mearly stopping:               early_stopping_custom[0m
[32mearly stopping params:        {'patience': 100}[0m
[32mminimize:                     {'steer_error': {'A': 'SamplePart17',
                                               'B': 'steering_angle',
                                               'loss': 'mse'}}[0m
[32moptimizer:                    Adam[0m
[32moptimizer defaults:           {'lr': 0.001}[0m
[32moptimizer params:             [{'params': 'PFir7p'},
                               {'params': 'PFir9p'},
                               {'params': 'h_1'},
                               {'params': 'h_2'},
                 

({'steer_error': [7.250860691070557,
   0.49330461025238037,
   0.26531797647476196,
   0.18019069731235504,
   0.12069321423768997,
   0.08212213218212128,
   0.05870248004794121,
   0.045916732400655746,
   0.03972342237830162,
   0.036538299173116684,
   0.03485159948468208,
   0.03416163846850395,
   0.033828821033239365,
   0.0336938314139843,
   0.033548545092344284,
   0.03296806290745735,
   0.03232007846236229,
   0.03204939141869545,
   0.03255182132124901,
   0.03226262331008911,
   0.03147595375776291,
   0.031592775136232376,
   0.031481534242630005,
   0.03146570920944214,
   0.03032674826681614,
   0.031012576073408127,
   0.03019380383193493,
   0.02912471629679203,
   0.02969312109053135,
   0.029162900522351265]},
 {'steer_error': [0.6663155555725098,
   0.5206828117370605,
   0.3964029550552368,
   0.2847863435745239,
   0.20540021359920502,
   0.15774770081043243,
   0.1311737447977066,
   0.11842428147792816,
   0.10943460464477539,
   0.106635183095932,
   0.10636

In [9]:
# Print the trained NN parameters
steer_controller_park.neuralizeModel()

[32m{'Functions': {'FParamFun6': {'code': 'def curvat_diagram(curv,h_1,h_2,h_3):\n'
                                      '  L = 2.6   # [m] wheelbase\n'
                                      '  return torch.atan(h_1*curv + '
                                      'h_2*torch.pow(curv,3) + '
                                      'h_3*torch.pow(curv,5) + curv*L)\n',
                              'n_input': 1,
                              'name': 'curvat_diagram',
                              'parameters': ['h_1', 'h_2', 'h_3']},
               'FParamFun8': {'code': 'def steer_map_spline(x):\n'
                                      '  # Inputs: \n'
                                      '  # x: average steering angle at the '
                                      'front wheels [rad]\n'
                                      '  # x_data: map of average steering '
                                      'angles at the front wheels '
                                      '(delta_w_avg_map) [r

### Re-train the NN in auto-regressive mode (closed-loop training)

In [10]:
num_epochs = 1500
batch_size = 100
learn_rate = 1e-3  # learning rate
early_stop_patience = 60
training_pars_closed_loop = {'num_of_epochs':num_epochs, 
                             'val_batch_size':batch_size, 
                             'train_batch_size':batch_size, 
                             'lr':learn_rate}

predict_samples = 50  # number of samples after which the internal state is reset
steps_skip = 1  # number of samples to skip when going to a new window. The default is 1, meaning the size of a batch. If steps_skip = predict_samples, then the whole window size is skipped

# NOTE: by default, the next batch skips a full length of a batch
# NOTE: shuffle = True shuffles only the order of the batches, so it's ok with the autoregression

steer_controller_park.trainModel(train_dataset='training_set', validation_dataset='validation_set', 
                                 training_params=training_pars_closed_loop, optimizer='Adam', shuffle_data=True,
                                 early_stopping=early_stopping_custom, early_stopping_params={'patience':early_stop_patience},
                                 prediction_samples=predict_samples, step=steps_skip, closed_loop={'steer':'steering_angle'})  

[33mRecurrent train: closing the loop between the the input ports steer and the output ports steering_angle for 50 samples[0m
[32mmodels:                       ['steer_ctrl'][0m
[32mtrain dataset:                training_set[0m
[32mtrain {batch size, samples}:  {100, 3092}[0m
[32mval dataset:                  validation_set[0m
[32mval {batch size, samples}:    {100, 832}[0m
[32mnum of epochs:                30[0m
[32mshuffle data:                 True[0m
[32mearly stopping:               early_stopping_custom[0m
[32mearly stopping params:        {'patience': 50}[0m
[32mminimize:                     {'steer_error': {'A': 'SamplePart17',
                                               'B': 'steering_angle',
                                               'loss': 'mse'}}[0m
[32mprediction samples:           50[0m
[32mstep:                         1[0m
[32mclosed loop:                  {'steer': 'steering_angle'}[0m
[32mconnect:                      {}[0m
[32mo

({'steer_error': [0.11423499137163162,
   0.0670536682009697,
   0.060909923166036606,
   0.05894617363810539,
   0.05702662467956543,
   0.055458296090364456,
   0.05536168813705444,
   0.06354054808616638,
   0.07584198564291,
   0.05363806337118149,
   0.04552225396037102,
   0.04425302520394325,
   0.04408230632543564,
   0.04752145707607269,
   0.05864229425787926,
   0.057137858122587204,
   0.041318316012620926,
   0.035896413028240204,
   0.03472783416509628,
   0.034774299710989,
   0.03717236593365669,
   0.04404274746775627,
   0.04859016463160515,
   0.03888050094246864,
   0.030252359807491302,
   0.027593575417995453,
   0.02694084867835045,
   0.027508767321705818,
   0.03017466329038143,
   0.035587526857852936]},
 {'steer_error': [0.20416124165058136,
   0.16526280343532562,
   0.14962820708751678,
   0.1491066962480545,
   0.1423526108264923,
   0.13430635631084442,
   0.12264928221702576,
   0.1108960509300232,
   0.12002100050449371,
   0.11935408413410187,
   0.112

### Print the parameters of the trained NN

In [11]:
# Print the trained NN parameters
steer_controller_park.neuralizeModel()

[32m{'Functions': {'FParamFun6': {'code': 'def curvat_diagram(curv,h_1,h_2,h_3):\n'
                                      '  L = 2.6   # [m] wheelbase\n'
                                      '  return torch.atan(h_1*curv + '
                                      'h_2*torch.pow(curv,3) + '
                                      'h_3*torch.pow(curv,5) + curv*L)\n',
                              'n_input': 1,
                              'name': 'curvat_diagram',
                              'parameters': ['h_1', 'h_2', 'h_3']},
               'FParamFun8': {'code': 'def steer_map_spline(x):\n'
                                      '  # Inputs: \n'
                                      '  # x: average steering angle at the '
                                      'front wheels [rad]\n'
                                      '  # x_data: map of average steering '
                                      'angles at the front wheels '
                                      '(delta_w_avg_map) [r

## Test on a new dataset

In [12]:
# Test on a new dataset
samples_test_set = steer_controller_park.get_samples('validation_set', index=0, window=50) 
steer_controller_park.resetStates()  # reset the internal state
out_nn_test_set  = steer_controller_park(samples_test_set)

# Test with custom data
#steer_controller_park({'curv':[0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0],'steer':[0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]})

[33mDifferent number of samples between inputs [MAX steer_target = 50; MIN curv = 36][0m


## Export the trained NN

In [13]:
# Export the model
steer_controller_park.neuralizeModel()
steer_controller_park.exportJSON()

flag_load_trained_model = False
if flag_load_trained_model:
  # Reload the trained model:
  # Load the json file with the model
  json_folder = os.path.join(os.getcwd(),'tutorials','trained_models','neu4mes_2024_10_07_17_50')
  json_file = os.path.join(json_folder,'model.json')
  import json
  # Open and read the JSON file
  with open(json_file, 'r') as file:
      model_trained_json = json.load(file)

  steer_controller_park.model_def = model_trained_json
  # steer_controller_park.trainModel(train_dataset='training_set', validation_dataset='validation_set', 
  #                                  training_params=training_pars_closed_loop, optimizer='Adam', shuffle_data=True,
  #                                  prediction_samples=predict_samples, step=steps_skip,
  #                                  early_stopping=early_stopping_custom, early_stopping_params={'exit_tol':1-3})  

[32m{'Functions': {'FParamFun6': {'code': 'def curvat_diagram(curv,h_1,h_2,h_3):\n'
                                      '  L = 2.6   # [m] wheelbase\n'
                                      '  return torch.atan(h_1*curv + '
                                      'h_2*torch.pow(curv,3) + '
                                      'h_3*torch.pow(curv,5) + curv*L)\n',
                              'n_input': 1,
                              'name': 'curvat_diagram',
                              'parameters': ['h_1', 'h_2', 'h_3']},
               'FParamFun8': {'code': 'def steer_map_spline(x):\n'
                                      '  # Inputs: \n'
                                      '  # x: average steering angle at the '
                                      'front wheels [rad]\n'
                                      '  # x_data: map of average steering '
                                      'angles at the front wheels '
                                      '(delta_w_avg_map) [r