In [4]:
import os
import sys
import pandas as pd
import datetime as dt
import pickle
import importlib
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.window import Window
from pyspark import SparkConf
from pyspark.sql.types import TimestampType, DateType,DoubleType,FloatType,IntegerType,StringType,StructType,ArrayType,StructField
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from scipy.stats import zscore
from pyspark.sql.types import IntegerType, StringType, FloatType
import numpy as np
import gtfs_kit as gk
import time
import math

pd.set_option('display.max_columns', None)

In [2]:
spark = SparkSession.builder.config('spark.executor.cores', '8').config('spark.executor.memory', '40g')\
        .config("spark.sql.session.timeZone", "UTC").config('spark.driver.memory', '20g').master("local[26]")\
        .appName("wego-daily").config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC').config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')\
        .config("spark.sql.datetime.java8API.enabled", "true").config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .getOrCreate()

22/08/04 02:43:03 WARN Utils: Your hostname, scope-vanderbilt resolves to a loopback address: 127.0.1.1; using 10.2.218.69 instead (on interface enp8s0)
22/08/04 02:43:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/04 02:43:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/04 02:43:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/04 02:43:04 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
# recalculate load
def get_derived_load(stops_in):
    stops = stops_in.sort_values(by=['scheduled_time'])
    stops = stops.iloc[1:len(stops)-1]
    last_load = stops.iloc[0]['load']
    derived_load = [last_load]
    for k in range(1, len(stops)):
        cur_stop = stops.iloc[k]
        cur_load = last_load + cur_stop['ons'] - cur_stop['offs']
        derived_load.append(cur_load)
        last_load = cur_load
    stops['derived_load'] = derived_load
    return stops

def timestr_to_seconds(timestr):
    temp = [int(x) for x in timestr.split(":")]
    hour, minute, second = temp[0], temp[1], temp[2]
    return second + minute*60 + hour*3600

def timestr_to_hour(timestr):
    temp = [int(x) for x in timestr.split(":")]
    hour, minute, second = temp[0], temp[1], temp[2]
    return hour

def get_days_of_week(week_arr):
    daysofweek = []
    if week_arr[0] == 1:
        daysofweek.append(1)
    if week_arr[1] == 1:
        daysofweek.append(2)
    if week_arr[2] == 1:
        daysofweek.append(3)
    if week_arr[3] == 1:
        daysofweek.append(4)
    if week_arr[4] == 1:
        daysofweek.append(5)
    if week_arr[5] == 1:
        daysofweek.append(6)
    if week_arr[6] == 1:
        daysofweek.append(7)
    return daysofweek

def seconds_to_timestr(seconds, format='%H:%M:%S'):
    return time.strftime(format, time.gmtime(seconds))

# APC data
* Don't try to load the entire dataset to pandas unless you are on a device with ~100GB RAM.
* You can select columns and limit the dates just to see the data:
    * `apcdata.columns`: to see column names
    * To select certain columns/filter dates
    ```
        query=f"""SELECT transit_date, trip_id, vehicle_id,
                    block_abbr, arrival_time, 
                    scheduled_time, departure_time,
                    block_stop_order, load, load_factor,
                    map_latitude, map_longitude, offs, ons, overload_id,
                    pattern_num, route_direction_name, route_id,
                    stop_id, stop_name, stop_sequence, year, month,
                    day, hour, dayofweek
                FROM apc
                WHERE (transit_date >= '{date_range[0]} 00:00:00') AND (transit_date < '{date_range[1]} 00:00:00')"""
    ```
