In [1]:
import numpy as np
import time
# from filterpy.common import Q_discrete_white_noise
from bokeh.io import show, output_notebook
from bokeh.plotting import figure 
from bokeh.layouts import row, column
output_notebook()

#### The predict and update steps of the Kalman Filter are defined in the class below

In [20]:
class kalman_filter:
    def __init__(self, dim_x, dim_z):
        self.x = np.zeros([dim_x, 1])
        self.F = np.zeros([dim_x, dim_x])
        self.H = np.zeros([1, dim_x])
        self.P = np.zeros([dim_x, dim_x])
        self.R = np.zeros([dim_z])
        self.Q = np.zeros([dim_x, dim_x])
        self.B = np.zeros([dim_x, 1])
        self.u = np.zeros([1])
        
    def predict(self):
        # x_k = A_k * x_k-1 + B_k * u_k-1 (predicted next state)
        # P_k = A_k * P_k-1 * (A_k)^T + Q_k (predicted next state covariance)
        self.x = np.matmul(self.A, self.x) + (self.B * self.u)
        self.P = np.matmul(self.A, np.matmul(self.P, self.A.transpose())) + self.Q
        
    def update(self, z):
        # K_k = P_k-1 * (H_k)^T * (H_k * P_k-1 * (H_k)^T + R_k)^-1 (Kalman Gain)
        # x_k = x_k-1 + K_k * (z_k - H_k * x_k-1) (updated next state)
        # P_k = P_k-1 * (I - K_k * H_k) (updated next state covariance)
        self.K = (np.matmul(self.P, self.H.transpose()) / 
             (np.matmul(self.H, np.matmul(self.P, self.H.transpose())) + self.R))
#         self.K = np.array([[0], [0]])
#         self.K = np.array([[1], [1]])
        self.x = self.x + self.K * (z - np.matmul(self.H, self.x))
        self.P = np.matmul((np.identity(2) - np.matmul(self.K, self.H)), self.P)

#### Change the initial values in the cell below to simulate different conditions

In [21]:
# Initial values
car1_initpos = 4  # starting position of the car in the front
car2_initpos = 0  # starting position of the following cars (keep at 0)
car1_vel = 8  # velocity of the car in the front
car2_initvel = 8  # starting velocity of the following cars
car_size = 2  # size of the car 
dist_sep = 2 * car_size  # desired ideal separation between the car in the front and the cars following
dist_size = 50  # length of the track in arb. units
car1_acc = 0  # accelaration of the car in the front (keep at 0)
car2_initacc = 0 # starting accelaration of the following cars
car2_maxacc = 1.5  # maximum possible accelaration for the following cars
car2_maxbrake = 10  # maximum possible decelaration for the following cars
noise_type = 'gauss'  # type of measurement noise (can use 'gauss', 'power', and 'uniform', if anything else is entered measurement noise will be 0
measurement_noise = 2  # measurement noise std. dev.
process_noise = 2  # process noise std. dev.
dt = 0.01  # time step 
timesteps = int((dist_size - car1_initpos) / car1_vel / dt)  # total number of timesteps 

#### Initializing the arrays that will be used for plotting

In [22]:
# Initializing arrays for plotting
timepoints = np.zeros(timesteps)
poscar1 = np.zeros(timesteps)
poscar1_est = np.zeros(timesteps)
poscar1_meas = np.zeros(timesteps)
poscar2 = np.zeros(timesteps)
car_sep = np.zeros(timesteps)
car2_acc = np.zeros(timesteps)
poscar2_kf = np.zeros(timesteps)
car_sep_kf = np.zeros(timesteps)
car2_acc_kf = np.zeros(timesteps)
noiserange = np.zeros(timesteps)
pdf_proc = np.zeros(timesteps)
pdf_meas = np.zeros(timesteps)
kf_gain = np.zeros([timesteps, 2])
def InitPlotArrays():    
    timepoints.fill(np.nan)
    poscar1.fill(np.nan)
    poscar1_est.fill(np.nan)
    poscar1_meas.fill(np.nan)
    poscar2.fill(np.nan)
    car_sep.fill(np.nan)
    car2_acc.fill(np.nan)
    poscar2_kf.fill(np.nan)
    car_sep_kf.fill(np.nan)
    car2_acc_kf.fill(np.nan)
    noiserange.fill(np.nan)
    pdf_proc.fill(np.nan)
    pdf_meas.fill(np.nan)

