In [1]:
from pathlib import Path
import numpy as np
import pandas as pd
import hdbscan
import matplotlib.pyplot as plt

from sklearn.cluster import KMeans, DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

from sklearn.metrics import silhouette_score, adjusted_rand_score
import seaborn as sns

import folium
import numpy as np
from IPython.display import display

In [2]:
here = Path.cwd()
root = next(p for p in [here, *here.parents] if (p / "data" / "processed").exists())
full_df = pd.read_csv(root / "data" / "processed" / "state1" / "ios_left.csv", header=0)
geo_cols = full_df[["eventStartLatitude", "eventStartLongitude",
                        "eventEndLatitude", "eventEndLongitude"]].copy()
full_df = full_df.drop(columns=['is_ios', 'eventType', 'eventStart', 'eventEnd'])

def assign_manual_label_3class(row):
    angle = row['gyroAngleChange']
    if angle < 60:
        return 0  # Lane Change
    elif angle < 140:
        return 1  # Left Turn
    else:
        return 2  # U-Turn

full_df['manual_label'] = full_df.apply(assign_manual_label_3class, axis=1)

print("Manual label distribution:")
print(full_df['manual_label'].value_counts().sort_index())
print()

print(full_df.columns.tolist())
features = [
    'angle_norm',                        
    'radius_log',                        
    'turn_intensity',                    
    'gyro_angular_change_per_second',   
    'eventDurationSeconds',            
]
df = full_df[features].copy()
print(df.columns.to_list())
print(df.shape)

Manual label distribution:
manual_label
0     809
1    2555
2     160
Name: count, dtype: int64

['eventSampleSpeed', 'eventDurationSeconds', 'eventMilesDriven', 'eventStartLatitude', 'eventStartLongitude', 'eventEndLatitude', 'eventEndLongitude', 'eventGPSSignalStrength', 'eventStartSpeed', 'eventEndSpeed', 'memsMedianHorizontalNorm', 'gyro_angular_change_per_second', 'gyroAngleChange', 'mems_radius', 'dv', 'turn_intensity', 'angle_norm', 'radius_log', 'manual_label']
['angle_norm', 'radius_log', 'turn_intensity', 'gyro_angular_change_per_second', 'eventDurationSeconds']
(3524, 5)


In [3]:
def calculate_gini_score(labels) -> float:
    # Exclude HDBSCAN/DBSCAN noise
    labels = np.asarray(labels)
    mask = labels != -1
    if mask.sum() == 0:
        return 0.0
    # Counts per cluster
    counts = np.bincount(labels[mask])
    counts = counts[counts > 0].astype(float)
    if counts.size <= 1:
        return 0.0
    # Pairwise Gini
    G = np.abs(counts[:, None] - counts[None, :]).sum() / (2 * counts.size * counts.sum())
    # Normalize so 1.0 = maximally imbalanced given k clusters
    Gmax = (counts.size - 1) / counts.size
    return float(G / Gmax) if Gmax else 0.0
    