In [None]:
# Import all of PySpark
import pyspark

# Import garbage collection module so we don't run out of memory while running the script
import gc

from pyspark.sql.functions import udf

In [None]:
###################################################################
# Bring in all necessary tables to run the script

In [None]:
# GTFS Trips
gtfs_trips = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/trips_2020_spring_recap.csv")

gtfs_trips.createOrReplaceTempView("gtfs_trips")

In [None]:
# GTFS Routes
gtfs_routes = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/routes_2020_spring_recap.csv")

gtfs_routes.createOrReplaceTempView("gtfs_routes")

In [None]:
# GTFS Stop Times
gtfs_stop_times_import = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/stop_times_2020_spring_recap.csv")

# Parse the arrival time column into hours, minutes, and seconds
split_col = pyspark.sql.functions.split(gtfs_stop_times_import['arrival_time'], ':')
gtfs_stop_times_parse = gtfs_stop_times_import.withColumn('hour', split_col.getItem(0)) \
       .withColumn('minute', split_col.getItem(1)) \
       .withColumn('second', split_col.getItem(2))

# Save the table with parsing to a SQL table
gtfs_stop_times_parse.createOrReplaceTempView("gtfs_stop_times_parse")

# Now check that times before 3AM have been converted to service day notation (>24h). If not, adjust them
gtfs_stop_times_adjust = spark.sql("""
    SELECT
         *
        ,CASE 
         WHEN CAST(hour AS integer) < 3 
         THEN CAST(hour AS integer) + 24 
         ELSE CAST(hour AS integer) END AS hour_adjust
        ,CAST(minute AS integer) AS minute_adjust
        ,CAST(second AS integer) AS second_adjust
    FROM gtfs_stop_times_parse
    """)

gtfs_stop_times_adjust.createOrReplaceTempView("gtfs_stop_times_adjust")

# Now convert the parsed values into seconds after midnight
gtfs_stop_times = spark.sql("""
    SELECT
         *
        ,(hour_adjust * 3600) + (minute_adjust * 60) + second_adjust AS arrival_time_sec
    FROM gtfs_stop_times_adjust
    """)

# Save the final, clean table to a SQL table
gtfs_stop_times.createOrReplaceTempView("gtfs_stop_times")

# Show a preview that the conversion worked
spark.sql("SELECT MAX(arrival_time), MIN(arrival_time), MAX(arrival_time_sec), MIN(arrival_time_sec) FROM gtfs_stop_times").show()

In [None]:
# GTFS Stops
gtfs_stops = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/stops_2020_spring_recap.csv")

gtfs_stops.createOrReplaceTempView("gtfs_stops")

In [None]:
# GTFS Calendar Attributes
gtfs_calendar_attributes = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/calendar_attributes_2020_spring_recap.csv")

gtfs_calendar_attributes.createOrReplaceTempView("gtfs_calendar_attributes")

In [None]:
# GTFS Calendar
gtfs_calendar = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/calendar_2020_spring_recap.csv")

gtfs_calendar.createOrReplaceTempView("gtfs_calendar")

In [None]:
# Hours to Bands
bands_hours = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/bands_hours.csv")

bands_hours.createOrReplaceTempView("bands_hours")

In [None]:
# Stop Grouping
stop_grouping_2020_spring = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/od_freq_calculation/stop_grouping_2020_spring.csv")

stop_grouping_2020_spring.createOrReplaceTempView("stop_grouping_2020_spring")

In [None]:
###################################################################
# Let's try running OD Frequency from the beginning, bringing the original SQL script into PySpark

In [None]:
# For assigning services to stops
# For each master stop ID, find the routes/variants that serve it in each during during each time period

weekday_stop_events = spark.sql("""
    SELECT DISTINCT
         t.route_id
        ,t.direction_id
        ,t.trip_id
        ,t.shape_id
        ,st.stop_sequence
        ,CASE
         WHEN s.parent_station IS NULL THEN st.stop_id
         ELSE s.parent_station END AS master_stop_id
        ,st.hour
        ,st.arrival_time_sec
        ,ca.service_description
    FROM gtfs_trips t
        JOIN gtfs_routes r ON t.route_id = r.route_id
        JOIN gtfs_stop_times st ON t.trip_id = st.trip_id
        JOIN gtfs_stops s ON st.stop_id = s.stop_id
        JOIN gtfs_calendar_attributes ca ON t.service_id = ca.service_id
        JOIN gtfs_calendar c ON t.service_id = c.service_id
    WHERE
        ca.service_description = 'Weekday schedule'
        AND TO_DATE(CAST(c.start_date AS STRING), 'yyyyMMdd') <= '2020-03-21'
        """)

