In [150]:
#pandas version: 1.3.4
import pandas as pd
#geopandas version: 0.10.2
import geopandas as gpd
#folium version: 0.12.1
import folium
#shapely version: 1.8.0
from shapely import wkt
from shapely.geometry import LineString

## Create network geodataframe

In [91]:
def create_network_geodataframe(file_path,crs=2230):
    """
    Creates network geodataframe from csv with geometry in well-known text format. 
    Required columns = ['A','B','FT','GEOMETRY']
    
    Parameters
    -------------------
    file_path (String):
    Path to csv
    
    crs (String):
    Coordinate reference system as a string, such as an authority string (eg "EPSG:4326")
    
    Returns
    -------------------
    Geodataframe object
    """
    req_cols = ['A','B','FT','GEOMETRY']
    #Read csv 
    nw_df = pd.read_csv('simple_network.csv')
    
    #Capitalize all column names
    nw_df.columns = [x.upper() for x in nw_df.columns]
    
    #Check if required columns existing in csv
    if all([item in nw_df.columns for item in req_cols]):
        #Keep only required columns
        nw_df = nw_df[req_cols]
        #Create LINK_ID column. Links are uniquely identified by A and B nodes.
        nw_df['LINK_ID'] = nw_df['A'].astype(str) + '-' + nw_df['B'].astype(str)
        #Create coordinates geoseries from wkt
        coordinates = gpd.GeoSeries.from_wkt(nw_df['GEOMETRY'],crs='EPSG:2230')
        #Create geodataframe using geoseries coordinates as geometry
        nw_gdf = gpd.GeoDataFrame(nw_df,geometry=coordinates)
        #Drop wkt representation of geometry
        nw_gdf = nw_gdf.drop(columns='GEOMETRY')
        return nw_gdf
    else:
       print(f"Required columns not present. \nRequired columns: {req_cols}")

In [92]:
nw_gdf = create_network_geodataframe(file_path='simple_network.csv')

In [93]:
#nw_gdf.to_file('simple_network_revised.geojson',driver='GeoJSON')

## Find intersections using geopandas overlay

In [151]:
def find_freeway_network_interchanges(freeway_nw_gdf,ramp_nw_gdf):
    """
    Creates an interchange geodataframe from a freeway network geodataframe and a ramp
    network geodataframe. 
    
    Parameters
    -------------------
    freeway_nw_gdf (Geodataframe):
    Geodataframe must only contain freeway links; freeway_nw_gdf['FT'] == 1
    
    ramp_nw_gdf (Geodataframe):
    Geodataframe must only contain ramp links; ramp_nw_gdf['FT'] == 3
    
    Returns
    -------------------
    Geodataframe object
    
    """
    #Find intersections, keeping only point geom types
    intersection = gpd.overlay(freeway_nw_gdf,ramp_nw_gdf,
                                how='intersection',
                                keep_geom_type=False)
    #Filter for point geometries
    intersection_pts = intersection[intersection.geom_type == 'Point'].copy()
    
    #Filter dataset for ramp start and end ramp points only
    interchanges = intersection_pts[(intersection_pts['A_1'] == intersection_pts['B_2']) | 
                          (intersection_pts['B_1'] == intersection_pts['A_2'])].copy()
    
    #Flag on-ramp and off-ramp
    #Freeway network intersected with ramp network 
    #Ramps are classified as on ramp if A_1 = B_2, and off if B_1 = A_2 
    interchanges['RAMP_TYPE'] = (interchanges
                                 .apply(lambda row: "on" if row['A_1'] == row['B_2'] else "off",axis=1))
    
    return interchanges

In [95]:
#Filter nw gdf to a gdf containing only freeways
fw_nw_gdf = nw_gdf[nw_gdf['FT'] == 1].copy()

In [96]:
#Filter new gdf to a gdf containing only ramps
rmp_nw_gdf = nw_gdf[nw_gdf['FT'] == 3].copy()

In [97]:
interchanges = find_freeway_network_interchanges(fw_nw_gdf,rmp_nw_gdf)

## Determine if node is a freeway to freeway node

