In [None]:
import pandas as pd
import os
import shutil
import datetime

In [None]:
def spark_table_gen(df, table_name, mode='append'):
    
    spark_df = spark.createDataFrame(df)

    spark_df.write.format('delta').mode(mode).save(lakehouse_path + '/Tables/' + table_name)

    # Create a table in the lakehouse that references the existing data
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
        USING DELTA
        LOCATION '{lakehouse_path}'
    """)

def extract_movement(full_movement):
    movement_parts = full_movement.split(' ')
    lane = movement_parts[1]
    approach = movement_parts[3].replace(',', '')
    movement = movement_parts[4]
    return lane, approach, movement

In [None]:
# Microsoft Fabric lakehouse settings
app_name = "tahoe"
lakehouse_path = os.getenv('tahoe_lakehouse_path')

spark = SparkSession.builder.appName(app_name).getOrCreate()

In [None]:
unprocessed_dir = '/lakehouse/default/Files/Unprocessed/'
processed_dir = '/lakehouse/default/Files/Processed/'

vru_approach_dict = {
    'NB': 'S',
    'SB': 'N',
    'EB': 'W',
    'WB': 'E'
}

class_dict = {
    'Mobility Aid': 'Pedestrian',
    'Motorcycle': 'Passenger Vehicle',
    'Articulated Truck': 'Semi Truck',
    'Single Unit Truck': 'Box Truck',
    'Person Mobility Device': 'Pedestrian'
}

severity_dict = {
    'High': 'Severe',
    'Low': 'Moderate'
}

default_date = '1900-01-01'

### Volume Processing

In [None]:
source_dir = os.path.join(unprocessed_dir, 'DERQ/Volumes')

dfs = []

for file in os.listdir(source_dir):
    if 'csv' in file:
        # get intersection id from file name
        intersection_id = file
        # read csv file to df
        file_path = os.path.join(source_dir, file)
        df = pd.read_csv(file_path)
        df['intersection_id'] = intersection_id
        dfs.append(df)

combined_volume = pd.concat(dfs, ignore_index=True)

In [None]:
# get times
combined_volume[['time', 'end_time']] = combined_volume['timeInterval'].str.split(' - ', expand=True)
combined_volume['time'] = pd.to_datetime(default_date + ' ' + combined_volume['time'])

# get dates as datetime
combined_volume['date'] = pd.to_datetime(combined_volume['date'])
combined_volume['date'] = combined_volume['date'].dt.date

# drop and rename excess columns
combined_volume.drop(columns=['timeInterval', 'movement', 'dayOfTheWeek', 'end_time'], inplace=True)
combined_volume.rename(columns={'movementType': 'movement', 'count': 'volume'}, inplace=True)

combined_volume['class'] = combined_volume['class'].str.replace('_', ' ').str.title()
combined_volume['class'] = combined_volume['class'].replace(class_dict)

mask = combined_volume['movement'] == 'CROSSING'
combined_volume.loc[mask, 'approach'] = combined_volume.loc[mask, 'approach'].map(vru_approach_dict)

combined_volume

In [None]:
table_name = 'derq_volume_fact_table'
spark_table_gen(combined_volume, table_name)

### Event processing

In [None]:
source_dir = os.path.join(unprocessed_dir, 'DERQ/Events')

event_dfs = []

for file_name in os.listdir(source_dir):
    file_path = os.path.join(source_dir, file_name)

    # read csv file to df
    df = pd.read_csv(file_path)
    event_dfs.append(df)

combined_events = pd.concat(event_dfs, ignore_index=True)
combined_events.head(5)

In [None]:
# rename id to event_id  
combined_events.rename(columns={'id': 'event_id'}, inplace=True)

# split date into date and time  
combined_events['datetime'] = pd.to_datetime(combined_events['datetime'])
combined_events['date'] = combined_events['datetime'].dt.date

# strip seconds from time 
combined_events['time_to_second'] = combined_events['datetime'].dt.strftime('%H:%M:%S')
combined_events['time'] = combined_events['datetime'].dt.strftime('%H:%M')
combined_events['time'] = combined_events['time'] + ':00'

# re-set time cols as datetimes
combined_events['time'] = pd.to_datetime(default_date + ' ' + combined_events['time'])
combined_events['time_to_second'] = pd.to_datetime(default_date + ' ' + combined_events['time_to_second'])

# drop excess columns
combined_events.drop(columns=['datetime'], inplace=True)
combined_events.rename(columns={'speed': 'speed_mph'}, inplace=True)

combined_events['speed_mph'] = combined_events['speed_mph'].str.extract(r'(\d+)').astype(float)

combined_events.rename(columns={'isSevere': 'severity'}, inplace=True)
combined_events['severity'] = combined_events['severity'].map(severity_dict)
combined_events.loc[combined_events['event_type'] == 'Illegal Crossing', 'severity'] = 'Low'

In [None]:
table_name = 'derq_event_fact_table'
spark_table_gen(combined_events, table_name)