# Run some previews & checks
weekday_stop_events.createOrReplaceTempView("weekday_stop_events")
spark.sql("SELECT * FROM weekday_stop_events LIMIT 10").show()
spark.sql("SELECT COUNT(*) FROM weekday_stop_events").show()

# Check that ferry worked
spark.sql("SELECT DISTINCT route_id FROM weekday_stop_events WHERE route_id LIKE 'Boat%' LIMIT 10").show()

In [None]:
# Do the same, but for Saturday
saturday_stop_events = spark.sql("""
    SELECT DISTINCT
         t.route_id
        ,t.direction_id
        ,t.trip_id
        ,t.shape_id
        ,st.stop_sequence
        ,CASE
         WHEN s.parent_station IS NULL THEN st.stop_id
         ELSE s.parent_station END AS master_stop_id
        ,st.hour
        ,st.arrival_time_sec
        ,ca.service_description
    FROM gtfs_trips t
        JOIN gtfs_routes r ON t.route_id = r.route_id
        JOIN gtfs_stop_times st ON t.trip_id = st.trip_id
        JOIN gtfs_stops s ON st.stop_id = s.stop_id
        JOIN gtfs_calendar_attributes ca ON t.service_id = ca.service_id
        JOIN gtfs_calendar c ON t.service_id = c.service_id
    WHERE
        ca.service_description = 'Saturday schedule'
        AND TO_DATE(CAST(c.start_date AS STRING), 'yyyyMMdd') <= '2020-03-21'
        """)

# Run some previews & checks
saturday_stop_events.createOrReplaceTempView("saturday_stop_events")
spark.sql("SELECT * FROM saturday_stop_events LIMIT 10").show()
spark.sql("SELECT COUNT(*) FROM saturday_stop_events").show()

# Check that CR worked
spark.sql("SELECT DISTINCT route_id FROM saturday_stop_events WHERE route_id LIKE 'CR%'").show()

In [None]:
# Do the same, but for Sunday
sunday_stop_events = spark.sql("""
    SELECT DISTINCT
         t.route_id
        ,t.direction_id
        ,t.trip_id
        ,t.shape_id
        ,st.stop_sequence
        ,CASE
         WHEN s.parent_station IS NULL THEN st.stop_id
         ELSE s.parent_station END AS master_stop_id
        ,st.hour
        ,st.arrival_time_sec
        ,ca.service_description
    FROM gtfs_trips t
        JOIN gtfs_routes r ON t.route_id = r.route_id
        JOIN gtfs_stop_times st ON t.trip_id = st.trip_id
        JOIN gtfs_stops s ON st.stop_id = s.stop_id
        JOIN gtfs_calendar_attributes ca ON t.service_id = ca.service_id
        JOIN gtfs_calendar c ON t.service_id = c.service_id
    WHERE
        ca.service_description = 'Sunday schedule'
        AND TO_DATE(CAST(c.start_date AS STRING), 'yyyyMMdd') <= '2020-03-21'
        """)

# Run some previews & checks
sunday_stop_events.createOrReplaceTempView("sunday_stop_events")
spark.sql("SELECT * FROM sunday_stop_events LIMIT 10").show()
spark.sql("SELECT COUNT(*) FROM sunday_stop_events").show()

# Check that ferry worked
spark.sql("SELECT DISTINCT route_id FROM sunday_stop_events WHERE route_id LIKE 'Boat%' LIMIT 10").show()

In [None]:
# Now union all the day types
stop_events = spark.sql("""
    SELECT * FROM weekday_stop_events
    UNION
    SELECT * FROM saturday_stop_events
    UNION
    SELECT * FROM sunday_stop_events
    """)

# Preview
stop_events.createOrReplaceTempView("stop_events")
spark.sql("SELECT DISTINCT service_description FROM stop_events").show()
spark.sql("SELECT COUNT(*) FROM stop_events").show()

