*Technical University of Munich<br>
Professorship of Environmental Sensing and Modeling<br><br>*
**Author:**  Daniel Kühbacher<br>
**Date:**  07.05.2024

--- 

# Detector Emission Calculation

<!--Notebook description and usage information-->
HBEFA provides different emission factors for different traffic situations e.g. Freeflow, Saturated or Stop&Go. These traffic situations are characterized by different average speeds of the vehicles, which is also provided by the traffic detectors.


In [25]:
import sys

import pandas as pd
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
import seaborn as sns

from datetime import time

sys.path.append('../utils')
import data_paths
from hbefa_hot_emissions import HbefaHotEmissions

# Notebook Settings

In [26]:
# year of investigation
year = 2019

# Define VISUM filename
visum_filename = "visum_links.GPKG"

#Define Counting Data filename
cnt_data_filename  = 'preprocessed_lhm_counting_data_until2023.parquet'

# Define vehicle classes and components
vehicle_classes = ['PC', 'LCV', 'HGV', 'BUS', 'MOT']
components = ['CO2(rep)', 'CO2(total)', 'NOx', 'CO']

###
#
# Save Data as parquet file
#
##

save_results = True
save_filepath = data_paths.INVENTORY_FOLDER_PATH + 'DetectorEmissions_2019.feather'

# Notebook Functions

In [27]:
# METHOD 1: Use road type independent speed thresholds as proposed by Cox & Notter 2019

def get_speed_los(allowed_speed: float, 
                  actual_speed: float) -> str:
    """Returns traffic condition based on allowed and measured speed

    Args:
        allowed_speed (float): Signaled speed at the road link
        actual_speed (float): True measured speed at the road link

    Returns:
        str: Traffic condition based on the speed value
    """
    
    # source: Cox & Notter 2019
    speed_limit = [30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140]
    freeflow = [28.0, 34.0, 41.0, 54.0, 62.0, 72.0, 83.0, 92.0, 106.0, 117.0, 127.0, 135.0]
    heavy = [20.2, 25.3, 29.0, 37.9, 45.1, 52.2, 60.0, 66.3, 77.1, 87.1, 95.3, 108.6]
    saturated = [12.4, 15.5, 17.1, 19.8, 23.0, 25.5, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0]
    stopandgo = [7.6, 8.7, 9.2, 9.2, 10.1, 10.5, 11.0, 11.0, 11.0, 11.0, 11.0, 11.0]
    v_thresholds = pd.DataFrame(index = speed_limit, 
                                data = {'Freeflow': freeflow,
                                        'Heavy': heavy,
                                        'Satur.': saturated,
                                        'St+Go': stopandgo})
    
    # convert allowed speed to closest speed in Hbefa
    speed_diff = np.abs((np.array(speed_limit) - allowed_speed))
    hbefa_speed = speed_limit[np.argmin(speed_diff)]
    
    # select speed thesholds depending on the allowed speed
    speed_thresholds = np.array(v_thresholds.loc[hbefa_speed])
    
    # select the LOS based on the actual speed
    for i, threshold in enumerate(speed_thresholds):
        if actual_speed >= threshold:
            return v_thresholds.columns[i]
    return 'St+Go2'

In [28]:
# METHOD 2: exact road and speed dependend emission factors
# Data source: HBEFA 4.1

# load speed thresholds for different road types from HBEFA exported xls file
path = data_paths.EF_path + 'ef_eval/v_per_TraSit.XLS'
ef_speed_thresholds = pd.read_excel(path)
ef_speed_thresholds['area_type'] = ef_speed_thresholds.apply(lambda row: row['TS'].split('/')[0], axis = 1)
ef_speed_thresholds['road_type'] = ef_speed_thresholds.apply(lambda row: row['TS'].split('/')[1], axis = 1)
ef_speed_thresholds['speed'] = ef_speed_thresholds.apply(lambda row: row['TS'].split('/')[2], axis = 1)
ef_speed_thresholds['TraSit'] = ef_speed_thresholds.apply(lambda row: row['TS'].split('/')[3], axis = 1)


def get_true_los(allowed_speed: float,
                 measured_speed: float,
                 road_type: str,
                 area_type: str = 'URB') -> str:
     """Returns traffic condition based on allowed and measured speed

     Args:
          allowed_speed (float): Signaled speed at the road link
          measured_speed (float): True measured speed at the road link
          road_type (str): HBEFA Road Type (e.g. 'Distributor/Secondary')
          area_type (str, optional): Defaults to 'URB'.

     Returns:
          str: returns the traffic condition based on the speed value
     """
     df = ef_speed_thresholds
     df_sub = df[(df['RoadCat'] == 'Urban') &
          (df['road_type'] == road_type) &
          (df['speed'] == str(allowed_speed)) & 
          (df['area_type'] == area_type)]
     absolute_diff = (df_sub['pass_ car'] - measured_speed).abs()
     return df.loc[absolute_diff.idxmin(), 'TS']



