In [None]:
### find lane change -+ 10 secs

import pandas as pd
import os

# Example data file paths
datasets = {
    #"df395": "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\I395-final-run-index.csv",
    "df294l1": "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\df294l1-aligned.csv",
    #"df9094": "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\df9094-aligned.csv",
    "df294l2": "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\df294l2-aligned.csv"
}

save_dir = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\"
os.makedirs(save_dir, exist_ok=True)  # Ensure save directory exists

# Combined output file
output_csv_path = os.path.join(save_dir, "combined_lane_change_data.csv")
all_lane_change_data = []  # List to collect data across all datasets
lane_change_counter = 0  # Counter for assigning unique lane change IDs

# Iterate through datasets
for df_key, df_path in datasets.items():
    # Load and preprocess the dataset
    df = pd.read_csv(df_path)
    df = df.sort_values(by=['run-index', 'time'])
    df['time'] = df['time'].round(1)

    # Process each unique run-index
    for run_index in df['run-index'].unique():
        run_data = df[df['run-index'] == run_index].copy()  # Use `.copy()` to create an independent DataFrame
        
        # Detect lane changes by comparing consecutive rows for each vehicle
        run_data.loc[:, 'lane-change'] = run_data.groupby('ID')['lane-kf'].diff().fillna(0).ne(0)  # Detect lane changes per vehicle
        
        # Get rows where a lane change occurred
        lane_change_events = run_data[run_data['lane-change']]
        
        for _, event in lane_change_events.iterrows():
            lane_change_counter += 1  # Increment lane change ID counter
            vehicle_id = event['ID']  # ID of the vehicle performing the lane change
            lane_change_time = event['time']  # Time of lane change
            
            # Extract 5 seconds before and after the lane change for this specific vehicle
            vehicle_data = run_data[run_data['ID'] == vehicle_id]  # Filter data for this vehicle
            surrounding_data = vehicle_data[
                (vehicle_data['time'] >= lane_change_time - 20) &
                (vehicle_data['time'] <= lane_change_time + 20)
            ].copy()  # Use `.copy()` to avoid warnings
            
            # Add dataset, ID, and run-index columns to the surrounding data
            surrounding_data.loc[:, 'dataset'] = df_key
            surrounding_data.loc[:, 'run-index'] = run_index
            surrounding_data.loc[:, 'ID'] = vehicle_id
            surrounding_data.loc[:, 'LC_ID'] = lane_change_counter  # Assign lane change ID
            
            # Calculate relative time
            surrounding_data.loc[:, 'time-relative'] = surrounding_data['time'] - lane_change_time
            
            # Select relevant columns, including lane-kf and time-relative
            relevant_data = surrounding_data[
                ['dataset', 'run-index', 'ID', 'LC_ID', 'time', 'time-relative', 'xloc-kf', 'yloc-kf', 'speed-kf', 'acceleration-kf', 'lane-kf', 'type-most-common']
            ]
            
            # Append to the combined list
            all_lane_change_data.append(relevant_data)

# Combine all data into a single DataFrame and save to a CSV file
if all_lane_change_data:
    combined_df = pd.concat(all_lane_change_data, ignore_index=True)
    combined_df.to_csv(output_csv_path, index=False)
    print(f"Saved combined lane change data to {output_csv_path}")
else:
    print("No lane changes were detected in the datasets.")


In [None]:
## plotting

import pandas as pd
import matplotlib.pyplot as plt
import os

# Path to the combined lane change data
input_csv_path = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\combined_lane_change_data.csv"
plot_save_dir = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\Plots\\"
os.makedirs(plot_save_dir, exist_ok=True)  # Ensure save directory exists

# Load the data
lane_change_data = pd.read_csv(input_csv_path)

# Iterate through each dataset, run-index, and ID
for dataset in lane_change_data['dataset'].unique():
    dataset_data = lane_change_data[lane_change_data['dataset'] == dataset]
    
    for run_index in dataset_data['run-index'].unique():
        run_data = dataset_data[dataset_data['run-index'] == run_index]
        
        for vehicle_id in run_data['ID'].unique():
            vehicle_data = run_data[run_data['ID'] == vehicle_id]
            
            plt.figure(figsize=(8, 6))
            plt.plot(vehicle_data['time'], vehicle_data['yloc-kf'], marker='o', linestyle='-', label='Y Location')
            #plt.axvline(x=0, color='r', linestyle='--', label='Lane Change Event')
            plt.xlabel('Time Relative to Lane Change (s)')
            plt.ylabel('Y Location')
            plt.title(f'Dataset: {dataset} | Run Index: {run_index} | ID: {vehicle_id}')
            plt.legend()
            plt.grid()
            
            # Save the plot
            plot_filename = f"{dataset}_run{run_index}_ID{vehicle_id}.png"
            plt.show()
            #plt.savefig(os.path.join(plot_save_dir, plot_filename))
            plt.close()