#### The car following function is defined below. The car in the front is moving with constant velocity. The car following has a proportional controller.

In [23]:
def RunCarFollowing(PN, MN):
    
    # Setup initial values
    InitPlotArrays()
    pos1real = car1_initpos
    pos2 = car2_initpos
    pos2_kf = car2_initpos
    vel2 = car2_initvel
    vel2_kf = car2_initvel
    acc2 = car2_initacc
    acc2_kf = car2_initacc
    tt = 0
    realt = 0
    resp_flag = False
    resp_kf_flag = False
    
    process_noise_level = PN
    measurement_noise_level = MN
    
    # Update noise distributions
#     if noise_type == 'gauss':
#         nlim = max(process_noise_level, measurement_noise_level)
#         global noiserange, pdf_proc, pdf_meas
#         noiserange = np.linspace(-4 * nlim, 4 * nlim, timesteps)
#         # PDFs of process and measurement noise
#         if process_noise_level == 0:
#             pdf_proc = np.zeros(np.shape(noiserange))
#         else:
#             pdf_proc = (1 / (process_noise_level * np.sqrt(2 * np.pi)) 
#                         * np.exp(-noiserange**2 / (2 * process_noise_level**2)))
#         if measurement_noise_level == 0:
#             pdf_meas = np.zeros(np.shape(noiserange))
#         else:
#             pdf_meas = (1 / (measurement_noise_level * np.sqrt(2 * np.pi)) 
#                         * np.exp(-noiserange**2 / (2 * measurement_noise_level**2)))
    
    # Initialize the Kalman Filter
    # initialize Kalman filter object
    f = kalman_filter(dim_x=2, dim_z=1)
    # Initial state
    f.x = np.array([[car1_initpos], [car1_vel]])
    # State transition matrix
    f.A = np.array([[1.0, dt],  
                    [0.0, 1.0]])
    # Control vector
    f.u = np.array([[acc2_kf]])
    # Control transition matrix
    f.B = np.array([[0.5 * dt**2], [dt]])
    # Measurement function
    f.H = np.array([[1.0, 0.0]]) 
    # Apriori state covariance
    f.P = np.array([[1000.0, 0.0],  
                    [0.0, 1000.0]])
    # Measurement noise covariance
    f.R = np.array([[measurement_noise_level**2]])  
    # Process noise
    f.Q = np.matmul(f.B, f.B.transpose()) * process_noise_level**2
