In [4]:
import pandas as pd
import geopandas as gpd
import fiona
import warnings 
warnings.filterwarnings('ignore')

In [6]:
# Load Files
ramp = pd.read_csv(r"C:\Users\agarbier\Downloads\D12_RAMP_20250602.csv")
intx = pd.read_csv(r"C:\Users\agarbier\Downloads\D12_INTX_20250602.csv")
hwy = pd.read_csv(r"C:\Users\agarbier\Downloads\D12_HWY_20250604.csv")


In [8]:
# Weight Network Components
# Create function that adds score as column based on field character
# Repeat for ramp, intersection, and highway
# Sum for each component

In [134]:
hwy['THY_HIGHWAY_ACCESS_CODE'].unique()

array(['Freeway', 'Conventional', 'Expressway', nan], dtype=object)

In [135]:
def score_add(df, type, name, var1, input1, var2, input2, var3, input3, value):
    df[name] = 0
    if type in ['intersection', 'ramp']:
        df[name][(df[var1].isin(input1)) & (df[var2].isin(input2)) & (df[var3].isin(input3))] = value
    elif type == 'separated':
        df[name][(df['THY_HIGHWAY_ACCESS_CODE'].isin(['Freeway','Expressway'])) & (df[var1].isin(input1)) & (df[var2].isin(input2)) & (df[var3].isin(input3))] = value
    elif type == 'atgrage':
        df[name][(df['THY_HIGHWAY_ACCESS_CODE'].isin(['Freeway','Expressway']) == -1) & (df[var1].isin(input1)) & (df[var2].isin(input2)) & (df[var3].isin(input3))] = value

In [None]:
# Create csv that includes reference to name, variable, and inputs for filtering and value for when present
# 

In [139]:
score_add(intx, 'intersection', "calc1", "INX_DESIGN_CODE",['Tee'],"INX_MAIN_LEFT_CHANNEL_CODE",["Painted Channelization"],"INX_MAIN_LANES_AMT",[1,2,3],5)
score_add(intx, 'intersection', "calc2", "INX_DESIGN_CODE",['Four-Legged'],"INX_MAIN_LEFT_CHANNEL_CODE",["Painted Channelization"],"INX_MAIN_LANES_AMT",[1,2,3],2)
score_add(intx, 'intersection', "calc3", "INX_DESIGN_CODE",['Tee'],"INX_MAIN_LEFT_CHANNEL_CODE",["Painted Channelization"],"INX_MAIN_SIGNAL_MAST_ARM_IND",['Y'],1)

intx['intx_points'] = intx[["calc1","calc2","calc3"]].sum(axis = 1)

In [140]:
score_add(ramp, 'ramp', "calc1", "RAM_DESIGN_DESC",['Direct or Semi-direct Connector (Right)'],"RAM_HIGHWAY_GROUP",["Divided Highway"],"RAM_HIGHWAY_GROUP",["Divided Highway"],1)
score_add(ramp, 'ramp', "calc2", "RAM_ON_OFF_CODE",['OFF'],"RAM_HIGHWAY_GROUP",["Divided Highway"],"RAM_HIGHWAY_GROUP",["Divided Highway"],2)
score_add(ramp, 'ramp', "calc3", "RAM_ON_OFF_CODE",['ON'],"RAM_HIGHWAY_GROUP",["Divided Highway"],"RAM_HIGHWAY_GROUP",["Divided Highway"],10)

ramp['ramp_points'] = ramp[["calc1","calc2","calc3"]].sum(axis = 1)

In [142]:
score_add(hwy, 'separated', "calc1", "THY_POPULATION_CODE",['Urbanized'],"THY_TERRAIN_CODE",['Flat', 'Rolling'],"THY_TERRAIN_CODE",['Flat', 'Rolling'],1)
score_add(hwy, 'separated', "calc2", "THY_POPULATION_CODE",['Urbanized'],"THY_TERRAIN_CODE",['Mountainous'],"THY_TERRAIN_CODE",['Mountainous'],2)
score_add(hwy, 'atgrage', "calc3", "THY_POPULATION_CODE",['Rural'],"THY_TERRAIN_CODE",['Mountainous'],"THY_TERRAIN_CODE",['Mountainous'],10)