In [None]:
gc.get_count()

In [None]:
# Now we clear out the GTFS dataframes and the individual daytypes, to free memory
del gtfs_trips
del gtfs_routes
del gtfs_stop_times
del gtfs_stops
del gtfs_calendar_attributes
del gtfs_calendar
del weekday_stop_events
del saturday_stop_events
del sunday_stop_events
gc.collect()
gc.get_count()

In [None]:
# Now split up into hours and rename the schedules to day types
all_events_format = spark.sql("""
    SELECT
         CASE
         WHEN service_description = 'Weekday schedule' THEN 'Weekday'
         WHEN service_description = 'Saturday schedule' THEN 'Saturday'
         WHEN service_description = 'Sunday schedule' THEN 'Sunday'
         END AS day_type
        ,route_id
        ,direction_id
        ,trip_id
        ,stop_sequence
        ,master_stop_id
        ,arrival_time_sec
        ,hour
    FROM stop_events
    """)

# Preview
all_events_format.createOrReplaceTempView("all_events_format")
spark.sql("SELECT * FROM all_events_format LIMIT 100").show()
del stop_events

In [None]:
# Now we join in the band labels so we can aggreggate properly by hour, band, or mbta time period moving forward
all_events_bands = spark.sql("""
    SELECT
         b.band
        ,b.day_type
        ,r.route_id
        ,r.direction_id
        ,r.trip_id
        ,r.stop_sequence
        ,r.master_stop_id
        ,r.arrival_time_sec
        ,r.hour
    FROM all_events_format r
    LEFT JOIN bands_hours b ON
         r.day_type = b.day_type
         AND r.hour >= b.start_hour
         AND r.hour < b.end_hour
    """)

# Preview
all_events_bands.createOrReplaceTempView("all_events_bands")
spark.sql("SELECT * FROM all_events_bands LIMIT 100").show()
del all_events_format

# Make sure you didn't lose or duplicate any records in the join
spark.sql("SELECT COUNT(*) FROM all_events_format").show() # 1057723
spark.sql("SELECT COUNT(*) FROM all_events_bands").show() # 1057723

In [None]:
# You currently have all stop events that occur at the master ID (parent) level
# You now need to create the basis for linking this to the cluster level
# To get here, take the services we just pegged to each master ID and assign it to all cluster centers that encompass that master ID
cluster_service = spark.sql("""
    SELECT DISTINCT 
         g.buffer_center
        ,r.day_type
        ,r.band
        ,r.hour
        ,r.arrival_time_sec
        ,r.route_id
        ,r.direction_id
        ,r.trip_id
        ,r.master_stop_id
        ,r.stop_sequence
    FROM all_events_bands r
    LEFT JOIN stop_grouping_2020_spring g ON g.child = r.master_stop_id
    """)

# Preview
cluster_service.createOrReplaceTempView("cluster_service")
spark.sql("SELECT * FROM cluster_service LIMIT 100").show()
del all_events_bands

# Make sure every stop event got assigned to a buffer center properly
spark.sql("SELECT * FROM cluster_service WHERE buffer_center IS NULL").show()

# Preview the data, notice that the stop events get duplicated and assigned to multiple buffer centers
# This allows you to be flexible with your OD and get nearby service as well as service at the queried stop itself
spark.sql("SELECT * FROM cluster_service WHERE day_type = 'Weekday' ORDER BY master_stop_id, day_type, hour, arrival_time_sec, buffer_center LIMIT 10000").show()


In [None]:
# Now take the relevant columns and save them as the universe of potential origins
origins = spark.sql("""
    SELECT
         buffer_center
        ,day_type
        ,band
        ,hour
        ,arrival_time_sec
        ,route_id
        ,direction_id
        ,trip_id
        ,stop_sequence
    FROM cluster_service
    """)

# Preview
origins.createOrReplaceTempView("origins")
spark.sql("SELECT * FROM origins LIMIT 100").show()

In [None]:
# Do the same, saving as the universe of potential destinations
destinations = spark.sql("""
    SELECT
         buffer_center
        ,day_type
        ,band
        ,hour
        ,arrival_time_sec
        ,route_id
        ,direction_id
        ,trip_id
        ,stop_sequence
    FROM cluster_service
    """)