* Column information: Blanks mean they are not used at all (so far)
    * `transit_date`: Trip date %Y-%m-%d
    * `trip_id`: Trip `
    * `vehicle_id`: Vehicle id, one vehicle can travel many blocks and trip
    * `block_abbr`: (str) Block`
    * `activation_date`:
    * `activation_date_str`: %Y-%m-%d
    * `arrival_time`: (datetime) Time the bus arrived at the stop
    * `arrival_time_str`: (str) %Y-%m-%d
    * `block_stop_order`:(int) Order of blocks
    * `deactivation_date`:
    * `deactivation_date_str`:
    * `departure_time`: (datetime) Time the bus left the stop
    * `departure_time_str`: (str) %Y-%m-%d
    * `load`: (int) Total passengers in the bus (ons - offs)
    * `load_factor`: (float)
    * `map_latitude`: (float)
    * `map_longitude`: (float)
    * `offs`: (int) People alighting
    * `ons`: (int) People boarding
    * `overload_id`: (int) 0 or 1, Whether the current vehicle was used as an overload bus
    * `pattern_num`:
    * `route_direction_name`: (str) Whether it goes TO or FROM Downtown
    * `route_id`: (int)
    * `scheduled_time`: (datetime) Time the bus was scheduled to arrive at the stop
    * `scheduled_time_str`: (str)  %Y-%m-%d
    * `source_pattern_id`:
    * `stop_id`: (str) Stop name abbreviation
    * `stop_id_list`: (list: str) Stop names (abbr) in order
    * `stop_id_original`: (str) Stop name, longer than stop_id but still shortened
    * `stop_name`: (str) complete stop name
    * `stop_sequence`: (int) Number of the stop in the stop sequence
    * `stop_sequence_list`: (list: int)
    * `transit_date_str`: (str) %m/%d/%y (08/01/21)
    * `update_date`:
    * `vehicle_capacity`: (int) Vehicle capacity, although not too reliable as many vehicles have NaN values
    * `zero_load_at_trip_end`:
    * `year`: (int)
    * `month`: (int)

In [None]:
# load the APC data
filepath = os.path.join("../data", "apc", "cleaned-wego-daily.apc.parquet")
apcdata = spark.read.load(filepath)

# add day and hour of day
apcdata = apcdata.withColumn('day', F.dayofmonth(apcdata.transit_date))
apcdata = apcdata.withColumn('hour', F.hour(apcdata.arrival_time))
apcdata = apcdata.withColumn('dayofweek', F.dayofweek(apcdata.transit_date)) # 1=Sunday, 2=Monday ... 7=Saturday
apcdata.createOrReplaceTempView("apc")

query = f"""
SELECT *
FROM apc
"""
apcdata = spark.sql(query)

# remove bad trips
todelete = apcdata.filter('(ons IS NULL) OR (offs IS NULL) OR (load IS NULL)').select('transit_date','trip_id','overload_id').distinct()
todelete=todelete.withColumn('marker',F.lit(1))
apcdata=apcdata.join(todelete,on=['transit_date', 'trip_id', 'overload_id'], how='left').filter('marker is null').drop('marker')

# remove trips with less then 5 stops
todelete = apcdata.groupby('transit_date', 'trip_id', 'overload_id').count().filter("count < 4")
todelete=todelete.withColumn('marker',F.lit(1))
apcdata=apcdata.join(todelete,on=['transit_date', 'trip_id', 'overload_id'], how='left').filter('marker is null').drop('marker')

# Weather - darksky

In [11]:
filepath = os.path.join("../data", "weather", "darksky_nashville_20220406.csv")
darksky = pd.read_csv(filepath)
darksky.head(1)

Unnamed: 0,agency,apparent_temperature,cloud_cover,dew_point,humidity,icon,nearest_storm_distance,ozone,precipitation_intensity,precipitation_probability,pressure,summary,temperature,time,uv_index,visibility,wind_bearing,wind_gust,wind_speed
0,darksky,75.99,0.96,58.35,0.54,cloudy,0,328.1,0.0,0.0,1020.7,Overcast,75.99,1621367859,4,10.0,155,22.15,11.4