#     if process_noise_level == 0:
#         f.Q = np.zeros([2, 2])
#     else:
#         f.Q = Q_discrete_white_noise(dim=2, dt=dt, var=process_noise_level**2)

    
    # Loop for updating positions
    for tt in range(timesteps):

        # Real position of leading car
        pos1real = pos1real + car1_vel * dt

        # Measurement of position of leading car with noise
        if noise_type == 'gauss':
            z = np.random.normal(pos1real, measurement_noise_level)
        elif noise_type == 'power':
            z = (pos1real + 2.0 * measurement_noise_level 
                 * np.sign((np.random.uniform() - 0.5)) * np.random.power(5))
        elif noise_type == 'uniform':
            z = pos1real + 4.0 * measurement_noise_level * (np.random.uniform() - 0.5)
        else:
            z = pos1real

        # Kalman Filter predict and update steps
        
        # predict next state using Kalman filter state propagation equation
        # x_k = F_k * x_k-1 + B_k * u_k-1 + w_k (predicted next state)
        # P_k = F_k * P_k-1 * (F_k)^T + Q_k (predicted next state covariance)
        f.predict()  
        
        # find updated state using Kalman gain and new measurement (z)
        # K_k = P_k-1 * (H_k)^T * (H_k * P_k-1 * (H_k)^T + R_k)^-1 (Kalman Gain)
        # x_k = x_k-1 + K_k * (z_k - H_k * x_k-1) (updated next state)
        # P_k = P_k-1 * (I - K_k * H_k) (updated next state covariance)
        f.update(z)  

        # Following car update without Kalman Filter
        dx = z - pos2
        err = (dx - dist_sep) / dist_sep
        if err > 1.0: err = 1.0
        # Acceleration model
        if err > 0:
            acc2 = err * car2_maxacc
        else:
            acc2 = err * car2_maxbrake
        vel2 = vel2 + acc2 * dt
        # crash protection 
        if vel2 < 0:
            vel2 = 0
            acc2 = 0
        # update position
        pos2 = pos2 + vel2 * dt + 0.5 * acc2 * dt**2
        
        # Calculate how much time to get to ideal separation
        if tt > int(timesteps / 5) and resp_flag == False:
            if err < 0.03 and err > -0.03:
                global resp
                resp = tt * dt
                resp_flag = True

        # Following car update with Kalman Filter
        dx_kf = f.x[0, 0] - pos2_kf
        err_kf = (dx_kf - dist_sep) / dist_sep
        if err_kf > 1.0: err_kf = 1.0
        # Acceleration model
        if err_kf > 0:
            acc2_kf = err_kf * car2_maxacc
        else:
            acc2_kf = err_kf * car2_maxbrake
        vel2_kf = vel2_kf + acc2_kf * dt
        # crash protection
        if vel2_kf < 0:
            vel2_kf = 0
            acc2_kf = 0
        # update position
        pos2_kf = pos2_kf + vel2_kf * dt + 0.5 * acc2_kf * dt**2
        
        # Calculate how much time to get to ideal separation
        if tt > int(timesteps / 5) and resp_kf_flag == False:
            if err_kf < 0.03 and err > -0.03:
                global resp_kf
                resp_kf = tt * dt
                resp_kf_flag = True

        # Update arrays for plotting
        global timepoints, poscar1, poscar2, poscar1_est, car_sep
        timepoints[tt] = tt * dt
        poscar1[tt] = pos1real
        poscar1_est[tt] = f.x[0, 0]
        poscar1_meas[tt] = z
        poscar2[tt] = pos2
        poscar2_kf[tt] = pos2_kf
        car_sep[tt] = pos1real - pos2
        car_sep_kf[tt] = pos1real - pos2_kf  
        car2_acc[tt] = acc2
        car2_acc_kf[tt] = acc2_kf
        kf_gain[tt, :] = f.K.T

#### Run the car following function

In [24]:
RunCarFollowing(process_noise, measurement_noise)

#### Plot the results

In [25]:
# Setup plots

# Figure 1: Noise distributions
# p5 = figure(plot_width = 400, plot_height = 200, title='Noise Distributions')
# r11 = p5.line(noiserange, pdf_proc, line_color='seagreen', 
#               line_width=4, legend='Process')
# r12 = p5.line(noiserange, pdf_meas, line_color='greenyellow', 
#               line_width=4, legend='Measured')

p5 = figure(plot_width = 400, plot_height = 300, y_range=[0, 1], 
            title='Kalman Gain')
r11 = p5.line(timepoints, kf_gain[:, 0], line_color='seagreen', 
              line_width=4, legend='Position', alpha=0.6)
r12 = p5.line(timepoints, kf_gain[:, 1], line_color='greenyellow', 
              line_width=4, legend='Velocity', alpha=0.6)
p5.toolbar.logo = None
p5.toolbar_location = None
p5.legend.location = 'top_right'

# Figure 2: Car positions
p1 = figure(plot_width=400, plot_height=300, x_axis_label='time', y_axis_label='position',
            title='Car Positions', x_range=[0, timesteps * dt], y_range=[0, dist_size])
r1 = p1.line(timepoints, poscar1, color='darkslategray', line_width=4)
r2 = p1.line(timepoints, poscar1_meas, color='royalblue', 
             legend='Measured', line_width=2, alpha=0.6)
