# Global Settings and Data Importation

In [1]:
%%configure
{"conf": {
    "spark.app.name": "PIA_final"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9508,application_1589299642358_4102,pyspark,idle,Link,Link,
9519,application_1589299642358_4113,pyspark,idle,Link,Link,
9524,application_1589299642358_4120,pyspark,idle,Link,Link,
9527,application_1589299642358_4123,pyspark,idle,Link,Link,
9528,application_1589299642358_4124,pyspark,idle,Link,Link,
9529,application_1589299642358_4125,pyspark,idle,Link,Link,
9530,application_1589299642358_4126,pyspark,idle,Link,Link,
9532,application_1589299642358_4128,pyspark,idle,Link,Link,
9534,application_1589299642358_4130,pyspark,idle,Link,Link,
9536,application_1589299642358_4132,pyspark,idle,Link,Link,


### Start Spark

In [2]:
# Initialization

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9553,application_1589299642358_4152,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Import Data

In [3]:
stop_times = spark.read.orc("/data/sbb/timetables/orc/stop_times")
routes = spark.read.orc("/data/sbb/timetables/orc/routes")
stops = spark.read.orc("/data/sbb/timetables/orc/stops")
calendar = spark.read.orc("/data/sbb/timetables/orc/calendar")
trips = spark.read.orc("/data/sbb/timetables/orc/trips")
transfers = spark.read.orc("/data/sbb/timetables/orc/transfers")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from geopy import distance
import pyspark.sql.functions as functions
import networkx as nx
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import from_unixtime, to_date, to_timestamp
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType
import datetime
import pyspark.sql.functions as F
import pandas as pd
from math import radians, cos, sin, asin, sqrt
from collections import deque
from heapq import heappush, heappop
import numpy as np
import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual
from __future__ import print_function
from IPython.display import display
import itertools
#from pyspark.sql.functions import *
import time
import math

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Data Cleaning

We are going to join each timetable. Each one contains information that we want to filter. 
Hence for example, here we keep only business days during the week and also we filter non - casual hours.

In [5]:
stop_times = stop_times.filter((stop_times.arrival_time <= "17:00:00") & (stop_times.departure_time >= "09:00:00"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
calendar = calendar.filter((calendar.monday == 1) & 
                           (calendar.tuesday == 1) &
                          (calendar.wednesday == 1) &
                          (calendar.thursday == 1) &
                          (calendar.friday == 1)).drop(*["monday", "tuesday", "wednesday", "thursday", 
                                                         "friday", "saturday", "sunday", "start_date", "end_date"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Once we kept only the data that interests us, we can join.

In [7]:
calendar_trips = calendar.join(trips, on = "service_id")
calendar_trips = calendar_trips.drop(*['service_id', 'route_id'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Filtering stops in Zurich

We need to retrieve all stations that are inside a 15km radius from Zurich HB.

In [8]:
lat_zurich = 47.378177 #Lat and lon for zurich HB
long_zurich = 8.540192
@functions.udf
def is_in_zurich(lat, long):
    pos_zurich = (lat_zurich, long_zurich)
    pos_to_calculate = (lat, long)
    return (distance.distance(pos_zurich, pos_to_calculate).km <= 15.0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
#Filter stops according to our function above and drop unnecessary columns.
stops = stops.filter(is_in_zurich(stops.stop_lat, stops.stop_lon) == True)\
.drop(*['location_type', 'parent_station'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
stops_stops = stops.join(stop_times, on="stop_id")
joined_data = stops_stops.join(calendar_trips, on="trip_id")
joined_data = joined_data.drop(*['pickup_type', 'drop_off_type', 'trip_short_name'])
joined_data = joined_data.withColumn('stop_sequence', joined_data.stop_sequence.cast('int'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Public Transport Model

From here, we aim at having a schedule table. Where one row will contain two stop ID, with a departure time from stop 1 and an arrival time to stop 2.
This table will be converted into a graph where an edge is a row.

In [11]:
#We use the stop sequence to retrieve the previous station for a trip on the same row as the current one.
two_stops_window = Window.partitionBy('trip_id').orderBy('stop_sequence')
from_stop = functions.lag('stop_id', 1).over(two_stops_window).alias('from')
deptime_stop = functions.lag('departure_time', 1).over(two_stops_window).alias('departure_time_stop')
from_name = functions.lag('stop_name', 1).over(two_stops_window).alias('from_name')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#This function computes the time between two dates in datetime format.
@functions.udf
def time_between(x, y):
    if not x or not y:
        return 0
    t1 = datetime.datetime.strptime(x, "%H:%M:%S")
    t2 = datetime.datetime.strptime(y, "%H:%M:%S")
    return (t1 - t2).seconds/60

#Create new columns with hour lag windows.
trip_schedule = joined_data.withColumn("from", from_stop)
trip_schedule = trip_schedule.withColumn("departure_time_stop", deptime_stop)
trip_schedule = trip_schedule.withColumn("from_name", from_name)
trip_schedule = trip_schedule.withColumn('distance', time_between(trip_schedule.arrival_time, trip_schedule.departure_time_stop))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#remove unnecessary columns, rearrange, rename columns and filter out first of each since each first line have no previous station, no information.
trip_schedule = trip_schedule.drop(*["trip_headsign", "direction_id", "departure_time", "stop_sequence", "stop_lat", "stop_lon"])

trip_schedule = trip_schedule.select("trip_id", "from", "from_name", "departure_time_stop",
                                     "stop_id", "stop_name", "arrival_time", "distance")\
.withColumnRenamed("departure_time_stop", "departure_time").withColumnRenamed("stop_id", "to").withColumnRenamed("stop_name", "to_name")

trip_schedule = trip_schedule.where(col("from").isNotNull())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here's trip_schedule. Now we need to complete the graph by adding walks and transfers.

In [14]:
trip_schedule.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------+-----------+--------------------------+--------------+-------+------------------------------+------------+--------+
|trip_id                  |from       |from_name                 |departure_time|to     |to_name                       |arrival_time|distance|
+-------------------------+-----------+--------------------------+--------------+-------+------------------------------+------------+--------+
|1005.TA.26-131-j19-1.9.H |8503855:0:F|Horgen, Bahnhof           |16:14:00      |8589111|Horgen, Gumelenstrasse        |16:15:00    |1       |
|1005.TA.26-131-j19-1.9.H |8589111    |Horgen, Gumelenstrasse    |16:15:00      |8573553|Horgen, Stocker               |16:16:00    |1       |
|1005.TA.26-131-j19-1.9.H |8573553    |Horgen, Stocker           |16:16:00      |8573554|Horgen Oberdorf, Bahnhof      |16:18:00    |2       |
|1005.TA.26-131-j19-1.9.H |8573554    |Horgen Oberdorf, Bahnhof  |16:18:00      |8573555|Horgen, Bergli                |16:19:00    |1       |

## Adding walks between each station

We focus on the stops table now.
We want to compute the distance from each station to each other one in the table and keep only the one that are not 500m away.
Since we would need to basically do a double for loop, we need to use spark and cross_join.

In [15]:
crossed_join = stops.crossJoin(stops.select(stops.stop_id.alias("stop_id2"), stops.stop_name.alias("stop_name2"), 
                                            stops.stop_lat.alias("stop_lat2"), stops.stop_lon.alias("stop_lon2")))
#Remove the diagonal
crossed_join = crossed_join.filter(crossed_join.stop_name != crossed_join.stop_name2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
#Necessary to compute the haversine
crossed_join = crossed_join.withColumn("stop_lat", crossed_join.stop_lat.cast("float"))
crossed_join = crossed_join.withColumn("stop_lon", crossed_join.stop_lon.cast("float"))
crossed_join = crossed_join.withColumn("stop_lat2", crossed_join.stop_lat2.cast("float"))
crossed_join = crossed_join.withColumn("stop_lon2", crossed_join.stop_lon2.cast("float"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In order to find out distances between stops, we use the haversine function which is faster than distances from the distance package.

In [17]:
@functions.udf
def haversine(lon1, lat1, lon2, lat2):
    """Calculate the great circle distance between two points
        on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula
    dlon = lon2 - lon1 
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371 # Radius of earth in kilometers
    return c * r

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we can compute the haversine distance and filter the table.

In [18]:
crossed_join = crossed_join.withColumn("distance_km", haversine(col("stop_lon"), col("stop_lat"), col("stop_lon2"), col("stop_lat2")))
crossed_join = crossed_join.filter(crossed_join.distance_km < 0.5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Convert into pandas to make our walking_distances dataframe. We can do it because now we have remove a lot of data.

In [19]:
stops_df = crossed_join.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We make it in the same format as our schedules (our graph). Once this will be computed, we will simply append all the walking edges to the rest of the graph.

In [20]:
walking_distances = []
for _, row in stops_df.iterrows():
    #retrieve important informations
    current_id, current_name = row["stop_id"], row["stop_name"] 
    other_id, other_name = row["stop_id2"], row["stop_name2"]
    t = float(row["distance_km"]) * 20 #put it into minutes
    trip_id = "w_"+current_id+"_"+other_id
    #Keep the same format as the original graph skeletton
    walking_distances.append([trip_id, current_id, current_name, None, other_id, other_name, None, t])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_walking = pd.DataFrame(data = walking_distances, columns = ["trip_id", "from", "from_name", "departure_time", "to", "to_name", "arrival_time", "distance"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df_walking["from_name"] = df_walking["from_name"].apply(lambda x: x.encode('utf-8'))
df_walking["to_name"] = df_walking["to_name"].apply(lambda x: x.encode('utf-8'))
df_walking = df_walking.astype({'trip_id': 'str', 'from': 'str', 'from_name': 'str',
                 'to': 'str', 'to_name': 'str', 
                 'distance': 'float'})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
df_walking.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                 trip_id     from  ... arrival_time  distance
0      w_8500926_8590616  8500926  ...         None  2.447927
1      w_8500926_8590737  8500926  ...         None  6.008431
2      w_8502186_8502270  8502186  ...         None  9.580292
3  w_8502186_8502270:0:1  8502186  ...         None  9.715994
4      w_8502186_8590200  8502186  ...         None  9.646272

[5 rows x 8 columns]

## Adding transfers

Transfers are more easy to deal with since we have a table transfers.txt that already contains two stop ID and the time between them. All we need to do is to adapt it to our graph's format.

In [24]:
transfers_df = transfers.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
transfer_times = []
for _, row in transfers_df.iterrows():
    #Keep important information
    from_id, to_id, distance = row["from_stop_id"], row["to_stop_id"], float(row["min_transfer_time"])/60
    #distinguish the trip id so that transfers have a unique one
    trip_id = "t_"+from_id+"_"+to_id
    transfer_times.append([trip_id, from_id, None, None, to_id, None, None, distance])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df_transfers = pd.DataFrame(data = transfer_times, columns = ["trip_id", "from", "from_name", "departure_time", "to", "to_name", "arrival_time", "distance"])
df_transfers.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                      trip_id         from  ... arrival_time distance
0   t_8500309:0:2_8500309:0:4  8500309:0:2  ...         None      3.0
1   t_8500309:0:2_8500309:0:5  8500309:0:2  ...         None      3.0
2   t_8500309:0:2_8500309:0:3  8500309:0:2  ...         None      3.0
3   t_8500309:0:2_8500309:0:1  8500309:0:2  ...         None      3.0
4  t_8500309:0:2_8500309:0:5B  8500309:0:2  ...         None      3.0

[5 rows x 8 columns]

## Build a graph

Now we have walkings and transfers, let's append them to our final graph.

In [27]:
schedule = trip_schedule.toPandas()
schedule.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                    trip_id         from  ... arrival_time distance
0  1005.TA.26-131-j19-1.9.H  8503855:0:F  ...     16:15:00        1
1  1005.TA.26-131-j19-1.9.H      8589111  ...     16:16:00        1
2  1005.TA.26-131-j19-1.9.H      8573553  ...     16:18:00        2
3  1005.TA.26-131-j19-1.9.H      8573554  ...     16:19:00        1
4  1005.TA.26-131-j19-1.9.H      8573555  ...     16:20:00        1

[5 rows x 8 columns]

In the following cell we simply re arrange each column type so that we indeed have dates, strings etc.

In [28]:
schedule["from_name"] = schedule["from_name"].apply(lambda x: x.encode('utf-8'))
schedule["to_name"] = schedule["to_name"].apply(lambda x: x.encode('utf-8'))

schedule = schedule.astype({'trip_id': 'str', 'from': 'str', 'from_name': 'str',
                 'to': 'str', 'to_name': 'str', 
                 'distance': 'float'})

schedule["departure_time"] = schedule["departure_time"].apply(lambda x: datetime.datetime.strptime(x, "%H:%M:%S"))
schedule["arrival_time"] = schedule["arrival_time"].apply(lambda x: datetime.datetime.strptime(x, "%H:%M:%S"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
temp_graph = schedule.append(df_walking)
final_graph = temp_graph.append(df_transfers)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
final_graph = final_graph.reset_index().drop(columns = ["index"])
final_graph["departure_time"] = pd.to_datetime(final_graph.departure_time, format ='%H:%M:%S', errors = 'coerce').dt.time
final_graph["arrival_time"] = pd.to_datetime(final_graph.arrival_time, format ='%H:%M:%S', errors = 'coerce').dt.time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
final_graph.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                    trip_id         from  ... arrival_time distance
0  1005.TA.26-131-j19-1.9.H  8503855:0:F  ...     16:15:00      1.0
1  1005.TA.26-131-j19-1.9.H      8589111  ...     16:16:00      1.0
2  1005.TA.26-131-j19-1.9.H      8573553  ...     16:18:00      2.0
3  1005.TA.26-131-j19-1.9.H      8573554  ...     16:19:00      1.0
4  1005.TA.26-131-j19-1.9.H      8573555  ...     16:20:00      1.0

[5 rows x 8 columns]

Now we convert the graph skeletton to a MultiDiGraph since between two stations (vertex), we have a lot of trip.

In [32]:
g_ = nx.MultiDiGraph()
min_time = datetime.time(0, 0, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Final Graph**

We model the public transport infrastructure with a graph:
* Each stop is a vertex. 
* Between two stops, we add an edge if any trip goes from one to another. 
* We consider walkings and transfers. From one vertex, we add an edge to all other vertex that are reachable by walking. 
* On each edge, we add all the informtion that we have. Departure time, arrival time, weight and trip_id

In [None]:
for _, row in final_graph.iterrows():
    g_.add_node(row["from"], max_arrival_time = min_time) #First stop is the first node
    g_.add_node(row["to"], max_arrival_time = min_time) #Second node
    #Each edges will have all the following information.
    #In order to differenciate between all edges between two stations, we add the key as the arrival_time (unique)
    g_.add_edge(row["from"], row["to"],
               weight = row["distance"],
               dep_time = row["departure_time"],
               arr_time = row["arrival_time"],
               trip_id = row["trip_id"],
               key = str(row["arrival_time"]))

# Routing Algorithm

In order to find the best route from a point A to a point B we need to first work on the graph.
The only information we have is the desired arrival time (and for later the probability of success), the source and the target. We want to output the latest departure time. 
For the moment our graph is incomplete because some edges (walkings and transfers) do not have any departure / arrival time. 

* First we do a backward BFS, starting from the target. At each node, we are going to store the maximal_arrival_time at which we should arrive at this node, if we want to be able to catch any trip that will lead us to the target. To do that, we analyze all incoming edges at the visited node and remove edges that have an arrival time bigger than the maximal_arrival_time. After this first filtering, we aim at keeping only 1 edge between two nodes so we keep only the one whose arrival time is the closest to maximal_arrival_time. If we are left with a walking and an trip edge, we keep the one with the smallest weight and update the departure/arrival time of the walking edge, if it is the chosen.

* The backward BFS is not enough. It allows us to filter the graph with interesting edges and only 1 edge between two nodes. But we can have edges that have a departure time before the latest arrival time at a node. So we need to do a forward BFS starting from the source. Hence we do another BFS to remove incoherent edges from the source to the target. 

* To find the shortest path, we apply the djikstra algorithm. Since we are using our graph with networkx, we have to adapt their implementation to ours. Indeed, we needed to keep into consideration the arrival_time and departure time. During the djikstra algorithm we always keep into memory the incoming arrival time and remove edges that could not be taken so for example the ones with a bad departure time. We also penalize waitings at a station by increasing the weight.

In the following cells, you will find each algorithm.


In [34]:
#Takes a datetime.time and returns a datetime.time where we add or substracct the given number of minutes.
def substract_times(t1, mins):
    t2 = datetime.datetime.combine(datetime.date.today(), t1) - datetime.timedelta(minutes=mins)
    return t2.time().replace(microsecond=0)

def add_times(t1, mins):
    t2 = datetime.datetime.combine(datetime.date.today(), t1) + datetime.timedelta(minutes=mins)
    return t2.time().replace(microsecond=0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
"""
At a given node, this function filters incoherent edges based on the max_arrival_time attributes. It also compute the departure / arrival time for walking edges.
"""
def filter_edges_forward(g, node):
    #Retrieve unique outgoing edges
    set_edges = set()
    for edge in g.out_edges(node):
        set_edges.add(edge)
    
    for e in set_edges:
        unique_edge = list(g.get_edge_data(e[0], e[1]).values())[0]
        best_time = datetime.time(23, 0, 0)
        best_edge = None
        #We need to find the lastest arrival_time to the neighbor e[1]
        for edge in g.in_edges(e[1]):
            ed = g.get_edge_data(edge[0], edge[1]).values()[0]
            
            if ed.get('arr_time') < best_time:
                best_time = ed.get('arr_time')
                best_edge = ed
        
        #Update the new max_arrival_time        
        g.nodes[e[1]]['max_arrival_time'] = best_time
        
        if node == source: #Don't exclude possibilities to arrive earlier
            g.nodes[e[0]]['max_arrival_time'] = min_time
         
        # If we have a walking edge, we assume we can walk as soon as we arrive to the node. 
        # Hence, departure time is the max_arrival_time at the source walking node.
        # Here we also add the weight of 2: a transfer from a station to the walking edge
        if unique_edge.get('trip_id')[0] == 'w':
            g[e[0]][e[1]]['NaT']["dep_time"] = g.nodes[e[0]]['max_arrival_time']
            new_arrival_time = add_times(g.nodes[e[0]]['max_arrival_time'], g[e[0]][e[1]]['NaT']["weight"] + 2)
            g[e[0]][e[1]]['NaT']["arr_time"] = new_arrival_time    
            #Compute the new weight
            g[e[0]][e[1]]['NaT']["weight"] = (datetime.datetime.combine(datetime.date.min, new_arrival_time) 
                                              - datetime.datetime.combine(datetime.date.min, g[e[0]][e[1]]['NaT']["dep_time"])).seconds / 60
        
        #Once we updated the max_arrival_time, we can remove trips that departs before the max_arrival_time to the node.
        if unique_edge.get('dep_time') < g.nodes[e[0]]['max_arrival_time']:
            try:
                g.remove_edge(e[0], e[1], str(unique_edge.get('arr_time')))
            except:
                #Walking have no key.
                g.remove_edge(e[0], e[1], 'NaT')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
"""
The second BFS algorithm that we apply. This one starts from the source and visit neighbors.
"""
def forward_bfs(graph, target):
    visited = []
    queue = deque()
    visited.append(target)
    queue.append(target)
    while queue:
        s = queue.popleft()
        filter_edges_forward(graph, s)
        for p in graph.neighbors(s):
            if p not in visited:
                visited.append(p)
                queue.append(p)
    return graph

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
"""
This function updates only walking and transfers edges. It also updates the max_arrival_time of the source node.
"""
def update_unique_path(g, e, arrival_time):
    g[e[0]][e[1]]['NaT']["arr_time"] = arrival_time
    new_departure_time = substract_times(arrival_time, g[e[0]][e[1]]['NaT']["weight"])
    g[e[0]][e[1]]['NaT']["dep_time"] = new_departure_time
    
    if new_departure_time > g.nodes[e[0]]["max_arrival_time"]:
        g.nodes[e[0]]["max_arrival_time"] =  new_departure_time
    
"""
Function linked to the backward BFS. 
Filter incoherent and invalid edges incoming to node for a given arrival_time.
"""
def filter_edges(g, node, arrival_time):
    #For a given node, retrieve its incoming edges.
    set_edges = set()
    for edge in g.in_edges(node):
        set_edges.add(edge)

    for e in set_edges:
        #Retrieve all edges between two nodes
        current = g.get_edge_data(e[0], e[1])
        #Make it a list
        list_current = list(current.values())
        trip_category = list_current[0].get('trip_id')[0]
        
        if (len(list_current) == 1 and (trip_category == "w" or trip_category == "t")):
            update_unique_path(g, e, arrival_time)
        else:
            #First we need to find the closest edge to the max_arrival_time
            closest = min_time
            best_edge = None
            best_time = float('inf')
            best_time_edge = None
            #Here, if an edge have an arrival_time > than max_time_arrival, then this edge is no longer valid. remove it.
            for i in list(current.values()):
                if i.get('arr_time') > arrival_time:
                    g.remove_edge(e[0], e[1], str(i.get('arr_time')))
                else:
                    #Keep only the closest
                    if i.get("arr_time") > closest:
                        closest = i.get("arr_time")
                        best_edge = i
            
            # Once we removed all edges that are above the arrival time and defined the closest time
            # we repass between all edges remaining: Remove all edges that are not the closest one.
            for i in list(current.values()):
                if i != best_edge and i.get('trip_id')[0] != "w":
                    g.remove_edge(e[0], e[1], str(i.get('arr_time')))
            
            #Since walkings are not filtered above, we may have at most 2 edges (one walking and one trip edge)
            if len(list(current.values())) > 1:
                # Select the one with the smallest weight
                for i in list(current.values()):
                    if i.get('weight') < best_time:
                        best_time = i.get("weight")
                        best_time_edge = i
                # Remove the other
                for i in list(current.values()):
                    if i != best_time_edge:
                        if i.get('trip_id')[0] != "w":
                            g.remove_edge(e[0], e[1], str(i.get('arr_time')))
                        else:
                            g.remove_edge(e[0], e[1], 'NaT')
                    else:
                        #If the best edge is a walking edge, update its arrival and departure time that are missing at the beggining.
                        if i.get('trip_id')[0] == "w":
                            g[e[0]][e[1]]['NaT']["arr_time"] = arrival_time
                            new_departure_time = substract_times(arrival_time, i["weight"])
                            g[e[0]][e[1]]['NaT']["dep_time"] = new_departure_time
                            g.nodes[e[0]]["max_arrival_time"] =  new_departure_time
            
            # Update node information: max_arrival_time                
            if (len(list(current.values())) != 0) and (list(current.values())[0].get('dep_time') > g.nodes[e[0]]["max_arrival_time"]):
                g.nodes[e[0]]["max_arrival_time"] = list(current.values())[0].get('dep_time')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
"""
The first BFS algorithm that we apply. This one starts from the target and visit predecessors.
"""

def backward_bfs(graph, target, time):
    #Initialy the max arrival time is the query time
    graph.nodes[target]["max_arrival_time"] = time
    visited = []
    queue = deque()
    visited.append(target)
    queue.append(target)
    while queue:
        s = queue.popleft()
        #propagate the info of max_arrival_time
        time = graph.nodes[s]["max_arrival_time"]
        filter_edges(graph, s, time)
        for p in graph.predecessors(s):
            if p not in visited:
                visited.append(p)
                queue.append(p)
    return graph

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we present algorithm that are useful to apply djikstra algorithm to find the shortest path.

Our strategy is to modify weights and if we are using a walking edges, we also need to update its departure and arrival time. 
Now before looking into neighbors of a node, we look at the incoming trip id and arrival time and we filtered even better the graph
to remove edges that are not valid.

In [39]:
def filter_with_current_time(graph, node, time, t_id):
    for e in graph.out_edges(node):
        unique_edge = list(graph.get_edge_data(e[0], e[1]).values())[0]
        #Add a transfer time if we change bus
        if unique_edge.get('trip_id') != t_id:
            if t_id[0] != "w" and t_id[0] != "t":
                if unique_edge.get('trip_id')[0] != "w" and unique_edge.get('trip_id')[0] != "t":
                    graph[e[0]][e[1]][str(unique_edge.get('arr_time'))]["weight"] = graph[e[0]][e[1]][str(unique_edge.get('arr_time'))]["weight"] + 2
        
        #Also penalize waiting at a stop
        if unique_edge.get('trip_id')[0] != "w" and unique_edge.get('trip_id')[0] != "t":
            w = graph[e[0]][e[1]][str(unique_edge.get('arr_time'))]["weight"]
            graph[e[0]][e[1]][str(unique_edge.get('arr_time'))]["weight"] = (datetime.datetime.combine(datetime.date.min, unique_edge.get('dep_time')) 
                                              - datetime.datetime.combine(datetime.date.min, time)).seconds / 60 + w

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
"""
Modified Djikstra algorithm
"""

def _dijkstra(G, source, get_weight, target, pred=None, paths=None):
    G_succ = G.succ if G.is_directed() else G.adj

    push = heappush
    pop = heappop
    dist = {}  # dictionary of final distances
    seen = {source: 0}
    fringe = []
    # Fringe keep in memory the incoming trip_id and arrival_time so that we can add time for transfers and filter edges that do not correspond to 
    # the arrival time.
    push(fringe, (0, source, None, None))
    while fringe:
        (d_to_current, currentNode, incoming_t_id, incoming_arr_time) = pop(fringe)
        if currentNode in dist:
            continue  # already searched this node.
        dist[currentNode] = d_to_current
        if currentNode == target:
            break
            
        if incoming_arr_time is not None: #if we are not at the source
            #Filter out going edges before finding the shortest path between them
            filter_with_current_time(G, currentNode, incoming_arr_time, incoming_t_id) 
            # Don't forget to reupdate the G_succ variable.
            G_succ = G.succ if G.is_directed() else G.adj

        # Deal with each filtered neighbors
        for u, e in G_succ[currentNode].items():
            #Retrieve information for current outgoing edge
            (weight, t_id, dep_time, arr_time) = get_weight(currentNode, u, e)
            vu_dist = dist[currentNode] + weight
            
            if u in dist:
                if vu_dist < dist[u]:
                    raise ValueError('Contradictory paths found:',
                                     'negative weights?')
            
            elif u not in seen or vu_dist < seen[u]:
                seen[u] = vu_dist
                
                # If we have a transfer or a walking, update the times
                if (t_id[0] == "w" or t_id[0] == "t") and incoming_arr_time is not None:
                    arr_time = add_times(incoming_arr_time, weight)
                    G[currentNode][u]['NaT']["arr_time"] = arr_time
                    G[currentNode][u]['NaT']["dep_time"] = incoming_arr_time
                
                push(fringe, (vu_dist, u, t_id, arr_time))
                paths[u] = paths[currentNode] + [u]
        
    return (dist, paths)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
"""
Finds the shortest path using dijkstra algorithm from a source to a target.
"""
def single_source_dijkstra(G, source, target, weight='weight'):
    if source == target:
        return ({source: 0}, {source: [source]})

    #The function used to retrieve informations from an edge.
    if G.is_multigraph():
        get_weight = lambda u, v, data: min(
            (eattr.get('weight', 1), eattr.get('trip_id', None), eattr.get('dep_time', None), eattr.get('arr_time', None))  for eattr in data.values())
    else:
        get_weight = lambda u, v, data: data.get('trip_id', 1)

    paths = {source: [source]}  # dictionary of paths
    return _dijkstra(G, source, get_weight, target=target, paths=paths)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we use all our algorithms to work on the graph and apply the shortest path algorithm on it.

In [42]:
"""
Finds the shortest path between source and target in graph.
"""
def find_my_route(source, target, time, graph):
    # Apply first the backward bfs
    # Then the forward bfs
    # On the final graph, apply dijkstra
    temp_graph = backward_bfs(graph, target, time)
    final_graph = forward_bfs(temp_graph, source)
    shortest_path = single_source_dijkstra(final_graph, source=source, target=target)
    return shortest_path

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Predictive Model using Delays

In [43]:
from pyspark.sql.functions import hour, minute, when, array_contains, collect_list
import time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Please uncomment the following cell if you want to create the parquet for the sbb filtered table.

In [44]:
#We filtered the sbb dataset to keep only the data with stations around zurich and trains between 9am and 5pm

#sbb = spark.read.orc('/data/sbb/orc/istdaten')

#We create a list containing all teh stops around zurich using previously generated DF "stops"

#all_stops = stops.select("stop_id").rdd.map(lambda x : x.stop_id)
#all_stops = all_stops.map(lambda x :x).collect()
#arr = [stop[:7] for stop in all_stops]
#arr = list(np.unique(arr).astype('string'))

#We selct only the relevant columns.

#sbb = sbb.select('betriebstag','fahrt_bezeichner','haltestellen_name','bpuic' , 'ankunftszeit', 'an_prognose', 'abfahrtszeit','ab_prognose', 'an_prognose_status', 'ab_prognose_status')

#We translate the column names to facilitate its use

#trad = [("betriebstag", "date"), ('fahrt_bezeichner', 'trip'), ('haltestellen_name', 'stop_name'), ('ankunftszeit', 'sched_arrival'), ('an_prognose', 'real_arrival'), ('abfahrtszeit','sched_departure'), ('ab_prognose', 'real_departure'), ('bpuic', 'stop_id')]
#for al, en in trad:
#    sbb = sbb.withColumnRenamed(al,en)
    
    
#sbb = sbb.filter(col('stop_id').isin(arr))
#sbb = sbb.filter((sbb.an_prognose_status == "PROGNOSE") | (sbb.an_prognose_status == "GESCHEATZT"))
#sbb = sbb.filter((sbb.ab_prognose_status == "PROGNOSE") | (sbb.ab_prognose_status == "GESCHEATZT"))

#sbb = sbb.withColumn('sched_arr',to_timestamp(sbb.sched_arrival, 'dd.MM.yyyy HH:mm'))
#sbb = sbb.withColumn('real_arr',to_timestamp(sbb.real_arrival, 'dd.MM.yyyy HH:mm'))
#sbb = sbb.withColumn('sched_dep',to_timestamp(sbb.sched_departure, 'dd.MM.yyyy HH:mm'))
#sbb = sbb.withColumn('real_dep',to_timestamp(sbb.real_departure, 'dd.MM.yyyy HH:mm'))
#sbb = sbb.drop(*['sched_arrival', 'real_arrival', 'sched_departure', 'real_departure'])

#We keep only rows with non-empty scheduled departure hour or non-empty scheduled arrival hours and filter train out of the work hours (9am-5pm)

#sbb = sbb.filter(((col('sched_dep').isNotNull()) & (hour(col('sched_dep')) >= 9) & (hour(col('sched_dep')) <= 17)) | ((col('sched_arr').isNotNull()) & (hour(col('sched_arr')) >= 9) & (hour(col('sched_arr')) <=17)))

#We use partition filters to spped up the filtering queries un get_similar and we store it as a parquet file to acces it more quickly 

#sbb.repartition("stop_id").write.partitionBy("stop_id").parquet('/user/274900/sbb_filtered_partitioned.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
#We read the filtered and partitioned file
sbb = spark.read.parquet('/user/274900/sbb_filtered_partitioned.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
sbb.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+---------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------+
|      date|                trip|      stop_name|an_prognose_status|ab_prognose_status|          sched_arr|           real_arr|          sched_dep|           real_dep|stop_id|
+----------+--------------------+---------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------+
|18.06.2018|85:3849:190363-02...|Zürich, Central|          PROGNOSE|          PROGNOSE|2018-06-18 09:03:00|2018-06-18 09:05:00|2018-06-18 09:03:00|2018-06-18 09:05:00|8588078|
|21.05.2019|85:3849:85919-020...|Zürich, Central|          PROGNOSE|          PROGNOSE|2019-05-21 09:03:00|2019-05-21 09:03:00|2019-05-21 09:03:00|2019-05-21 09:04:00|8588078|
|18.06.2018|85:3849:190365-02...|Zürich, Central|          PROGNOSE|          PROGNOSE|2018-06-18 10:18:00|2018-06-18 10

In [47]:
"""
Given two stops ids and two times, this function returns all the historical "real" times of departure from 
the first station and all the historical "real" times of arrival to the second station.
"""
def get_similar(stop_id_1,hour_1,minute_1, stop_id_2,hour_2,minute_2, time_travel, list_stops_in_trip = None):
    
    #Filter with only rows concerning the two stops.
    df = sbb.filter((col('stop_id') ==stop_id_1 ) | (col('stop_id') == stop_id_2))
    
    #keep only rows with non-null values for the departure time for the departure stop and similarly for the arrival stop
    df = df.filter(((col('stop_id') ==stop_id_1) & (col('real_dep').isNotNull())) | (col('stop_id') ==stop_id_2) & col('real_arr').isNotNull())
    
    #add columns to differentiate hours and minutes from the scheduled times
    df = df.withColumn('hour_arr',hour(df.sched_arr))
    df = df.withColumn('minute_arr', minute(df.sched_arr))
    df = df.withColumn('hour_dep',hour(df.sched_dep))
    df = df.withColumn('minute_dep', minute(df.sched_dep))
    
    #keep rows that correspond to departure from the stop1 at hour1 and minute1 (time departure) and similarly for the arrival times and stop2.
    df = df.filter(((df.hour_dep == hour_1) & (df.minute_dep == minute_1) & (df.stop_id == stop_id_1)) | ((df.hour_arr == hour_2) & (df.minute_arr >= minute_2) & (df.minute_arr <= minute_2) & (df.stop_id == stop_id_2)))
    df = df.withColumn('date', when(col('stop_id') == stop_id_1, df.sched_dep.cast('date')).otherwise(df.sched_arr.cast("date")))
    df = df.drop(*['sched_dep', 'sched_arr'])
    
    #keep only inetersting data wich is the "real" departure and arrival times respectively for departure stop and arrival stop
    df = df.withColumn('real', when((col('stop_id') == stop_id_1), col('real_dep')).otherwise(col("real_arr"))) 
    df = df.drop(*['real_arr', 'real_dep'])
    
    #We pass it to pandas as the dataset is now considerably reduced 
    df = df.toPandas()
    
    # We know want to identify the valid trips_id from the sbb dataset that at a given hour link those two stations directly or undireclty.
    df_p = df.groupby(['trip', "date"]).agg({"stop_id": lambda x: set(x)}).reset_index()
    df_count = df_p["stop_id"].apply(lambda x : len(x))

    trips_df = pd.DataFrame({'trip': df_p["trip"] , 'stops': df_count.values})

    #We filter to keep only trips that link the two stations.
    valid_trips = trips_df["trip"][trips_df["stops"] == 2]

    #We keep a list of those trips
    valid_trips = list(np.unique(valid_trips))

    #We filter to keep only the rows that correspond to a "valid_trip"
    df =df[df['trip'].isin(valid_trips)]

    df = df.drop(["stop_id"], axis = 1)
    
    #We groupby trip and by date to merge in a list [departure time0, arrival time0] for all the historical data
    all_data = df.groupby(['trip', 'date'])["real"].apply(list).values 
    
    #We collect and flatten the data
    all_data = list(itertools.chain.from_iterable(all_data))
    
    size = int(math.floor(len(all_data)/2)*2)
    dep = []
    arr =[]
    
    #And separate it into two distinct lists, one for the departure times, the other for the arrival times.
    for i in range(0,size, 2):
        res = all_data[i + 1] - all_data[i]
        if (((res.days) == 0) & ((res.seconds/60) >= time_travel/2) & ((res.seconds/60) <= 3 * time_travel)):
            dep.append(all_data[i])
            arr.append(all_data[i+1])
    
    return dep, arr

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The function below takes the two lists generated above and computes a probability. We decompose this computation into three cases :
* The first case is where we do not change trip ids e.g a trip that consists only of one and unique trip id. \
For this first case, we need the initial stop, the destination stop and the corresponding hours of departure and arrival. \
With get_similar, we retrieve the historical data corresponding the time of departure and time of arrival.
The "weight" computed is the scheduled time of travel. query_time - hour_2 (scheduled arrival time) corresponds to the margin available for a delay.
For the historical data, we compute for each pair, the effective travel time by zipping the arr and dep previously generated and computing the difference.
To determine if there is a delay, we compute the difference of the total available time (weight + query_time - hour_2) and substract the effective time that took a journey.
If it is < 0, the user will arrive too late. We count such events and divide by the total number of events considered to get a probability
  
* The second case is a transfer in the same stop from a platform to another platform.
We operate in a similar way for the second case. we get all the historical times of arrivals to the "transfer station" and the departure times of the train the will be taken afterwards. The available time to do the transfer is therefore the difference between those two. The time needed to do the transfer is "transfer" and can be 0 if there is a change of trip ids but not of plateform.
* The third case is a transfer by walking between two different stops.
This case is similar to the above one except that now, retrieve the data that corresponds to the arrival time to the station where the user will begin his walking and the data that corresponds to the departure times of the train he is willing to take afterwards. The available time again the difference between those two and the needed time to not miss his transfer is "transfer" which correspond to the walking time between the two stations.


In [48]:
"""
Computes the probablitiy as explained above
"""
def get_proba(stop_id_1, hour_1, stop_id_2, hour_2 , query_time, transfer, stop_id_3=None, hour_3= None, stop_id_4= None ,hour_4=None):
    
    if(transfer == 0 and stop_id_3 == None):
        weight = (hour_2 - hour_1).seconds/60
        dep, arr = get_similar(stop_id_1, hour_1.hour, hour_1.minute, stop_id_2, hour_2.hour, hour_2.minute, weight)
        diff = []
        zipped = zip(arr, dep)
        for (el1, el2) in zipped:
            diff.append((el1 - el2).seconds/60)
        delay = []
        for time in diff:
            delay.append(weight + (query_time - hour_2).seconds/60 - time)
            
        total_nb = len(delay)
        
        if total_nb > 0:
            proba = float(len([l for l in delay if l > 0])) / total_nb
        else: 
            proba= 1
        
        return proba
        
    else:
        weight_1 = (hour_2 - hour_1).seconds/60
        if (stop_id_3 == stop_id_4):
            weight_2 = (hour_3 - hour_2).seconds/60
            
            _, arr1 = get_similar(stop_id_1, hour_1.hour, hour_1.minute, stop_id_2, hour_2.hour, hour_2.minute, weight_1)
            dep1, _ = get_similar(stop_id_2, hour_2.hour, hour_2.minute, stop_id_3,hour_3.hour, hour_3.minute, weight_2)
            
        else:
            weight_3 = (hour_4 - hour_3).seconds/60
            _, arr1 = get_similar(stop_id_1, hour_1.hour, hour_1.minute, stop_id_2,hour_2.hour, hour_2.minute, weight_1)
            dep1, _ = get_similar(stop_id_3, hour_3.hour, hour_3.minute, stop_id_4,hour_4.hour, hour_4.minute, weight_3)
            
        diff = []
        zipped = zip(dep1, arr1)
        for (el1, el2) in zipped:
            diff.append((el1 - el2).seconds/60)
        delay = []
        for time in diff:
            delay.append(time-transfer)
        
        total_nb = len(delay)
        
        if total_nb > 0:
            proba = float(len([l for l in delay if l >0]))/total_nb
        else: 
            proba = 1
        
        return proba

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Robust Route Planning Algorithm

Now we need to merge the routing algorithm with the predictive model. To do that, here's our strategy:

* Since our routing algorithm returns only the latest departure time we need to run it multiple times. This is a valid choice because we filter a lot the graph to obtain the optimal path, hence running it multiple times is quite fast.
* For a given time, we define the minimum query time which is by default query_time - 30 minutes. 
* We aim at generating all paths that arrive between the minimum query time and the default query time, using the routing algorithm.
* Then, for each generated paths, according to the success rate that the user wants, we compute its probability of success. And we return the first one that is above the wanted success rate.





In [49]:
"""
Generate all path from source to target with a query time between query_time and query_time - 30 min = min_query_time in the graph.
"""
def generate_multiple_paths(source, target, query_time, min_query_time, graph):
    # nodes will contain only the stop ids
    # details will contain all the detail for each edge
    nodes = []
    details = []
    while query_time >= min_query_time:
        gr = graph.copy()
        l = find_my_route(source, target, query_time, gr)
        
        #append in the two lists
        if l[1][target] not in nodes:
            nodes.append(l[1][target])
            details_temp = []
            for i in range(1, len(l[1][target])):
                details_temp.append(gr.get_edge_data(l[1][target][i - 1], l[1][target][i]).values()[0])
            details.append(details_temp)
        
        # The new query time will be the last arrival time - 1 so that we don't have always the same path generated.
        last_arrival = gr.get_edge_data(l[1][target][-2], l[1][target][-1]).values()[0].get('arr_time')
        query_time = substract_times(last_arrival, 1)
    
    return nodes, details

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In order to get the path probability, we need to differenciate multiple cases:

* If we are on the portion of the path where we don't change the trip id until the end of the trip, then we can simply compute the probability of arriving late at the target using that trip id. 

* While the trip_id is the same, we continue until we have a change. Now we have two cases: either we change with a walk between two stations, or we arrive at a station, wait there and continue our trip (no walking). In the first one, we compute if even with a bit of delay, we can still catch the next trip knowing that we have to walk. In the second one, we simply compute the probability of missing the next trip, knowing that we have some time to wait at the station meaning that a trip is able to be late if we are still able to catch the rest of the trip.


In [50]:
"""
Retrieve the total path probability for each edges in the path, by looking at different portion of the path (all changes between trips).
"""
def get_path_probability(nodes, edges, query_time):
    previous_trip_id = ""
    result_proba = 1.0
    prev_stop_dep = None
    prev_stop_arr = None
    for i in range(len(edges)):
        current_edge = edges[i]
        current_trip_id, current_dep_time = current_edge.get('trip_id'), current_edge.get('dep_time')
        current_arr_time, current_weight = current_edge.get('arr_time'), current_edge.get('weight')

        final_edge = edges[-1]
        final_trip_id = final_edge.get('trip_id')
        
        #First case: If we do not change our trip id until the end
        # 
        if current_trip_id == final_trip_id:
            final_arr_time = final_edge.get('arr_time')
            current_dep_time = datetime.datetime.combine(date, current_dep_time)
            final_arr_time = datetime.datetime.combine(date, final_arr_time)
            query_time1 = datetime.datetime.combine(date, query_time)
            
            #Since the sbb data do not consider plateform, we need to cut our data.
            current_stop_dep = nodes[i][:7]
            final_stop = nodes[-1][:7]
            proba = get_proba(current_stop_dep, current_dep_time, final_stop, final_arr_time, query_time1, 0)
            result_proba = result_proba * proba
        
        if i == 0: # If we are at the first edge, initialize variables
            previous_trip_id = current_trip_id
            prev_stop_dep = nodes[i]
            prev_stop_arr = nodes[i + 1]
        else:
            # While we have the same trip id, find the first change.
            if current_trip_id == previous_trip_id:
                prev_stop_dep = nodes[i]
                prev_stop_arr = nodes[i + 1]
                continue
            # If we change with walking
            elif (current_trip_id[0] == "w" or current_trip_id[0] == "t") and i != len(edges) - 1:
                stop_dep = nodes[i] #Walk departure
                stop_arr = nodes[i + 1] #walk arrival
                #need to look at the edge incoming inside the walk, and the one leaving the station after the walk.
                past_edge = edges[i - 1]
                past_hour_dep = past_edge.get('dep_time')
                past_hour_arr = past_edge.get('arr_time')
                
                next_edge = edges[i + 1]
                next_hour_dep = next_edge.get('dep_time')
                next_hour_arr = next_edge.get('arr_time')
                next_stop_dep = nodes[i + 1]
                next_stop_arr = nodes[i + 2]
                
                #convert into datetime.datetime for the get_proba function
                date = datetime.date(1, 1, 1)
                past_hour_dep = datetime.datetime.combine(date, past_hour_dep)
                past_hour_arr = datetime.datetime.combine(date, past_hour_arr)
                next_hour_dep = datetime.datetime.combine(date, next_hour_dep)
                next_hour_arr = datetime.datetime.combine(date, next_hour_arr)
                
                #Cut station id's name for the sbb data
                prev_stop_dep_cut = prev_stop_dep[:7]
                prev_stop_arr_cut = prev_stop_arr[:7]
                next_stop_dep_cut = next_stop_dep[:7]
                next_stop_arr_cut = next_stop_arr[:7]
                
                proba = get_proba(prev_stop_dep_cut, past_hour_dep, prev_stop_arr_cut, past_hour_arr, None, 
                              current_weight, next_stop_dep_cut, next_hour_dep, next_stop_arr_cut, next_hour_arr)
                result_proba = result_proba * proba
                
                previous_trip_id = current_trip_id
                prev_stop_dep = nodes[i]
                prev_stop_arr = nodes[i + 1]
            
            #If we change at a station
            elif current_trip_id[0] != "w" and current_trip_id[0] != "t" and previous_trip_id[0] != "w" and previous_trip_id[0] != "t":                
                past_edge = edges[i - 1]
                past_hour_dep = past_edge.get('dep_time')
                past_hour_arr = past_edge.get('arr_time')
                
                date = datetime.date(1, 1, 1)
                past_hour_dep = datetime.datetime.combine(date, past_hour_dep)
                past_hour_arr = datetime.datetime.combine(date, past_hour_arr)
                
                
                current_arr_time = datetime.datetime.combine(date, current_arr_time)
                
                prev_stop_arr_c = prev_stop_arr[:7]
                prev_stop_dep_c = prev_stop_dep[:7]
                
                proba = get_proba(prev_stop_dep_c, past_hour_dep, prev_stop_arr_c, past_hour_arr, None, 0, nodes[i + 1][:7], current_arr_time, nodes[i + 1][:7], None)
                result_proba = result_proba * proba
                
                previous_trip_id = current_trip_id
                prev_stop_dep = nodes[i]
                prev_stop_arr = nodes[i + 1]
                
            else:
                previous_trip_id = current_trip_id
                prev_stop_dep = nodes[i]
                prev_stop_arr = nodes[i + 1]
    return result_proba

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
"""
Returns the optimal path with a given success rate.
"""
def get_optimal_path(all_paths, details_path, success_rate, query_time):
    for (nodes, edges) in zip(all_paths, details_path):
        proba = get_path_probability(nodes, edges, query_time)
        if proba > success_rate:
            return nodes, edges, proba

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
"""
Our final function. Give all the parameter you want for you perfect trip. Returns the optimal path and the probability of success.
"""
def robust_journey_planner(source, target, query_time, success_rate):
    min_query_time = substract_times(query_time, 30)
    
    g = g_.copy()
    nodes, details = generate_multiple_paths(source, target, query_time, min_query_time, g)
    optimal_nodes, optimal_path, success_probability = get_optimal_path(nodes, details, success_rate, query_time)
    return optimal_nodes, optimal_path, success_probability

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Test it!

We build a little IpyWidgets User friendly interface to try robust journey planner. It takes around 1 minute to find the best path.

In [53]:
import datetime
"""
Function that takes argument coming from local into spark and computes the robust route. It returns a dataFrame that will be shown in the console.
"""
def spark_magic(source, target, hour_time, minute_time, success_rate):
    query_time = datetime.time(int(hour_time), int(minute_time))
    optimal_nodes, optimal_path, success_probability = robust_journey_planner(source, target, query_time, float(success_rate))
    list_edges = []
    for i in range(len(optimal_path)):
        list_edges.append([optimal_path[i].get('trip_id'), optimal_nodes[i], optimal_path[i].get('dep_time'), optimal_nodes[i + 1], optimal_path[i].get('arr_time')])
    shortest_path = pd.DataFrame(list_edges, columns = ["trip_id", "from", "departure_time", "to", "arrival_time"])
    shortest_path = spark.createDataFrame(shortest_path.astype(str)) 
    return shortest_path

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The following cell builds the user-friendly interface to test our robust journey planner. When you click on the button, it will propagate informations to spark, compute the optimal path and returns a dataframe in the console showing your trip!

In [54]:
%%local
import ipywidgets as widgets
from IPython.display import display, HTML
import datetime
import pandas as pd

#Source
source_label = widgets.Label("Source station")
display(source_label)
source_textField = widgets.Text(value='', placeholder = "Source station ID")
display(source_textField)

#Target
target_label = widgets.Label("Target station")
display(target_label)
target_textField = widgets.Text(value='', placeholder = "Target station ID")
display(target_textField)

#Arrival time
arrival_label = widgets.Label("Desired arrival time (Hour between 9 and 17)")
display(arrival_label)
arrival_time_hour = widgets.IntText(value=12,description='Hour:', disabled=False)
display(arrival_time_hour)
arrival_time_minute = widgets.IntText(value=30,description='Minute:', disabled=False)
display(arrival_time_minute)

#Success rate
success_label = widgets.Label("Choose your confidence level")
display(success_label)
success_rate_slider = widgets.IntSlider()
success_slider = widgets.IntSlider()
display(success_slider)

#Button
btn = widgets.Button(description = "Find your route!")
display(btn)

loading = widgets.Label()
display(loading)
end = widgets.Label()
display(end)

info = widgets.Label()
display(info)

def print_result(p):
    source = str(source_textField.value)
    target = str(target_textField.value)
    hour_time = str(arrival_time_hour.value)
    minute_time = str(arrival_time_minute.value)
    success_rate = str(success_slider.value / 100)
    
    loading.value = "Finding your best route..."
    
    get_ipython().push("source")
    get_ipython().push("target")
    get_ipython().push("hour_time")
    get_ipython().push("minute_time")
    get_ipython().push("success_rate")
    
    get_ipython().run_cell_magic('send_to_spark', '-i source -n source', ' ')
    get_ipython().run_cell_magic('send_to_spark', '-i target -n target', ' ')
    get_ipython().run_cell_magic('send_to_spark', '-i hour_time -n hour_time', ' ')
    get_ipython().run_cell_magic('send_to_spark', '-i minute_time -n minute_time', ' ')
    get_ipython().run_cell_magic('send_to_spark', '-i success_rate -n success_rate', ' ')
    
    get_ipython().run_cell_magic("spark", "-o df", "df=spark_magic(source, target, hour_time, minute_time, success_rate)")
    df1 = pd.DataFrame(df)
    end.value = "Here it is! (Console)"
    
    display(HTML(df1.to_html()))
    
    dep_hour = str(df1["departure_time"][0].hour)
    dep_min = str(df1["departure_time"][0].minute)

    arr_hour = str(df1["arrival_time"].iloc[-1].hour)
    arr_min = str(df1["arrival_time"].iloc[-1].minute)
    info.value = "You should leave at "+dep_hour+":"+dep_min+" to arrive at "+arr_hour +":"+arr_min+" with " +success_rate+ " probability of success."
    
btn.on_click(print_result)

Label(value='Source station')

Text(value='', placeholder='Source station ID')

Label(value='Target station')

Text(value='', placeholder='Target station ID')

Label(value='Desired arrival time (Hour between 9 and 17)')

IntText(value=12, description='Hour:')

IntText(value=30, description='Minute:')

Label(value='Choose your confidence level')

IntSlider(value=0)

Button(description='Find your route!', style=ButtonStyle())

Label(value='')

Label(value='')

Label(value='')

# Map visualization

Run the following cells if you want to see the trip on a map!

We need to load the data from spark so that we can plot using bokeh

In [55]:
%%spark -o stops

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
#We need to recompute it since we had it inside the user interface only.
shortest_path = spark_magic(source, target, hour_time, minute_time, success_rate)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
%%spark -o shortest_path

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
%%local
from pyproj import Transformer
transformer = Transformer.from_proj("EPSG:4326", "EPSG:3857", always_xy=True)

## Map building

In [59]:
%%local
from bokeh.io import push_notebook, show, output_notebook, curdoc
from bokeh.plotting import figure
from bokeh.layouts import layout
from bokeh.models import ColumnDataSource, Range1d
from bokeh.tile_providers import get_provider, OSM
from bokeh.models import Arrow, VeeHead
import time

output_notebook()

In [60]:
%%local
lat_zurich = 47.378177
long_zurich = 8.540192
x = []
y = []
x_end = []
y_end = []
station_name = []
source_bokeh = ColumnDataSource(data = dict(x=x, y=y, x_end=x_end, y_end=y_end, station_name=station_name))

TOOLTIPS = [
    ("(x,y)", "($x, $y)"), 
    ("Station name", "@station_name")
]

# create the map
tile_provider = get_provider(OSM)
p = figure(x_axis_type="mercator", y_axis_type="mercator", tooltips=TOOLTIPS)
p.add_tile(tile_provider)

#Add a circle for each station and a segment between pairs of stations
p.circle('x', 'y', source=source_bokeh, size=10, line_color="navy", fill_color="orange", alpha=0.8)
p.segment(x0='x', y0='y', x1='x_end', y1='y_end',source=source_bokeh,
          color="blue", line_width=4)


# make the plot centered at Amsterdam
x_min, y_min = transformer.transform(long_zurich - 0.065, lat_zurich - 0.065)
x_max, y_max = transformer.transform(long_zurich + 0.065, lat_zurich + 0.065)
p.x_range = Range1d(x_min, x_max)
p.y_range = Range1d(y_min, y_max)

t=show(p, notebook_handle=True)

In [61]:
%%local
#shortest_path_1 = shortest_path.toPandas()
for i, row in shortest_path.iterrows():
    old_x = stops[stops['stop_id'] == str(row['from'])]['stop_lon'].values[0]
    old_y = stops[stops['stop_id'] == str(row['from'])]['stop_lat'].values[0]
    old_x_end = stops[stops['stop_id'] == str(row['to'])]['stop_lon'].values[0]
    old_y_end = stops[stops['stop_id'] == str(row['to'])]['stop_lat'].values[0]
    new_x, new_y = transformer.transform(old_x, old_y)
    new_x_end, new_y_end = transformer.transform(old_x_end, old_y_end)
    x.append(new_x)
    y.append(new_y)
    x_end.append(new_x_end)
    y_end.append(new_y_end)
    station_name.append(row["from"])


old_last_x = stops[stops['stop_id'] == str(shortest_path.iloc[-1]['to'])]['stop_lon'].values[0]
old_last_y = stops[stops['stop_id'] == str(shortest_path.iloc[-1]['to'])]['stop_lat'].values[0]
new_last_x, new_last_y = transformer.transform(old_last_x, old_last_y)
x.append(new_last_x)
y.append(new_last_y)
x_end.append(new_last_x)
y_end.append(new_last_y)
station_name.append(shortest_path.iloc[-1]["to"])

source_bokeh.data = dict(x=x, y=y,x_end=x_end, y_end=y_end, station_name=station_name)
push_notebook(handle=t)

# Validation

We decided to use the SBB online journey planner to check our results as they probably use the same data as us.

## SBB online journey planner for trip 1

![title](trip1_im.png)

### Sbb planning

In [76]:
%%local
from IPython.core.display import display, HTML
display(HTML('<img src="trip1_ls.png" alt="Drawing" style="width: 1000px;"/>'))

## Our model for trip 1 (Robust)

![title](trip1_our.jpeg)

### Our robust planning

In [78]:
%%local
from IPython.core.display import display, HTML
display(HTML('<img src="trip1_us.png" alt="Drawing" style="width: 1000px;"/>'))

We can see that for the first trip, our model shows the same stops as the SBB planner. We have the exact same times also

## SBB online journey planner for trip 2

![title](trip2_im.png)

## Our model for trip 2

![title](trip2_our.jpeg)

We can see a slight difference between our model and the SBB planner. Indeed our model use a different path at the start. 

![title](trip2_ls.png)

![title](trip2_ls_our.png)

But if we look at the departure time we leave at the same time. 