In [None]:
import pandas as pd
import geopandas as gpd
from pyproj.transformer import Transformer
from shapely.geometry import asLineString
import numpy as np
import shapely
import matplotlib.pyplot as plt
from shapely.geometry import Point, Polygon
from shapely.ops import unary_union, LineString
from sklearn.cluster import DBSCAN
import math
import statistics

import pptk

In [None]:
lidarData = pd.read_csv("HD_Run1(0).csv")
lidarGDF = gpd.GeoDataFrame(data=lidarData, crs=4326, geometry=gpd.points_from_xy(lidarData["X"], lidarData["Y"]))

In [None]:
gpsData = pd.read_csv("gps.csv")
gpsGDF = gpd.GeoDataFrame(data=gpsData,crs=4326, geometry=gpd.points_from_xy(gpsData["X"], gpsData["Y"]) )

In [None]:
lidarGDF.head()

In [None]:
gpsNP = gpsGDF.filter(['X','Y'])
#gpsNP = gpsData.to_numpy()
gpsNP = gpsNP.to_numpy()
gpsNP
# projectionTrans = Transformer.from_crs(4326, 'ESRI:102604', always_xy=True)
# projPoints = np.array(projectionTrans.transform(point_list[:,0], point_list[:, 1], direction="forward")).T
# projPoints

In [None]:
GPSline = asLineString(gpsNP)
GPSline

In [None]:
# DO NOT RUN

lidarNP = lidarData.filter(['X','Y'])
lidarNP = lidarNP.to_numpy()
lidarLine = asLineString(lidarNP)

lidarLine

In [None]:
# Creates a 5m buffer around the GPS tragetory, and a list of all the LiDAR points within that buffer

pointsWithinBuffer = lidarGDF.sindex.query(GPSline.buffer(5), predicate="contains")
print(pointsWithinBuffer.shape)

pointsWithinBuffer = np.asarray(pointsWithinBuffer)
offsetFilteredDf = lidarData.loc[pointsWithinBuffer]

In [None]:
# Elevation Filtering

for pt in gpsNP:
    sampleBuffer = Point(pt[0], pt[1]).buffer(0.5)
    pointsWithinBuffer = lidarGDF.sindex.query(sampleBuffer, predicate="contains")

    if len(pointsWithinBuffer) > 0:
        median_elevation = statistics.median(lidarData.loc[np.asarray(pointsWithinBuffer)]['Z'])
        offsetFilteredDf = offsetFilteredDf[(offsetFilteredDf['X'] > pt[0] + 5) | (offsetFilteredDf['X'] < pt[0] - 5)  | (offsetFilteredDf['Y'] > pt[1] + 5) | (offsetFilteredDf['Y'] < pt[1] - 5) | (offsetFilteredDf['Z'] < median_elevation + 0.4)]

        
print(offsetFilteredDf.shape)
    

In [None]:
## PARTITION GPS LINE

slice_frequency = 0.01
slice_depth = 0.1
whole_line = False

# Chose a range on the line. 2000-2200 works well
range_start = 2043
range_end = 2046
sample_starts = np.arange(range_start, range_end, slice_frequency)

if whole_line:
    sample_starts = np.arange(0, GPSline.length, slice_frequency)

start_points = [GPSline.interpolate(distance) for distance in sample_starts] + [GPSline.boundary.geoms[1]]
start_points = unary_union(start_points)
print(len(start_points.geoms))
start_points

In [None]:
## CREATE SLICES OF ROAD, BASED ON DIST

slices = []

for dist in sample_starts:
#for g in range(0, 150):
    #dist = sample_starts[g]

    sample_start = GPSline.interpolate(dist)
    sample_end = GPSline.interpolate(dist + slice_depth)
    sample_buffer = LineString([sample_start, sample_end]).buffer(5, cap_style=2)
    offset_line = LineString([sample_buffer.exterior.coords[3], sample_buffer.exterior.coords[4]])
    
    sample_df = lidarData.loc[np.asarray(lidarGDF.sindex.query(sample_buffer, predicate="contains"))]
    points_in_sample = []
    for i in sample_df['Id']:
        points_in_sample.append((i, sample_df.loc[i]['geometry']))
    

    points_in_sample = sorted(points_in_sample, key=lambda x: x[1].distance(offset_line))

    slice = []
    for pt in points_in_sample:
        slice.append((pt[1].distance(offset_line), sample_df.loc[pt[0]]['Z'], sample_df.loc[pt[0]]['geometry']))
    slices.append(slice)

    # Each "slice" in slices is a tuple of (dist_from_offset, z, POINT())
    # Slices are sorted by their distance from the offset line

    
    

In [None]:
## EDGE CREATION BASED ON DERIVATIVE

edges = []
h = 5
mg = 0.01 # minimum gradient
ma = 0.09 # minimum acceleration

for slice in slices:
    values = []
    for i in range((len(slice) - (len(slice) % h)) // 2, len(slice) - (len(slice) % h), h):
        pt = (statistics.mean([x[0] for x in slice[i:i+h]]), statistics.mean([x[1] for x in slice[i:i+h]]))
        ## Check last and sign, add if sign changes
        if len(values) < 5:
            values.append(pt[1])
        else:
            values = values[1:]
            values.append(pt[1])
            if (values[0] > values[1]+mg and values[1] > values[2]+mg and values[3] > values[2]+mg and values[4] > values[3]+mg):
                edges.append(Point(slice[i][2].x, slice[i][2].y, slice[i][1])) 
            elif (values[0]+mg < values[1] and values[1]+mg < values[2] and values[3]+mg < values[2] and values[4]+mg < values[3]):
                edges.append(Point(slice[i][2].x, slice[i][2].y, slice[i][1]))
            elif abs((values[4] - values[3]) - (values[3] - values[2])) > ma:
                edges.append(Point(slice[i][2].x, slice[i][2].y, slice[i][1]))

print(len(edges))

In [None]:
edge_plot_x = []
edge_plot_y = []
for pt in edges:
    edge_plot_x.append(pt.x)
    edge_plot_y.append(pt.y)

plt.scatter(edge_plot_x, edge_plot_y, marker='.', alpha=1)
plt.show()

In [None]:
dbscan_eplsilon = 0.085
dbscan_min_samples = 2

X = np.vstack((np.array(edge_plot_x), np.array(edge_plot_y))).T
clustering = DBSCAN(eps=dbscan_eplsilon, min_samples=dbscan_min_samples).fit(X)


color_labels = ["lime", "r", "g", "b", "m", "c", "y", "k", "blueviolet", "tomato", "gold", "darkblue", "lime", "wheat", "teal"]
clustering.labels_

clustered_x = []
clustered_y = []
cluster_colors = []
for i in range(len(edges)):
    if clustering.labels_[i] != -1:
        clustered_x.append(edges[i].x)
        clustered_y.append(edges[i].y)
        cluster_colors.append(color_labels[clustering.labels_[i]])

plt.scatter(clustered_x, clustered_y, marker='.', alpha=1, c=cluster_colors)
plt.show()

In [None]:
# Renders The filtered points in 3D, and their retroflectivity

filtered_xyz = offsetFilteredDf.filter(['X', 'Y', 'Z'])
filtered_intensity = offsetFilteredDf.filter(['Retro'])

filtered_intensity = np.array(filtered_intensity.Retro).transpose()
print(filtered_intensity.shape)

V = pptk.viewer(filtered_xyz)
V.attributes(filtered_intensity)

In [None]:
# Export to CSV

offsetFilteredDf.to_csv("filtered_points.csv")