In [17]:
def calculate_emissions(year: int,
                        vehicle_class: str,
                        vehicle_volume: int,
                        TraSit: str,
                        hbefa_gradient: str,
                        component: str, 
                        hbefa_class: HbefaHotEmissions) -> float:
    """Calculates the emissions for a single vehicle class based on the given traffic volume and hbefa Traffic Situation

    Args:
        year (int): Year of investigation
        vehicle_class (str): Vehicle Class
        vehicle_volume (int): Traffic volume of the respective vehicle class
        TraSit (str): Traffic situation 
        hbefa_gradient (str): Road gradient
        component (str): Emission component
        hbefa_class (HbefaHotEmissions): pre-initilized HBEFA object

    Returns:
        float: Emission estimate for the given input parameters
    """
    
    try:
        ef = hbefa_class.ef_dict[vehicle_class]['EFA_weighted']\
            [year, TraSit, hbefa_gradient, component]
    except KeyError:
        ef = hbefa_class.ef_dict[vehicle_class]['EFA_weighted']\
            [year, TraSit, '0%', component]
    return vehicle_volume * ef


## Import Data

In [29]:
# import speed and volume data from traffic counting stations
visum_links = gpd.read_file(data_paths.VISUM_FOLDER_PATH + visum_filename)
# raw speed and counting data from LHM counting stations
cnt_lhm = pd.read_parquet(data_paths.MST_COUNTING_PATH + cnt_data_filename)

# subselect counting data for links that are in the visum network
cnt_lhm = cnt_lhm[cnt_lhm['road_link_id'].isin(visum_links['road_link_id'].unique())]
cnt_lhm = cnt_lhm[cnt_lhm['date'].between(f'{year}-01-01', f'{year}-12-31')].copy() # reduce to 2019 data

# import hbefa emission module
hbefa = HbefaHotEmissions()

Loaded emission factors from /Users/daniel_tum/Documents/projects/traffic inventory v2/traffic-emission-inventory/data/restricted_input/hbefa/EFA_HOT_Vehcat_PC.XLS
Loaded emission factors from /Users/daniel_tum/Documents/projects/traffic inventory v2/traffic-emission-inventory/data/restricted_input/hbefa/EFA_HOT_Vehcat_LCV.XLS
Loaded emission factors from /Users/daniel_tum/Documents/projects/traffic inventory v2/traffic-emission-inventory/data/restricted_input/hbefa/EFA_HOT_Vehcat_HGV.XLS
Loaded emission factors from /Users/daniel_tum/Documents/projects/traffic inventory v2/traffic-emission-inventory/data/restricted_input/hbefa/EFA_HOT_Vehcat_Coach.XLS
Loaded emission factors from /Users/daniel_tum/Documents/projects/traffic inventory v2/traffic-emission-inventory/data/restricted_input/hbefa/EFA_HOT_Vehcat_MOT.XLS
Loaded emission factors from /Users/daniel_tum/Documents/projects/traffic inventory v2/traffic-emission-inventory/data/restricted_input/hbefa/ef_aggregated_los/EFA_HOT_Vehcat

## Prepare dataframe with information on each detector.

In [36]:
_det = cnt_lhm.set_index('detector_id')[['road_link_id']]\
    .reset_index()\
    .drop_duplicates()

det_info = pd.merge(_det, visum_links[['road_type', 'hbefa_gradient',
                                       'hbefa_speed', 'speed', 'road_link_id']], 
                    left_on = 'road_link_id', 
                    right_on = 'road_link_id', 
                    how = 'inner').drop_duplicates()

# set duplicates in road gradient to 0%
det_info = det_info.groupby('detector_id').agg({'road_link_id': 'first', 
                                                'road_type': 'first', 
                                                'speed': 'first', 
                                                'hbefa_speed': 'first', 
                                                'hbefa_gradient': lambda x: x.iloc[0] if len(x)==1 else '0%'})
det_info.head()

Unnamed: 0_level_0,road_link_id,road_type,speed,hbefa_speed,hbefa_gradient
detector_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
4010011,80645.0,TrunkRoad/Primary-City,50,50,0%
4010012,80645.0,TrunkRoad/Primary-City,50,50,0%
4010013,80645.0,TrunkRoad/Primary-City,50,50,0%
4010014,80645.0,TrunkRoad/Primary-City,50,50,0%
4010021,80645.0,TrunkRoad/Primary-City,50,50,0%


# Prepare speed and volume dataframe

In [31]:
hour_value_columns = [str(x) for x in range(0,24)]

_cnt = cnt_lhm.melt(id_vars = ['date', 'detector_id', 'vehicle_class', 'metric'],
                         value_vars =hour_value_columns)

_cnt['timestamp'] = _cnt.apply(lambda row: pd.Timestamp.combine(row['date'],
                                                                time(int(row['variable']))), axis =1)
_cnt = _cnt[(_cnt['vehicle_class']!='SUM')]