# Preview
destinations.createOrReplaceTempView("destinations")
spark.sql("SELECT * FROM destinations LIMIT 100").show()

In [None]:
# Now take these two tables and join all possible combinations to one another
# A combination is valid if:
### The origin and the destination row correspond to the same trip, direction, and day type
### The origin stop comes before the destination stop on that match
# You can also add an elapsed time check here, if you'd like (see original SQL query - v7)
od_edges = spark.sql("""
    SELECT
         o.buffer_center AS buffer_center_origin
         ,o.day_type AS day_type_origin
         ,o.band AS band_origin
         ,o.hour AS hour_origin
         ,o.arrival_time_sec AS arrival_time_origin
         ,o.route_id AS route_id_origin
         ,o.direction_id AS direction_id_origin
         ,o.trip_id AS trip_id_origin
         ,o.stop_sequence AS stop_sequence_origin
         ,d.buffer_center AS buffer_center_dest
         ,d.day_type AS day_type_dest
         ,d.band AS band_dest
         ,d.hour AS hour_dest
         ,d.arrival_time_sec AS arrival_time_dest
         ,d.route_id AS route_id_dest
         ,d.direction_id AS direction_id_dest
         ,d.trip_id AS trip_id_dest
         ,d.stop_sequence AS stop_sequence_dest
         ,d.arrival_time_sec - o.arrival_time_sec AS elapsed_time
    FROM origins o
    JOIN destinations d 
        ON o.day_type = d.day_type 
        AND o.route_id = d.route_id 
        AND o.trip_id = d.trip_id 
        AND o.direction_id = d.direction_id
    WHERE o.stop_sequence < d.stop_sequence
    """)

# Preview
od_edges.createOrReplaceTempView("od_edges")
spark.sql("SELECT * FROM od_edges LIMIT 100").show()
del cluster_service
del origins
del destinations

# Test it out
spark.sql("""
    SELECT * 
    FROM od_edges
    WHERE buffer_center_origin = 'place-coecl' AND buffer_center_dest = 'place-nuniv'
    ORDER BY buffer_center_origin, buffer_center_dest, arrival_time_origin LIMIT 1000
    """).show()

In [None]:
gc.get_count()

In [None]:
gc.collect()
gc.get_count()

In [None]:
# Now, for each OD in each hour, we want to include only the first occurrence of a trip in the events table
# This will prevent multiple stop events within the origin on the same trip (e.g. loops or nearby sequential stops) from getting counted as a headway
# Note that if the trip stops twice within the cluster at the turn of the time period, these events will be counted separately in each respective period
# Trip ID should be unique on its own, but we include route ID and direction ID just to be safe
no_duplicates_intermediate = spark.sql("""
    SELECT
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
        ,trip_id_origin
        ,route_id_origin
        ,direction_id_origin
        ,MIN(arrival_time_origin) AS arrival_time_origin
    FROM od_edges
    GROUP BY
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
        ,trip_id_origin
        ,route_id_origin
        ,direction_id_origin
        """)

# Preview
no_duplicates_intermediate.createOrReplaceTempView("no_duplicates_intermediate")
spark.sql("SELECT * FROM no_duplicates_intermediate LIMIT 100").show()
del od_edges

In [None]:
# Now that we have made sure we have only meaningful stop events, we further filter
# We no longer care about specific trip/route IDs
# Now we just want the distinct stop events that happen at each time (simultaneous service provides no additional value; no 0-minute headways)
distinct_events = spark.sql("""
    SELECT DISTINCT
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
        ,arrival_time_origin
    FROM no_duplicates_intermediate
    """)

# Preview
distinct_events.createOrReplaceTempView("distinct_events")
spark.sql("SELECT * FROM distinct_events LIMIT 100").show()
del no_duplicates_intermediate

In [None]:
# Now we count up the number of edges between each OD, post cleaning
# We do this here instead of as a partition in the distinct_events table because distinct_events is still cleaning out dups we don't want counted
od_instance_count = spark.sql("""
    SELECT
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
        ,COUNT(*) AS connection_count
    FROM distinct_events
    GROUP BY
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
    """)

# Preview
od_instance_count.createOrReplaceTempView("od_instance_count")
spark.sql("SELECT * FROM od_instance_count LIMIT 100").show()