print(f"Plots saved to {plot_save_dir}")

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from scipy.signal import savgol_filter
from scipy.optimize import curve_fit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Paths
input_csv_path = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\lateral_align\\combined_lane_change_data.csv"
plot_save_dir = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\Fitted_Curves\\"
results_csv_path = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\Fitted_Curves\\Fitted_Models_Results.csv"
os.makedirs(plot_save_dir, exist_ok=True)  # Ensure save directory exists

# Load the data
lane_change_data = pd.read_csv(input_csv_path)

# Define polynomial fitting functions
def cubic_fit(x, a, b, c, d):
    return a * x**3 + b * x**2 + c * x + d

def quintic_fit(x, a, b, c, d, e, f):
    return a * x**5 + b * x**4 + c * x**3 + d * x**2 + e * x + f

# Store results in a DataFrame
results = []

# Iterate through each unique LC_ID
for lc_id in lane_change_data['LC_ID'].unique():
    lc_data = lane_change_data[lane_change_data['LC_ID'] == lc_id]
    
    # Extract metadata
    dataset = lc_data['dataset'].iloc[0]
    run_index = lc_data['run-index'].iloc[0]
    veh_id = lc_data['ID'].iloc[0]

    # Extract time, lateral position, and speed
    time_relative = lc_data['time-relative'].values
    yloc = lc_data['yloc-kf'].values
    speed = lc_data['speed-kf'].values  # Extract speed values
    
    # Compute derivatives for velocity and acceleration
    velocity = np.gradient(yloc, time_relative)
    acceleration = np.gradient(velocity, time_relative)
    
    # Smooth velocity and acceleration
    velocity_smooth = savgol_filter(velocity, 7, 2)
    acceleration_smooth = savgol_filter(acceleration, 7, 2)
    
    # Detect lane change start and end
    vel_threshold = 0.25 * np.max(np.abs(velocity_smooth))  # 25% of peak velocity
    start_idx = np.where(np.abs(velocity_smooth) > vel_threshold)[0][0]
    end_idx = np.where(np.abs(velocity_smooth) > vel_threshold)[0][-1]
    
    # Extract lane change duration data
    lane_change_time = time_relative[start_idx:end_idx+1]

    lane_change_duration = time_relative[end_idx] - time_relative[start_idx]

    # Skip this lane change if the duration is less than 15 seconds
    if lane_change_duration < 1:
        print(f"Skipping LC_ID={lc_id} due to insufficient duration: {lane_change_duration:.2f} seconds.")
        continue

    if lane_change_duration > 11:
        print(f"Skipping LC_ID={lc_id} due to insufficient duration: {lane_change_duration:.2f} seconds.")
        continue
    
    lane_change_yloc = yloc[start_idx:end_idx+1]
    
    # Fit cubic and quintic curves
    popt_cubic, _ = curve_fit(cubic_fit, lane_change_time, lane_change_yloc)
    popt_quintic, _ = curve_fit(quintic_fit, lane_change_time, lane_change_yloc)
    
    # Generate predictions for error calculation
    y_cubic_pred = cubic_fit(lane_change_time, *popt_cubic)
    y_quintic_pred = quintic_fit(lane_change_time, *popt_quintic)
    
    # Compute error metrics
    rmse_cubic = np.sqrt(mean_squared_error(lane_change_yloc, y_cubic_pred))
    mae_cubic = mean_absolute_error(lane_change_yloc, y_cubic_pred)
    r2_cubic = r2_score(lane_change_yloc, y_cubic_pred)
    
    rmse_quintic = np.sqrt(mean_squared_error(lane_change_yloc, y_quintic_pred))
    mae_quintic = mean_absolute_error(lane_change_yloc, y_quintic_pred)
    r2_quintic = r2_score(lane_change_yloc, y_quintic_pred)
    
    # Extract speed at the start and end of the lane change
    speed_start = speed[start_idx]
    speed_end = speed[end_idx]
    
    # Find the index where time-relative is closest to zero (lane change start time)
    closest_to_zero_idx = np.argmin(np.abs(time_relative))  # Index where time-relative is closest to 0
    speed_at_lane_change = speed[closest_to_zero_idx]  # Speed at time-relative = 0
    
    # Save results including speed at the start, end, and at time-relative = 0
    results.append({
        "dataset": dataset,
        "run-index": run_index,
        "ID": veh_id,
        "LC_ID": lc_id,
        "start_time": time_relative[start_idx],
        "end_time": time_relative[end_idx],
        "duration": time_relative[end_idx] - time_relative[start_idx],
        "cubic_a": popt_cubic[0],
        "cubic_b": popt_cubic[1],
        "cubic_c": popt_cubic[2],
        "cubic_d": popt_cubic[3],
        "cubic_rmse": rmse_cubic,
        "cubic_mae": mae_cubic,
        "cubic_r2": r2_cubic,
        "quintic_a": popt_quintic[0],
        "quintic_b": popt_quintic[1],
        "quintic_c": popt_quintic[2],
        "quintic_d": popt_quintic[3],
        "quintic_e": popt_quintic[4],
        "quintic_f": popt_quintic[5],
        "quintic_rmse": rmse_quintic,
        "quintic_mae": mae_quintic,
        "quintic_r2": r2_quintic,
        "speed_start": speed_start,
        "speed_end": speed_end,
        "speed_at_lane_change": speed_at_lane_change  # Add new column for speed at time-relative = 0
    })
    
    # Create a 3x1 subplot
    fig, axes = plt.subplots(3, 1, figsize=(8, 12))

    # Generate smooth time series for plotting
    time_smooth = np.linspace(lane_change_time.min(), lane_change_time.max(), 100)
    yloc_cubic_fit = cubic_fit(time_smooth, *popt_cubic)
    yloc_quintic_fit = quintic_fit(time_smooth, *popt_quintic)

    # Plot lateral position with fits
    axes[0].scatter(time_relative, yloc, label='Original Data', color='gray', alpha=0.6)
    axes[0].plot(time_smooth, yloc_cubic_fit, label=f'Cubic Fit (R²={r2_cubic:.2f})', linestyle='--')
    axes[0].plot(time_smooth, yloc_quintic_fit, label=f'Quintic Fit (R²={r2_quintic:.2f})', linestyle='-')
    axes[0].axvline(time_relative[start_idx], color='r', linestyle='--', label='Start of Lane Change')
    axes[0].axvline(time_relative[end_idx], color='g', linestyle='--', label='End of Lane Change')
    axes[0].set_ylabel('Lateral Position')
    axes[0].legend()
    axes[0].grid()
    
    # Plot velocity
    axes[1].plot(time_relative, velocity, label='Velocity (Raw)', linestyle='--', alpha=0.6)
    axes[1].plot(time_relative, velocity_smooth, label='Velocity (Smoothed)', linestyle='-')
    axes[1].axvline(time_relative[start_idx], color='r', linestyle='--')
    axes[1].axvline(time_relative[end_idx], color='g', linestyle='--')
    axes[1].set_ylabel('Lateral Velocity')
    axes[1].legend()
    axes[1].grid()
    
    # Plot acceleration
    axes[2].plot(time_relative, acceleration, label='Acceleration (Raw)', linestyle='--', alpha=0.6)
    axes[2].plot(time_relative, acceleration_smooth, label='Acceleration (Smoothed)', linestyle='-')
    axes[2].axvline(time_relative[start_idx], color='r', linestyle='--')
    axes[2].axvline(time_relative[end_idx], color='g', linestyle='--')
    axes[2].set_xlabel('Time Relative to Lane Change (s)')
    axes[2].set_ylabel('Lateral Acceleration')
    axes[2].legend()
    axes[2].grid()

    # Construct filename
    filename = f"LC_{dataset}_Run{run_index}_ID{veh_id}_LC{lc_id}.png"
    filepath = os.path.join(plot_save_dir, filename)

    # Save and show the plot
    plt.tight_layout()
    plt.savefig(filepath)
    plt.show()
    plt.close(fig)

    print(f"Plot saved as: {filename}")

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv(results_csv_path, index=False)

