Install and import the required packages.

In [None]:
!pip install -r requirements.txt

import os
if not os.path.exists('results'):
    os.mkdir('results')

if not os.path.exists('results/MODEL'):
    os.chdir('results')
    os.mkdir('MODEL')
    os.chdir('..')
    
if not os.path.exists('results/best'):
    os.chdir('results')
    os.mkdir('best')
    os.chdir('..')
    
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import pickle
from bayes_opt import BayesianOptimization 
import json

import functions

# A Deep-Learning Framework to Predict the Dynamics of a Human-Driven Vehicle based on the Road Geometry

## Parameters definition

In this section, the main parameters are defined (for information, read the reference paper):
- Track dataframe
- Driver/Vehicle dataframe
- List of features (primary, secondary and road features)
- The parameters stride, dR, pR, tF
- The batch size used during the Neural Network training
- The number of laps covered by the training, validation and test sets

In [None]:
# Name of the track dataset
track_name = "calabogie"

# Name of the driver/vehicle dataset
dataset_name = "Calabogie-Sportcar-Antonio-20_12_03.pkl"

# Load the track dataset
trackMap = functions.load_track(track_name)

# Load the driver/vehicle dataset
df = functions.load_dataset(dataset_name)

# All of the driver/vehicle/road features needed in the prediction framework
driver_vehicle_road_features = ['chassis_accelerations.longitudinal',
           'chassis_accelerations.lateral',
           'chassis_velocities.yaw',
           'chassis_accelerations.vertical',
           'chassis_velocities.pitch',
           'chassis_velocities.roll', 
           'driver_demands.brake',
           'driver_demands.steering', 
           'driver_demands.steering_velocity',
           'driver_demands.throttle', 
           'transmission.gear',
           'relativeDistance',
           'relativeYaw',
           'chassis_velocities.lateral',
           'chassis_velocities.longitudinal',
           'chassis_velocities.vertical',
           'Angle_banking', 
           'FirstDerZ', 
           'SecondDerZ', 
           'curvature2d', 
           'roadWidth']

driver_vehicle_road_features.append('Session-Lap') # Session-Lap is needed to manage the dataset

# All of the driver/vehicle features
driver_vehicle_features = ['chassis_accelerations.longitudinal',
           'chassis_accelerations.lateral',
            'chassis_velocities.yaw',
           'chassis_accelerations.vertical',
           'chassis_velocities.pitch',
           'chassis_velocities.roll', 
           'driver_demands.brake',
           'driver_demands.steering', 
           'driver_demands.steering_velocity',
           'driver_demands.throttle', 
           'transmission.gear',
           'relativeDistance',
           'relativeYaw',
           'chassis_velocities.lateral',
           'chassis_velocities.longitudinal',
           'chassis_velocities.vertical']

# The driver/vehicle features to predict (primary and secondary)
prediction_features = ['chassis_accelerations.longitudinal',
       'chassis_accelerations.lateral',
       'chassis_velocities.yaw',
       'chassis_accelerations.vertical',
       'chassis_velocities.pitch',
       'chassis_velocities.roll',
       'driver_demands.brake',
       'driver_demands.steering', 
       'driver_demands.steering_velocity',
       'driver_demands.throttle',
       'transmission.gear',
       'relativeDistance',
       'relativeYaw',
       'chassis_velocities.lateral',
       'chassis_velocities.longitudinal',
       'chassis_velocities.vertical']

# The primary features (the features of interest in the prediction problem)
primary_features = ['chassis_accelerations.longitudinal',
       'chassis_accelerations.lateral',
       'chassis_velocities.yaw']

# The road geometry features
road_features = ['Angle_banking',
                 'FirstDerZ',
                 'SecondDerZ',
                 'curvature2d',
                 'roadWidth']

# The stride in the windowing process
stride = 1

# The parameter d^R
dR = 150 # in meters

# The parameter p^R
pR = 50

# The parameter t^F (Prediction steps)
tF = 30

# The batch size used during training of the Neural Network
batch_size = 64