In [None]:
filepath = os.path.join("../data", "weather", "darksky_nashville_20220406.csv")
darksky = pd.read_csv(filepath)
# GMT-5
darksky['datetime'] = darksky['time'] - 18000
darksky['datetime'] = pd.to_datetime(darksky['datetime'], infer_datetime_format=True, unit='s')
darksky = darksky.set_index(darksky['datetime'])
# darksky = darksky.sort_index().loc[date_range[0]:date_range[1]]
darksky['year'] = darksky['datetime'].dt.year
darksky['month'] = darksky['datetime'].dt.month
darksky['day'] = darksky['datetime'].dt.day
darksky['hour'] = darksky['datetime'].dt.hour
val_cols= ['temperature', 'humidity', 'nearest_storm_distance', 'precipitation_intensity', 'precipitation_probability', 'pressure', 'wind_gust', 'wind_speed']
join_cols = ['year', 'month', 'day', 'hour']
darksky = darksky[val_cols+join_cols]
renamed_cols = {k: f"darksky_{k}" for k in val_cols}
darksky = darksky.rename(columns=renamed_cols)
darksky = darksky.groupby(['year', 'month', 'day', 'hour']).mean().reset_index()
darksky=spark.createDataFrame(darksky)
darksky.createOrReplaceTempView("darksky")

# join apc and darksky
apcdata = apcdata.join(darksky,on=['year', 'month', 'day', 'hour'], how='left')

# Weather - weatherbit

In [12]:
filepath = os.path.join("../data", "weather", "weatherbit_weather_2010_2022.parquet")
weatherbit = spark.read.load(filepath)
weatherbit.show(1, vertical=True, truncate=False)



-RECORD 0---------------------------------------
 station_id               | 720259-63844        
 start_date_st            | 2016-02-05          
 end_date_st              | 2016-02-12          
 timestamp_local          | 2010-01-01 00:00:00 
 rh                       | 75.0                
 wind_spd                 | 2.1                 
 timestamp_utc            | 2010-01-01 05:00:00 
 pod                      | n                   
 slp                      | 1019.0              
 app_temp                 | 4.4                 
 elev_angle               | -75.42              
 solar_rad                | 0.0                 
 pres                     | 935.0005            
 h_angle                  | null                
 dewpt                    | 1.0                 
 snow                     | 0.0                 
 uv                       | 0.0                 
 azimuth                  | 324.18              
 wind_dir                 | 350.0               
 ghi                

                                                                                

In [None]:
# load weatherbit
filepath = os.path.join("../data", "weather", "weatherbit_weather_2010_2022.parquet")
weatherbit = spark.read.load(filepath)

weatherbit = weatherbit.filter("(spatial_id = 'Berry Hill') OR (spatial_id = 'Belle Meade')")
weatherbit.createOrReplaceTempView("weatherbit")
query = f"""
SELECT *
FROM weatherbit
"""
# WHERE (timestamp_local >= '{date_range[0]} 23:00:00') AND (timestamp_local < '{date_range[1]} 00:00:00')
weatherbit = spark.sql(query)

weatherbit = weatherbit.withColumn('year', F.year(weatherbit.timestamp_local))
weatherbit = weatherbit.withColumn('month', F.month(weatherbit.timestamp_local))
weatherbit = weatherbit.withColumn('day', F.dayofmonth(weatherbit.timestamp_local))
weatherbit = weatherbit.withColumn('hour', F.hour(weatherbit.timestamp_local))
weatherbit = weatherbit.select('year', 'month', 'day', 'hour', 'rh', 'wind_spd', 'slp', 'app_temp', 'temp', 'snow', 'precip')
weatherbit = weatherbit.groupBy('year', 'month', 'day', 'hour').agg(F.mean('rh').alias('weatherbit_rh'), \
                                                                    F.mean('wind_spd').alias('weatherbit_wind_spd'), \
                                                                    F.mean('app_temp').alias('weatherbit_app_temp'), \
                                                                    F.mean('temp').alias('weatherbit_temp'), \
                                                                    F.mean('snow').alias('weatherbit_snow'), \
                                                                    F.mean('precip').alias('weatherbit_precip')
                                                                   )
weatherbit = weatherbit.sort(['year', 'month', 'day', 'hour'])

# join apc and weatherbit
apcdata=apcdata.join(weatherbit,on=['year', 'month', 'day', 'hour'], how='left')

# Join with GTFS

In [13]:
filepath = os.path.join("../data", "gtfs", "alltrips_mta_wego.parquet")
alltrips = spark.read.load(filepath)
alltrips.show(1, vertical=True, truncate=False)



