# Classification of roads

We're going to look into the success of classifying algorithms on the LiDAR data we have with a couple of different techniques

### The following are the list of classifications defined by IFP
- 0 not yet classified (nothing done yet)
- 1 unclassified (actively marked as nothing)
- 2 ground, sidewalk
- 3,4,5 vegetation, low(gras) medium(shrubbery) high (trees)
- 6 buildings
- 8 street furniture
- 10 street markings
- 11 street, pavement
- 12 bike lanes
- 13 temporary things(bicycles, trashcans)
- 15 cars, trucks

In [3]:
import laspy
import numpy as np
import matplotlib.pyplot as plt     
import seaborn as sns
import open3d as o3d
import pandas as pd
from dotenv import load_dotenv
import json
from upath import UPath
import os
# Load environment variables from .env file if it exists
load_dotenv()

sns.set_theme(style="whitegrid")

In [4]:
#Set up the B2Drop pathway
B2D_DIR = UPath(os.getenv("DATA_DIR_FSSPEC_URI"),
                 base_url= os.getenv("DATA_DIR_FSSPEC_BASE_URL"),
                 auth=(os.getenv("DATA_DIR_FSSPEC_USER"),
                       os.getenv("DATA_DIR_FSSPEC_PASS"))
                    )

In [5]:
def describe_las(las):
    print(f"Point Format: {las.header.point_format}")
    print(f"Number of Points: {las.header.point_count}")
    print("Available Dimensions:", list(las.point_format.dimension_names))
    print("Bounding Box:")
    print(f"  X: {las.header.mins[0]} to {las.header.maxs[0]}")
    print(f"  Y: {las.header.mins[1]} to {las.header.maxs[1]}")
    print(f"  Z: {las.header.mins[2]} to {las.header.maxs[2]}")
    print("Scale:", las.header.scales)
    print("Offset:", las.header.offsets)
    try:
        print("CRS:", las.header.parse_crs())
    except:
        print("CRS: Not defined")

In [None]:
import os

file_path = B2D_DIR / "bologna.laz"
#Load in bologna.laz file (may take a while, make sure your VPN/B2DROP connection is active)
# Check if file exists
if not file_path.exists():
    raise FileNotFoundError(f"File not found: {file_path}")

# Check file size
file_size = os.path.getsize(file_path)
if file_size == 0:
    raise IOError(f"File exists but is empty — is your VPN/B2DROP connection active?")

print(f"File found: {file_path} ({file_size:,} bytes)")

# If it passes both checks, safe to load
with file_path.open("rb") as f:
    las = laspy.read(f)


In [8]:

with (B2D_DIR / "bologna.laz").open("rb") as f:
    las = laspy.read(f)

LazrsError: IoError: Failed to use readinto to read bytes

In [9]:
max_points = 1_000_000

indices = np.random.choice(len(las.x), max_points, replace=False)
points = np.vstack((las.x[indices], las.y[indices], las.z[indices])).T

In [10]:
#las = laspy.read(LAZ_FILE)
x, y, z = las.x, las.y, las.z

In [11]:
describe_las(las)

Point Format: <PointFormat(3, 4 bytes of extra dims)>
Number of Points: 112787346
Available Dimensions: ['X', 'Y', 'Z', 'intensity', 'return_number', 'number_of_returns', 'scan_direction_flag', 'edge_of_flight_line', 'classification', 'synthetic', 'key_point', 'withheld', 'scan_angle_rank', 'user_data', 'point_source_id', 'gps_time', 'red', 'green', 'blue', 'height_division']
Bounding Box:
  X: -319.2701 to -107.22720000000001
  Y: -315.29970000000003 to 26.5196
  Z: -8.614600000000001 to 34.2946
Scale: [0.0001 0.0001 0.0001]
Offset: [0. 0. 0.]
CRS: None


In [12]:
from laspy import read
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

# Fields of interest
fields = ['X', 'Y', 'Z', 'intensity', 'red', 'green', 'blue', 'classification']

df = pd.DataFrame({field: np.asarray(getattr(las, field)) for field in fields})

# Drop non-ground points (keep classification == 2 or 11 for now)
df = df[df['classification'].isin([2, 11])]  # 2 = ground, 11 = road surface

# Feature engineering placeholders (e.g., flatness, curvature can go here)
# Later...

# Optional: Normalize RGB
df[['red', 'green', 'blue']] /= 65535.0

# Optional: Standard scale numerical features
scaler = StandardScaler()
features = ['Z', 'intensity', 'red', 'green', 'blue']
df[features] = scaler.fit_transform(df[features])


We're dealing with a huge pointcloud here, so let's significantly downsample for the purpose of building the pipeline.

In [13]:
from sklearn.utils import resample

# Get sidewalk and road separately
df_sidewalk = df[df['classification'] == 2] #sidewalks
df_road = df[df['classification'] == 11] # streets