# The number of laps constituting the training, validation and test sets
total_laps = df['Session-Lap'].nunique() # total laps in the dataset
n_train = int(total_laps*0.85) # number of training laps
n_test = 1 # number of test laps
n_dev = total_laps-n_train-n_test # number of validation laps

train_dev_test_laps = [n_train, n_dev, n_test]

## Execute some utilities

In this section, some minor utilities are run.

In [None]:
# Reorder lists, so that the primary features are always first, and the order of the features is always the same in the different lists
driver_vehicle_road_features = primary_features + [el for el in driver_vehicle_road_features if (primary_features.count(el)==0)]
prediction_features = [el for el in driver_vehicle_road_features if (prediction_features.count(el)>0)]
road_features = [el for el in driver_vehicle_road_features if (road_features.count(el)>0)]

# Define the indices of the features
driver_vehicle_indices = []
for i in driver_vehicle_features:
    driver_vehicle_indices.append(driver_vehicle_road_features.index(i))

prediction_indices = []
for i in prediction_features:
    prediction_indices.append(driver_vehicle_road_features.index(i))
 
road_features_extended = []
for i in range(pR):
    for j in road_features:
        road_features_extended.append('future_' + j + '_' + str(i))
        
driver_vehicle_road_features.remove("Session-Lap") 
driver_vehicle_road_features = driver_vehicle_road_features + road_features_extended + ["Session-Lap"] # I put "Session-Lap" field at the very end

road_indices = []
for i in road_features_extended:
    road_indices.append(driver_vehicle_road_features.index(i))

In [None]:
# Store config parameters in a dictionary
config_parameters = {}
config_parameters['tF'] = tF
config_parameters['df'] = df
config_parameters['trackMap'] = trackMap
config_parameters['road_features'] = road_features
config_parameters['dR'] = dR
config_parameters['pR'] = pR
config_parameters['driver_vehicle_road_features'] = driver_vehicle_road_features
config_parameters['stride'] = stride
config_parameters['train_dev_test_laps'] = train_dev_test_laps
config_parameters['driver_vehicle_indices'] = driver_vehicle_indices
config_parameters['road_indices'] = road_indices
config_parameters['prediction_indices'] = prediction_indices
config_parameters['batch_size'] = batch_size
config_parameters['primary_features'] = primary_features

## Bayesian optimisation

In this section, Bayesian optimisation is launched. The routine loads the hyperparameters defined at each iteration by the Bayesian optimisation algorithm (read the reference paper), and the previously defined parameters. Then, it builds the input matrices to be fed to the Neural Network, trains the Neural Network and saves the model weights. At the end of Bayesian optimisation, the best hyperparameter combination is saved in a dedicated file.

In [None]:
# Define prefix name of the files (the same must be defined in function functions.train_model)
fileName = 'Data30red'

# Configure   
def train_model_launcher(r, w, tP, u_ed, xi, save_results=False, i=''):
    bayes_opt_score = functions.train_model(r, w, tP, u_ed, xi, save_results=save_results, i=i, config_parameters=config_parameters)
    return bayes_opt_score
    
optimizer = BayesianOptimization(f = train_model_launcher,
                                 pbounds={'r': (0.25, 0.5),
                                          'w': (0.0, 1.0), 
                                          'tP': (15, 40),
                                          'u_ed': (70, 110), 
                                          'xi': (0.3, 0.5)
                                         },
                                  verbose=2)

# Run
optimizer.maximize(init_points=2, n_iter=10)

# Save the optimiser
pickle.dump(optimizer, open('results/best/optimizer_{}.pkl'.format(fileName), 'wb'))

# Save the best hyperparameters in a dedicated file
targets = [e['target'] for e in optimizer.res]
best_index = targets.index(max(targets))
params = optimizer.res[best_index]['params']

params_fname = '{}_results_best_params.json'.format(fileName)
with open(os.path.join('results/best', params_fname), 'w') as f:
     json.dump(params, f, indent=2)

## Cross-validation

In this section, Cross-validation is used to assess the performance of the optimal model found with Bayesian optimisation. A CSV file reporting the scores on the different sets is generated.

In [None]:
# Load the best hyperparameters of the Bayesian optimisation
fileName = 'Data30red'
params_fname = '{}_results_best_params.json'.format(fileName)
with open(os.path.join('results/best', params_fname), 'r') as f:
    params = json.load(f)
    
