# SENSOR MODELS

## IR 3

In [None]:
import numpy as np
from matplotlib.pyplot import subplots, show

In [None]:
# Load data
# Sensor model
filename = 'D:/428/assignment 1/enmt482-assignment1-source_code/partA/calibration.csv'
data = np.loadtxt(filename, delimiter=',', skiprows=1)

In [None]:
# Split into columns
index, time, dist, velocity_command, raw_ir1, raw_ir2, raw_ir3, raw_ir4, \
    sonar1, sonar2 = data.T

In [None]:
from scipy.optimize import curve_fit

# Define non linear sensor model
def model(x, a, b, c, d):
    return a + b/(x + c) + d*x

params_ir3, cov = curve_fit(model, dist, raw_ir3)
zfit_ir3 = model(dist, *params_ir3)
zerror_ir3 = raw_ir3 - zfit_ir3

In [None]:
# Plot the dataset and its best fit line
from matplotlib import pyplot as plt
plt.plot(dist,raw_ir3,'.',label='Data',alpha=0.2)
plt.plot(dist,zfit_ir3,label='Fit')
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Voltage')
plt.legend()
plt.show()

In [None]:
# Error plot
plt.plot(dist,zerror_ir3,'.', alpha=0.2)
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Measurement error v')
plt.show()

In [None]:
# Remove outliers
#ir1: [-0.3 0.3]
#ir3: [-0.18 0.18]
#ir4+2: [-0.4 0.4]
#sonar1: [0 0.5]
#sonar2: [-2.4 0.2]
f_dist_ir3 = []
f_data_ir3 = []
f_dist_ir3 = np.array(f_dist_ir3, dtype=np.float64)
f_data_ir3 = np.array(f_data_ir3, dtype=np.float64)
for i in range(len(zerror_ir3)):
    if zerror_ir3[i] < 0.2 and zerror_ir3[i] > -0.2:
        f_dist_ir3 = np.append(f_dist_ir3, dist[i])
        f_data_ir3 = np.append(f_data_ir3, raw_ir3[i])
        
f_params_ir3, cov = curve_fit(model, f_dist_ir3, f_data_ir3)
f_zfit_ir3 = model(f_dist_ir3, *f_params_ir3)
f_zerror_ir3 = f_data_ir3 - f_zfit_ir3

plt.plot(f_dist_ir3,f_data_ir3,'.',label='Data',alpha=0.2)
plt.plot(f_dist_ir3,f_zfit_ir3,label='Fit')
plt.xlabel('Range x')
plt.ylabel('Measurement z')
plt.grid()
plt.legend()
plt.show()

In [None]:
plt.plot(f_dist_ir3,f_zerror_ir3,'.',alpha=0.2)
plt.xlabel('Range x')
plt.ylabel('Measurement error v')
plt.grid()
plt.show()

In [None]:
plt.hist(f_zerror_ir3, bins=40, density=True)
plt.xlabel('Error v')
plt.ylabel('Histogram')
plt.grid()
plt.show()


In [None]:
#Fit Gaussian to histogram
import scipy.stats as sstat
from scipy.stats import norm
import statistics as stat
#Best fit
(mu, sigma) = norm.fit(f_zerror_ir3)
std = stat.stdev(f_zerror_ir3)
n, bins, patches = plt.hist(f_zerror_ir3, bins=40, density=True)
y = norm.pdf(bins, mu, sigma)
plt.plot(bins, y, color='black',linewidth=2)
plt.grid()
plt.show()


## IR 1

In [None]:
# Fit the best fit line
params_ir1, cov = curve_fit(model, dist, raw_ir1)
zfit_ir1 = model(dist, *params_ir1)
zerror_ir1 = raw_ir1 - zfit_ir1

In [None]:
# Plot the dataset with its best fit line
from matplotlib import pyplot as plt
plt.plot(dist,raw_ir1,'.',label='Data',alpha=0.2)
plt.plot(dist,zfit_ir1,label='Fit')
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Voltage')
plt.legend()
plt.show()

