In [None]:
import os
import sys
import matplotlib.pyplot as plt
current_dir = os.getcwd()
parent_parent_dir = os.path.abspath(os.path.join(current_dir, '../..')) # tweak so that you get dir of code project

sys.path.append(parent_parent_dir)

In [None]:
import json
import numpy as np
import pandas as pd
import ast
from sklearn.cluster import OPTICS
from src.features.get_first_and_last_x_y_coordinates import get_first_and_last_x_y_coordinates
from src.features.get_x_y_tuple_list import get_x_y_tuple_list
from src.models.DISTANCE_METRICS_WITH_ADDITIONAL_ARGS import DISTANCE_METRICS_WITH_ADDITIONAL_ARGS
from scipy.spatial.distance import cdist
from scipy.optimize import linprog
from scipy.spatial import ConvexHull
from sklearn.metrics import silhouette_score
import re
%matplotlib qt5

In [None]:
def load_dtw_matrices_from_json(file_path):
    """
    Load the DTW distance matrices from a JSON file.

    :param file_path: The path to the JSON file containing DTW matrices.
    :return: A dictionary where the keys are parameter names and the values are the DTW matrices (as NumPy arrays).
    """
    with open(file_path, 'r') as json_file:
        dtw_matrices_dict = json.load(json_file)

    # Convert the matrices from lists back to NumPy arrays
    dtw_matrices_dict = {key: np.array(matrix) for key, matrix in dtw_matrices_dict.items()}

    return dtw_matrices_dict

In [None]:
# define optimization parameters
# max_eps_range = np.arange(20, 500, 30)  # 16 values
# min_samples_range = np.arange(3, 10, 1)  # 7 values
# xi_range = np.arange(0.01, 0.5, 0.05)  # 10 values
# cluster_methods = ['dbscan', 'xi']  # 2 values

max_eps_range = np.arange(20, 510, 20)  # 16 values
min_samples_range = np.arange(3, 10, 1)  # 7 values
xi_range = np.arange(0.01, 0.2, 0.02)  # 10 values
cluster_methods = ['dbscan', 'xi']  # 2 values

In [None]:
# define intersection names
# intersection_names = ['k733_2020', 'k733_2018']
intersection_names = ['k729_2022']
# intersection_names.append('k729_2022')
print(intersection_names)

In [None]:
def create_three_letter_uid(max_eps_range, min_samples_range, xi_range):
    """
    Create a 3-letter unique ID based on optimization parameters.
    """
    # Take the first letter of 'max_eps', 'min_samples', and 'xi' ranges for uniqueness
    max_eps_id = chr(65 + int(max(max_eps_range)) % 26)  # Convert max_eps to a letter
    min_samples_id = chr(65 + int(min(min_samples_range)) % 26)  # Convert min_samples to a letter
    xi_id = chr(65 + int(min(xi_range) * 100) % 26)  # Convert xi to a letter based on its percentage

    # Combine them to form a 3-letter ID
    uid = f"{max_eps_id}{min_samples_id}{xi_id}"
    return uid

In [None]:
# optimize for each score individually

from src.models.optics.optimize_optics_for_precomputed_dtw import optimize_optics_for_precomputed_dtw
from src.data.save_optimization_parameters_in_json_file import save_optimization_parameters_in_json_file

for intersection_name in intersection_names:
    data_path = f'{parent_parent_dir}/data/processed/{intersection_name}_cuid.csv'

    df_cuid = pd.read_csv(data_path)
    df_cuid_grouped_path = data_path.replace('.csv', '_grouped.csv')
    df_cuid_grouped = pd.read_csv(df_cuid_grouped_path)
    df_cuid_grouped['x'] = df_cuid_grouped['x'].apply(lambda x: ast.literal_eval(x))
    df_cuid_grouped['y'] = df_cuid_grouped['y'].apply(lambda y: ast.literal_eval(y))
    list_x_y_tuples = get_x_y_tuple_list(df_cuid_grouped, ['x','y'])

    dtw_matrices_dict = load_dtw_matrices_from_json(f'{parent_parent_dir}/data/processed/{intersection_name}_diff_itakura_slope_dtw_matrices.json')



    optimization_results_optics = optimize_optics_for_precomputed_dtw(dtw_matrices_dict,
                                                                                    max_eps_range=max_eps_range,
                                                                                    min_samples_range=min_samples_range,
                                                                                    cluster_methods=cluster_methods,
                                                                                    xis=xi_range,
                                                                                    n_jobs=-1)
    print(optimization_results_optics)


    optimization_results_optics['used_parameters'] = {
        "max_eps_range": list(max_eps_range),
        "min_samples_range": list(min_samples_range),
        "cluster_methods": cluster_methods,
        "xi_range": list(xi_range)
    }

    unique_id = create_three_letter_uid(max_eps_range, min_samples_range, xi_range)
    unique_filename = f'{intersection_name}_optics_vehicle_paths_optimized_no_outliers_params_{unique_id}.json'

    save_optimization_parameters_in_json_file(f'{parent_parent_dir}/data/processed/{unique_filename}', **optimization_results_optics)

In [None]:
# optimize with accumulating scores

from src.models.optics.optimize_optics_accum_scores_precomputed_dtw import optimize_optics_accum_scores_precomputed_dtw
from src.data.save_optimization_parameters_in_json_file import save_optimization_parameters_in_json_file


for intersection_name in intersection_names:
    data_path = f'{parent_parent_dir}/data/processed/{intersection_name}_cuid.csv'

    df_cuid = pd.read_csv(data_path)
    df_cuid_grouped_path = data_path.replace('.csv', '_grouped.csv')
    df_cuid_grouped = pd.read_csv(df_cuid_grouped_path)
    df_cuid_grouped['x'] = df_cuid_grouped['x'].apply(lambda x: ast.literal_eval(x))
    df_cuid_grouped['y'] = df_cuid_grouped['y'].apply(lambda y: ast.literal_eval(y))

    dtw_matrices_dict = load_dtw_matrices_from_json(f'{parent_parent_dir}/data/processed/{intersection_name}_diff_itakura_slope_dtw_matrices.json')



    optimization_results_optics = optimize_optics_accum_scores_precomputed_dtw(dtw_matrices_dict,
                                                                                    max_eps_range=max_eps_range,
                                                                                    min_samples_range=min_samples_range,
                                                                                    cluster_methods=cluster_methods,
                                                                                    xis=xi_range,
                                                                                    n_jobs=-1)
    print(optimization_results_optics)

    optimization_results_optics['used_parameters'] = {
        "max_eps_range": list(max_eps_range),
        "min_samples_range": list(min_samples_range),
        "cluster_methods": cluster_methods,
        "xi_range": list(xi_range)
    }

    unique_id = create_three_letter_uid(max_eps_range, min_samples_range, xi_range)
    unique_filename = f'{intersection_name}_optics_vehicle_paths_optimized_no_outliers_params_acc_score_{unique_id}.json'

    save_optimization_parameters_in_json_file(f'{parent_parent_dir}/data/processed/{unique_filename}', **optimization_results_optics)