# Sample a balanced subset
N = 200_000  # 100k each
sidewalk_sample = resample(df_sidewalk, n_samples=N//2, random_state=42)
road_sample = resample(df_road, n_samples=N//2, random_state=42)

df_sampled = pd.concat([sidewalk_sample, road_sample]).sample(frac=1, random_state=42).reset_index(drop=True)


In [None]:
#provide a new column for labels, and map classifications to labels
df_sampled['label'] = df_sampled['classification'].map({2: 'sidewalk', 11: 'road'})

In [29]:
from sklearn.model_selection import train_test_split

X = df_sampled[features]
y = df_sampled['label']

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3, random_state=42)

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)

print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))

[[25685  4315]
 [ 4037 25963]]
              precision    recall  f1-score   support

        road       0.86      0.86      0.86     30000
    sidewalk       0.86      0.87      0.86     30000

    accuracy                           0.86     60000
   macro avg       0.86      0.86      0.86     60000
weighted avg       0.86      0.86      0.86     60000



We can also add features to the lidar to improve the model efficacy, such as planes, roughness, etc.

In [17]:
import open3d as o3d

# Convert subset to Open3D point cloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(df_sampled[['X','Y','Z']].values)

# Estimate normals
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.5, max_nn=30))

# Extract normal z-component to detect flatness (e.g. horizontal surfaces)
normals = np.asarray(pcd.normals)
df_sampled['normal_z'] = normals[:,2]


The Z-component of the normal tells you how “vertical” or “horizontal” the surface is at that point.
> If normal_z ≈ 1 → point is on a horizontal surface facing up.

> If normal_z ≈ 0 → point is on a vertical surface (wall, tree trunk, etc.).

This is a feature that can help the classifier distinguish ground, roofs, walls, etc.

In [18]:
print(y_test.value_counts())
print(np.unique(y_pred, return_counts=True))


label
road        30000
sidewalk    30000
Name: count, dtype: int64
(array(['road', 'sidewalk'], dtype=object), array([29722, 30278]))


In [27]:
import open3d as o3d
import numpy as np

# Convert coordinates to numpy array
points = df_sampled[['X', 'Y', 'Z']].to_numpy()

# Create point cloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)

# Add RGB colors
colors = df_sampled[['red', 'green', 'blue']].to_numpy().astype(np.float64)
pcd.colors = o3d.utility.Vector3dVector(colors)


# OPTIONAL: if you want to visualize by classification label instead of RGB
# Map each label to a distinct color
import matplotlib.pyplot as plt
unique_labels = df_sampled['label'].unique()
label_to_color = {lbl: plt.cm.tab20(i / len(unique_labels))[:3] for i, lbl in enumerate(unique_labels)}
colors_by_label = df_sampled['label'].map(label_to_color).to_numpy()
#pcd.colors = o3d.utility.Vector3dVector(np.vstack(colors_by_label)) #if you want to use label colors instead of RGB

# Save to file
#o3d.io.write_point_cloud("classified_output.ply", pcd)

# View in Open3D visualizer
o3d.visualization.draw_geometries([pcd])


In [28]:
# Perform cross-validation to evaluate the model's performance
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.ensemble import RandomForestClassifier
import numpy as np

X = df_sampled[features]
y = df_sampled['label']

rf = RandomForestClassifier(n_estimators=200, random_state=42)

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(rf, X, y, cv=cv, scoring='f1_macro')

print("F1 Macro per fold:", scores)
print("Mean F1 Macro:", np.mean(scores))


F1 Macro per fold: [0.8640439  0.86549906 0.8622708  0.86406885 0.8632986 ]
Mean F1 Macro: 0.8638362401468338


In [34]:
import open3d as o3d
import numpy as np
import matplotlib.pyplot as plt
#Let's plot the point cloud with colors based on match/mismatch of predicted labels

# 🔹 1. Predict for all points
df_sampled['pred_label'] = model.predict(df_sampled[features])

# 🔹 2. Points array
points = df_sampled[['X', 'Y', 'Z']].to_numpy()

# 🔹 3. Colormap for predicted labels
unique_pred_labels = np.unique(df_sampled['pred_label'])
pred_label_to_color = {lbl: plt.cm.tab20(i / len(unique_pred_labels))[:3] for i, lbl in enumerate(unique_pred_labels)}

# 🔹 4. Build colors based on match/mismatch
colors = []
for actual, pred in zip(df_sampled['label'], df_sampled['pred_label']):
    if actual == pred:
        # Correct → grey
        colors.append([0.6, 0.6, 0.6])
    else:
        # Incorrect → color by predicted label
        colors.append(pred_label_to_color[pred])

colors = np.array(colors)

# 🔹 5. Create point cloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)
pcd.colors = o3d.utility.Vector3dVector(colors)

# 🔹 6. Visualize
o3d.visualization.draw_geometries([pcd])


Looks like we're generally maintaining a good level of accuracy. However - there is potentially a single problem here... In our training dataset we have ONLY data that is either labelled as a sidewalk or a carriageway. If we are to introduce unseen data here, there will be more classes than "sidewalk" and "road". We have to introduce another more classes. If we just want a sidewalk and carriageway classifier, we can just use 3 classes - "Sidewalk", "carriageway", and "other"