In [None]:
distinct_events_w_counts = spark.sql("""
    SELECT 
         d.day_type_origin
        ,d.buffer_center_origin
        ,d.buffer_center_dest
        ,d.hour_origin
        ,d.arrival_time_origin
        ,c.connection_count
    FROM distinct_events d
    LEFT JOIN od_instance_count c
        ON d.day_type_origin = c.day_type_origin
        AND d.buffer_center_origin = c.buffer_center_origin
        AND d.buffer_center_dest = c.buffer_center_dest
        AND d.hour_origin = c.hour_origin
    """)

# Preview
distinct_events_w_counts.createOrReplaceTempView("distinct_events_w_counts")
spark.sql("SELECT * FROM distinct_events_w_counts LIMIT 100").show()

# Check that counts match
spark.sql("SELECT COUNT(*) FROM distinct_events_w_counts").show() #118,891,660
spark.sql("SELECT COUNT(*) FROM distinct_events").show() #118,891,660
del distinct_events
del od_instance_count

In [None]:
gc.get_count()

In [None]:
gc.collect()
gc.get_count()

In [None]:
# Save to S3 for reference - then you can always just rerun the script from this point
distinct_events_w_counts.write.csv("s3://massdot-test-bucket/checkpoint_v04_12_a_distinct_events_w_counts_2020_spring_recap.csv", header = 'true')

In [None]:
# Filter to where there was only one connection
single_connection_intermediate = spark.sql("""
    SELECT * 
    FROM distinct_events_w_counts
    WHERE connection_count = 1
    """)

# Preview
single_connection_intermediate.createOrReplaceTempView("single_connection_intermediate")
spark.sql("SELECT * FROM single_connection_intermediate LIMIT 100").show()

# Check how many single connection OD pairs there were on weekdays
spark.sql("SELECT COUNT(*) FROM single_connection_intermediate WHERE day_type_origin = 'Weekday'").show()

In [None]:
# Save to S3 for reference - then you can always just rerun the script from this point
single_connection_intermediate.write.csv("s3://massdot-test-bucket/checkpoint_v04_12_a_single_connection_intermediate_2020_spring_recap.csv", header = 'true')

In [None]:
# Now for where there were multiple connections, we get the lag for the headways
all_events_lag = spark.sql("""
    SELECT
        day_type_origin
       ,buffer_center_origin
       ,buffer_center_dest
       ,hour_origin
       ,arrival_time_origin
       ,LAG(arrival_time_origin,1) OVER (
          PARTITION BY day_type_origin, buffer_center_origin, buffer_center_dest, hour_origin
          ORDER BY arrival_time_origin
       ) previous_arrival
    FROM distinct_events_w_counts
    WHERE connection_count > 1
    """)

# Preview
all_events_lag.createOrReplaceTempView("all_events_lag")
spark.sql("SELECT * FROM all_events_lag LIMIT 100").show()
#del distinct_events_w_counts

In [None]:
# Now we get the headway at each ODT where there remains more than one connection
headways = spark.sql("""
    SELECT
        day_type_origin
       ,buffer_center_origin
       ,buffer_center_dest
       ,hour_origin
       ,arrival_time_origin
       ,previous_arrival
       ,(arrival_time_origin - previous_arrival)/60 AS headway_minutes
    FROM all_events_lag
    """)

# Preview
headways.createOrReplaceTempView("headways")
spark.sql("SELECT * FROM headways LIMIT 100").show()
#del all_events_lag

In [None]:
headway_calculations_intermediate = spark.sql("""
    SELECT
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
        ,CAST(SUM(headway_minutes * headway_minutes) AS DOUBLE) / CAST(SUM(headway_minutes) AS DOUBLE) AS avg_expected_wait_time
        ,COUNT(*) AS headway_count
    FROM headways
    WHERE headway_minutes IS NOT NULL
    GROUP BY 
         day_type_origin
        ,buffer_center_origin
        ,buffer_center_dest
        ,hour_origin
    """)

# Preview
headway_calculations_intermediate.createOrReplaceTempView("headway_calculations_intermediate")
spark.sql("SELECT * FROM headway_calculations_intermediate LIMIT 100").show()
#del headways

