In [None]:
import os

current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
analyze_path = os.path.join(parent_dir, "utils")

os.chdir(analyze_path)

from new_grid import TaiwanBaseGridGenerator, AccidentHotspotAnalyzer

m = 1000
OSM_DIR = '/Users/wangqiqian/Desktop/ST-RTA-GIS/Data/road_new.shp/'
SHP_PATH = '../Data/OFiles_9e222fea-bafb-4436-9b17-10921abc6ef2/TOWN_MOI_1140318.shp'
ACCIDENT_PATH = '../ComputedDataV2/Accident/combined_data_in_taiwan.csv'
OUTPUT_PATH = f'../ComputedDataV5/ForModel/full_hex_grid_{m}.csv'

local_tasks = {
    'count_mrt': '../ComputedData/MRT/full_mrt.csv',
    'count_youbike': '../ComputedData/YouBike/full_youbike.csv',
    'count_parking_official': '../ComputedData/Parkinglot/full_parkinglot.csv'
}

BASE_GRID_PATH = f'../ComputedDataV5/ForModel/base_hex_grid_{m}.csv'
ANALYSIS_OUTPUT_PATH = f'../ComputedDataV5/ForModel/full_hotspots_{m}.csv'

In [None]:
generator = TaiwanBaseGridGenerator(osm_dir=OSM_DIR, boundary_shp_path=SHP_PATH)
generator.generate_grid(radius_meters=m)
generator.calculate_osm_features()
generator.add_local_features(local_tasks)
generator.save_base_grid(BASE_GRID_PATH)

In [None]:
analyzer = AccidentHotspotAnalyzer(base_grid_path=BASE_GRID_PATH)
analyzer.integrate_accident_data(ACCIDENT_PATH, filter_query=None)
analyzer.calculate_hotspots()
analyzer.save_result(ANALYSIS_OUTPUT_PATH)
analyzer.plot_hotspots()

In [None]:
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from shapely import wkt

# roads_path = '/Users/wangqiqian/Desktop/ST-RTA-GIS/Data/road_new.shp/gis_osm_roads_free_1.shp'
# gdf_roads = gpd.read_file(roads_path)

# all_coords = [pt for line in gdf_roads.geometry for pt in line.coords]

# df_points = pd.DataFrame(all_coords, columns=['x', 'y'])
# point_counts = df_points.value_counts()
# junctions = point_counts[point_counts >= 2].index.tolist()

# gdf_junctions = gpd.GeoDataFrame(
#     geometry=[Point(x, y) for x, y in junctions],
#     crs=gdf_roads.crs
# )

roads_path = '/Users/wangqiqian/Desktop/ST-RTA-GIS/Data/road_new.shp/gis_osm_roads_free_1.shp'
gdf_roads = gpd.read_file(roads_path)
car_roads = ['motorway', 'motorway_link', 'trunk', 'trunk_link', 'primary', 'primary_link', 
             'secondary', 'secondary_link', 'tertiary', 'tertiary_link', 'unclassified', 
             'residential', 'living_street', 'service']
gdf_roads = gdf_roads[gdf_roads['fclass'].isin(car_roads)]

all_coords = [pt for line in gdf_roads.geometry for pt in line.coords]
df_points = pd.DataFrame(all_coords, columns=['x', 'y'])
point_counts = df_points.value_counts()

way = point_counts[point_counts >= 2].index.tolist()
gdf_way = gpd.GeoDataFrame(geometry=[Point(x, y) for x, y in way], crs=gdf_roads.crs)

df_grid = pd.read_csv(ANALYSIS_OUTPUT_PATH)
df_grid['geometry'] = df_grid['geometry'].apply(wkt.loads)
gdf_grid = gpd.GeoDataFrame(df_grid, geometry='geometry', crs="EPSG:3826")

if gdf_way.crs != gdf_grid.crs:
    gdf_way = gdf_way.to_crs(gdf_grid.crs)

joined = gpd.sjoin(gdf_way, gdf_grid[['grid_id', 'geometry']], predicate='within')
count = joined.groupby('grid_id').size()
count.name = 'count_intersection'

gdf_grid = gdf_grid.merge(count, on='grid_id', how='left')
gdf_grid['count_intersection'] = gdf_grid['count_intersection'].fillna(0).astype(int)

OUTPUT_PATH2 = '../ComputedDataV5/ForModel/base_hex_grid_final.csv'
gdf_grid.to_csv(OUTPUT_PATH2, index=False)