In [None]:
# Plot error
plt.plot(dist,zerror_ir1,'.', alpha=0.2)
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Measurement error v')
plt.show()

In [None]:
# Remove outliers
#ir1: [-0.3 0.3]
#ir3: [-0.18 0.18]
#ir4+2: [-0.4 0.4]
#sonar1: [0 0.5]
#sonar2: [-2.4 0.2]
f_dist_ir1 = []
f_data_ir1 = []
f_dist_ir1 = np.array(f_dist_ir1, dtype=np.float64)
f_data_ir1 = np.array(f_data_ir1, dtype=np.float64)
for i in range(len(zerror_ir1)):
    if zerror_ir1[i] < 0.3 and zerror_ir1[i] > -0.3:
        f_dist_ir1 = np.append(f_dist_ir1, dist[i])
        f_data_ir1 = np.append(f_data_ir1, raw_ir1[i])
        
f_params_ir1, cov = curve_fit(model, f_dist_ir1, f_data_ir1)
f_zfit_ir1 = model(f_dist_ir1, *f_params_ir1)
f_zerror_ir1 = f_data_ir1- f_zfit_ir1

plt.plot(f_dist_ir1,f_data_ir1,'.',label='Data',alpha=0.2)
plt.plot(f_dist_ir1,f_zfit_ir1,label='Fit')
plt.xlabel('Range x')
plt.ylabel('Measurement z')
plt.grid()
plt.legend()
plt.show()

In [None]:
plt.plot(f_dist_ir1,f_zerror_ir1,'.',alpha=0.2)
plt.xlabel('Range x')
plt.ylabel('Measurement error v')
plt.grid()
plt.show()

In [None]:
plt.hist(f_zerror_ir1, bins=40, density=True)
plt.xlabel('Error v')
plt.ylabel('Histogram')
plt.grid()
plt.show()

In [None]:
#Fit Gaussian to histogram
import scipy.stats as sstat
from scipy.stats import norm

#Best fit
(mu, sigma) = norm.fit(f_zerror_ir1)
std = stat.stdev(f_zerror_ir1)
n, bins, patches = plt.hist(f_zerror_ir1, bins=40, density=True)
y = norm.pdf(bins, mu, sigma)
plt.plot(bins, y, color='black',linewidth=2)
plt.grid()
plt.show()

## Sonar 1

In [None]:
# Fit the parametric mmodel
coeff = np.polyfit(dist,sonar1, 9)
fit = np.poly1d(coeff)

from matplotlib import pyplot as plt
plt.plot(dist,sonar1,'.',label='Data',alpha=0.2)
plt.plot(dist,fit(dist),label='Fit')
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Measurement z')
plt.legend()
plt.show()

In [None]:
# Error plot
import statistics as stat
error = sonar1 - fit(dist)
mean = stat.mean(error)
variance = stat.variance(error)

plt.plot(dist,error,'.', alpha=0.2)
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Measurement error v')
plt.show()

In [None]:
# Remove outliers
#ir1: [-0.3 0.3]
#ir3: [-0.18 0.18]
#ir4+2: [-0.4 0.4]
#sonar1: [0 0.5]
#sonar2: [-2.4 0.2]
f_dist_s1 = []
f_data_s1 = []
for i in range(len(error)):
    if error[i] < 0.5 and error[i] > 0:
        f_dist_s1.append(dist[i])
        f_data_s1.append(sonar1[i])

new_coeff = np.polyfit(f_dist_s1,f_data_s1, 2)
new_fit = np.poly1d(new_coeff)

plt.plot(f_dist_s1,f_data_s1,'.',label='Data',alpha=0.2)
plt.plot(f_dist_s1,new_fit(f_dist_s1),label='Fit')
plt.xlabel('Range x')
plt.ylabel('Measurement z')
plt.grid()
plt.legend()
plt.show()

In [None]:
f_error_s1 = f_data_s1 - new_fit(f_dist_s1)
new_mean = stat.mean(f_error_s1)
new_variance = stat.variance(f_error_s1)