In [None]:
# Save to S3 for reference - then you can always just rerun the script from this point
headway_calculations_intermediate.write.csv("s3://massdot-test-bucket/checkpoint_v4_12_a_headway_calculations_intermediate_2020_spring_recap.csv", header = 'true')

In [None]:
# Now we have three reference tables calculated.  We need to put them in the proper format
# This is the reference table for single-connection ODs: single_connection_intermediate
# This is the reference table for ODs with more than one connection per hour: headway_calculations_intermediate

In [None]:
# Now we need to edit the output tables we have to fit the format of the combined table
# First let's take care of the single connections table
single_connection_format = spark.sql("""
    SELECT
         day_type_origin AS day_type
        ,buffer_center_origin AS origin
        ,buffer_center_dest AS destination
        ,hour_origin AS hour
        ,0 AS headway_count
        ,45 AS expected_wait_time
    FROM single_connection_intermediate
    """)

# Preview
single_connection_format.createOrReplaceTempView("single_connection_format")
spark.sql("SELECT * FROM single_connection_format LIMIT 100").show()
#del single_connection_intermediate

In [None]:
# Now we do the same as above for the other output table with headways
headway_table_format = spark.sql("""
    SELECT
         day_type_origin AS day_type
        ,buffer_center_origin AS origin
        ,buffer_center_dest AS destination
        ,hour_origin AS hour
        ,headway_count
        ,avg_expected_wait_time AS expected_wait_time
    FROM headway_calculations_intermediate
    """)

# Preview
headway_table_format.createOrReplaceTempView("headway_table_format")
spark.sql("SELECT * FROM headway_table_format LIMIT 100").show()
#del headway_calculations_intermediate

In [None]:
# Now we need to bring these tables together into one
od_frequency = spark.sql("""
    SELECT * FROM single_connection_format
    UNION ALL
    SELECT * FROM headway_table_format
    """)

# Preview
od_frequency.createOrReplaceTempView("od_frequency")
spark.sql("SELECT * FROM od_frequency LIMIT 100").show()
#del single_connection_format
#del headway_table_format

In [None]:
# Save results to S3
od_frequency.write.csv("s3://massdot-test-bucket/od_frequency_2020_spring_recap_20210413.csv", header = 'true')

In [None]:
# Make sure each OD has only one score
test_no_dup = spark.sql("""
    SELECT 
         origin
        ,destination
        ,hour
        ,day_type
        ,COUNT(*) AS count
    FROM od_frequency
    GROUP BY origin, destination, hour, day_type
    """)

# Preview
test_no_dup.createOrReplaceTempView("test_no_dup")
spark.sql("SELECT * FROM test_no_dup WHERE count > 1").show()

In [None]:
# Check how many records are in the final table
# SELECT COUNT(*) FROM jwoltal.od_frequency_spring_2020_v7 --45,378,131
spark.sql("SELECT COUNT(*) FROM od_frequency").show() # 45,378,131

In [None]:
# Check a sample OD
spark.sql("""
    SELECT *
    FROM od_frequency
    WHERE
        origin = '81852'
        AND destination IN ('91852', 'place-NB-0120', 'Needham Junction')
        AND hour = 8
        AND day_type = 'Weekday'
    """).show()

In [None]:
# Check a sample OD
spark.sql("""
    SELECT *
    FROM od_frequency
    WHERE
        origin = 'place-coecl'
        AND destination = 'place-nuniv'
        AND hour = 8
        AND day_type = 'Weekday'
    """).show()

In [None]:
# Let's also spot check the old version of frequency which was calculated in the research server - do they look like they match?
df = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/frequency")

df.createOrReplaceTempView("old_freq_table")

In [None]:
# Check a sample OD in the old data
# It's a match.  So was the overall record count.
spark.sql("""
    SELECT *
    FROM old_freq_table
    WHERE
        origin = 'place-coecl'
        AND destination = 'place-nuniv'
        AND hour = 8
        AND day_type = 'Weekday'
    """).show()

