# To Begin With...

### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.

<div class='alert alert-info'><b>Any application without a proper name would be promptly killed.</b></div>

In [1]:
%%configure
{
    "executorMemory": "40g",
    "driverMemory":"16g",
    "driverCores":16,
    "executorCores": 8,
    "conf": {
        "spark.app.name": "lgyy_final"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9463,application_1589299642358_4053,pyspark,idle,Link,Link,
9470,application_1589299642358_4061,pyspark,dead,Link,Link,
9472,application_1589299642358_4064,pyspark,idle,Link,Link,
9473,application_1589299642358_4065,pyspark,idle,Link,Link,
9475,application_1589299642358_4067,pyspark,idle,Link,Link,
9476,application_1589299642358_4068,pyspark,idle,Link,Link,
9479,application_1589299642358_4071,pyspark,idle,Link,Link,
9480,application_1589299642358_4072,pyspark,busy,Link,Link,
9481,application_1589299642358_4073,pyspark,idle,Link,Link,
9484,application_1589299642358_4076,pyspark,busy,Link,Link,


In [2]:
from datetime import datetime, timedelta
import pyspark.sql.functions as functions
from pyspark.sql.types import BooleanType, FloatType, IntegerType, ArrayType, LongType
from pyspark.sql.window import Window
import datetime
import networkx as nx
import math
import pickle
import random
import numpy as np
import pandas as pd
import functools
import copy

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9499,application_1589299642358_4092,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Start Spark

In [3]:
# Initialization

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format

In [4]:
sbb = spark.read.orc('/data/sbb/orc/istdaten')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
sbb.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- betriebstag: string (nullable = true)
 |-- fahrt_bezeichner: string (nullable = true)
 |-- betreiber_id: string (nullable = true)
 |-- betreiber_abk: string (nullable = true)
 |-- betreiber_name: string (nullable = true)
 |-- produkt_id: string (nullable = true)
 |-- linien_id: string (nullable = true)
 |-- linien_text: string (nullable = true)
 |-- umlauf_id: string (nullable = true)
 |-- verkehrsmittel_text: string (nullable = true)
 |-- zusatzfahrt_tf: string (nullable = true)
 |-- faellt_aus_tf: string (nullable = true)
 |-- bpuic: string (nullable = true)
 |-- haltestellen_name: string (nullable = true)
 |-- ankunftszeit: string (nullable = true)
 |-- an_prognose: string (nullable = true)
 |-- an_prognose_status: string (nullable = true)
 |-- abfahrtszeit: string (nullable = true)
 |-- ab_prognose: string (nullable = true)
 |-- ab_prognose_status: string (nullable = true)
 |-- durchfahrt_tf: string (nullable = true)

# Summary of project

**The link of our video:** https://www.youtube.com/watch?v=9uN3iFn2rzU&feature=youtu.be

### 1, Team members: Lei Yan, Yueran Liang, Saibo Geng, Chun-Hung Yeh

### 2. Work distributions:

Part I Data preprocessing:                                         Yeh & Yan

Part II Obtaining historical delay data:                           Yan

Part III Modelling transport infrastructure:                       Liang

Part IV Basic Route Planning Algorithm:                            Geng & Liang

Part V Evaluate the route uncertainty based on historical delay:   Yan & Geng

Code review:                                                       Yeh

## [Part I] Data preprocessing

### Summary

In this part, we do the following data cleaning operations:

- timetable:
    - Construct the timetable dataset by joining the stops, stop_times, calendar, trips, routes tables
    - Keep only rows with stops inside Zurich neighborhood (< 15km from Zurich HB)
    - Only keep services that run at least from Mon to Fri
    - Only keep the schedule in the reasonable hours (7:00 to 20:00)
    
- sbb:
    - Keep only rows with stops inside Zurich neighborhood (< 15km from Zurich HB)
    - Reformat ill-formatted stop names to match it with those in timetable 

### Cleaning timetable

In [6]:
# helper functions

# Calculate distance using (long, lat)
def hav(x):
    """haversine function """
    return (1 - math.cos(x)) / 2

def distance(long1, lat1, long2, lat2):
    """
    Compute the distance in kms between two locations given their coordinates (longitude, latitude).
    """
    # convert decimal degrees to radians 
    long1, long2, lat1, lat2 = [math.radians(x) for x in [long1, long2, lat1, lat2]]
    
    # earth radius
    r = 6371 
    
    # haversine formula
    return 2*r*math.asin(math.sqrt(hav(lat2 - lat1) + math.cos(lat1) * math.cos(lat2) * hav(long2-long1)))

# coordinate of Zurich HB
lat_zurich = 47.378177
lon_zurich = 8.540192

# Calulate distance to Zurich HB
def get_distance(lon, lat):
    return distance(lon, lat, lon_zurich, lat_zurich)

distance_getter = functions.udf(lambda x,y: get_distance(x,y), FloatType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
### load and clean stops
stops = spark.read.orc("hdfs:///data/sbb/timetables/orc/stops")

# keep only the stops that are located < 15km from Zurich HB
stops = stops.withColumn('distance', distance_getter('stop_lon', 'stop_lat'))
stops = stops.where(stops.distance < 15)


### Construct station table
# Construct a table of stations(stations can have single or multiple stops/platforms),
# will be used to construct the "walking distance between stations" network
# Get list of stops of each station
stations = stops.groupBy('stop_name').agg(functions.collect_list('stop_id').alias('stops'),
                                          functions.mean('stop_lat').alias('station_lat'),
                                          functions.mean('stop_lon').alias('station_lon'))
stations = stations.withColumnRenamed('stop_name', 'station_name')

### For fellow developers:
# The stations table can be then used to construct the "walking distance between stations" network
# In route planning algorithm the table can be indexed by "station_name" (same as "stop_name" in stops table)
# and the list of all stops of the same station can be looked up.

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Test
stations.where(stations.station_name == 'Zufikon Belvédère').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+--------------------+------------------+----------------+
|     station_name|               stops|       station_lat|     station_lon|
+-----------------+--------------------+------------------+----------------+
|Zufikon Belvédère|[8502268, 8502268...|47.357611660093596|8.35923694492646|
+-----------------+--------------------+------------------+----------------+

In [9]:
### Create and clean timetable
stop_times = spark.read.orc("hdfs:///data/sbb/timetables/orc/stop_times")
calendar = spark.read.orc("hdfs:///data/sbb/timetables/orc/calendar")
trips = spark.read.orc("hdfs:///data/sbb/timetables/orc/trips")
routes = spark.read.orc("hdfs:///data/sbb/timetables/orc/routes")

# Only keep services that run at least from Mon to Fri
calendar = calendar.where(calendar.monday & calendar.tuesday & calendar.wednesday
                                          & calendar.thursday & calendar.friday)

# merge to a suitable timetable,
# used to construct the "zurich transport" network
timetable = stop_times.join(stops, on = ['stop_id'], how = 'inner')\
                      .join(trips, on = ['trip_id'], how = 'inner')\
                      .join(calendar, on = ['service_id'], how = 'inner')\
                      .join(routes, on = ['route_id'], how = 'inner')

# Only keep the schedule in the reasonable hours (7:00 to 20:00)
timetable = timetable.where(((functions.hour(timetable.departure_time) >= 7) & (functions.hour(timetable.departure_time) <= 19)) |
                ((functions.hour(timetable.arrival_time) >= 7) & (functions.hour(timetable.arrival_time) <= 19)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cleaning istdaten(sbb) dataset

In [10]:
### rename attributes from German to English
fields = {
    'betriebstag':'date',
    'fahrt_bezeichner':'trip_id',
    'produkt_id':'transport_type',
    'linien_id':'train_id',
    'linien_text':'line',
    'verkehrsmittel_text':'train_type',
    'zusatzfahrt_tf':'additional_trip',
    'faellt_aus_tf':'trip_failed',
    'haltestellen_name':'stop_name',
    'bpuic':'stop_id',
    'ankunftszeit':'schedule_arrival',
    'an_prognose':'real_arrival',
    'an_prognose_status':'arr_forecast_status',
    'abfahrtszeit':'schedule_dep',
    'ab_prognose':'real_dep',
    'ab_prognose_status':'dep_forecast_status',
    'durchfahrt_tf':'no_stop_here'
}

df = sbb.selectExpr([k + ' as ' + fields[k] for k in fields])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
### Clean up table sbb
# keep only the rows with stops near Zurich:
#     Join table sbb with table stops to filter out sbb data outside the zurich region (>15 km from Zurich HB)
#     Both table have column "stop_name"
#     In sbb, "stop_name" is not clean, it has duplicates like "Zurich, Balgrist" and "Zurich Balgrist"
#     In stops, it is clean. Thus we remove the dirty "stop_name" from sbb before join

# Construct a table containing stop ids in the format of the stop ids in sbb
def get_sbb_stop_id(id):
    return id.split(':')[0]

sbb_stop_id_getter = functions.udf(lambda x: get_sbb_stop_id(x))
temp_stops = stops.withColumn('stop_id', sbb_stop_id_getter('stop_id'))
temp_stops = temp_stops.groupBy('stop_id', 'stop_name').count().withColumnRenamed('count', 'num')
temp_stops = temp_stops.drop('num')

df_no_name = df.drop('stop_name')
df1 = df_no_name.join(temp_stops, 'stop_id', 'inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#df1.write.format("orc").save(path="hdfs:///user/lyan/temp_data/joined_sbb_stops.orc", mode="overwrite")

### This process takes a couple of minutes, use the following line to read the results directly from HDFS
sbb = spark.read.orc("hdfs:///user/lyan/temp_data/joined_sbb_stops.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## [Part II] Get delay info

### Summary

We assume arrival delays occured at the same stop, of the same transport line, at the same arrival time follow the same
probability distribution. A "transport line" refers to a certain line of a certain type of transport of a certain transport agency.

In this part, we retrieve the list of historical arrival delays for each {transport line, stop, arrival time} triple based on
historical data from the sbb dataset. This information will be used to build the predictive model for predicting arrival delays of a stop of a trip (i.e, a row in timetable).

#### Step 1: Obtaining the triple

"transport line" is not directly available in both of the sbb and timetable datasets.

We observed that in the sbb dataset, the column "train_id" has the following format:
- "85:agency_id:line_number" for bus/tram, where agency_id and line_number are the columns agency_id, route_short_name in the timetable, respectively.
- "line_number" for trains, where line_number is the column trip_short_name in the timetable.

This "train_id" column can uniquely identify transport lines except for trains. We observed that for trains "train_id" is only unqiue for one type of train, thus we need to combine it with type of trains (corresponds to "route_desc" in timetable and "train_type" in sbb) to make it unique.

Based on the discussion above, we construct a "lineID" column (i.e., "transport line") for both of the tables in the following manner:

- sbb:
    - "train_id" for buses and trams
    - "train_type:train_id" for trains

- timetable:
    - "85:agency_id:route_short_name" for buses and trams
    - "route_desc:trip_short_name" for trains
    
"stop" and "arrival_time" is directly available in both datasets.

#### Step 2: Obtaining historical delay lists

In this step we join the sbb and timetable datasets on the triple to get a list of historical delay data for each row in the timetable (if available).

We found that the triple is not unique within timetable. There are some corner cases leading to this:
- There are duplicate trips in the timetable, their rows only differ in the trip_id, all other columns are identical. After checking the corresponding records in the sbb table, we found that these trips are identical in all aspects, only that they run on different dates.
- In some cases, two trips belonging to the same transport line running in opposite direction can stop at the same station at the same time.

This means we cannot directly join the sbb and timetable on the triple. But this can be easily worked-around. We first extract a table containing all the unique triple values found in timetable. Then we join the sbb dataset with this triple table, this way we get the historical delays for each of the triple. In the last step, we join the timetable to the triple table, such that we get the list of historical delays for each row of the timetable.

#### [Code] Construct the triple in timetable

In [13]:
### Print transport types in both tables
#timetable.groupBy('route_desc').count().show()
#sbb.groupBy('transport_type', 'train_type').count().orderBy('transport_type').show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
### convert transport type in timetable to the format used in sbb 
def map_timetable_transport_type(orig):
    map = {
        "TGV": "TGV",
        "Eurocity": "EC",
        "RegioExpress": "RE",
        "S-Bahn": "S",
        "Tram": "Tram",
        "ICE": "ICE",
        "Bus": "Bus",
        "Intercity": "IC",
        "InterRegio": "IR"
    }
    return map.get(orig)
transport_type_mapper = functions.udf(lambda x: map_timetable_transport_type(x))
timetable = timetable.withColumn('route_desc', transport_type_mapper('route_desc'))

# remove null transport type (i.e., those do not have a match in sbb)
timetable = timetable.where(timetable.route_desc.isNotNull())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
### TEST
#timetable.groupBy('route_desc').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
### Construct the lineID for timetable
def timetable_get_uid(route_desc, agency_id, trip_short_name, route_short_name):
    # 85 is found in the train_id (linien_id) column of sbb
    # whenever the transport is tram or bus
    if (route_desc == 'Bus'):
        lineID = u"85:{}:{}".format(agency_id, route_short_name)
    elif (route_desc == 'Tram'):
        # The tram line ID in sbb always has 3 digits
        # format the tram line ID here to match that representation
        num_zero_prefix = 3 - len(route_short_name)
        zero_prefix = '0' * num_zero_prefix
        route_short_name = zero_prefix + route_short_name
        lineID = u"85:{}:{}".format(agency_id, route_short_name)
    else:
        lineID = u"{}:{}".format(route_desc, trip_short_name)
    return lineID
timetable_uid_getter = functions.udf(lambda a,b,c,d: timetable_get_uid(a, b, c, d))

timetable = timetable.withColumn('lineID', timetable_uid_getter('route_desc', 'agency_id', 'trip_short_name', 'route_short_name'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
### Select relevant columns
timetable = timetable.select('trip_id', 'stop_id', 'arrival_time', 'departure_time', 'stop_sequence', 'stop_name',
                             'stop_lat', 'stop_lon', 'location_type', 'parent_station', 'trip_headsign', 'trip_short_name',
                             'route_short_name', 'direction_id', 'agency_id', 'route_desc', 'lineID')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#timetable.write.format("orc").save(path="hdfs:///user/lyan/temp_data/timetable_temp.orc", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
### Use pre-saved data
timetable = spark.read.orc("hdfs:///user/lyan/temp_data/timetable_temp.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
### Confirming for starting stop (depart = arrival)
#timetable.where(timetable.stop_sequence == 1).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
#timetable.where(timetable.stop_sequence == 1).where(timetable.departure_time == timetable.arrival_time).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### [Code] Construct the triple table

In [22]:
### Construct the triple table

timetable_uid = timetable.groupBy('lineID', 'stop_name', 'arrival_time').count()
timetable_uid = timetable_uid.drop('count')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### [Code] Construct the triple in sbb

In [23]:
### Further cleaning of sbb data

# discard rows belonging to cancelled trips
# discard the rows when there is no stop here
sbb = sbb.where((sbb.trip_failed == 'false') & (sbb.no_stop_here == 'false'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Select relevant columns
sbb = sbb.select('trip_id', 'train_type', 'transport_type', 'train_id', 'stop_name',
                 'schedule_dep', 'schedule_arrival', 'real_arrival', 'real_dep')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
###Test
#sbb.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
### Temp
# sbb.groupBy('transport_type', 'train_type').count().orderBy('transport_type').show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Calculate arrival delay
# Remove negative or "null" arrival delay, except for starting station
sbb = sbb.withColumn('arr_delay',  functions.unix_timestamp('real_arrival', 'dd.MM.yyyy HH:mm:ss')
                                 - functions.unix_timestamp('schedule_arrival', 'dd.MM.yyyy HH:mm'))
sbb = sbb.where((sbb.arr_delay >= 0) | (sbb.real_arrival == ''))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
### Construct lineID field for sbb

def sbb_get_uid(transport_type, train_type, train_id):
    # Use "train_id" as lineID for Bus or Tram
    if ((transport_type == 'Bus') | (transport_type == 'Tram')):
        lineID = train_id
    # train_id is only unique for one train_type
    # Use train_type + train_id for trains
    elif (transport_type == 'Zug'):   
        lineID = u"{}:{}".format(train_type, train_id)
    # No lineID for rows without clear transport_type
    else:
        lineID = None
    return lineID
sbb_uid_getter = functions.udf(lambda a,b,c: sbb_get_uid(a, b, c))

sbb = sbb.withColumn('lineID', sbb_uid_getter('transport_type', 'train_type', 'train_id'))

sbb = sbb.where(sbb.lineID.isNotNull())

sbb = sbb.drop('transport_type', 'train_type', 'train_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
#sbb.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
### Reformat & rename "schedule_arrival" column to match the "arrival_time" column in timetable

def ts_convert(arr, depart):
    # Check if starting station
    if arr == '':
        arr = depart
    return arr.split(' ')[-1] + ':00'
ts_converter = functions.udf(lambda x,y: ts_convert(x,y))

sbb = sbb.withColumn('arrival_time', ts_converter('schedule_arrival', 'schedule_dep'))

sbb = sbb.drop('trip_id', 'schedule_dep', 'schedule_arrival', 'real_arrival', 'real_dep')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
#sbb.write.format("orc").save(path="hdfs:///user/lyan/temp_data/sbb_ready_to_join.orc", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### [Code] Obtain historical delay lists

In [32]:
# load pre-saved data
sbb = spark.read.orc("hdfs:///user/lyan/temp_data/sbb_ready_to_join.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
### Join sbb with triple table
joined_sbb = sbb.join(timetable_uid, on = ['lineID', 'stop_name', 'arrival_time'], how = 'inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
#joined_sbb.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
### Get historical delay lists of each triple
delay_list = joined_sbb.groupBy('lineID', 'stop_name', 'arrival_time').agg(functions.collect_list('arr_delay').alias('delays'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
#delay_list.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
### Append corresponding historical delay list to each row of timetable
timetable_w_delays = timetable.join(delay_list, on = ['lineID', 'stop_name', 'arrival_time'], how = 'inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
#delay_list.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
#timetable_w_delays.write.format("orc").save(path="hdfs:///user/lyan/temp_data/timetable_w_delay.orc", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## [Part III] Model the public transport infrastructure

In this part, we need to build three network as follows to serve the algorithm in partIV:
1. Network according to trip_id, stop_id, stop_sequence; Because there are some stops which are skipped in some trips even they are in the same routes.
2. Network for transfer among stops in the same station
3. Walking Network. This network is built according to pairwise distance of stops which are less than 500m. Some stops which have been included in the same station transfer network are exclude of this network.

### Network based on trip id, stop id, stop sequence

In [40]:
#load pre-saved data
timetable_w_delays = spark.read.orc("hdfs:///user/lyan/temp_data/timetable_w_delay.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
from pyspark.sql.functions import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# change to date time
new_timetable = timetable_w_delays.withColumn('arrival_time', functions.from_unixtime(functions.unix_timestamp('arrival_time', 'HH:mm:ss'), 'HH:mm:ss'))
new_timetable = new_timetable.withColumn('departure_time', functions.from_unixtime(functions.unix_timestamp('departure_time', 'HH:mm:ss'), 'HH:mm:ss'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# select related columns construct network
trip_info = new_timetable.select('trip_id','stop_id','stop_name','stop_lat','stop_lon','arrival_time', 'departure_time','stop_sequence')
# group all stops by trip_id
info_df = trip_info.groupBy('trip_id').agg(collect_list(array("stop_id", "stop_sequence"))).withColumnRenamed("collect_list(array(stop_id, stop_sequence))", "stops")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
def sort_stops_by_arr(x):
    """
    sort (stop,sequence) array according to sequence in the trip
    """
    xx = map(lambda d: [d[0],int(d[1])],x)
    return sorted(xx,key=lambda s:s[1])
sort_stops_udf = udf(sort_stops_by_arr)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
# sort stop according to sequence for each trip
trip = info_df.withColumn('sorted_stops',sort_stops_udf('stops')).select('trip_id','sorted_stops')
trip.show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|trip_id                 |sorted_stops                                                                                                                                                                                                                                                                          |
+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|104.TA.26-733-j19-1.2.R |[[8573205:0:D, 1], [8580301, 2], [8588553, 3], [8573211,

In [46]:
trip_df = trip.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
# constuct netowrk
# because in one trip, some stops can be repeated more than once or be skipped
# so we preserve the sequence NO. of them
import re
nw = {'trip_id':[],'src':[],'dst':[],'src_sequence':[],'dst_sequence':[]}
for record in trip_df.values:
    if ',' not in record[1]:
        continue
    tmp = re.sub(r"^\[{1}", "", record[1])
    tmp = re.sub(r"\]{1}$", "", tmp)
    tmp = re.sub(r"], ", "]-", tmp)
    stops = tmp.split('-')
    stop_sequence = [s.lstrip('[').rstrip(']').split(',') for s in stops ]
    for i in range(len(stop_sequence)):
        if (i+1)==len(stop_sequence):
            break
        nw['src'].append(stop_sequence[i][0].strip().encode('ascii', 'ignore'))
        nw['src_sequence'].append(int(stop_sequence[i][1]))
        nw['dst'].append(stop_sequence[i+1][0].strip().encode('ascii', 'ignore'))
        nw['dst_sequence'].append(int(stop_sequence[i+1][1]))
        nw['trip_id'].append(record[0].strip().encode('ascii', 'ignore'))
nw_df = pd.DataFrame(nw)
nw_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

       dst  dst_sequence          src  src_sequence                  trip_id
0  8580301             2  8573205:0:D             1  104.TA.26-733-j19-1.2.R
1  8588553             3      8580301             2  104.TA.26-733-j19-1.2.R
2  8573211             4      8588553             3  104.TA.26-733-j19-1.2.R
3  8590699             5      8573211             4  104.TA.26-733-j19-1.2.R
4  8587420             6      8590699             5  104.TA.26-733-j19-1.2.R

In [48]:
nw_spdf = spark.createDataFrame(nw_df)
nw_spdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------+-----------+------------+--------------------+
|        dst|dst_sequence|        src|src_sequence|             trip_id|
+-----------+------------+-----------+------------+--------------------+
|    8580301|           2|8573205:0:D|           1|104.TA.26-733-j19...|
|    8588553|           3|    8580301|           2|104.TA.26-733-j19...|
|    8573211|           4|    8588553|           3|104.TA.26-733-j19...|
|    8590699|           5|    8573211|           4|104.TA.26-733-j19...|
|    8587420|           6|    8590699|           5|104.TA.26-733-j19...|
|    8580434|           7|    8587420|           6|104.TA.26-733-j19...|
|    8576153|           8|    8580434|           7|104.TA.26-733-j19...|
|    8580433|           9|    8576153|           8|104.TA.26-733-j19...|
|    8588553|           2|    8591031|           1|111.TA.79-736-j19...|
|    8580301|           3|    8588553|           2|111.TA.79-736-j19...|
|8573205:0:L|           4|    8580301|           3|

In [49]:
nw_spdf.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

187032

In [50]:
# join arrival time and departure time to network dataframe
# again, as one stop may be repeated more than once, sequence NO. should also be considered to maintain their arrival & departure time
tripmeta = trip_info.select(col("trip_id").alias("meta_trip_id"),'stop_id','arrival_time', 'departure_time','stop_sequence')
tmp = nw_spdf.join(tripmeta, [nw_spdf.trip_id == tripmeta.meta_trip_id,nw_spdf.src == tripmeta.stop_id, nw_spdf.src_sequence == tripmeta.stop_sequence], how = 'inner')\
                    .select('trip_id',col('departure_time').alias('src_dep'),'src','dst','src_sequence','dst_sequence')
edges_info = tmp.join(tripmeta, [tmp.trip_id == tripmeta.meta_trip_id,tmp.dst == tripmeta.stop_id,nw_spdf.dst_sequence == tripmeta.stop_sequence], how = 'inner')\
                    .select('trip_id','src_dep','src','dst',col('arrival_time').alias('dst_arr'),'dst_sequence','src_sequence')
# change to datetime
edges_info = edges_info.withColumn('unix_dep',unix_timestamp('src_dep', 'HH:mm:ss'))
edges_info = edges_info.withColumn('unix_arr',unix_timestamp('dst_arr', 'HH:mm:ss'))
# compute time cost for each edge
edges_info = edges_info.withColumn('time_cost',(col('unix_arr') - col('unix_dep')))
edges_info = edges_info.select('trip_id','src_dep','src','dst','dst_arr','time_cost')
edges_info.cache()
edges_info.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+-----------+-----------+--------+---------+
|             trip_id| src_dep|        src|        dst| dst_arr|time_cost|
+--------------------+--------+-----------+-----------+--------+---------+
|104.TA.26-733-j19...|07:33:00|8573205:0:D|    8580301|07:33:00|        0|
|104.TA.26-733-j19...|07:33:00|    8580301|    8588553|07:34:00|       60|
|104.TA.26-733-j19...|07:34:00|    8588553|    8573211|07:36:00|      120|
|104.TA.26-733-j19...|07:36:00|    8573211|    8590699|07:37:00|       60|
|104.TA.26-733-j19...|07:37:00|    8590699|    8587420|07:40:00|      180|
|104.TA.26-733-j19...|07:40:00|    8587420|    8580434|07:41:00|       60|
|104.TA.26-733-j19...|07:41:00|    8580434|    8576153|07:42:00|       60|
|104.TA.26-733-j19...|07:42:00|    8576153|    8580433|07:44:00|      120|
|111.TA.79-736-j19...|18:39:00|    8591031|    8588553|18:41:00|      120|
|111.TA.79-736-j19...|18:41:00|    8588553|    8580301|18:42:00|       60|
|111.TA.79-736-j19...|18:

In [51]:
######################################################

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
#join edges_info with new_timetable to get delay: dst, dst_arr, trip_id, (dst_sq)
temp_timetable_to_join = new_timetable.select('trip_id', functions.col('arrival_time').alias('dst_arr'),
                                                         functions.col('stop_id').alias('dst'), 'delays')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
#delay_sorter = functions.udf(lambda x: sorted(x, reverse=True))
#temp_timetable_to_join = temp_timetable_to_join.withColumn('delays', delay_sorter('delays'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
edges_info_w_delay = edges_info.join(temp_timetable_to_join, on = ['trip_id', 'dst_arr', 'dst'], how = 'inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
edges_info_w_delay.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+-------+--------+-------+---------+--------------------+
|             trip_id| dst_arr|    dst| src_dep|    src|time_cost|              delays|
+--------------------+--------+-------+--------+-------+---------+--------------------+
|312.TA.26-2-A-j19...|19:34:00|8591093|19:33:00|8591299|       60|[83, 3, 0, 82, 10...|
|1548.TA.26-2-A-j1...|19:34:00|8591093|19:32:00|8591105|      120|[83, 3, 0, 82, 10...|
|1473.TA.26-2-A-j1...|09:02:00|8590318|09:00:00|8591051|      120|[45, 20, 74, 112,...|
|1502.TA.26-2-A-j1...|14:39:00|8590318|14:38:00|8591051|       60|[93, 155, 15, 51,...|
|229.TA.26-2-A-j19...|08:18:00|8576197|08:17:00|8576196|       60|[67, 34, 6, 13, 6...|
|1475.TA.26-2-A-j1...|08:18:00|8576197|08:17:00|8576198|       60|[67, 34, 6, 13, 6...|
|462.TA.26-2-A-j19...|09:43:00|8576199|09:42:00|8576198|       60|[163, 72, 42, 42,...|
|1541.TA.26-2-A-j1...|13:29:00|8591165|13:27:00|8591222|      120|[157, 38, 142, 52...|
|421.TA.26-2-A-j19...|11:20:00|8

In [56]:
######################################################

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
# check if any time_cost smaller than 0 
edges_info.filter(col('time_cost') < 0).show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+---+---+-------+---------+
|trip_id|src_dep|src|dst|dst_arr|time_cost|
+-------+-------+---+---+-------+---------+
+-------+-------+---+---+-------+---------+

### Same station transfer network

In [58]:
def one_more_stops(x):
    """
    filter stations with more stops
    """
    return len(x) > 1
one_more_stops_udf = udf(one_more_stops)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
# filter stations with more stops
more_stops_stations = stations.withColumn('more_stops',one_more_stops_udf('stops')).filter(col('more_stops') == 'true')
more_stops_stations.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

86

In [60]:
more_stops_stations.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+-----------------+----------------+----------+
|       station_name|               stops|      station_lat|     station_lon|more_stops|
+-------------------+--------------------+-----------------+----------------+----------+
|Winkel am Zürichsee|[8503111, 8503111...|47.29724984915355|8.59897932798565|      true|
+-------------------+--------------------+-----------------+----------------+----------+
only showing top 1 row

In [61]:
same_stations_df = more_stops_stations.select('station_name','stops').toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
# build double direction edge between two stops in one stations
# this means stops have a edge can be transferred in one stations
from itertools import combinations 
same_stations_transfer_nw = {'src':[],'dst':[]}
for record in same_stations_df.values:
    stops = [x.strip().encode('ascii', 'ignore') for x in record[1]]
    comb = combinations(stops, 2) 
    for c in comb:
        same_stations_transfer_nw['src'].append(c[0])
        same_stations_transfer_nw['src'].append(c[1])
        same_stations_transfer_nw['dst'].append(c[1])
        same_stations_transfer_nw['dst'].append(c[0])
same_stations_transfer_nw_df = pd.DataFrame(same_stations_transfer_nw)
same_stations_transfer_nw_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

           dst          src
0  8503111:0:1      8503111
1      8503111  8503111:0:1
2  8503111:0:2      8503111
3      8503111  8503111:0:2
4     8503111P      8503111

In [63]:
# add infomation and columns need to be the same with the network built by time table
same_stations_transfer_nw_df['trip_id'] = "sameStationsTransfer"
same_stations_transfer_nw_df['time_cost'] = 120
same_stations_transfer_nw_df['src_dep'] = "00:00:00"
same_stations_transfer_nw_df['dst_arr'] = "00:00:00"
same_stations_transfer_nw_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

           dst          src  ...   src_dep   dst_arr
0  8503111:0:1      8503111  ...  00:00:00  00:00:00
1      8503111  8503111:0:1  ...  00:00:00  00:00:00
2  8503111:0:2      8503111  ...  00:00:00  00:00:00
3      8503111  8503111:0:2  ...  00:00:00  00:00:00
4     8503111P      8503111  ...  00:00:00  00:00:00

[5 rows x 6 columns]

In [64]:
same_stations_transfer_spdf = spark.createDataFrame(same_stations_transfer_nw_df)
same_stations_transfer_spdf = same_stations_transfer_spdf.withColumn('src_dep',from_unixtime(unix_timestamp('src_dep', 'HH:mm:ss'), 'HH:mm:ss'))
same_stations_transfer_spdf = same_stations_transfer_spdf.withColumn('dst_arr',from_unixtime(unix_timestamp('dst_arr', 'HH:mm:ss'), 'HH:mm:ss'))
same_stations_transfer_spdf.show()
### TODO: add fake delay column

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+--------------------+---------+--------+--------+
|        dst|        src|             trip_id|time_cost| src_dep| dst_arr|
+-----------+-----------+--------------------+---------+--------+--------+
|8503111:0:1|    8503111|sameStationsTransfer|      120|00:00:00|00:00:00|
|    8503111|8503111:0:1|sameStationsTransfer|      120|00:00:00|00:00:00|
|8503111:0:2|    8503111|sameStationsTransfer|      120|00:00:00|00:00:00|
|    8503111|8503111:0:2|sameStationsTransfer|      120|00:00:00|00:00:00|
|   8503111P|    8503111|sameStationsTransfer|      120|00:00:00|00:00:00|
|    8503111|   8503111P|sameStationsTransfer|      120|00:00:00|00:00:00|
|8503111:0:2|8503111:0:1|sameStationsTransfer|      120|00:00:00|00:00:00|
|8503111:0:1|8503111:0:2|sameStationsTransfer|      120|00:00:00|00:00:00|
|   8503111P|8503111:0:1|sameStationsTransfer|      120|00:00:00|00:00:00|
|8503111:0:1|   8503111P|sameStationsTransfer|      120|00:00:00|00:00:00|
|   8503111P|8503111:0:2|

In [65]:
#################

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
def dummy(x):
    return []

dummier = functions.udf(lambda x: dummy(x), ArrayType(LongType()))

same_stations_transfer_spdf_w_delay = same_stations_transfer_spdf.withColumn('delays', dummier('time_cost'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
# union all to one dataframe
edges_with_sameStationTransfer_w_delay = edges_info_w_delay.unionByName(same_stations_transfer_spdf_w_delay)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
#edges_with_sameStationTransfer_w_delay.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
#################

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
# union all to one dataframe
edges_with_sameStationTransfer = edges_info.unionByName(same_stations_transfer_spdf)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
edges_info.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

187032

In [72]:
same_stations_transfer_spdf.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2082

In [73]:
edges_with_sameStationTransfer.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

189114

### Walking network

In [74]:
# to construct walking network, we need to construct vertex information dataframe
all_info = trip_info.select('trip_id','stop_id','stop_name','stop_lat','stop_lon','arrival_time', 'departure_time')
vertex_info = all_info.select('stop_id','stop_name','stop_lat','stop_lon').distinct()
vertex_info.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+----------------+
|    stop_id|           stop_name|        stop_lat|        stop_lon|
+-----------+--------------------+----------------+----------------+
|    8575921|Effretikon, Linde...|47.4260215925169|8.69948284198693|
|    8590727|Oberengstringen, ...|47.4104852703573|8.45874332896223|
|    8503091|   Zürich Giesshübel|47.3624553927874|8.52184997768041|
|    8502270|         Bergfrieden|47.3977111751049|8.39908621093555|
|    8573234|   Kloten, Hohrainli|47.4596193089001|8.58166879245825|
|    8590759|Regensdorf, Stras...|47.4397123662621|8.46168081994171|
|    8575941|    Lindau, Eschikon|47.4480782093562|8.68228908744649|
|    8591181|Zürich, Heerenwiesen|47.4047157270536|8.57633279966984|
|    8576248|     Watt, Geerenweg|47.4390014816685|8.47723964066482|
|    8591136|  Zürich, Frankental|47.4057006674825|8.48137189097235|
|    8591832|Pfaffhausen, Müseren|47.3626987847054|8.61754750491098|
|8573213:0:C|Zürich Flughafen,...|

In [75]:
def convert_to_hour_trip_dict(x):
    """
    for each vertex, record the time of the trip it has
    """
    hour_trip = dict()
    for tmp in x:
        if tmp[0] in hour_trip:
            hour_trip[tmp[0]].append(tmp[1])
        else:
            hour_trip[tmp[0]] = [tmp[1]]
    return hour_trip
hour_trip_dict = udf(convert_to_hour_trip_dict)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
# add stop infomation according to timetable
vertex_arrhour = all_info.withColumn('arr_hour',functions.hour(all_info.arrival_time)).groupBy('stop_id').agg(collect_list(array('arr_hour','trip_id')))\
                         .withColumnRenamed("collect_list(array(arr_hour, trip_id))", "arr_hour").withColumn('arr_hourtrip',hour_trip_dict('arr_hour')).select('stop_id','arr_hourtrip')
vertex_dephour = all_info.withColumn('dep_hour',functions.hour(all_info.departure_time)).groupBy('stop_id').agg(collect_list(array('dep_hour','trip_id')))\
                         .withColumnRenamed("collect_list(array(dep_hour, trip_id))", "dep_hour").withColumn('dep_hourtrip',hour_trip_dict('dep_hour')).select('stop_id','dep_hourtrip')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
# add arrival & departure hour information to vertex
vertex_info = vertex_info.join(vertex_arrhour,on =['stop_id'],how='left')
vertex_info = vertex_info.join(vertex_dephour,on =['stop_id'],how='left')
vertex_info.cache()
vertex_info.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+----------------+--------------------+--------------------+
|    stop_id|           stop_name|        stop_lat|        stop_lon|        arr_hourtrip|        dep_hourtrip|
+-----------+--------------------+----------------+----------------+--------------------+--------------------+
|    8575921|Effretikon, Linde...|47.4260215925169|8.69948284198693|{11=[618.TA.26-65...|{11=[618.TA.26-65...|
|    8590727|Oberengstringen, ...|47.4104852703573|8.45874332896223|{11=[259.TA.26-30...|{11=[259.TA.26-30...|
|    8503091|   Zürich Giesshübel|47.3624553927874|8.52184997768041|{11=[73.TA.26-4-B...|{11=[73.TA.26-4-B...|
|    8502270|         Bergfrieden|47.3977111751049|8.39908621093555|{11=[115.TA.1-17-...|{11=[115.TA.1-17-...|
|    8573234|   Kloten, Hohrainli|47.4596193089001|8.58166879245825|{11=[120.TA.26-73...|{11=[120.TA.26-73...|
|    8590759|Regensdorf, Stras...|47.4397123662621|8.46168081994171|{11=[21.TA.26-452...|{11=[21.TA.26-452...|
|

In [78]:
# rename stop_id as id
vertex_info = vertex_info.select(col('stop_id').alias('id'),'stop_name','stop_lat','stop_lon','arr_hourtrip','dep_hourtrip')
vertex_info.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+----------------+----------------+--------------------+--------------------+
|     id|           stop_name|        stop_lat|        stop_lon|        arr_hourtrip|        dep_hourtrip|
+-------+--------------------+----------------+----------------+--------------------+--------------------+
|8575921|Effretikon, Linde...|47.4260215925169|8.69948284198693|{11=[618.TA.26-65...|{11=[618.TA.26-65...|
|8590727|Oberengstringen, ...|47.4104852703573|8.45874332896223|{11=[259.TA.26-30...|{11=[259.TA.26-30...|
|8503091|   Zürich Giesshübel|47.3624553927874|8.52184997768041|{11=[73.TA.26-4-B...|{11=[73.TA.26-4-B...|
|8502270|         Bergfrieden|47.3977111751049|8.39908621093555|{11=[115.TA.1-17-...|{11=[115.TA.1-17-...|
|8573234|   Kloten, Hohrainli|47.4596193089001|8.58166879245825|{11=[120.TA.26-73...|{11=[120.TA.26-73...|
|8590759|Regensdorf, Stras...|47.4397123662621|8.46168081994171|{11=[21.TA.26-452...|{11=[21.TA.26-452...|
|8575941|    Lindau, Eschikon|47.4480

In [79]:
vertex_info.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1243

In [80]:
# cross product of vertex list to build stops pair 
vertex_id = vertex_info.select('id')
vertex_loc1 = vertex_info.select(col('id').alias('id1'),col('stop_lat').alias('stop_lat1'),col('stop_lon').alias('stop_lon1'))
vertex_loc2 = vertex_info.select(col('id').alias('id2'),col('stop_lat').alias('stop_lat2'),col('stop_lon').alias('stop_lon2'))
vid_crossproduct = vertex_loc1.crossJoin(vertex_loc2)
vid_crossproduct.cache()
vid_crossproduct.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+----------------+-----------+----------------+----------------+
|    id1|       stop_lat1|       stop_lon1|        id2|       stop_lat2|       stop_lon2|
+-------+----------------+----------------+-----------+----------------+----------------+
|8575921|47.4260215925169|8.69948284198693|    8575921|47.4260215925169|8.69948284198693|
|8575921|47.4260215925169|8.69948284198693|    8590727|47.4104852703573|8.45874332896223|
|8575921|47.4260215925169|8.69948284198693|    8503091|47.3624553927874|8.52184997768041|
|8575921|47.4260215925169|8.69948284198693|    8502270|47.3977111751049|8.39908621093555|
|8575921|47.4260215925169|8.69948284198693|    8573234|47.4596193089001|8.58166879245825|
|8575921|47.4260215925169|8.69948284198693|    8590759|47.4397123662621|8.46168081994171|
|8575921|47.4260215925169|8.69948284198693|    8575941|47.4480782093562|8.68228908744649|
|8575921|47.4260215925169|8.69948284198693|    8591181|47.4047157270536|8.57633279966984|
|8575921|4

In [81]:
# compute distance between two stops to filter out stops which can be reachable to each other on foot
# we consider stops with distance < 500m are reachable by walk 
distance_twostops_udf = udf(distance)
stops_can_walk = vid_crossproduct.filter(col('id1')!=col('id2')).withColumn('distance',distance_twostops_udf('stop_lon1','stop_lat1','stop_lon2','stop_lat2'))\
                      .filter(col('distance')<0.5).withColumn('time_cost',col('distance')*1000/50*60)
stops_can_walk.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+----------------+-----------+----------------+----------------+-------------------+------------------+
|    id1|       stop_lat1|       stop_lon1|        id2|       stop_lat2|       stop_lon2|           distance|         time_cost|
+-------+----------------+----------------+-----------+----------------+----------------+-------------------+------------------+
|8575921|47.4260215925169|8.69948284198693|    8575920|47.4290480888166|8.69659924992451|0.40038442297354065| 480.4613075682488|
|8575921|47.4260215925169|8.69948284198693|    8503370|47.4238215000378| 8.7010638768872| 0.2720205125095378| 326.4246150114454|
|8575921|47.4260215925169|8.69948284198693|    8503372|47.4271216042743|8.69411989973999|0.42157359412504647|505.88831295005576|
|8590727|47.4104852703573|8.45874332896223|    8590728|47.4091295756792|8.46260608468448| 0.3274408398465013| 392.9290078158016|
|8590727|47.4104852703573|8.45874332896223|    8590833|47.4122360710415|8.45316479104707|0.462725

In [82]:
# construct walking network
walking_nw = stops_can_walk.select(col('id1').alias('src'),col('id2').alias('dst'),'time_cost')
walking_nw.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+------------------+
|    src|        dst|         time_cost|
+-------+-----------+------------------+
|8575921|    8575920| 480.4613075682488|
|8575921|    8503370| 326.4246150114454|
|8575921|    8503372|505.88831295005576|
|8590727|    8590728| 392.9290078158016|
|8590727|    8590833| 555.2705340956833|
|8503091|    8591415| 375.5602605880987|
|8503091|    8503051| 310.2358988536467|
|8503091|    8591366|336.61001760671263|
|8502270|    8590220|465.90537207068706|
|8502270|8502186:0:1| 566.5131006861697|
|8502270|    8517376| 594.4587083886204|
|8502270|8502186:0:2| 558.4008325136786|
|8502270|    8590521| 590.3615083862259|
|8573234|    8573232|489.34341226862466|
|8573234|    8573233|284.46861281858094|
|8590759|    8576279|457.35014239267895|
|8590759|    8590755| 466.0443754524091|
|8590759|    8590756| 550.4043863268454|
|8590759|    8576270|297.30565225585434|
|8590759|    8576269| 328.9750708891323|
+-------+-----------+------------------+
only showing top

In [83]:
# check if exists two same edge in walking network
walking_nw.groupBy('src','dst').count().filter(col('count')>1).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---+-----+
|src|dst|count|
+---+---+-----+
+---+---+-----+

In [84]:
walking_nw.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6914

In [85]:
# stops can be transfer in one station doesn't include in this case
# drop them out of the walking network
not_in_one_stations = walking_nw.select('src','dst').join(same_stations_transfer_spdf,on=['src','dst'],how="leftanti")
not_in_one_stations.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------------+
|         src|         dst|
+------------+------------+
| 8502208:0:4|     8573552|
|8503000:0:13|     8591262|
|8503000:0:34|8503088:0:21|
| 8503000:0:9|     8587349|
| 8503011:0:1|     8573710|
| 8503508:0:5|     8595511|
|     8573504|     8588055|
|     8576195|     8530813|
|     8576259|     8576258|
|     8576283| 8503315:0:3|
|     8580438|     8580435|
|     8587348|     8591367|
|     8587349|8503000:0:34|
|     8587420|     8576152|
|     8588003|     8588001|
|     8588005|     8588007|
|     8588078|8503000:0:31|
|     8588736|     8580434|
|     8590519|     8590520|
|     8590529|     8590217|
+------------+------------+
only showing top 20 rows

In [86]:
not_in_one_stations.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5872

In [87]:
walking_nw = walking_nw.join(not_in_one_stations, on=['src','dst'],how='inner')
walking_nw.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------------+------------------+
|         src|         dst|         time_cost|
+------------+------------+------------------+
| 8502208:0:4|     8573552| 560.0945691828715|
|8503000:0:13|     8591262|  433.763953695952|
|8503000:0:34|8503088:0:21|217.12913884881664|
| 8503000:0:9|     8587349|227.30588964219046|
| 8503011:0:1|     8573710| 69.31110061338293|
| 8503508:0:5|     8595511| 345.5078520541618|
|     8573504|     8588055| 434.3485394526424|
|     8576195|     8530813| 585.6793077043894|
|     8576259|     8576258| 339.2625057956586|
|     8576283| 8503315:0:3| 540.3562358840627|
|     8580438|     8580435|484.39368059408264|
|     8587348|     8591367| 488.7278973709142|
|     8587349|8503000:0:34|240.32341513504582|
|     8587420|     8576152| 528.4816300244935|
|     8588003|     8588001| 464.9911865299479|
|     8588005|     8588007|436.77478536669275|
|     8588078|8503000:0:31|457.38065027456025|
|     8588736|     8580434| 345.2813900640017|
|     8590519

In [88]:
walking_nw.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5872

In [89]:
# final walking network
### TODO: add delay list column
walking_nw = walking_nw.withColumn('trip_id',lit('walk'))
walking_nw = walking_nw.withColumn('src_dep',from_unixtime(unix_timestamp(lit('00:00:00'), 'HH:mm:ss'), 'HH:mm:ss'))
walking_nw = walking_nw.withColumn('dst_arr',from_unixtime(unix_timestamp(lit('00:00:00'), 'HH:mm:ss'), 'HH:mm:ss'))
walking_nw.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------------+------------------+-------+--------+--------+
|         src|         dst|         time_cost|trip_id| src_dep| dst_arr|
+------------+------------+------------------+-------+--------+--------+
| 8502208:0:4|     8573552| 560.0945691828715|   walk|00:00:00|00:00:00|
|8503000:0:13|     8591262|  433.763953695952|   walk|00:00:00|00:00:00|
|8503000:0:34|8503088:0:21|217.12913884881664|   walk|00:00:00|00:00:00|
| 8503000:0:9|     8587349|227.30588964219046|   walk|00:00:00|00:00:00|
| 8503011:0:1|     8573710| 69.31110061338293|   walk|00:00:00|00:00:00|
| 8503508:0:5|     8595511| 345.5078520541618|   walk|00:00:00|00:00:00|
|     8573504|     8588055| 434.3485394526424|   walk|00:00:00|00:00:00|
|     8576195|     8530813| 585.6793077043894|   walk|00:00:00|00:00:00|
|     8576259|     8576258| 339.2625057956586|   walk|00:00:00|00:00:00|
|     8576283| 8503315:0:3| 540.3562358840627|   walk|00:00:00|00:00:00|
|     8580438|     8580435|484.39368059408264|   wa

In [90]:
##############

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
def dummy2(x):
    return []

dummier2 = functions.udf(lambda x: dummy2(x), ArrayType(LongType()))

walking_nw_w_delay = walking_nw.withColumn('delays', dummier2('time_cost'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [92]:
# union all to one dataframe
all_edges_info_w_delay = edges_with_sameStationTransfer_w_delay.unionByName(walking_nw_w_delay)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
##############

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
# union all
all_edges_info = edges_with_sameStationTransfer.unionByName(walking_nw)
all_edges_info.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

194986

In [95]:
### Save/Load output of this part
#all_edges_info.write.format("orc").save(path="hdfs:///user/lyan/temp_data/all_edges_info.orc", mode="overwrite")
#all_edges_info_w_delay.write.format("orc").save(path="hdfs:///user/lyan/temp_data/all_edges_info_w_delay.orc", mode="overwrite")
#vertex_info.write.format("orc").save(path="hdfs:///user/lyan/temp_data/vertex_info.orc", mode="overwrite")

### pre-load saved data, will be used in part IV & V
all_edges_info = spark.read.orc("hdfs:///user/lyan/temp_data/all_edges_info.orc")
all_edges_info_w_delay = spark.read.orc("hdfs:///user/lyan/temp_data/all_edges_info_w_delay.orc")
vertex_info = spark.read.orc("hdfs:///user/lyan/temp_data/vertex_info.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
# convert spark dataframe to pandas dataframe for convenience to feed networkx
all_edges_info_df =  all_edges_info.toPandas()
all_edges_info_w_delay_df = all_edges_info_w_delay.toPandas()
vertex_info_df = vertex_info.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:
all_edges_info_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                   trip_id   src_dep  ...   dst_arr time_cost
0    60.TA.26-2-j19-1.10.R  15:19:00  ...  15:24:00     300.0
1   62.TA.26-133-j19-1.2.R  12:00:00  ...  12:02:00     120.0
2   62.TA.26-133-j19-1.2.R  12:02:00  ...  12:04:00     120.0
3   62.TA.26-133-j19-1.2.R  12:04:00  ...  12:06:00     120.0
4  736.TA.26-131-j19-1.7.R  17:07:00  ...  17:08:00      60.0

[5 rows x 6 columns]

In [98]:
all_edges_info_w_delay_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                     trip_id  ...                                             delays
0    312.TA.26-2-A-j19-1.1.H  ...  [83, 3, 0, 82, 10, 111, 40, 0, 6, 55, 1, 84, 1...
1  1548.TA.26-2-A-j19-1.24.R  ...  [83, 3, 0, 82, 10, 111, 40, 0, 6, 55, 1, 84, 1...
2  1473.TA.26-2-A-j19-1.24.R  ...  [45, 20, 74, 112, 248, 114, 42, 42, 6, 0, 16, ...
3  1502.TA.26-2-A-j19-1.24.R  ...  [93, 155, 15, 51, 12, 10, 76, 91, 126, 96, 174...
4    229.TA.26-2-A-j19-1.1.H  ...  [67, 34, 6, 13, 62, 38, 107, 36, 47, 110, 6, 2...

[5 rows x 7 columns]

In [99]:
vertex_info_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        id  ...                                       dep_hourtrip
0  8576193  ...  {11=[1139.TA.26-5-B-j19-1.23.R, 660.TA.26-15-A...
1  8591891  ...  {11=[634.TA.26-303-j19-1.10.R, 418.TA.26-303-j...
2  8590223  ...  {11=[323.TA.26-306-j19-1.2.H, 13.TA.1-305-j19-...
3  8591283  ...  {11=[2504.TA.26-7-B-j19-1.17.H, 2505.TA.26-7-B...
4  8502776  ...  {11=[856.TA.26-140-j19-1.6.H, 855.TA.26-140-j1...

[5 rows x 6 columns]

In [100]:
def time2timestamps(time):
    """
    convert time string to time, np.datetime64 need to feed a date
    """
    # time string like "12:30:00"
    arr_time = np.datetime64(time)
    return arr_time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [101]:
#convert string to datetime
# This parameter is hardcoded and only used to workaround a numpy limitation 
# np.datetime64 need to feed a date
date = '2000-01-01T'
all_edges_info_df['src_dep'] =all_edges_info_df['src_dep'].apply(lambda x: time2timestamps(date + x))
all_edges_info_df['dst_arr'] =all_edges_info_df['dst_arr'].apply(lambda x: time2timestamps(date + x))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## [Part IV] Basic Route Planning Algorithm (without uncertainty)

In our route planning process, we establish several assumptions to limit the search space of our algorithm:
1. As the problem aims to find the paths under an area, which is 15km from Zurich HB the farrest, the diameter of this area is 30km, thus we assume the longest distance of a trip is 40km. In this case, the total time cost will not exceed 3 hours according to reality. So we can assume the last stops of expected trip can be reached in 3 hours.
2. As the problem ask us to find the latest departure time of a trip, and the latest arrival at the last stops is highly possble to obtain the latest departure time. However, it is not always the case and there are also situations that we arrive a bit earlier but we can still depart later. Therefore, we assume we can get the latest departure time if we just consider the trip which contains the last stop with arrival time 30 minutes before expected one. 
3. We assume that the total number of transfer will not exceed 4 times including taking other transportations, walk to the other stop(not in the same station) and same station transfer.
4. We only consider routes that have historical delay data.

### Step 1: filter out edges satisfied the assumption 1

In [102]:
def select_edges(all_edges_info_df, arr_time, start_time):
    return all_edges_info_df[(((all_edges_info_df['dst_arr']<=arr_time) & (all_edges_info_df['src_dep']<=arr_time)&
                            (all_edges_info_df['dst_arr']>=start_time) & (all_edges_info_df['src_dep']>=start_time) & 
                            (all_edges_info_df['trip_id'] != "walk") & (all_edges_info_df['trip_id'] != "sameStationsTransfer"))) |
                            ((all_edges_info_df['trip_id'] == "walk") | (all_edges_info_df['trip_id'] == "sameStationsTransfer"))]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: filter out edges satisfied the assumption 2

In [103]:
def drop_edges(selected_edges,arr_stop, arg3):
    drop_earlyarr = selected_edges[(selected_edges['dst'] == arr_stop) & (selected_edges['dst_arr']<=arg3) & 
                                (selected_edges['trip_id'] != "sameStationsTransfer") &(selected_edges['trip_id'] != "walk")]
    drop_relatedtrip = drop_earlyarr[drop_earlyarr['trip_id'].isin(drop_earlyarr['trip_id'].unique())]
    return selected_edges.drop(list(drop_relatedtrip.index))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 3: build edges in each trip

Next, add edges if they are in one trip (A->B->C => A->C) according to their order  as follows. This will make algorithm more easy to find a path.

<img src="./DSLab-1.png"/>

In this case, if we go from  D2 to D1, then once we meet A1, we can know that this trip can take us to D1 and the stops in the middle will no longer important. By this way, the algo directly know that D2 to D1 have a path D2-A1-D1, rather than D2-A1-B1-C1-D1, which reduce the size of output and the time of the algo to search for paths.

<img  src="./DSLab-2.png"/>

In [104]:
# 7～8 minutes
def add_edges_in_one_trip(selected_edges):
    new = {"trip_id":[],"src_dep":[],"src":[],"dst":[],"dst_arr":[],"time_cost":[]}
    for t_id in selected_edges['trip_id'].unique():
        if t_id == "sameStationsTransfer" or t_id == "walk":
            continue
        tmp = selected_edges[selected_edges['trip_id'] == t_id].sort_values('src_dep')
        if len(tmp) == 1:
            continue
        trip_id = t_id
        for i in range(len(tmp.values)):
            if i+1 == (len(tmp.values) - 1):
                break
            src_dep = tmp.values[i][1]
            src = tmp.values[i][2]
            time_cost = tmp.values[i][-1]
            for j in range(i+1,len(tmp.values)):
                dst = tmp.values[j][-3]
                dst_arr = tmp.values[j][-2]
                time_cost += tmp.values[j][-1]
                new['trip_id'].append(trip_id)
                new['src_dep'].append(src_dep)
                new['src'].append(src)
                new['dst'].append(dst)
                new['dst_arr'].append(dst_arr)
                new['time_cost'].append(time_cost)
    return pd.DataFrame(new)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 4: apply assumption 3

feed to networkx, use n-depth (cutoff) DFS to find paths with length <=n.

Based on the assumption, cutoff here set to be 4, based on the assumption 3

In [105]:
def find_connected_path(new_graph_edges,dep_stop,arr_stop):
    G = nx.from_pandas_edgelist(
            new_graph_edges,
            'src',
            'dst', ['src_dep', 'dst_arr', 'time_cost', "trip_id"],
            create_using=nx.DiGraph)
    candidates = nx.all_simple_paths(G, dep_stop, arr_stop, cutoff=4)
    paths = []
    for x in candidates:
    #     print(x)
        paths.append(x)
    return paths

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 5: Rebuild path using greedy

After getting the possible paths from the n-depth DFS, we need to consider the connection in terms of time because the algo just consider the connection of space. We need to deduce the departure time of the first stop starting from the last stop. Therefore, we rebuild the path by greedy, choose the  trip with the latest departure but can also catch the next trip. We deduce backwards until it reach the first stop or it cannot find path because it cannot find a previous trip that can ensure us to catch this trip. (e.g. A->B->C, B->C we deduce we depart from B at 12:13, but A->B, we can just arrive at B at 12:15, then this path fail). The followin graph shows the process of deduction.

<img src="./DSLab-3.png"/>

In [106]:
# rebuild path by greedily, start from the last stop in the path, choose the trip with the latest arrival and deduce backwards 
def build_path_greedy(paths, new_graph_edges, arr_time):
    build_path = {"src_dep":[],"paths_info":[],"dst_arr":[],"time_cost":[]}
    for specific_path in paths:
        current_dep = arr_time
        real_arr_time = arr_time
        paths_info = []
        catch=True
        total_time_cost = 0
        for i in range(len(specific_path)-1,0,-1):
            src = specific_path[i-1]
            dst = specific_path[i]

            tmp_df = new_graph_edges[(new_graph_edges['src'] == src) & (new_graph_edges['dst'] == dst)]
            if "sameStationsTransfer" in tmp_df['trip_id'].values:
                same_dep = current_dep - np.timedelta64(120, 's')
                info = {"src":src,"src_dep":same_dep,"dst":dst,"dst_arr":current_dep,"trip_id":"sameStationTransfer"}
                paths_info.append(info)
                current_dep = same_dep
                total_time_cost += 120
            elif 'walk' not in tmp_df['trip_id'].values:
                try:
                    tmp = tmp_df[tmp_df['dst_arr'] <= current_dep].sort_values('src_dep',ascending = False).iloc[0]
                    if i == len(paths[0]) - 1:
                        real_arr_time = tmp['dst_arr']
                    current_dep = tmp['src_dep']
                    info = {"src":src,"src_dep":tmp['src_dep'],"dst":dst,"dst_arr":tmp['dst_arr'],"trip_id":tmp['trip_id']}
                    paths_info.append(info)
                    total_time_cost += tmp['time_cost']
                except:
                    # cannot catch the transportation from the last stop
                    catch=False
                    print(specific_path)
                    break
            else: 
                time_cost = tmp_df[tmp_df['trip_id'] == 'walk']['time_cost'].values[0]
                walk_current_dep = current_dep - np.timedelta64(int(np.ceil(time_cost)), 's')
                if len(tmp_df) > 1:
                    try:
                        tmp = tmp_df[(tmp_df['trip_id'] != 'walk') &
                                     (tmp_df['dst_arr'] <= current_dep)].sort_values('src_dep',ascending = False).iloc[0]
                        tp_current_dep = tmp['src_dep']
                    except:
                        # cannot catch the transportation from the last stop
                        catch=False
                        print(specific_path)
                        break
                    if walk_current_dep < tp_current_dep:
                        if i == len(paths[0]) - 1:
                            real_arr_time = tmp['dst_arr']
                        info = {"src":src,"src_dep":tp_current_dep,"dst":dst,"dst_arr":tmp['dst_arr'],"trip_id":tmp['trip_id']}
                        paths_info.append(info)
                        current_dep = tp_current_dep
                        total_time_cost += tmp['time_cost']
                    else:
                        info = {"src":src,"src_dep":walk_current_dep,"dst":dst,"dst_arr":current_dep,"trip_id":'walk'}
                        paths_info.append(info)
                        current_dep = walk_current_dep
                        total_time_cost += time_cost
                else:
                    info = {"src":src,"src_dep":walk_current_dep,"dst":dst,"dst_arr":current_dep,"trip_id":'walk'}
                    paths_info.append(info)
                    current_dep = walk_current_dep
                    total_time_cost += time_cost
        if(catch):
            build_path["src_dep"].append(current_dep)
            build_path["paths_info"].append(paths_info[::-1])
            build_path["dst_arr"].append(real_arr_time)
            build_path["time_cost"].append(total_time_cost)
    return build_path

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [107]:
# get all routes
def get_routes(all_edges_info_df, dep_stop, arr_stop, arr_time):
    ### Ignore it
    # This parameter is hardcoded and only used to workaround a numpy limitation 
    # np.datetime64 need to feed a date
    date = '2000-01-01T'

    arr_time = time2timestamps(date + arr_time)
    delta = np.timedelta64(3, 'h')
    start_time = arr_time - delta

    # step 1
    selected_edges_tmp = select_edges(all_edges_info_df, arr_time, start_time)

    # step 2
    arg3 = arr_time - np.timedelta64(30, 'm')
    selected_edges = drop_edges(selected_edges_tmp, arr_stop, arg3)

    # 7～8minutes
    # step 3
    new_selected = add_edges_in_one_trip(selected_edges)
    new_graph_edges = pd.concat([selected_edges,new_selected], sort=True)

    # step 4
    paths = find_connected_path(new_graph_edges,dep_stop,arr_stop)

    # 15minutes
    # step 5
    build_path = build_path_greedy(paths, new_graph_edges, arr_time)
    build_path_df = pd.DataFrame(build_path)
    
    return build_path_df

# print top N routes
def print_topN_routes(n, build_path_df):
    # choose the first N route as final result (latest depature time, if there are several candidates which have the same depature time , choose the one use less time cost)
    #### Ignore the date in the output ####
    firstN = build_path_df.sort_values(['src_dep','time_cost'],ascending=[False,True]).iloc[:n]
    for path in firstN['paths_info'].values:
        print("Route:")
        # for walk to correct time
        next_dep = path[0]['src_dep']
        for partial_path in path:
            src = partial_path['src']
            try:
                src_name = vertex_info_df[vertex_info_df['id'] == src]['stop_name'].values[0].encode('utf-8')
            except:
                continue
            dst = partial_path['dst']
            dst_name = vertex_info_df[vertex_info_df['id'] == dst]['stop_name'].values[0].encode('utf-8')
            if partial_path['trip_id'] != 'walk' or partial_path['trip_id'] != 'sameStationTransfer':
                next_dep = partial_path['dst_arr']
                src_dep = partial_path['src_dep'].strftime("%H:%M:%S")
                dst_arr = partial_path['dst_arr'].strftime("%H:%M:%S")
            else: # if walk or same station transfer are in the later part of the path, need to be correct
                time_cost = partial_path['dst_arr'] - partial_path['src_dep']
                src_dep = next_dep.strftime("%H:%M:%S")
                dst_arr = next_dep + time_cost
                next_dep = dst_arr
                dst_arr = dst_arr.strftime("%H:%M:%S")
            trip_id = partial_path['trip_id']
            print("[{}] From {}(id: {}) to {}(id: {}): {} ~ {}".format(trip_id,src_name,src,dst_name,dst, src_dep,dst_arr))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# @TAs The next blocks is for you to run your querys

Parameters:
- dep_stop: stop id of the departure stop
- arr_stop: stop id of the arrival stop
- arr_time: expected arrival time

**Ignore the date in the output, it is artifact of a workaround**

In [108]:
### NOTE: heavy step, don't rerun this cell unless you changed your parameter

# Change your query parameters here
dep_stop = '8503000'
arr_stop = '8591049'
arr_time = '12:30:00'

build_path_df = get_routes(all_edges_info_df, dep_stop, arr_stop, arr_time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['8503000', u'8503000:0:14', u'8503006:0:6', u'8580449', '8591049']
['8503000', u'8503000:0:14', u'8503006:0:6', u'8591314', '8591049']
['8503000', u'8503000:0:14', u'8503006:0:6', u'8591382', '8591049']

In [109]:
### Show top N routes
n = 10
print_topN_routes(n, build_path_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Route:
[32.TA.80-159-Y-j19-1.8.H] From Zürich HB(id: 8503000:0:5) to Zürich Oerlikon(id: 8503006:0:6): 12:05:00 ~ 12:11:00
[walk] From Zürich Oerlikon(id: 8503006:0:6) to Zürich Oerlikon, Bahnhof(id: 8580449): 12:13:38 ~ 12:15:00
[1914.TA.26-11-A-j19-1.27.R] From Zürich Oerlikon, Bahnhof(id: 8580449) to Zürich, Auzelg(id: 8591049): 12:15:00 ~ 12:24:00
Route:
[32.TA.80-159-Y-j19-1.8.H] From Zürich HB(id: 8503000:0:5) to Zürich Oerlikon(id: 8503006:0:6): 12:05:00 ~ 12:11:00
[walk] From Zürich Oerlikon(id: 8503006:0:6) to Zürich, Sternen Oerlikon(id: 8591382): 12:11:50 ~ 12:17:00
[1914.TA.26-11-A-j19-1.27.R] From Zürich, Sternen Oerlikon(id: 8591382) to Zürich, Auzelg(id: 8591049): 12:17:00 ~ 12:24:00
Route:
[248.TA.26-6-A-j19-1.45.H] From Zürich HB(id: 8503000:0:41/42) to Zürich Oerlikon(id: 8503006:0:8): 12:01:00 ~ 12:08:00
[walk] From Zürich Oerlikon(id: 8503006:0:8) to Zürich Oerlikon, Bahnhof(id: 8580449): 12:13:57 ~ 12:15:00
[1914.TA.26-11-A-j19-1.27.R] From Zürich Oerlikon, Bahnhof

### Result interpretation

#### 1. For the query (dep_stop = '8503000', arr_stop = '8591049', arr_time = '12:30:00'), this is the result we obtained:

Route:

[32.TA.80-159-Y-j19-1.8.H] From Zürich HB(id: 8503000:0:5) to Zürich Oerlikon(id: 8503006:0:6): 12:05:00 ~ 12:11:00

[walk] From Zürich Oerlikon(id: 8503006:0:6) to Zürich Oerlikon, Bahnhof(id: 8580449): 12:13:38 ~ 12:15:00

[1914.TA.26-11-A-j19-1.27.R] From Zürich Oerlikon, Bahnhof(id: 8580449) to Zürich, Auzelg(id: 8591049): 12:15:00 ~ 12:24:00

Route:

[32.TA.80-159-Y-j19-1.8.H] From Zürich HB(id: 8503000:0:5) to Zürich Oerlikon(id: 8503006:0:6): 12:05:00 ~ 12:11:00

[walk] From Zürich Oerlikon(id: 8503006:0:6) to Zürich, Sternen Oerlikon(id: 8591382): 12:11:50 ~ 12:17:00

[1914.TA.26-11-A-j19-1.27.R] From Zürich, Sternen Oerlikon(id: 8591382) to Zürich, Auzelg(id: 8591049): 12:17:00 ~ 12:24:00

Route:

[248.TA.26-6-A-j19-1.45.H] From Zürich HB(id: 8503000:0:41/42) to Zürich Oerlikon(id: 8503006:0:8): 12:01:00 ~ 12:08:00

[walk] From Zürich Oerlikon(id: 8503006:0:8) to Zürich Oerlikon, Bahnhof(id: 8580449): 12:13:57 ~ 12:15:00

[1914.TA.26-11-A-j19-1.27.R] From Zürich Oerlikon, Bahnhof(id: 8580449) to Zürich, Auzelg(id: 8591049): 12:15:00 ~ 12:24:00

Route:

[248.TA.26-6-A-j19-1.45.H] From Zürich HB(id: 8503000:0:41/42) to Zürich Oerlikon(id: 8503006:0:8): 12:01:00 ~ 12:08:00

[walk] From Zürich Oerlikon(id: 8503006:0:8) to Zürich, Sternen Oerlikon(id: 8591382): 12:12:21 ~ 12:17:00

[1914.TA.26-11-A-j19-1.27.R] From Zürich, Sternen Oerlikon(id: 8591382) to Zürich, Auzelg(id: 8591049): 12:17:00 ~ 12:24:00


#### 2. We captured the second-best route (departure at 12:05:00) and missed the best one:

Route:

[20.TA.26-9-A-j19-1.2.H] From Zürich HB(id: 8503000:0:41/42) to Glattbrugg(id: 8503310:0:3): 12:07:00 ~ 12:17:00

[walk] From Glattbrugg(id: 8503310:0:3) to Glattbrugg, Bahnhof(id: 8590620): 12:21:56 ~ 12:23:00

[168.TA.26-12-A-j19-1.2.H] From Glattbrugg, Bahnhof(id: 8590620) to Zürich, Auzelg(id: 8591049): 12:23:00 ~ 12:29:00

After investigation in the dataset, we found that some entries in timetable do not have data in sbb dataset (They could be pruned from sbb during the preprocessing).
And this is the reason we didn't find the best route.
For the route planning algorithm, we only consider routes that have historical delay data.

## [Part V] Evaluate the route uncertainty based on historical delay

### Idea

Here we model the certainty of a route using the cdf of the historical delay distribution at each transfer station.

The certainty is calculated as the product of the probability of catching the change at each transfer station.

The probability of catching the change at each transfer station is looked up directly from the the cdf of the historical delay distribution.

In [110]:
from scipy import stats
import math

def time_diff(time_early,time_late):
    #input should be datetime.datetime
    diff=(time_early-time_late).total_seconds()
    return diff

def calc_delay_cdf(build_path_df,historical_delay):
    success_rate=[]
    build_path_df=build_path_df.reset_index(drop=True)
    # workaround numpy limitation
    date = '2000-01-01T'
    arr_time_w_date = date + arr_time 
    for path_n in range(len(build_path_df)):
        #we will calculate the proba of successfully catching transport inversely
        src_dep=(pd.to_datetime(pd.Timestamp(arr_time_w_date)))
        trip_len=len(build_path_df['paths_info'][path_n])
        p_catch_accum=1
        for i in list(range(trip_len))[::-1]:
            dst=build_path_df['paths_info'][path_n][i]['dst']
            trip_id=build_path_df['paths_info'][path_n][i]['trip_id']
            if trip_id == "sameStationTransfer" or trip_id == "walk":
                p_catch=1
                src_dep=build_path_df['paths_info'][path_n][i]['src_dep']
            else:
                delay_for_trip = historical_delay[historical_delay['trip_id'] == trip_id].sort_values('src_dep')
                dst_arr=build_path_df['paths_info'][path_n][i]['dst_arr']
                delays=delay_for_trip[delay_for_trip['dst'] ==dst]['delays'].values[0]
                tolerance_time=time_diff(src_dep,dst_arr)
                src_dep=build_path_df['paths_info'][path_n][i]['src_dep']
                delays = np.array(delays)
                # number of elements in historical delays samller than tolerance time
                if len(delays) == 0:
                    p_catch = 1
                else:
                    p_catch=np.sum(delays<tolerance_time)/float(len(delays))
            p_catch_accum*=p_catch
        success_rate.append(p_catch_accum)
    build_path_df['success_rate']=success_rate
    return build_path_df


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [111]:
# Print top N routes with given certainty threshold

def print_topN_routes_w_certainty(n, certainty, build_path_df, all_edges_info_w_delay_df):
    # Get historical delay data
    historical_delay = all_edges_info_w_delay_df[['trip_id','src_dep','src','dst','dst_arr','time_cost', 'delays']]
    temp = build_path_df.sort_values(['src_dep','time_cost'],ascending=[False,True])
    firstN_w_certainty = calc_delay_cdf(temp, historical_delay)
    firstN_w_certainty = firstN_w_certainty[firstN_w_certainty['success_rate'] > certainty].iloc[:n]

    # Print top N routes with given certainty threshold 
    for path, probability in zip(firstN_w_certainty['paths_info'].values, firstN_w_certainty['success_rate'].values):
        print("Route:")
        print("Probablity: {}".format(probability))
        # for walk to correct time
        next_dep = path[0]['src_dep']
        for partial_path in path:
            src = partial_path['src']
            try:
                src_name = vertex_info_df[vertex_info_df['id'] == src]['stop_name'].values[0].encode('utf-8')
            except:
                continue
            dst = partial_path['dst']
            dst_name = vertex_info_df[vertex_info_df['id'] == dst]['stop_name'].values[0].encode('utf-8')
            if partial_path['trip_id'] != 'walk' or partial_path['trip_id'] != 'sameStationTransfer':
                next_dep = partial_path['dst_arr']
                src_dep = partial_path['src_dep'].strftime("%H:%M:%S")
                dst_arr = partial_path['dst_arr'].strftime("%H:%M:%S")
            else: # if walk or same station transfer are in the later part of the path, need to be correct
                time_cost = partial_path['dst_arr'] - partial_path['src_dep']
                src_dep = next_dep.strftime("%H:%M:%S")
                dst_arr = next_dep + time_cost
                next_dep = dst_arr
                dst_arr = dst_arr.strftime("%H:%M:%S")
            trip_id = partial_path['trip_id']
            print("[{}] From {}(id: {}) to {}(id: {}): {} ~ {}".format(trip_id,src_name,src,dst_name,dst, src_dep,dst_arr))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [112]:
# Print top N routes with given certainty threshold

# will take 1 or 2 minutes
# Change the following parameters for your experiment
n = 10
certainty = 0.8
print_topN_routes_w_certainty(n, certainty, build_path_df, all_edges_info_w_delay_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Route:
Probablity: 0.970904584882
[32.TA.80-159-Y-j19-1.8.H] From Zürich HB(id: 8503000:0:5) to Zürich Oerlikon(id: 8503006:0:6): 12:05:00 ~ 12:11:00
[walk] From Zürich Oerlikon(id: 8503006:0:6) to Zürich Oerlikon, Bahnhof(id: 8580449): 12:13:38 ~ 12:15:00
[1914.TA.26-11-A-j19-1.27.R] From Zürich Oerlikon, Bahnhof(id: 8580449) to Zürich, Auzelg(id: 8591049): 12:15:00 ~ 12:24:00
Route:
Probablity: 0.982893458175
[248.TA.26-6-A-j19-1.45.H] From Zürich HB(id: 8503000:0:41/42) to Zürich Oerlikon(id: 8503006:0:8): 12:01:00 ~ 12:08:00
[walk] From Zürich Oerlikon(id: 8503006:0:8) to Zürich Oerlikon, Bahnhof(id: 8580449): 12:13:57 ~ 12:15:00
[1914.TA.26-11-A-j19-1.27.R] From Zürich Oerlikon, Bahnhof(id: 8580449) to Zürich, Auzelg(id: 8591049): 12:15:00 ~ 12:24:00
Route:
Probablity: 0.974702679356
[248.TA.26-6-A-j19-1.45.H] From Zürich HB(id: 8503000:0:41/42) to Zürich Oerlikon(id: 8503006:0:8): 12:01:00 ~ 12:08:00
[walk] From Zürich Oerlikon(id: 8503006:0:8) to Zürich, Sternen Oerlikon(id: 859

## Pros:
1. This algorithm can find paths within small number of n of the n-depth DFS (nx.all_simple_path) because we connect all the path in one trip so that the algo can know all the possible destination if we get on the trip and thus find out the path. And the number of cutoff here is actually the number of transfer. Furthermore, based on our assumption 3, the number of transfer time will not exceed 4. So it is highly possible to find a path given small n.
2. Use greedy deduce previous path is easy to find the latest departure time. We always take the trip with latest departure time in the partial path. In this case, we can just check if the latest departure one in the previous partial path arrive before this latest departure time. If yes, then we can continue to move forwards the deduction, otherwise, it means this path fail and it is not possible during this period.

## Cons:
1. This algorithm need to consider all the possible path from the n-depth DFS. And the number of paths are usually very large, which cost much time to rebuild all the path. But this can be improved by building the all paths once and pickle it for future use. Thus this is not a huge concern.
2. Furthermore, this algorithm can just get the result after rebuilding all the possible paths (using sorting).
3. There is still possibility to find a real latest departure path because we just consider transfer within 4 times, although we think this probability is very small.
4. Here we model the certainty of a route using the cdf of the delay distribution. A better way would be to model some common distributions, e.g., lognorm, such that the model generalizes.

## Possible Improvement:
1. The algorithm have very large space to improve as what we designed here is basically brute force. For this problem, it can actually be constructed in an constraint shortest path problem. We had tried for few days but still cannot figure out the way how to add constraint to the problem. Apart from this, randomization algorithm may also work.
2. We model the certainty of a route using the CDF of the delay distribution. A better way would be to fit the data into some common distributions, e.g., lognorm, such that the model generalizes.
3. In pratical aspect, travaler can start at any platform inside one station, such as Zurich HB. But for now we only support input a sinle platform(not Zurich HB as a whole). This remains to be imrpoved for better user experience.