### Unsupervised (another notebook?)

In [None]:
# -------------------------
# 0. Imports
# -------------------------
import pandas as pd
import numpy as np
import open3d as o3d
import matplotlib.pyplot as plt

from sklearn.model_selection import StratifiedKFold, cross_val_score, train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA

#import tensorflow as tf
#from tensorflow.keras import layers, models
#import joblib


















In [35]:
# -------------------------
# 1. Load data
# -------------------------
# df should already contain: X, Y, Z, intensity, red, green, blue, label (12 classes originally)
df['label'] = df['label'].astype(str)

# -------------------------
# 2. Collapse into 3 classes
# -------------------------
# Classes of interest
target_map = {
    2: 'sidewalk',
    11: 'carriageway'
}

# Map, with "other" fallback
df['label_3class'] = df['classification'].map(target_map).fillna('other')

# Check balance
print(df['label_3class'].value_counts())


KeyError: 'label'

In [None]:
# -------------------------
# 3. Geometric feature engineering
# -------------------------

# Subsample for normals
N_SUBSAMPLE = 50000
df_sub = df.sample(min(N_SUBSAMPLE, len(df)), random_state=42)

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(df_sub[['X','Y','Z']].to_numpy())
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.5, max_nn=30))

# Store normal_z
df_sub['normal_z'] = np.asarray(pcd.normals)[:,2]

# Nearest-neighbor interpolation of normal_z
from sklearn.neighbors import NearestNeighbors
nbrs = NearestNeighbors(n_neighbors=1).fit(df_sub[['X','Y','Z']])
_, indices = nbrs.kneighbors(df[['X','Y','Z']])
df['normal_z'] = df_sub.iloc[indices[:,0]]['normal_z'].values

# Add height (relative to min Z = ground approx.)
df['height'] = df['Z'] - df['Z'].min()

In [None]:
# -------------------------
# 4. Prepare features & labels
# -------------------------
features = ['intensity','red','green','blue','normal_z','height']
X = df[features].to_numpy()
y = df['label_3class']

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
# -------------------------
# 5. Check for "only one label" issue
# -------------------------
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, stratify=y, test_size=0.3, random_state=42)

rf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)

print("Unique predictions:", np.unique(y_pred, return_counts=True))
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))

In [None]:
# -------------------------
# 6. Cross-validated RandomForest with tuning
# -------------------------
param_grid = {
    'n_estimators': [100, 200],
    'max_depth': [None, 10, 20]
}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
grid = GridSearchCV(rf, param_grid, cv=cv, scoring='f1_macro', n_jobs=-1)
grid.fit(X_scaled, y)

print("Best RF params:", grid.best_params_)
print("Best CV score:", grid.best_score_)

rf_best = grid.best_estimator_


In [None]:
# -------------------------
# 7. Unsupervised clustering
# -------------------------

# PCA for speed
X_pca = PCA(n_components=3).fit_transform(X_scaled)

# KMeans
kmeans = KMeans(n_clusters=3, random_state=42).fit(X_pca)
print("KMeans cluster counts:", np.unique(kmeans.labels_, return_counts=True))

# DBSCAN
dbscan = DBSCAN(eps=0.5, min_samples=50).fit(X_pca)
print("DBSCAN cluster counts:", np.unique(dbscan.labels_, return_counts=True))

In [None]:
# -------------------------
# 8. Deep learning baseline (MLP)
# -------------------------
n_classes = len(np.unique(y))
y_encoded = pd.Categorical(y).codes  # convert to int labels

model = models.Sequential([
    layers.Input(shape=(X_scaled.shape[1],)),
    layers.Dense(64, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(n_classes, activation='softmax')
])

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
history = model.fit(X_train, pd.Categorical(y_train).codes,
                    validation_data=(X_test, pd.Categorical(y_test).codes),
                    epochs=20, batch_size=32)

In [None]:
# -------------------------
# 9. Save models
# -------------------------
joblib.dump(rf_best, "rf_3class.joblib")
joblib.dump(scaler, "scaler.joblib")
model.save("mlp_3class.h5")

print("Models saved.")

In [None]:
# -------------------------
# 10. Visualisation helper
# -------------------------
def visualize_predictions(df, y_pred, out_file="predictions.ply"):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(df[['X','Y','Z']].to_numpy())
    
    # Map labels to colors
    unique_labels = np.unique(y_pred)
    cmap = plt.cm.get_cmap("tab20", len(unique_labels))
    label_to_color = {lbl: cmap(i)[:3] for i,lbl in enumerate(unique_labels)}
    colors = np.vstack([label_to_color[lbl] for lbl in y_pred])
    
    pcd.colors = o3d.utility.Vector3dVector(colors)
    o3d.io.write_point_cloud(out_file, pcd)
    o3d.visualization.draw_geometries([pcd])

# Example: visualize RF predictions
visualize_predictions(df, rf_best.predict(X_scaled), out_file="rf_predictions.ply")