In [98]:
def is_node_connected_to_freeway(row,nw_gdf):
    """
    This function uses recusion to traverse the freeway network, using an interchange node geodataframe.
    The function takes accepts a row as a parameter from the interchange dataset, using the 'RAMP_TYPE'
    attribute to determine the direction of the ramp. 
    
    The network is traversed backwards or forwards depending on ramp direction. If "on", the function looks
    backwards at network links using the interchange node 'A' as the starting point. If the ramp direction is 
    "off", the function looks forward using the interchange node 'B' as the starting point. 
    
    Base cases include the following:
    1. Check if ramp is connected to any other links. Return False if not, meaning that the link is not 
    connected to a freeway. 
    2. If ramp is connected to another link, check the link facility type. If 'FT' == 1, return True
    meaning that the ramp link is an freeway interchange. 
    3. If ramp is not connected to another ramp link of 'FT' == 3, return False meaning that the 
    ramp link is not a freeway interchange. 
    
    Recursive case includes the following:
    1. If the ramp is connected to another ramp link, check all ramp links (backwards if on ramp, forward if off)
    using recusion to determine if they are connected to a freeway and return True if connected to a freeway link. 
    Also uses a list to remember which links were already looked at. Solves for cases where ramp link is both an 
    on ramp and an off ramp and cases where off ramp and on ramp links intersect, which would cause an infinate 
    looping backwards or forwards.
    
    Parameters
    -------------------
    row (Geodataframe):
    Single row from interchanges geodataframe. 
    
    nw_gdf (Geodataframe):
    Full network geodataframe containing all network feature types. nw_gdf['FT'].isin([1,2,3,4,5,6,7])
    
    Returns
    -------------------
    Boolean
    """    
    nw_ramp_gdf = nw_gdf[nw_gdf['LINK_ID'] == row['LINK_ID_2']].iloc[0]
    ramp_type = row['RAMP_TYPE']
    inspected_ramps = []
    #freeway_name = row['FWY_NAME']

    def validator(nw_ramp_gdf,ramp_type,inspected_ramps):
        link_id = nw_ramp_gdf['LINK_ID']
        ramp_start = nw_ramp_gdf['A']
        ramp_end = nw_ramp_gdf['B']
        inspected_ramps.append(link_id)
        
        if ramp_type == 'on':
            #Check if ramp is connected to any other links
            if ramp_start not in nw_gdf.B.values:
                return False
            #If connected to another link, check if link is FT = 1 (Freeway)   
            elif 1 in nw_gdf.loc[nw_gdf['B'] == ramp_start,'FT'].values:
                return True
            #If not connected to another ramp link, then not an interchange
            elif 3 not in nw_gdf.loc[nw_gdf['B'] == ramp_start,'FT'].values:
                return False
            #If it is connected to another ramp link, check all ramp links using recursion 
            #to determine if they are connected to a freeway and stop 
            #looking to see if we've already looked at the ramp link.
            else:
                #Look at network for ramp ends, connecting to ramp start from interchanges
                connecting_ramps = nw_gdf[(nw_gdf['B'] == ramp_start) & 
                                          (~nw_gdf['LINK_ID'].isin(inspected_ramps)) &
                                          (nw_gdf['FT'] == 3)]
                #return true of any items in the list returns true else returns false
                return any([validator(row,ramp_type,inspected_ramps) for index,row in connecting_ramps.iterrows()])
        else:
            if ramp_end not in nw_gdf.A.values:
                return False
            elif 1 in nw_gdf.loc[nw_gdf['A'] == ramp_end,'FT'].values:
                return True
            elif 3 not in nw_gdf.loc[nw_gdf['A'] == ramp_end,'FT'].values:
                return False
            else:
                connecting_ramps = nw_gdf[(nw_gdf['A'] == ramp_end) & 
                                          (~nw_gdf['LINK_ID'].isin(inspected_ramps)) &
                                          (nw_gdf['FT'] == 3)]
                return any([validator(row, ramp_type,inspected_ramps) for index,row in connecting_ramps.iterrows()])
    return validator(nw_ramp_gdf,ramp_type,inspected_ramps)

In [99]:
def flag_freeway_interchanges(interchanges,freeway_nw_gdf):
    interchanges['FWY_INTERCHANGE'] = (interchanges
                                          .apply(lambda row: is_node_connected_to_freeway(row,freeway_nw_gdf),
                                                 axis=1))
    return interchanges
    

In [100]:
interchanges_flagged = flag_freeway_interchanges(interchanges,nw_gdf)    

In [101]:
#Export interchange geometry to csv
(interchanges_flagged.loc[interchanges_flagged['FWY_INTERCHANGE'] == True,'geometry']
 .to_csv('freeway_interchanges.csv',index=False))

## Visualize interchanges on freeway and ramp network interactively

In [148]:
m = nw_gdf[nw_gdf['FT'].isin([1,3])].explore(column='FT',
                                                 cmap=['black','lightblue'],
                                                 categorical=True,
                                                 #categories=['Freeway','Ramp'],
                                                 style_kwds={'weight':3.5},
                                                 legend_kwds={'caption':'Facility Type'})

(interchanges_flagged.loc[interchanges_flagged['FWY_INTERCHANGE'] == True,
                          ['A_2','B_2','RAMP_TYPE','geometry']]
 .explore(m=m,
          style_kwds = {'opacity':1,
                        'color':'white',
                        'weight':.5,
                        'fillColor':'orange',
                        'fillOpacity':1},
          marker_kwds = {'radius':5}))
m

In [149]:
m.save('docs/network_interactive_map.html')

In [123]:
interchanges_flagged.to_file('freeway_interchanges.geojson',driver='GeoJSON')