# Remove outliers second time
ff_dist_s1 = []
ff_data_s1 = []
for i in range(len(f_error_s1)):
    #sonar1: >-0.03
    #sonar2: <1
    if f_error_s1[i] >-0.03:       
        ff_dist_s1.append(f_dist_s1[i])
        ff_data_s1.append(f_data_s1[i])

f_params = np.polyfit(ff_dist_s1,ff_data_s1, 2)
f_fit = np.poly1d(f_params)

plt.plot(ff_dist_s1,ff_data_s1,'.',label='Data', alpha=0.2)
plt.plot(ff_dist_s1,f_fit(ff_dist_s1),label='Fit')
plt.legend()
plt.grid()
plt.xlabel('Range x')
plt.ylabel('Measurement z')
plt.show()

In [None]:
ff_error_s1 = ff_data_s1 - f_fit(ff_dist_s1)
ff_mean_s1 = stat.mean(ff_error_s1)
ff_variance_s1 = stat.variance(ff_error_s1)
plt.plot(ff_dist_s1,ff_error_s1,'.',alpha=0.2)
plt.grid()
plt.ylim(bottom=-0.02, top=0.04)
plt.xlabel('Range x')
plt.ylabel('Measurement error v')
plt.show()

In [None]:
plt.hist(ff_error_s1, bins=100, density=True)
plt.xlim(left=-0.04, right=0.04)
plt.xlabel('Error v')
plt.ylabel('Histogram')
plt.grid()
plt.show()

In [None]:
#Fit Gaussian to histogram
import scipy.stats as sstat
from scipy.stats import norm

#Best fit
(mu, sigma) = norm.fit(ff_error_s1)
std = stat.stdev(ff_error_s1)
n, bins, patches = plt.hist(ff_error_s1, bins=100, density=True)
y = norm.pdf(bins, mu, sigma)
plt.plot(bins, y, color='black',linewidth=2)
plt.grid()
plt.show()

# SENSOR FUSION

In [None]:
filename = 'D:/428/assignment 1/enmt482-assignment1-source_code/partA/training1.csv'
data = np.loadtxt(filename, delimiter=',', skiprows=1)

# Split into columns (for training1.cvs and training2.csv)
index, time, dist, velocity_command, raw_ir1, raw_ir2, raw_ir3, raw_ir4, \
  sonar1, sonar2 = data.T

# Split into columns (for test.cvs)
#index, time, velocity_command, raw_ir1, raw_ir2, raw_ir3, raw_ir4, \
#   sonar1, sonar2 = data.T

## IR3

In [None]:
# Test the sensor model
a = f_params_ir3[0]
b = f_params_ir3[1]
c = f_params_ir3[2]
d = f_params_ir3[3] 

# Define non linear function
def test_func(x):
    return a + b/(x+c) + d*x

In [None]:
z_val_ir3 = []
z_val_ir3 = np.array(z_val_ir3, dtype=np.float64)
for i in range (len(raw_ir3)):
    z_ir3= test_func(dist[i])
    z_val_ir3 = np.append(z_val_ir3, z_ir3)

In [None]:
#plt.plot(dist, z_val_ir3)

In [None]:
#Create inverse function of h(x)
from __future__ import print_function
import sympy as sy
from sympy import *
y, x, a, b, c, d = sy.symbols('y x a b c d', real=True)

#Define equation
fwd_equation = sy.Eq(y, a + b/(c+x) + d*x)

#Inverse equation
inverse = sy.solve(fwd_equation, x)
print('found {} solutions for x:'.format(len(inverse)))
print('\n'.join([str(s) for s in inverse]))
print('x = ', sy.latex(inverse[0]))

In [None]:
# Inverse function for IR__ sensor
# Just need to change parameters corresponding to the chosen sensor
a = f_params_ir3[0]
b = f_params_ir3[1]
c = f_params_ir3[2]
d = f_params_ir3[3]

# Define inverse function
def inv_ir(z):
    return (-a -c*d + z - sy.sqrt(a**2 - 2*a*c*d - 2*a*z - 4*b*d + c**2*d**2 + 2*c*d*z + z**2))/(2*d)