r3 = p1.line(timepoints, poscar1_est, color='skyblue', 
             legend='KF estimated', line_width=2, alpha=0.6)
r4 = p1.line(timepoints, poscar2, color='gold', line_width=4, alpha=0.6)
r5 = p1.line(timepoints, poscar2_kf, color='crimson', line_width=4, alpha=0.6)
p1.legend.location = 'top_left'
p1.toolbar.logo = None
p1.toolbar_location = None

# Figure 3: Car separations
p2 = figure(plot_width=400, plot_height=300, x_axis_label='time', 
            y_axis_label='separation', title='Car Separations', 
            x_range=[0, timesteps * dt], y_range=[0, 4 * dist_sep])
r6 = p2.line(timepoints, car_sep, color='gold', line_width=4, alpha=0.6)
r7 = p2.line(timepoints, car_sep_kf, color='crimson', line_width=4, alpha=0.6)
p2.toolbar.logo = None
p2.toolbar_location = None

# Figure 4: Car accelarations
p3 = figure(plot_width=400, plot_height=300, x_axis_label='time', 
            y_axis_label='acceleration', title='Following Car Acceleration', 
            x_range=[0, timesteps * dt], y_range=[-car2_maxbrake, car2_maxacc])
r8 = p3.line(timepoints, car2_acc, color='gold', line_width=4, alpha=0.6)
r9 = p3.line(timepoints, car2_acc_kf, color='crimson', line_width=4, alpha=0.6)
p3.toolbar.logo = None
p3.toolbar_location = None

show(row(column(p5, p1), column(p2, p3)))

#### Energy and response time trade-off

In [26]:
# Define arrays for measurement and process noise
arr_len = 100
measurement_noise = np.linspace(0.1, 1, arr_len)
process_noise = np.linspace(0.1, 0.1, arr_len) 

# Initialize energy and time response arrays
car2_acc_int = np.zeros(arr_len)
car2_acc_kf_int = np.zeros(arr_len)
car2_resp = np.zeros(arr_len)
car2_kf_resp = np.zeros(arr_len)

# Run the "RunCarFollowing" in the loop to simulate all noise values
for n in range(arr_len):
    RunCarFollowing(process_noise[n], measurement_noise[n])
    global car2_acc_int, car2_acc_kf_int
    car2_acc_int[n] = np.sum(car2_acc**2)
    car2_acc_kf_int[n] = np.sum(car2_acc_kf**2)
    car2_resp[n] = resp
    car2_kf_resp[n] = resp_kf

# Calculate average across all noise values
print('Average energy (without KF)', np.mean(car2_acc_int), '[arb units]')
print('Average energy (with KF)', np.mean(car2_acc_kf_int), '[arb units]')
print('Average response time (without KF)', np.mean(car2_resp), '[s]')
print('Average response time (with KF)', np.mean(car2_kf_resp), '[s]')

# Plot results
# Energy trade-off 
p6 = figure(plot_width=400, plot_height=400, x_axis_label='Noise Index', 
            y_axis_label='energy (arb. units)', title='Energy trade off')
p6.line(range(arr_len), car2_acc_int, line_color='gold', 
        line_width=2, legend='without KF')
p6.line(range(arr_len), car2_acc_kf_int, line_color='crimson', 
        line_width=2, legend='with KF')
p6.legend.location = 'top_left'

# Response time trade-off
p7 = figure(plot_width=400, plot_height=400, x_axis_label='Noise Index', 
            y_axis_label='response (s)', title='Response time trade off')
p7.line(range(arr_len), car2_resp, line_color='gold', 
        line_width=2, legend='without KF')
p7.line(range(arr_len), car2_kf_resp, line_color='crimson', 
        line_width=2, legend='with KF')
p7.legend.location = 'top_left'
show(row(p6, p7))

Average energy (without KF) 226.1263395030959 [arb units]
Average energy (with KF) 14.307319599056516 [arb units]
Average response time (without KF) 1.2182 [s]
Average response time (with KF) 1.6420000000000001 [s]