In [None]:
SPD_DLT_PATH = '/Users/wangqiqian/Desktop/ST-RTA-GIS/CalculatedData/pairs_annot_all_cities.shp'
OUTPUT_PATH3 = '../ComputedDataV5/ForModel/base_hex_grid_spd_final.csv'

gdf_grid = gpd.read_file(OUTPUT_PATH2)
gdf_grid['geometry'] = gdf_grid['geometry'].apply(wkt.loads) 
gdf_grid = gpd.GeoDataFrame(gdf_grid, geometry='geometry', crs="EPSG:3826")

gdf_spd = gpd.read_file(SPD_DLT_PATH)

if gdf_spd.crs != gdf_grid.crs:
    gdf_spd = gdf_spd.to_crs(gdf_grid.crs)

joined_spd = gpd.sjoin(gdf_spd, gdf_grid[['grid_id', 'geometry']], predicate='within')

cols_to_drop = ['count_spd_points', 'count_spd_points_x', 'count_spd_points_y']
gdf_grid = gdf_grid.drop(columns=[c for c in cols_to_drop if c in gdf_grid.columns])

count_spd = joined_spd.groupby('grid_id').size().reset_index(name='count_spd_points')

gdf_grid['grid_id'] = gdf_grid['grid_id'].astype(int)
count_spd['grid_id'] = count_spd['grid_id'].astype(int)

gdf_grid = gdf_grid.merge(count_spd, on='grid_id', how='left')

gdf_grid['count_spd_points'] = gdf_grid['count_spd_points'].fillna(0).astype(int)

gdf_grid.to_csv(OUTPUT_PATH3, index=False)

## Morans

In [None]:
import libpysal
from esda.moran import Moran, Moran_Local
import matplotlib.pyplot as plt

def calculate_morans(gdf, target_col='accident_count'):
    work_gdf = gdf.copy()
    work_gdf[target_col] = work_gdf[target_col].fillna(0)
    
    w = libpysal.weights.Queen.from_dataframe(work_gdf, use_index=True)

    if w.islands:
        print(f'found {w.islands} islands, removing from analysis')
        work_gdf = work_gdf.drop(index=w.islands)
        w = libpysal.weights.Queen.from_dataframe(work_gdf, use_index=True)

    w.transform = 'r'
    # Global Moran's I
    y = work_gdf[target_col].values
    moran = Moran(y, w)

    print('global moran result')
    print(f"   Moran's I : {moran.I:.4f}")
    print(f"   P-value : {moran.p_sim:.4f}")

    print('local morans')
    lisa = Moran_Local(y, w)
    
    work_gdf['lisa_q'] = lisa.q
    work_gdf['lisa_p'] = lisa.p_sim
    
    work_gdf['cluster_type'] = 'Not Significant'
    sig = work_gdf['lisa_p'] < 0.05

    work_gdf.loc[sig & (work_gdf['lisa_q']==1), 'cluster_type'] = 'High-High (Hotspot)'
    work_gdf.loc[sig & (work_gdf['lisa_q']==3), 'cluster_type'] = 'Low-Low (Coldspot)'
    work_gdf.loc[sig & (work_gdf['lisa_q']==2), 'cluster_type'] = 'Low-High (Outlier)'
    work_gdf.loc[sig & (work_gdf['lisa_q']==4), 'cluster_type'] = 'High-Low (Outlier)'
    
    return work_gdf, moran

hex_lisa, global_moran = calculate_morans(hex_grid_final, 'accident_count')

In [None]:
def plot_lisa_map(gdf):
    fig, ax = plt.subplots(figsize=(12, 12))

    color_map = {
        'High-High (Hotspot)': '#d7191c',
        'Low-Low (Coldspot)': '#2c7bb6',
        'Low-High (Outlier)': '#83b9e2',
        'High-Low (Outlier)': '#fdae61',
        'Not Significant': '#eeeeee'
    }

    gdf[gdf['cluster_type'] == 'Not Significant'].plot(
        ax=ax, color='#eeeeee', edgecolor='none', alpha=0.5
    )

    for ctype, color in color_map.items():
        if ctype == 'Not Significant': continue
        subset = gdf[gdf['cluster_type'] == ctype]
        if len(subset) > 0:
            subset.plot(
                ax=ax, 
                color=color, 
                edgecolor='black', 
                linewidth=0.1, 
                label=ctype
            )
    
    plt.title('LISA Cluster Map of Accidents (Moran\'s I)', fontsize=15)
    plt.legend(loc='lower right')
    plt.axis('off')
    plt.show()

plot_lisa_map(hex_lisa)