In [None]:
# Create x_hat values based on noiseless z values
x_hat_ir3 = []
x_hat_ir3 = np.array(x_hat_ir3, dtype=np.float64)
for i in range(len(z_val_ir3)):
    x_hat_val_ir3 = inv_ir(z_val_ir3[i])
    x_hat_ir3 = np.append(x_hat_ir3, x_hat_val_ir3)

In [None]:
# Plot the inverse function
plt.plot(z_val_ir3, x_hat_ir3)

In [None]:
# Function to calculate standard deviation and variance by window
def stats_calc(data, dist, window):
    lower = 0
    stdev_noise = []
    upper = []
    pos= []
    var_noise = []
    n_data = len(data)
    while lower <= n_data:
        if lower + window < n_data:
            total = 0
            mean = sum(data[lower: lower+window])/len(data[lower:lower+window])
            for value in data[lower: lower+window]:
                total += (value - mean)**2
            stdev_noise += [(total/(len(data[lower:lower+window])-1))**0.5]
            upper += [lower+window]
        else:
            total = 0
            for value in data[lower:n_data]:
                mean = sum(data[lower:n_data])/len(data[lower:n_data])
                total += (value - mean)**2
            stdev_noise += [(total/(len(data[lower:+n_data])-1))**0.5]
            upper += [n_data]
        lower += window

    for i in range (len(upper)):
        pos.append(dist[upper[i]-1])

    for i in range (len(stdev_noise)):
        var_noise.append(stdev_noise[i] ** 2)
    
    return (stdev_noise, var_noise, pos)

In [None]:
# Calculate standard deviation and variance by window
stdev_noise_ir3 = []
var_noise_ir3 = []
pos_ir3 = []
stdev_noise_ir3, var_noise_ir3, pos_ir3 = stats_calc(f_data_ir3, f_dist_ir3,window=20)


In [None]:
# Plot variance and standard deviation
plt.plot(pos_ir3, stdev_noise_ir3)
plt.plot(pos_ir3, var_noise_ir3)
plt.show()

In [None]:
# Convert noise variance to x_hat variance. Linearize model  
h = a + b/(c+x) + d*x   #function h(n)
diff_h = lambdify(x,h)  #inverse of function h(x)

In [None]:
# Calculate x_hat variance
est_var_ir3 = []

for i in range (len(pos_ir3)):  
    diff = diff_h(pos_ir3[i])
    n_var = var_noise_ir3[i] / (diff ** 2)
    est_var_ir3.append(n_var) 

In [None]:
# Plot x_hat variance versus position
plt.plot(pos_ir3, est_var_ir3)

## IR1

In [None]:
# Test sensor model
a = f_params_ir1[0]
b = f_params_ir1[1]
c = f_params_ir1[2]
d = f_params_ir1[3]

z_val_ir1 = []
z_val_ir1 = np.array(z_val_ir1, dtype=np.float64)
for i in range (len(raw_ir1)):
    z_ir1 = test_func(dist[i])
    z_val_ir1 = np.append(z_val_ir1, z_ir1)

In [None]:
#plt.plot(dist, z_val_ir1)

In [None]:
# Test inervse function. Create x_hat values from noiseless z values
x_hat_ir1 = []
x_hat_ir1 = np.array(x_hat_ir1, dtype=np.float64)
for i in range(len(z_val_ir1)):
    x_hat_val_ir1 = inv_ir(z_val_ir1[i])
    x_hat_ir1 = np.append(x_hat_ir1, x_hat_val_ir1)

In [None]:
plt.plot(z_val_ir1, x_hat_ir1)

In [None]:
# Calculate noise variance and standard deviation by window method
stdev_noise_ir1 = []
var_noise_ir1 = []
pos_ir1 = []
stdev_noise_ir1, var_noise_ir1, pos_ir1 = stats_calc(f_data_ir1, f_dist_ir1, window=20)

In [None]:
plt.plot(pos_ir1, stdev_noise_ir1)
plt.plot(pos_ir1, var_noise_ir1)
plt.show()