In [32]:
# prepare speed dataset
_cnt_speed = _cnt[(_cnt['metric'] == 'speed') &
                  (_cnt['value'] != 0)][['detector_id',
                                         'timestamp',
                                         'value']]
_cnt_speed = _cnt_speed.rename(columns = {'value': 'measured_speed'})

# prepare volume dataset
_cnt_volume = _cnt[_cnt['metric'] == 'volume']
_cnt_volume = _cnt_volume.pivot(index = ['detector_id', 'timestamp'],
                                columns = 'vehicle_class',
                                values = 'value')
_cnt_volume['SUM_PCU'] = _cnt_volume.mul(pd.Series(hbefa.car_unit_factors)).sum(axis = 1)

# combine speed and volume dataset
cnt_base = pd.merge(_cnt_speed, _cnt_volume,
                    left_on = ['detector_id', 'timestamp'], 
                    right_index = True, 
                    how = 'inner')

# add detector information
detector_dat = pd.merge(cnt_base, det_info,
                        left_on='detector_id',
                        right_on='detector_id')
detector_dat.head()

Unnamed: 0,detector_id,timestamp,measured_speed,BUS,HGV,LCV,MOT,PC,SUM_PCU,road_link_id,road_type,speed,hbefa_speed,hbefa_gradient
0,4010011,2019-01-03,48.0,0.0,2.0,1.0,0.0,37.0,43.0,80645.0,TrunkRoad/Primary-City,50,50,0%
1,4010011,2019-01-04,50.0,0.0,0.0,2.0,0.0,58.0,60.0,80645.0,TrunkRoad/Primary-City,50,50,0%
2,4010011,2019-01-05,48.0,0.0,4.0,3.0,0.0,84.0,97.0,80645.0,TrunkRoad/Primary-City,50,50,0%
3,4010011,2019-01-06,44.0,0.0,0.0,7.0,0.0,69.0,76.0,80645.0,TrunkRoad/Primary-City,50,50,0%
4,4010011,2019-01-07,49.0,0.0,2.0,2.0,0.0,33.0,40.0,80645.0,TrunkRoad/Primary-City,50,50,0%


# Calculate Traffic Situation by speed distribution

In [22]:
detector_dat['TraSit']= detector_dat.apply(lambda row: get_speed_los(actual_speed= row['measured_speed'],
                                                                     allowed_speed= row['hbefa_speed']),
                                           axis = 1)

detector_dat['hbefa_road_type'] = detector_dat.apply(lambda row: hbefa.hbefa_road_abbreviations[row['road_type']],
                                                     axis = 1)

detector_dat['hbefa_trasit_string'] = 'URB/' + detector_dat['hbefa_road_type'] \
    + '/' + detector_dat['hbefa_speed'].astype('str') + '/' + detector_dat['TraSit']

# Calculate emissions for each detector


In [23]:
for c in components: 
    for vc in vehicle_classes:
        detector_dat[f'{vc}_{c}'] = detector_dat.apply(lambda row: calculate_emissions(year = year,
                                                                                   vehicle_class=vc,
                                                                                   vehicle_volume= row[vc],
                                                                                   TraSit=row['hbefa_trasit_string'],
                                                                                   hbefa_gradient=row['hbefa_gradient'],
                                                                                   component=c,
                                                                                   hbefa_class=hbefa), axis = 1)
detector_dat.head()

Unnamed: 0,detector_id,timestamp,measured_speed,BUS,HGV,LCV,MOT,PC,SUM_PCU,road_link_id,...,PC_NOx,LCV_NOx,HGV_NOx,BUS_NOx,MOT_NOx,PC_CO,LCV_CO,HGV_CO,BUS_CO,MOT_CO
0,4010011,2019-01-03,48.0,0.0,2.0,1.0,0.0,37.0,43.0,80645.0,...,11.278339,0.710967,3.999327,0.0,0.0,7.79913,0.200761,2.168188,0.0,0.0
1,4010011,2019-01-04,50.0,0.0,0.0,2.0,0.0,58.0,60.0,80645.0,...,17.679558,1.421935,0.0,0.0,0.0,12.225663,0.401522,0.0,0.0,0.0
2,4010011,2019-01-05,48.0,0.0,4.0,3.0,0.0,84.0,97.0,80645.0,...,25.604878,2.132902,7.998654,0.0,0.0,17.706133,0.602283,4.336376,0.0,0.0
3,4010011,2019-01-06,44.0,0.0,0.0,7.0,0.0,69.0,76.0,80645.0,...,21.032578,4.976772,0.0,0.0,0.0,14.544323,1.405327,0.0,0.0,0.0
4,4010011,2019-01-07,49.0,0.0,2.0,2.0,0.0,33.0,40.0,80645.0,...,10.059059,1.421935,3.999327,0.0,0.0,6.955981,0.401522,2.168188,0.0,0.0


## Save Results

In [24]:
if save_results: 
    detector_dat.to_feather(save_filepath)