-RECORD 0-----------------------------------------------
 trip_id               | 132152                         
 arrival_time          | 17:10:00                       
 bikes_allowed         | 0                              
 block_id              | b_14613                        
 departure_time        | 17:10:00                       
 direction_id          | 0                              
 drop_off_type         | null                           
 gtfs_file             | 30-March-2017.zip              
 location_type         | 0.0                            
 parent_station        | null                           
 pickup_type           | 1.0                            
 route_id              | 19                             
 route_long_name       | HERMAN                         
 service_id            | 3_merged_136582                
 shape_dist_traveled   | 9.0812                         
 shape_id              | 10625                          
 stop_code             | MCC5_8

                                                                                

In [None]:
filepath = os.path.join(os.getcwd(), "data", "static_gtfs", "alltrips_mta_wego.parquet")
alltrips = spark.read.load(filepath)

# add gtfs_file, gtfs_shape_id, gtfs_route_id, gtfs_direction_id, gtfs_start_date, gtfs_end_date, gtfs_date
gtfstrips = alltrips.select('trip_id','date','gtfs_file', 'shape_id', 'route_id', 'direction_id', 'start_date', 'end_date').distinct()
gtfstrips = gtfstrips.withColumnRenamed('shape_id', 'gtfs_shape_id')\
                     .withColumnRenamed('route_id', 'gtfs_route_id')\
                     .withColumnRenamed('direction_id', 'gtfs_direction_id')\
                     .withColumnRenamed('start_date','gtfs_start_date')\
                     .withColumnRenamed('end_date','gtfs_end_date')

# Some GTFS are outdated?, add transit_date, and trip_id
rantrips = apcdata.select('transit_date','trip_id').distinct().join(gtfstrips, on='trip_id', how='left').filter('transit_date >= date')
rantrips_best_gtfs_file = rantrips.groupby('transit_date','trip_id').agg(F.max('date').alias('date'))
# Inner assures no NaN
rantrips = rantrips.join(rantrips_best_gtfs_file, on=['transit_date','trip_id','date'], how='inner').withColumnRenamed('date', 'gtfs_date')
# Essentilly rantrips is just the GTFS data with transit_id and trip_id (matched from the apcdata)
apcdata = apcdata.join(rantrips,on=['transit_date','trip_id'], how='left')

In [None]:
# get scheduled number of vehicles on route at the given hour

alltrips = alltrips.withColumnRenamed('route_id', 'gtfs_route_id')\
                   .withColumnRenamed('date', 'gtfs_date')\
                   .withColumnRenamed('direction_id', 'gtfs_direction_id')

timestrToSecondsUDF = F.udf(lambda x: timestr_to_seconds(x), IntegerType())
alltrips = alltrips.withColumn("time_seconds", timestrToSecondsUDF(F.col('arrival_time')))

timestrToHourUDF = F.udf(lambda x: timestr_to_hour(x), IntegerType())
alltrips = alltrips.withColumn("hour", timestrToHourUDF(F.col('arrival_time')))

getDaysOfWeekUDF = F.udf(lambda x: get_days_of_week(x), ArrayType(IntegerType()))
alltrips = alltrips.withColumn("dayofweek", getDaysOfWeekUDF(F.array('monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday')))
alltrips = alltrips.withColumn("dayofweek", F.explode("dayofweek"))

alltrips.createOrReplaceTempView('alltrips_1')

query = """
SELECT gtfs_date, dayofweek, hour, gtfs_route_id, gtfs_direction_id, count(trip_id) AS gtfs_number_of_scheduled_trips
FROM alltrips_1
GROUP BY gtfs_date, dayofweek, hour, gtfs_route_id, gtfs_direction_id
"""
trips_per_route = spark.sql(query)
apcdata = apcdata.join(trips_per_route, on=['gtfs_date', 'dayofweek', 'hour', 'gtfs_route_id', 'gtfs_direction_id'], how='left')

In [None]:
# get scheduled trips per stop