In [None]:
# Convert noise variance to x_hat variance. Linearlize sensor model
est_var_ir1 = []

for i in range (len(pos_ir1)):  
    diff = diff_h(pos_ir1[i])
    n_var = var_noise_ir1[i] / (diff ** 2)
    est_var_ir1.append(n_var) 

In [None]:
plt.plot(pos_ir1, est_var_ir1)

## Sonar 1

In [None]:
# Test sensor model
a = f_params[0]
b = f_params[1]
c = f_params[2]

# Define function of linear model
def s1_test_func(x):  
    return a*x**2 + b*x + c 

In [None]:
z_val_s1 = []
z_val_s1 = np.array(z_val_s1, dtype=np.float64)
for i in range (len(sonar1)):
    z = s1_test_func(dist[i])
    z_val_s1 = np.append(z_val_s1, z)

#plt.plot(dist, z_val_s1)

In [None]:
# Generate inverse function for linear model
# Define equation
fwd_equation_s1 = sy.Eq(y, a*x**2 + b*x + c)

# Inverse equation
inverse_s1 = sy.solve(fwd_equation_s1, x)
print('found {} solutions for x:'.format(len(inverse_s1)))
print('\n'.join([str(s) for s in inverse]))
print('x = ', sy.latex(inverse[0]))

In [None]:
# Test inverse function of linear model
a = f_params[0]
b = f_params[1]
c = f_params[2]

# Define inverse function of linear model
def inv_s1(z):
    return (-b + sy.sqrt(-4*a*c + 4*a*z + b**2))/(2*a)

In [None]:
# Generate x_hat values
x_hat_s1 = []
x_hat_s1 = np.array(x_hat_s1, dtype=np.float64)
for i in range(len(sonar1)):
    x_hat_val_s1 = inv_s1(sonar1[i])
    x_hat_s1 = np.append(x_hat_s1, x_hat_val_s1)

plt.plot(sonar1, x_hat_s1)

In [None]:
# Calculate noise variance of h(x)
# x_hat variance is assumed to be equal to noise variance
stdev_noise_s1 = []
var_noise_s1 = []
pos_s1 = []
stdev_noise_s1, var_noise_s1, pos_s1 = stats_calc(ff_data_s1, ff_dist_s1, window=20)

plt.plot(pos_s1, stdev_noise_s1)
plt.plot(pos_s1, var_noise_s1)
plt.show()

# MOTION MODEL

In [None]:
# Define function g(x)
def motion_model(v_command, deltaT): 
    return v_command*deltaT

# Define motion model
def X_now(prior_pos, v_command, deltaT):
    return prior_pos + motion_model(v_command, deltaT)

In [None]:
# Estimated range from motion model only
dt = 0.06
pos_mm = []
pos_mm = np.array(pos_mm, dtype=np.float64)
pos_mm = np.append(pos_mm, 0)
for i in range (1, len(velocity_command)):
    next_p = X_now(pos_mm[i-1], velocity_command[i-1], dt)
    pos_mm = np.append(pos_mm, next_p)

In [None]:
#plt.plot(time, pos_mm, label = 'Estimated Range')
#plt.plot(time, dist, '--', label = 'True Range')
#plt.legend()
#plt.show()

In [None]:
# Motion model noise variance and standard deviation
stdev_noise_mm = []
var_noise_mm = []
fpos_mm = []
stdev_noise_mm, var_noise_mm, fpos_mm = stats_calc(pos_mm, dist, window=20)

In [None]:
# Determine index of the x_hat variance value in the x_hat variance array based on the provided x_hat value 
def findIndex(x_hat, pos_array):
    for i in range (len(pos_array)):
        if x_hat < pos_array[i]:
            return i
        elif x_hat > pos_array[-1]:
            return len(pos_array) - 1


# KALMAN FILTER

In [None]:
x_est = []
x_est = np.array(x_est, dtype=np.float64)
x_est = np.append(x_est, 0)