print(f"Results saved to {results_csv_path}")


In [None]:
### filter r2 < 0.5
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit

# Load the CSV file
results_csv_path = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\Fitted_Curves\\Fitted_Models_Results.csv"
results_df = pd.read_csv(results_csv_path)

# Filter data where quintic_r2 >= 0.5
filtered_results_df = results_df[results_df["quintic_r2"] >= 0.85]

In [None]:
### duration fit

# Extract relevant columns
durations = filtered_results_df["duration"].values
speeds_at_lane_change = filtered_results_df["speed_start"].values

# Number of observations after filtering
num_observations = len(durations)
print(f"Number of observations after filtering: {num_observations}")

# Check if we have enough data to fit a model
if num_observations > 1:
    # Define a linear function for fitting
    def linear_fit(x, m, b):
        return m * x + b

    # Fit the linear model
    popt, _ = curve_fit(linear_fit, speeds_at_lane_change, durations)
    slope, intercept = popt

    print(f"Fitted Linear Model: duration = {slope:.4f} * speed_at_lane_change + {intercept:.4f}")

    # Generate values for plotting
    speed_range = np.linspace(min(speeds_at_lane_change), max(speeds_at_lane_change), 100)
    duration_fit = linear_fit(speed_range, *popt)

    # Plot the data and the fitted line
    plt.figure(figsize=(8, 6))
    plt.scatter(speeds_at_lane_change, durations, label="Data Points", alpha=0.6)
    plt.plot(speed_range, duration_fit, color="red", label=f"Fitted Line")
    plt.xlabel("Speed at Lane Change")
    plt.ylabel("Duration of Lane Change")
    plt.title("Linear Fit: Duration vs. Speed at Lane Change")
    plt.legend()
    plt.grid()

    filename = f"Duration_R2Plot.png"
    filepath = os.path.join(plot_save_dir, filename)
    plt.savefig(filepath)
    
    plt.show()