hwy['road_points'] = hwy[["calc1","calc2","calc3"]].sum(axis = 1)

In [143]:
# Create sliding window to give raw RTE BM EM
# Merge windows with scored segments
# Add points associated with intersections and ramps
# Percent add based on segment overlap for roadway

In [144]:
hwy_sort = hwy[['PMRouteID_Fix','begPM_Fix','endPM_Fix']].sort_values(by = ['PMRouteID_Fix','begPM_Fix'],ascending = True)
hwy_sort['prior_endPM'] = hwy_sort['endPM_Fix'].shift(1)
hwy_sort['prior_RID'] = hwy_sort['PMRouteID_Fix'].shift(1)
hwy_sort['merge'] = 0
hwy_sort['merge'][(hwy_sort['prior_endPM'] == hwy_sort['begPM_Fix']) & (hwy_sort['prior_RID'] == hwy_sort['PMRouteID_Fix'])] = 1

unique_id = 0
merge_id = []

for x in hwy_sort['merge']:
    if x == 0:
        unique_id = unique_id + 1
    merge_id.append(unique_id)

hwy_sort['diss_id'] = merge_id

In [145]:
agg_routes = hwy_sort.groupby(['PMRouteID_Fix','diss_id']).agg({'begPM_Fix': 'min', 'endPM_Fix': 'max'}).reset_index()

In [146]:
SlidingDist = 2
SInterval = 1

# Create walk through each dissolved road. Information about windows is recorded in lists that
# are later processed into dataframe
seg_ID_lst = []
road_diss_lst = []
RID_lst = []
BMP_lst = []
EMP_lst = []
BMP_full = []
EMP_full = []

# Index used to ID each window
i = 1

for index, row in agg_routes.iterrows():
    lengthTemp = row['endPM_Fix'] - row['begPM_Fix']
    BMP = row['begPM_Fix']
    EMP = row['endPM_Fix']

    SlidePostStart = BMP
    SlidePostEnd = BMP + SlidingDist

    # If segment is less than the analysis window
    if lengthTemp <= SlidingDist:
        seg_ID_lst.append(i)
        road_diss_lst.append(row['diss_id'])
        RID_lst.append(row['PMRouteID_Fix'])
        BMP_lst.append(row['begPM_Fix'])
        EMP_lst.append(row['endPM_Fix'])
        BMP_full.append(row['begPM_Fix'])
        EMP_full.append(row['endPM_Fix'])
        i = i + 1

    # If segment is greater than the analysis window but less than the sliding distance plus analysis interval
    # Creates one window from BMP and one working backwards from end of the segment
    elif (lengthTemp > SlidingDist) and (lengthTemp <= (SlidingDist + SInterval)):
        seg_ID_lst.append(i)
        road_diss_lst.append(row['diss_id'])
        RID_lst.append(row['PMRouteID_Fix'])
        BMP_lst.append(row['begPM_Fix'])
        EMP_lst.append(SlidePostEnd)
        BMP_full.append(row['begPM_Fix'])
        EMP_full.append(row['endPM_Fix'])
        i = i + 1

        SlidePostStart = SlidePostStart + SInterval
        seg_ID_lst.append(i)
        road_diss_lst.append(row['diss_id'])
        RID_lst.append(row['PMRouteID_Fix'])
        BMP_lst.append(row['endPM_Fix'] - SlidingDist)
        EMP_lst.append(row['endPM_Fix'])
        BMP_full.append(row['begPM_Fix'])
        EMP_full.append(row['endPM_Fix'])
        i = i + 1

    # If the segment is fits at least two analysis windows offset by the sliding distance
    # Create sliding windows while distance and end with last segment working backwards from end of segment
    elif lengthTemp > (SlidingDist + SInterval):
        while SlidePostEnd < EMP:
            seg_ID_lst.append(i)
            road_diss_lst.append(row['diss_id'])
            RID_lst.append(row['PMRouteID_Fix'])
            BMP_lst.append(SlidePostStart)
            EMP_lst.append(SlidePostEnd)
            BMP_full.append(row['begPM_Fix'])
            EMP_full.append(row['endPM_Fix'])
            i = i + 1
            SlidePostStart = SlidePostStart + SInterval
            SlidePostEnd = SlidePostEnd + SInterval

        seg_ID_lst.append(i)
        road_diss_lst.append(row['diss_id'])
        RID_lst.append(row['PMRouteID_Fix'])
        BMP_lst.append(row['endPM_Fix'] - SlidingDist)
        EMP_lst.append(row['endPM_Fix'])
        BMP_full.append(row['begPM_Fix'])
        EMP_full.append(row['endPM_Fix'])
        i = i + 1