query = """
SELECT gtfs_date, dayofweek, hour, gtfs_route_id, gtfs_direction_id, stop_id, count(trip_id) AS gtfs_number_of_scheduled_trips_at_stop
FROM alltrips_1
GROUP BY gtfs_date, dayofweek, hour, gtfs_route_id, gtfs_direction_id, stop_id
"""
trips_per_stop = spark.sql(query)
apcdata = apcdata.join(trips_per_stop, on=['gtfs_date', 'dayofweek', 'hour', 'gtfs_route_id', 'gtfs_direction_id', 'stop_id'], how='left')

## Headway
* Here we generate the actual and scheduled headways based on the arrival time and scheduled time in the APC data

In [None]:
apcdata = apcdata.withColumn("delay_time", F.col("scheduled_time").cast("long") - F.col("arrival_time").cast("long")) # calculated in seconds

In [None]:
apcdata = apcdata.withColumn("dwell_time", F.col("departure_time").cast("long") - F.col("arrival_time").cast("long")) # calculated in seconds

In [None]:
windowSpec_sched = Window.partitionBy( "transit_date", "route_id", "route_direction_name", "stop_id").orderBy("scheduled_time")
apcdata = apcdata.withColumn("prev_sched", F.lag("scheduled_time", 1).over(windowSpec_sched))
apcdata = apcdata.withColumn("sched_hdwy", F.col("scheduled_time").cast("long") - F.col("prev_sched").cast("long"))

In [None]:
windowSpec_actual = Window.partitionBy( "transit_date", "route_id", "route_direction_name", "stop_id").orderBy("departure_time")
apcdata = apcdata.withColumn("prev_depart", F.lag("departure_time", 1).over(windowSpec_actual))
apcdata = apcdata.withColumn("actual_hdwy", F.col("departure_time").cast("long") - F.col("prev_depart").cast("long"))

In [None]:
apcdata = apcdata.withColumn('is_gapped', F.when(((F.col('actual_hdwy') - F.col('sched_hdwy')) > 0) & ((F.col('actual_hdwy') / F.col('sched_hdwy')) >= 1.5), 1).otherwise(0))
apcdata = apcdata.withColumn('is_bunched', F.when(((F.col('actual_hdwy') - F.col('sched_hdwy')) < 0) & ((F.col('actual_hdwy') / F.col('sched_hdwy')) >= 0) & ((F.col('actual_hdwy') / F.col('sched_hdwy')) <= 0.5), 1).otherwise(0))
apcdata = apcdata.withColumn('is_target', F.when(((F.col('actual_hdwy') / F.col('sched_hdwy')) > 0.5) & ((F.col('actual_hdwy') / F.col('sched_hdwy')) < 1.5), 1).otherwise(0))

## Cleaning but i think the dataset has been cleaned prior to this.

In [None]:
duplicates=apcdata.groupby(['transit_date','trip_id','route_id','route_direction_name','stop_id_original', 'stop_sequence','block_abbr','vehicle_id']).count()
todelete=duplicates.filter('count >1').select('transit_date','block_abbr').distinct()
todelete=todelete.withColumn('indicator',F.lit(1))

In [None]:
print(duplicates)
print(todelete.count())

In [None]:
# for null vehicle id -- remove the whole block
nullvehicleids=apcdata.filter('vehicle_id="NULL" or vehicle_id is null').select('transit_date','block_abbr').distinct()
nullvehicleids=nullvehicleids.withColumn('indicator',F.lit(1))
nullvehicleids.count()

In [None]:
null_arrival_departure_times=apcdata.groupBy('transit_date', 'trip_id','vehicle_id','overload_id','block_abbr')  .agg((F.sum(F.col('arrival_time').isNull().cast("int")).alias('null_arrival_count')),F.count('*').alias('total_count'))
null_arrival_departure_times=null_arrival_departure_times.filter('null_arrival_count = total_count').select('transit_date','block_abbr').distinct()
null_arrival_departure_times=null_arrival_departure_times.withColumn('indicator',F.lit(1))
null_arrival_departure_times.count()

In [None]:
apcdata.show(5)

In [None]:
fp = os.path.join(os.getcwd(), 'data', "cleaned-merged-wego-daily.apc.parquet")
# apcdata.write.save(f)
apcdata.write.partitionBy("year", 'month').mode("overwrite").parquet(fp)