# Run cross-validation
cv_steps = 10 # steps of Cross-validation
for i in range(0, cv_steps):
    train_model_launcher(r=params['r'],
                    w=params['w'],
                    tP=params['tP'],
                    u_ed=params['u_ed'],
                    xi=params['xi'],
                    save_results=True,
                    i=i)

## Plot predictions with the best model

In this section, the optimal model is applied to a random test set. Some plots are generated to show its prediction capabilities.

In [None]:
# Load best model with a random test set
fileName = 'Data30red'
model, test_dataset, x_test, meanv, stdv = functions.best_model(fileName, config_parameters=config_parameters)

# Predict the behaviour of the model on the test set
X_true = np.array([np.squeeze(i[0][0], axis=0) for i in test_dataset])[:,:,prediction_indices] # I calculate the past only for the values I predict, for the plots
Y_true = np.array([np.squeeze(i[1], axis=0) for i in test_dataset])
Y_pred = model.predict(test_dataset)

In [None]:
# Plot two random prediction horizons
q = len(primary_features)
random_plots = np.random.randint(Y_pred.shape[0], size=2)
names = ["Primary feat " + str(el) for el in range(q)]
means = meanv.head(q).values
standards = stdv.head(q).values

fig, ax = plt.subplots(3,2,figsize=(6,6), )
for index,i in enumerate(random_plots):
    for j in range(q):
        current_ax = ax[j][index]

        X_true2 = X_true[i,:,j]*standards[j]+means[j]
        Y_true2 = Y_true[i,:,j]*standards[j]+means[j]
        Y_pred2 = Y_pred[i,:,j]*standards[j]+means[j]
        current_ax.plot(np.arange(X_true.shape[1]) ,X_true2, linewidth=2, linestyle='--', color='tab:green')
        current_ax.plot(X_true.shape[1] + np.arange(Y_true.shape[1]) ,Y_true2, linewidth=2, color='tab:blue')
        current_ax.plot(X_true.shape[1] + np.arange(Y_pred.shape[1]) ,Y_pred2, linewidth=2, color='tab:orange')
                
        if (index==0 and j==q-1):
            current_ax.legend(labels=["Past","Truth","Prediction"], fontsize=10)
        if j==q-1:
            current_ax.set_xlabel("Time steps", fontsize=10)
            
        current_ax.set_ylabel(names[j], fontsize=10)
        current_ax.grid(b=True, which='major', color='k', linestyle='-', alpha=0.1)
        plt.xticks(fontsize=8)
        plt.yticks(fontsize=8)
    fig.subplots_adjust(hspace=0.20, wspace=0.45)

In [None]:
# Plot the long-term prediction for the whole test set
q = len(primary_features)
random_plots = np.random.randint(Y_pred.shape[0], size=2)
names = ["Primary feat " + str(el) for el in range(q)]
means = meanv.head(q).values
standards = stdv.head(q).values

fig, ax = plt.subplots(3,1,figsize=(6,7))
for j in range(q):
    current_ax = ax[j]

    X_true2 = X_true[:,-1,j]*standards[j]+means[j]
    Y_true2 = Y_true[:,-1,j]*standards[j]+means[j]
    Y_pred2 = Y_pred[:,-1,j]*standards[j]+means[j]

    current_ax.plot(X_true.shape[1] + np.arange(Y_true2.shape[0]) ,Y_true2, linewidth=1.4)
    current_ax.plot(X_true.shape[1] + np.arange(Y_pred2.shape[0]) ,Y_pred2, linewidth=1.4)
   
    if j==q-1:
        current_ax.legend(labels=["Truth","Prediction"], fontsize=10)
        current_ax.set_xlabel("Time steps", fontsize=10)
        
    current_ax.set_ylabel(names[j], fontsize=10)
    current_ax.grid(b=True, which='major', color='k', linestyle='-', alpha=0.1)
    plt.xticks(fontsize=8)
    plt.yticks(fontsize=8)
fig.subplots_adjust(hspace=0.25, wspace=0.38)