# Combine lists into a dataframe
d = {'Seg_ID': seg_ID_lst, 'PMRouteID_Fix': RID_lst, 'begPM_Fix': BMP_lst, 'endPM_Fix': EMP_lst, 'diss_id': road_diss_lst, \
        'BMP_full': BMP_full, 'EMP_full': EMP_full}
sl_winds = pd.DataFrame(d)

In [147]:
# sl_winds

In [148]:
intx['mp'] = intx[['begPM_Fix','endPM_Fix']].mean(axis = 1)
ramp['mp'] = ramp[['begPM_Fix','endPM_Fix']].mean(axis = 1)
hwy['start'] = hwy['begPM_Fix']
hwy['end'] = hwy['endPM_Fix']
sl_winds['length'] = (sl_winds['endPM_Fix'] - sl_winds['begPM_Fix'])/SlidingDist


In [149]:
sl_intx = sl_winds.merge(intx[['PMRouteID_Fix','mp','intx_points']], how = 'left')
sl_intx = sl_intx[(sl_intx['mp'] >= sl_intx['begPM_Fix']) & (sl_intx['mp'] <= sl_intx['endPM_Fix'])]

sl_ramp = sl_winds.merge(ramp[['PMRouteID_Fix','mp','ramp_points']], how = 'left')
sl_ramp = sl_ramp[(sl_ramp['mp'] >= sl_ramp['begPM_Fix']) & (sl_ramp['mp'] <= sl_ramp['endPM_Fix'])]

sl_hwy = sl_winds.merge(hwy[['PMRouteID_Fix','start','end','road_points']], how = 'left')
sl_hwy['road_points_scale'] = 0
sl_hwy['road_points_scale'][(sl_hwy['start'] <= sl_hwy['begPM_Fix']) & (sl_hwy['end'] >= sl_hwy['endPM_Fix'])] = sl_hwy['road_points'] * sl_hwy['length']
sl_hwy['road_points_scale'][(sl_hwy['start'] >= sl_hwy['begPM_Fix']) & (sl_hwy['end'] >= sl_hwy['endPM_Fix'])] = sl_hwy['road_points'] * (sl_hwy['endPM_Fix'] - sl_hwy['start'])
sl_hwy['road_points_scale'][(sl_hwy['start'] <= sl_hwy['begPM_Fix']) & (sl_hwy['end'] <= sl_hwy['endPM_Fix'])] = sl_hwy['road_points'] * (sl_hwy['end'] - sl_hwy['begPM_Fix'])
sl_hwy['road_points_scale'][(sl_hwy['end'] <= sl_hwy['begPM_Fix']) | (sl_hwy['start'] >= sl_hwy['endPM_Fix'])] = 0

In [150]:
sl_winds_wintx = sl_winds.merge(sl_intx.groupby('Seg_ID')['intx_points'].sum(), on = 'Seg_ID', how = 'left').fillna(0)
sl_winds_wir = sl_winds_wintx.merge(sl_ramp.groupby('Seg_ID')['ramp_points'].sum(), on = 'Seg_ID', how = 'left').fillna(0)
sl_winds_wirh = sl_winds_wir.merge(sl_hwy.groupby('Seg_ID')['road_points_scale'].sum(), on = 'Seg_ID', how = 'left').fillna(0)

In [151]:
sl_winds_wirh['score'] = sl_winds_wirh[['intx_points','ramp_points','road_points_scale']].sum(axis = 1)

0      0.0
1      0.0
2      0.0
3      0.0
4      0.0
      ... 
720    0.0
721    0.0
722    0.0
723    0.0
724    0.0
Name: intx_points, Length: 725, dtype: float64