In [None]:
# Join everything together and find the difference between the values
diff = spark.sql("""
    SELECT 
         a.day_type
        ,a.origin
        ,a.destination
        ,a.hour
        ,a.headway_count AS headway_count_new
        ,a.expected_wait_time AS expected_wait_new
        ,b.headway_count AS headway_count_old
        ,b.expected_wait_time AS expected_wait_old
        ,ABS(b.expected_wait_time - a.expected_wait_time) AS delta
    FROM od_frequency a 
    LEFT JOIN old_freq_table b 
        ON a.day_type = b.day_type
        AND a.origin = b.origin
        AND a.destination = b.destination
        AND a.hour = b.hour
    """)
    
diff.createOrReplaceTempView("diff")
spark.sql("SELECT * FROM diff").show()

In [None]:
# Check where the difference is not just a rounding error based on different versions of SQL
# Note that the output calculated here is very infrequently different from that calculated using v6 in the research server (which is what was loaded onto S3)
# I investigated the cause of these edge cases
# v6 did not properly fix the parent-child combinations more than 100m apart issue.  This was fixed in v7, so the dataframes should match there
# The values calculated here take this into account properly.  I have also confirmed an example using the raw GTFS schedule
# A file testing this is saved in "C:\Users\woltalj\Box\Ongoing Activities\SQL Queries\Service Delivery Policy 2020\Frequency\OD Frequency\OD Frequency S3\S3-MGHPCC Diff Check.xlsx"
spark.sql("SELECT COUNT(*) FROM diff WHERE delta > .1").show()
spark.sql("SELECT * FROM diff WHERE delta > .1").show()

In [None]:
error = spark.sql("SELECT * FROM diff WHERE delta > .1")
error.write.csv("s3://massdot-test-bucket/method_delta.csv", header = 'true')

In [None]:
######################## How to pick up at a checkpoint

In [None]:
# Load the checkpoint data from S3

# There are three available checkpoints:
## "s3://massdot-test-bucket/checkpoint_v2_distinct_events_w_counts_2020_spring_recap.csv"
## "s3://massdot-test-bucket/checkpoint_v2_headway_calculations_intermediate_2020_spring_recap.csv"
## "s3://massdot-test-bucket/checkpoint_v2_single_connection_intermediate_2020_spring_recap.csv"
df = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/checkpoint_v2_headway_calculations_intermediate_2020_spring_recap.csv")

# Save it under a table the script recognizes, then continue running above
# You can save it as:
## distinct_events_w_counts
## headway_calculations_intermediate
## single_connection_intermediate
df.createOrReplaceTempView("headway_calculations_intermediate")

In [None]:
spark.sql("SELECT * FROM headway_calculations_intermediate").show()

In [None]:
# Load the checkpoint data from S3

# There are two available checkpoints:
## "s3://massdot-test-bucket/checkpoint_v2_distinct_events_w_counts_2020_spring_recap.csv"
## "s3://massdot-test-bucket/checkpoint_v2_headway_calculations_intermediate_2020_spring_recap.csv"
df = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/checkpoint_v04_12_a_distinct_events_w_counts_2020_spring_recap.csv")

# Save it under a table the script recognizes, then continue running above
# You can save it as:
## distinct_events_w_counts
## headway_calculations_intermediate
df.createOrReplaceTempView("distinct_events_w_counts")

In [None]:
spark.sql("SELECT * FROM distinct_events_w_counts").show()

In [None]:
# Load the checkpoint data from S3

# There are two available checkpoints:
## "s3://massdot-test-bucket/checkpoint_v2_distinct_events_w_counts_2020_spring_recap.csv"
## "s3://massdot-test-bucket/checkpoint_v2_headway_calculations_intermediate_2020_spring_recap.csv"
df = spark\
    .read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("s3://massdot-test-bucket/checkpoint_v04_12_a_single_connection_intermediate_2020_spring_recap.csv")

# Save it under a table the script recognizes, then continue running above
# You can save it as:
## distinct_events_w_counts
## headway_calculations_intermediate
df.createOrReplaceTempView("single_connection_intermediate")

In [None]:
spark.sql("SELECT * FROM single_connection_intermediate").show()

In [None]:
# Check a sample OD
spark.sql("""
    SELECT *
    FROM distinct_events_w_counts
    WHERE
        buffer_center_origin = '81852'
        AND buffer_center_dest IN ('91852', 'place-NB-0120', 'Needham Junction')
        AND hour_origin = 8
        AND day_type_origin = 'Weekday'
    """).show()