In [2]:
import os
import numpy as np
import itertools
import time
from scipy.spatial import distance
from scipy.optimize import fsolve
from sklearn.linear_model import RANSACRegressor
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

In [None]:
class PolynomialRegression(object):
    def __init__(self, degree=3, coeffs=None):
        self.degree = degree
        self.coeffs = coeffs

    def fit(self, X, y):
        self.coeffs = np.polyfit(X.ravel(), y, self.degree)

    def get_params(self, deep=False):
        return {'coeffs': self.coeffs}

    def set_params(self, coeffs=None, random_state=None):
        self.coeffs = coeffs

    def predict(self, X):
        poly_eqn = np.poly1d(self.coeffs)
        y_hat = poly_eqn(X.ravel())
        return y_hat

    def score(self, X, y):
        return mean_squared_error(y, self.predict(X))

In [None]:
grid_dict = {}
for y in (-1,1):
    grid_list = [] 

    for x in range(5, -5, -1):
        if y == -1:
            grid_x_up_bound = x*8
            grid_x_low_bound = (x-1)*8
            grid_y_up_bound = (y+1)*3.9
            grid_y_low_bound = y*3.9
            grid = [grid_x_up_bound, grid_x_low_bound, grid_y_up_bound, grid_y_low_bound]
            coord = (y, x)
            grid_dict[coord] = grid
        else:
            grid_x_up_bound = x*8
            grid_x_low_bound = (x-1)*8
            grid_y_up_bound = y*3.9
            grid_y_low_bound = (y-1)*3.9
            grid = [grid_x_up_bound, grid_x_low_bound, grid_y_up_bound, grid_y_low_bound]
            coord = (y, x)
            grid_dict[coord] = grid
        

In [None]:
data_folder = 'C:/Users/User/Desktop/Lidar_data/pointclouds_filtered'
lidar_files = sorted(os.listdir(data_folder))
lidar_fpath = [os.path.join(data_folder, f) for f in lidar_files]
lidar_bin = lidar_fpath[0]
lidar_data = np.fromfile(lidar_bin, dtype=np.float32).reshape(-1, 5)
print(lidar_bin)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ

In [None]:
lidar_data[:, 2] = 0
data_in_grid = {}
for grid_cell_index, grid_cell_coord in enumerate(grid_dict):
    x_upper_bound, x_lower_bound, y_upper_bound, y_lower_bound = grid_dict[grid_cell_coord]
    inGrid_lidar_data = np.delete(lidar_data, np.where(
    (lidar_data[:, 0] > x_upper_bound) |
    (lidar_data[:, 0] <= x_lower_bound) |
    (lidar_data[:, 1] > y_upper_bound) |
    (lidar_data[:, 1] <= y_lower_bound))[0], axis = 0)
    
    data_in_grid[grid_cell_coord] = inGrid_lidar_data

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ

In [None]:
min_sample_threshold = 5
data_repres_up = np.empty((0, 5)) 
data_repres_down = np.empty((0, 5)) 
iteration = 0
max_iter = 1000
prev_error = 1000

start = time.time()
while iteration <= max_iter:
    for y in (-1, 1):
        for x in range(5, -5, -1):
            if y == -1:
                if len(data_in_grid[y, x]) >= min_sample_threshold:
                    idx = np.random.randint(len(data_in_grid[y, x]), size = 3)
                    data_repres_up = np.append(data_repres_up, data_in_grid[y, x][idx, :], axis = 0)
                elif len(data_in_grid[y, x]) == 0:
                    pass
                else:
                    idx = np.random.randint(len(data_in_grid[y, x]), size = len(data_in_grid[y,x]))
                    data_repres_up = np.append(data_repres_up, data_in_grid[y, x][idx, :], axis = 0)

            elif y == 1:
                if len(data_in_grid[y, x]) >= min_sample_threshold:
                    idx = np.random.randint(len(data_in_grid[y, x]), size = 3)
                    data_repres_down = np.append(data_repres_down, data_in_grid[y, x][idx, :], axis = 0)
                elif len(data_in_grid[y, x]) == 0:
                    pass
                else:
                    idx = np.random.randint(len(data_in_grid[y, x]), size = len(data_in_grid[y,x]))
                    data_repres_up = np.append(data_repres_up, data_in_grid[y, x][idx, :], axis = 0)
                    
    up_grids_x = np.sort(data_repres_up, axis = 0)[:, 0] 
    up_grids_y = np.sort(data_repres_up, axis = 0)[:, 1]
    down_grids_x = np.sort(data_repres_down, axis = 0)[:, 0] 
    down_grids_y = np.sort(data_repres_down, axis = 0)[:, 1]

    ransac_up = RANSACRegressor(PolynomialRegression(degree=3),
                             min_samples = 7,    
                             max_trials = 10000,
                             random_state=0)
    ransac_up.fit(np.expand_dims(up_grids_x, axis=1), up_grids_y)
    up_grids_y_pred = ransac_up.predict(np.expand_dims(up_grids_x, axis=1))
    up_lane_coeffs= ransac_up.estimator_.get_params(deep = True)["coeffs"]

    ransac_down = RANSACRegressor(PolynomialRegression(degree=3), 
                             min_samples = 7,    
                             max_trials = 10000,
                             random_state=0)
    ransac_down.fit(np.expand_dims(down_grids_x, axis=1), down_grids_y)
    down_grids_y_pred = ransac_down.predict(np.expand_dims(down_grids_x, axis=1))
    down_lane_coeffs = ransac_down.estimator_.get_params(deep = True)["coeffs"]
    
    ego_lane_coeffs_pair = np.append(down_lane_coeffs, up_lane_coeffs, axis = 0)       
    curr_error = cost(up_lane_coeffs, down_lane_coeffs)
    
    if curr_error < prev_error:
        prev_error = curr_error 
        best_coeffs_pair = ego_lane_coeffs_pair

    iteration += 1
    end = time.time()
    print(end - start)
    print(iteration)

print("time spent: {:.2f} sec".format(time.time() - start)) 
print(best_coeffs_pair)
print(prev_error)


ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ

# cost function


In [None]:
def cost(up_lane_coeffs, down_lane_coeffs):
    
    x_list = list(range(-40, 41, 1))
    interval_truth = list(itertools.repeat(3, 81)) 
    interval_measured = [] 

    for x_test in x_list:
        y_down = down_lane_coeffs[0]*x_test**3 + down_lane_coeffs[1]*x_test**2 + down_lane_coeffs[2]*x_test + down_lane_coeffs[3]
        deriv_y_down = 3*down_lane_coeffs[0]*x_test**2 + 2*down_lane_coeffs[1]*x_test+down_lane_coeffs[2]

        y_down_perp_line = lambda x: -1/deriv_y_down*(x-x_test) + y_down
        y_up = lambda x: up_lane_coeffs[0]*x**3 + up_lane_coeffs[1]*x**2 + up_lane_coeffs[2]*x + up_lane_coeffs[3]

        intersection_x = findIntersection(y_down_perp_line, y_up, x_test)[0]
        intersection_y = y_down_perp_line(intersection_x)

        dist = distance.euclidean([intersection_x, intersection_y], [x_test, y_down])
        interval_measured.append(dist)
    
    cost = mean_squared_error(interval_truth, interval_measured)
    return cost 

def findIntersection(fun1, fun2, x0):
    return fsolve(lambda x: fun1(x) - fun2(x), x0)