x_post = []
x_post = np.array(x_post, dtype=np.float64)
x_post = np.append(x_post, 0)
tooFar = 0.3  #use to remove outliers
dt = 0.06
current_pos = 0 #for sensor models only
K = 1
#K = 0.8   #for training_.csv to avoid bias
K_array = []
K_array = np.array(K_array, dtype=np.float64)
K_array = np.append(K_array, K)
x_hat_pos = 0

var_ff = []
var_ff = np.array(var_ff, dtype=np.float64)
var_ff = np.append(var_ff, 0)
for i in range(1, len(time)):
    topsum = 0
    totalVar = 0
    est_x = X_now(x_est[i-1], velocity_command[i-1], dt)
    #p_minus = 0.00007  #for training_.cvs file to avoid bias
    p_minus = var_noise_mm[findIndex(est_x, fpos_mm)]

    #IR 1
    x1_hat = inv_ir(raw_ir1[i])
    if x1_hat.is_real:
        if(np.abs(current_pos - x1_hat) < tooFar):   #if not satisfy, treat as outlier
            var_ir1 = est_var_ir1[findIndex(x1_hat, pos_ir1)]
            topsum += x1_hat/var_ir1
            totalVar += 1/var_ir1
    #IR 3
    x2_hat = inv_ir(raw_ir3[i])
    if x2_hat.is_real:
        if(np.abs(current_pos - x2_hat) < tooFar):   #if not satisfy, treat as outlier
            var_ir3 = est_var_ir3[findIndex(x2_hat, pos_ir3)]
            topsum += x2_hat/var_ir3
            totalVar += 1/var_ir3
            
    #Sonar 1
    x3_hat = inv_s1(sonar1[i])
    if(np.abs(current_pos - x3_hat) < tooFar):      #if not satisfy, treat as outlier
        var_s1 = var_noise_s1[findIndex(x3_hat, pos_s1)]
        topsum += x3_hat/var_s1
        totalVar += 1/var_s1
        


    if totalVar != 0:
        x_hat_pos = topsum/totalVar  #estimated postion
        current_pos = x_hat_pos      #Use to compare with the next x_hat values to reject the outliers from sensor data
        p_plus = 1/totalVar
        if p_plus != 0.0000 and p_minus != 0.0000:
            K = (1/p_plus) / (1/p_plus + 1/p_minus)    #Update Kalman gain
            #v = 1/((1/p_plus) + (1/p_minus))           #Posterior variance
            
            
    K_array = np.append(K_array, K)
    #var_ff = np.append(var_ff,v)
    x_final = (1-K)*x_hat_pos + (K)*est_x
    x_est = np.append(x_est, x_final)
    x_post = np.append(x_post, x_final)


In [None]:
#Calculated the predicted range from motion model only 
m_pos = []
m_pos = np.array(m_pos, dtype=np.float64)
m_pos = np.append(m_pos, 0)
for i in range(1, len(time)):
    c_pos = X_now(m_pos[i-1], velocity_command[i-1], dt)
    m_pos = np.append(m_pos, c_pos)
    

In [None]:
# Estimated range from Kalman filter
new_post = np.array(x_post, dtype=np.int)
std_v = np.std(new_post)
plt.plot(time, x_post, label='Estimated range')
plt.plot(time, m_pos, "--")
#plt.legend(bbox_to_anchor=(1.05, 1), loc="center right", borderaxespad=0.)
plt.legend()
plt.title(r'$\sigma^{2} = %.5f$' % std_v)
plt.show()



In [None]:
# Kalman gain versus time
plt.plot(time, K_array,".", label='Kalman gain', alpha = 0.2)
plt.legend(bbox_to_anchor=(1.05, 1), loc="center right", borderaxespad=0.)
plt.show()
print(len(K_array))

In [None]:
# Estimated range from Kalman filter vs True range (training_.csv only)
plt.plot(time, x_post, label='Estimated range')
plt.plot(time, dist, '--', label='True range')
plt.legend(bbox_to_anchor=(1.05, 1), loc="center right", borderaxespad=0.)
plt.show()