else:
    print("Not enough data points to fit a model.")

In [None]:

results_csv_path = "C:\\Users\\Pedram\\Desktop\\PEDRAM - DO NOT TOUCH\\PT_IDM\\Fitted_Curves\\Fitted_Models_Results.csv"
results_df = pd.read_csv(results_csv_path)
results_df = results_df[results_df["quintic_r2"] >= 0.85]

# Compute the mean of the quintic parameters
quintic_params = ['quintic_a', 'quintic_b', 'quintic_c', 'quintic_d', 'quintic_e', 'quintic_f']
mean_quintic_params = results_df[quintic_params].mean()

# Report the mean values of the quintic polynomial parameters
print("Mean Quintic Polynomial Parameters:")
print(mean_quintic_params)

# Plot the distribution of the quintic polynomial parameters with percentile-based bins
fig, axes = plt.subplots(2, 3, figsize=(12, 8))
axes = axes.flatten()

for i, param in enumerate(quintic_params):
    # Get the parameter data
    data = results_df[param]
    
    # Calculate the 5th and 95th percentiles
    data_5th = np.percentile(data, 5)
    data_95th = np.percentile(data, 95)
    
    # Define the bins: range from the 5th to the 95th percentile
    bin_edges = np.linspace(data_5th, data_95th, 12)  # 8 bins (9 edges)
    
    # Plot histogram
    axes[i].hist(data, bins=bin_edges, color='skyblue', edgecolor='black', alpha=0.7)
    axes[i].set_title(f"Distribution of {param}")
    axes[i].set_xlabel(param)
    axes[i].set_ylabel('Frequency')

plt.tight_layout()
filename = f"Parameter_R2Plot.png"
filepath = os.path.join(plot_save_dir, filename)
plt.savefig(filepath)

plt.show()

# Plot the distribution of errors (RMSE, MAE) and R² with percentile-based bins
fig, axes = plt.subplots(1, 3, figsize=(18, 6))

# RMSE
rmse_data = results_df['quintic_rmse']
rmse_5th, rmse_95th = np.percentile(rmse_data, 5), np.percentile(rmse_data, 95)
rmse_bin_edges = np.linspace(rmse_5th, rmse_95th, 12)
axes[0].hist(rmse_data, bins=rmse_bin_edges, color='lightgreen', edgecolor='black', alpha=0.7)
axes[0].set_title("Distribution of RMSE")
axes[0].set_xlabel('RMSE')
axes[0].set_ylabel('Frequency')

# MAE
mae_data = results_df['quintic_mae']
mae_5th, mae_95th = np.percentile(mae_data, 5), np.percentile(mae_data, 95)
mae_bin_edges = np.linspace(mae_5th, mae_95th, 12)
axes[1].hist(mae_data, bins=mae_bin_edges, color='lightcoral', edgecolor='black', alpha=0.7)
axes[1].set_title("Distribution of MAE")
axes[1].set_xlabel('MAE')
axes[1].set_ylabel('Frequency')

# R²
r2_data = results_df['quintic_r2']
r2_5th, r2_95th = np.percentile(r2_data, 5), np.percentile(r2_data, 95)
r2_bin_edges = np.linspace(r2_5th, r2_95th, 12)
axes[2].hist(r2_data, bins=r2_bin_edges, color='lightblue', edgecolor='black', alpha=0.7)
axes[2].set_title("Distribution of R²")
axes[2].set_xlabel('R²')
axes[2].set_ylabel('Frequency')

plt.tight_layout()

filename = f"Manouver_R2Plot.png"
filepath = os.path.join(plot_save_dir, filename)
plt.savefig(filepath)

plt.show()
