# FINAL PROJECT

## Useful imports

In [1]:
import math
import numpy as np
import pandas as pd
import time
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure, show, output_file
from bokeh.tile_providers import CARTODBPOSITRON
from bokeh.models import ColumnDataSource, HoverTool
from collections import defaultdict
from IPython.display import clear_output
from geopy.distance import geodesic
from datetime import datetime, timedelta

## Initialize the `SparkSession`

In [2]:
import getpass
import pyspark
import pyspark.sql.functions as functions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import struct, udf
from pyspark.sql.types import *

conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('final_project-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '2g')
conf.set('spark.executor.instances', '10')
conf.set('spark.executor.cores', '8')
conf.set('spark.port.maxRetries', '100')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
spark = SparkSession(sc)
sc

## Import the SBB data:

Note: we only consider the stops starting from December 10th 2017 (new schedule)

In [3]:
# 2017 Stops
df_12_17 = spark.read.format("csv").option("header", "true").load("/datasets/project/istdaten/2017/12", delimiter = ';')
df_12_17 = df_12_17.filter(df_12_17.BETRIEBSTAG >= "11.12.2017")
df_12_17 = df_12_17.filter(df_12_17.FAELLT_AUS_TF == "false").filter(df_12_17.DURCHFAHRT_TF == "false").filter(df_12_17.FAELLT_AUS_TF == "false")
df_01_18 = spark.read.format("csv").option("header", "true").load("/datasets/project/istdaten/2018/01", delimiter = ';')
df_01_18 = df_01_18.filter(df_01_18.FAELLT_AUS_TF == "false").filter(df_01_18.DURCHFAHRT_TF == "false").filter(df_01_18.FAELLT_AUS_TF == "false")
df_02_18 = spark.read.format("csv").option("header", "true").load("/datasets/project/istdaten/2018/02", delimiter = ';')
df_02_18 = df_02_18.filter(df_02_18.FAELLT_AUS_TF == "false").filter(df_02_18.DURCHFAHRT_TF == "false").filter(df_02_18.FAELLT_AUS_TF == "false")
df_03_18 = spark.read.format("csv").option("header", "true").load("/datasets/project/istdaten/2018/03", delimiter = ';')
df_03_18 = df_03_18.filter(df_03_18.FAELLT_AUS_TF == "false").filter(df_03_18.DURCHFAHRT_TF == "false").filter(df_03_18.FAELLT_AUS_TF == "false")
df_04_18 = spark.read.format("csv").option("header", "true").load("/datasets/project/istdaten/2018/04", delimiter = ';')
df_04_18 = df_04_18.filter(df_04_18.FAELLT_AUS_TF == "false").filter(df_04_18.DURCHFAHRT_TF == "false").filter(df_04_18.FAELLT_AUS_TF == "false")

# Example for the December dataframe:
df_12_17.show(3)

+-----------+-------------------+------------+-------------+--------------+----------+---------+-----------+---------+-------------------+--------------+-------------+-------+-----------------+----------------+-------------------+------------------+----------------+-------------------+------------------+-------------+
|BETRIEBSTAG|   FAHRT_BEZEICHNER|BETREIBER_ID|BETREIBER_ABK|BETREIBER_NAME|PRODUKT_ID|LINIEN_ID|LINIEN_TEXT|UMLAUF_ID|VERKEHRSMITTEL_TEXT|ZUSATZFAHRT_TF|FAELLT_AUS_TF|  BPUIC|HALTESTELLEN_NAME|    ANKUNFTSZEIT|        AN_PROGNOSE|AN_PROGNOSE_STATUS|    ABFAHRTSZEIT|        AB_PROGNOSE|AB_PROGNOSE_STATUS|DURCHFAHRT_TF|
+-----------+-------------------+------------+-------------+--------------+----------+---------+-----------+---------+-------------------+--------------+-------------+-------+-----------------+----------------+-------------------+------------------+----------------+-------------------+------------------+-------------+
| 11.12.2017|80:06____:17010:000|   80:0

We only keep the columns that we think might be useful for the rest of the project:

In [4]:
useful_columns =  ["BETRIEBSTAG", "FAHRT_BEZEICHNER", "LINIEN_ID", "BPUIC", "HALTESTELLEN_NAME", "ANKUNFTSZEIT", "AN_PROGNOSE", "ABFAHRTSZEIT", "AB_PROGNOSE"]
df_12_17 = df_12_17.select(useful_columns)
df_01_18 = df_01_18.select(useful_columns)
df_02_18 = df_02_18.select(useful_columns)
df_03_18 = df_03_18.select(useful_columns)
df_04_18 = df_04_18.select(useful_columns)

all_df = [df_12_17, df_01_18, df_02_18, df_03_18,df_04_18]

Import the metadata:

In [5]:
df_metadata = sc.textFile("/datasets/project/metadata")
df_metadata.take(5)

['0000002  26.074412  44.446770 0      % Bucuresti',
 '0000003   1.811446  50.901549 0      % Calais',
 '0000004   1.075329  51.284212 0      % Canterbury',
 '0000005  -3.543547  50.729172 0      % Exeter',
 '0000007   9.733756  46.922368 744    % Fideris, Bahnhof']

The data in df_metadata is in bad shape, we will process it to keep the useful information such as the id, longitudes, latitudes, name of station and put it in a pandas dataframe. The ids will be useful to match the stations between metadata and the train stops dataframe.

In [6]:
def get_lat_long_station(str_array):
    """
    Retrieves the id, longitude, latitude and name of a station
    from the metadata.
    """
    if len(str_array) == 6:
        station = str_array[-1]
    else:
        station = ""
        for i, word in enumerate(str_array[5:]):
            if i == 0:
                station += word
            else:
                station += " " + word
            
    return [str_array[0], str_array[1], str_array[2], station]

cleaned_metadata = df_metadata.map(lambda x: get_lat_long_station(x.split()))

df_metadata = cleaned_metadata.toDF()
df_metadata = df_metadata.toPandas()
df_metadata.columns = ['id', 'longitude', 'latitude', 'stop_name']
df_metadata[['longitude','latitude']] = df_metadata[['longitude','latitude']].apply(pd.to_numeric)
# Let's take a look !
df_metadata.head()

Unnamed: 0,id,longitude,latitude,stop_name
0,2,26.074412,44.44677,Bucuresti
1,3,1.811446,50.901549,Calais
2,4,1.075329,51.284212,Canterbury
3,5,-3.543547,50.729172,Exeter
4,7,9.733756,46.922368,"Fideris, Bahnhof"


We want to process df_metadata more by only keeping the stations / places which are nearby (in a 10km radius) from our starting point: Zürich HB (central station):

In [7]:
START_STOP = "Zürich HB"
zurichHB_row = df_metadata[df_metadata["stop_name"] == START_STOP]
zurichHB_long = float(zurichHB_row["longitude"])
zurichHB_lat = float(zurichHB_row["latitude"])
zurichHB_row.head()

Unnamed: 0,id,longitude,latitude,stop_name
2379,8503000,8.540192,47.378177,Zürich HB


In [8]:
# Compute a new column with distance to Zurich HB in kms and then filter stops within a 12km range. We use this as a basis for next steps

ZURICH_HB_GPS = (zurichHB_lat, zurichHB_long)

def distance_from_to(from_lat, from_long, to_lat, to_long):
    """Given a metadata datafraame, 2 names of stations / places in the dataframe, compute the distance in km between these 2 places using geopy"""
    distance = geodesic((from_lat, from_long), (to_lat, to_long)).km
    return distance


df_metadata['distance_to_ZurichHB'] = df_metadata.apply(lambda row : distance_from_to(ZURICH_HB_GPS[0], ZURICH_HB_GPS[1], row.latitude, row.longitude), axis=1)
# Keep all stations that are in a 10km radius of Zurich HB 
final_df_metadata = df_metadata[df_metadata['distance_to_ZurichHB'] <= 10]
final_df_metadata.head()

Unnamed: 0,id,longitude,latitude,stop_name,distance_to_ZurichHB
74,176,8.521961,47.351679,Zimmerberg-Basistunnel,3.251969
2084,8502220,8.434713,47.390882,Urdorf,8.088858
2085,8502221,8.437543,47.357432,Birmensdorf ZH,8.089105
2086,8502222,8.468175,47.325896,Bonstetten-Wettswil,7.961914
2092,8502229,8.43033,47.380971,Urdorf Weihermatt,8.302117


From now on, we will work with this metadata dataframe for the rest of the project (contains all the stations in a 10km radius of Zurich HB). From the train-stops dataframes, we filter out the stops (rows) that are out of our range by matching the BPUIC in the train-stops dataframe to the station ids (id) in the final metadata dataframe.

In [9]:
# Keep only the stations in the 10 km range of Zurich HB
list_stations_id = list(final_df_metadata["id"])
df_12_17_zurich = df_12_17.filter(df_12_17.BPUIC.isin(list_stations_id))
df_01_18_zurich = df_01_18.filter(df_01_18.BPUIC.isin(list_stations_id))
df_02_18_zurich = df_02_18.filter(df_02_18.BPUIC.isin(list_stations_id))
df_03_18_zurich = df_03_18.filter(df_03_18.BPUIC.isin(list_stations_id))
df_04_18_zurich = df_04_18.filter(df_04_18.BPUIC.isin(list_stations_id))

# Get the new total count:
total_count_zurich = df_12_17_zurich.count() + df_01_18_zurich.count() + df_02_18_zurich.count() + df_03_18_zurich.count() + df_04_18_zurich.count()
print("Total count of rows (near Zurich): {}".format(total_count_zurich))

# Get the number of unique stations:
all_df = [df_12_17_zurich, df_01_18_zurich, df_02_18_zurich, df_03_18_zurich, df_04_18_zurich]
stations = all_df[0].select("HALTESTELLEN_NAME").distinct().rdd.map(lambda halt: halt[0])
for df in all_df[1:]:
    stations = stations.union(df.select("HALTESTELLEN_NAME").distinct().rdd.map(lambda halt: halt[0]))
stations_list = stations.collect()

distinct_stations = set(stations_list)
print("Total number of stations near Zurich: {}".format(len(distinct_stations)))

Total count of rows (near Zurich): 34326261
Total number of stations near Zurich: 997


## Modelization of the transport infrastructure

We will recreate the paths of all possible trains by looking at the trip_ids and all the stops they do on a same trip (same day). In order to be efficient, we considered that all the possible trips were computed within one week. We decided to choose the week from monday 21th January 2018 to sunday 27th January 2018 (this should be a typical week). However we only had around 47000 number of distinc trips during this week whereas there is a total number of 107000 over the 5 months. Thus we considered the full dataset to compute the trips

In [10]:
# We consider the week from 21.01.18 (monday) to 27.01.18
week = ["21.01.2018", "22.01.2018", "23.01.2018", "24.01.2018", "25.01.2018", "26.01.2018", "27.01.2018"]
df_week = df_01_18_zurich.filter(df_01_18_zurich.BETRIEBSTAG.isin(week))
all_trip_ids = df_week.select("FAHRT_BEZEICHNER").distinct().rdd.map(lambda halt: halt[0])
print("Total number of distinct trips in the chosen week: {}".format(all_trip_ids.count()))

# Every possible schedule
schedule_df = all_df[0].union(all_df[1]).union(all_df[2]).union(all_df[3]).union(all_df[4])

def remove_date(x):
    if x:
        return x.split()[1]
    else:
        return None
remove_date_udf = F.udf(remove_date)

# Typical week
df_week = schedule_df.filter(schedule_df.BETRIEBSTAG.isin(week))
df_week = df_week.withColumn("ANKUNFTSZEIT",remove_date_udf(df_week.ANKUNFTSZEIT))
df_week = df_week.withColumn("ABFAHRTSZEIT",remove_date_udf("ABFAHRTSZEIT"))
df_week = df_week.withColumn("tupled_data",struct(df_week.HALTESTELLEN_NAME,df_week.ANKUNFTSZEIT,df_week.ABFAHRTSZEIT))
df_week.show(10)

Total number of distinct trips in the chosen week: 47207
+-----------+----------------+---------+-------+-----------------+------------+-------------------+------------+-------------------+--------------------+
|BETRIEBSTAG|FAHRT_BEZEICHNER|LINIEN_ID|  BPUIC|HALTESTELLEN_NAME|ANKUNFTSZEIT|        AN_PROGNOSE|ABFAHRTSZEIT|        AB_PROGNOSE|         tupled_data|
+-----------+----------------+---------+-------+-----------------+------------+-------------------+------------+-------------------+--------------------+
| 21.01.2018|    85:11:10:002|       10|8503000|        Zürich HB|       21:51|21.01.2018 22:03:22|        null|               null|[Zürich HB,21:51,...|
| 21.01.2018| 85:11:10578:002|    10578|8503000|        Zürich HB|       17:19|21.01.2018 17:19:06|        null|               null|[Zürich HB,17:19,...|
| 21.01.2018| 85:11:10580:001|    10580|8503000|        Zürich HB|       18:19|21.01.2018 18:23:57|        null|               null|[Zürich HB,18:19,...|
| 21.01.2018| 85:11

We will rebuild the stations path of all the possible trips during the week considered. We do so by grouping the dataframe by trip_id and keeping one typical trip ! 

In [11]:
def set_(x):
    """
    Utility set casting for lists
    in order to apply it to a
    pyspark dataframe (transforming
    it to an udf function)
    """
    tmp = []
    for i in x:
        if i not in tmp:
            tmp.append(i)
    return tmp

df_grouped_trips = df_week.select(["BETRIEBSTAG", "FAHRT_BEZEICHNER", "tupled_data"]) \
                              .groupBy(["BETRIEBSTAG", "FAHRT_BEZEICHNER"]) \
                              .agg(F.collect_list("tupled_data") \
                              .alias("stations")) \
                              .groupBy("FAHRT_BEZEICHNER") \
                              .agg(F.collect_list("stations").alias("stations"))
df_grouped_trips.show(10, False)

setudf = F.udf(set_, ArrayType(ArrayType(StructType([
                    StructField("HALTESTELLEN_NAME", StringType(), True),
                    StructField("ANKUNFTSZEIT", StringType(), True),
                    StructField("ABFAHRTSZEIT", StringType(), True)
                    ]))))
trips = df_grouped_trips.withColumn("stations", setudf(df_grouped_trips.stations)).toPandas()

def tuple_it_up(x):
    """
    Utility function transforming into 
    list of tuples a list of lists
    """
    total_list = []
    for i in x:
        total_list.append([ tuple(y) for y in i ])
    return total_list

def count_none(list_of_tuples):
    """
    Returns the number of Nones in a list of tuples
    """
    return len([ x for i in list_of_tuples for x in i if not x])

def keep_only_longest(listed_stations):
    """
    Out of a list of stations, keep the longest sublist
    """
    if len(listed_stations) > 1:
        # Checks if the two intern lists are of same size
        if len(set([ len(i) for i in listed_stations])) > 1:
            return sorted(listed_stations, key=len, reverse=True)[0]
        else:
            nones = [ count_none(l_tup) for l_tup in listed_stations ]
            return listed_stations[np.argmin(nones)]
    else:
        return listed_stations[0]
    
trips["stations"] = trips.stations.apply(tuple_it_up).apply(keep_only_longest)
trips = trips[trips["stations"].apply(len) > 1].rename(columns={"FAHRT_BEZEICHNER":"trip_id"})

# Take a look at the trips !
trips.head()

+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Unnamed: 0,trip_id,stations
0,85:11:13752:001,"[(Zürich Stadelhofen, 01:52, 01:52), (Zürich H..."
1,85:11:18388:001,"[(Dietlikon, 23:13, 23:14), (Stettbach, 23:17,..."
2,85:11:18718:001,"[(Zürich Tiefenbrunnen, None, 05:41), (Zürich ..."
3,85:11:18982:002,"[(Schwerzenbach ZH, 21:17, 21:17), (Dübendorf,..."
4,85:11:19439:001,"[(Bonstetten-Wettswil, 10:18, 10:18), (Birmens..."


Let us now select the possible edges for a graph out of all of the possible trips, an edge is inbetween two stations with a trip_id, the names of both sations, the time of arrival/departure and the delta of time in between both of them.

In [12]:
edges = []

for index, data in trips.iterrows():
    print("Loading edges: {:.2f}%".format(index/trips.iloc[-1].name*100), end="\r")
    t_id, stations = data
    for i in range(len(stations) - 1):
        dep_status = stations[i]
        arr_status = stations[i + 1]
        dep = dep_status[0]
        if dep_status[-1] is not None:
            departure_time = datetime.strptime(dep_status[-1], "%H:%M")
        else:
            departure_time = None
        arr = arr_status[0]
        if arr_status[1] is not None:
            arrival_time = datetime.strptime(arr_status[1], "%H:%M")
        else:
            arrival_time = None
        if dep_status[-1] is not None and arr_status[1] is not None:
            delta = int((arrival_time - departure_time).seconds/60)
        else:
            delta = None
        edges.append({"trip_id":t_id,
                      "departure":dep,
                      "departure_time":departure_time,
                      "arrival":arr,
                      "arrival_time":arrival_time,
                      "time":delta})

# Drop it into a pandas dataframe
edges_df = pd.DataFrame(edges, columns=["trip_id", "departure", "departure_time",
                      "arrival", "arrival_time", "time"])
edges_df = edges_df.dropna()
edges_df = edges_df[edges_df.departure != edges_df.arrival]
edges_df.head()

Loading edges: 100.00%

Unnamed: 0,trip_id,departure,departure_time,arrival,arrival_time,time
0,85:11:13752:001,Zürich Stadelhofen,1900-01-01 01:52:00,Zürich HB,1900-01-01 01:55:00,3.0
1,85:11:13752:001,Zürich HB,1900-01-01 01:57:00,Zürich Hardbrücke,1900-01-01 01:59:00,2.0
2,85:11:13752:001,Zürich Hardbrücke,1900-01-01 01:59:00,Zürich Altstetten,1900-01-01 02:01:00,2.0
3,85:11:13752:001,Zürich Altstetten,1900-01-01 02:01:00,Urdorf,1900-01-01 02:06:00,5.0
4,85:11:13752:001,Urdorf,1900-01-01 02:06:00,Urdorf Weihermatt,1900-01-01 02:07:00,1.0


At each station we have to consider the possibility that the user can possibly walk to reach a new connection if the 2 stops are not too far from each other (500 meters for instance). Thus, for each station, we have to find the others stations in a radius of this distance and build an edge between the 2 station nodes in our graph:

In [13]:
METERS_BTW_STATIONS = 500
#walking_connections is a dict of the form: 
# {stop: [(close_stop1, dist), (close_stop2, dist),...]}
walking_connections = {}

def time_to_walk(dist):
    return dist / 0.06 

for i, row in final_df_metadata.iterrows():
    station = row["stop_name"]
    from_lat = row["latitude"]
    from_long = row["longitude"]
    station_dist = []
    for j, row2 in final_df_metadata.iterrows():
        station2 = row2["stop_name"]
        to_lat = row2["latitude"]
        to_long = row2["longitude"]
        if station != station2:
            km_dist = distance_from_to(from_lat, from_long, to_lat, to_long)
            if km_dist * 1000 < METERS_BTW_STATIONS:
                # Append the station, distance in km and the time to walk to this destination 
                station_dist.append((station2, km_dist, np.ceil(time_to_walk(km_dist))))
    walking_connections[station] = station_dist 

Construct the dataframe of possible walking edges (same columns format as edges dataframe):

In [14]:
array_walking = []
unique_stations = set(edges_df.arrival).union(set(edges_df.departure))

for station in  unique_stations:
    if station in walking_connections.keys():
        connections = walking_connections[station]
        for connect in connections:
            # connect of the form (name, dist to stations, time to walk)
            station2 = connect[0]
            if station2 in unique_stations:
                time = connect[2]
                row = ["walk", station, "null", station2, "null", int(np.ceil(time))]
                array_walking.append(row)       
        
walking_df = pd.DataFrame(data = array_walking, columns = ["trip_id", "departure", "departure_time", "arrival", "arrival_time", "time"])

## Routing Algorithm

In this part we focus on building a routing algorithm that will try to find the best path to go from one point A to another B. We first focused on building a graph using Breadth First Search to find the weight (time) to put on the edges (route) separating the different nodes (stations) of the graph without considering the delays:

In [15]:
def add_time(current, hours=0, minutes=0):
    """
    Given a time as a String like so: "12:03",
    returns a new time, also as a String by
    adding hours or minutes to the given time.
    """
    new_time = datetime.strptime(current, "%H:%M") + timedelta(hours=hours, minutes=minutes)
    return new_time.strftime('%H:%M')

def get_neighs(node, time, t_id=None):
    """
    For a given node, and a time, returns
    the directly reachable nodes from the given one.
    """
    now = considering[considering.departure == node] \
                    .between_time(time, add_time(time, minutes=waiting_to_go_time_minutes))
    
    can_walk = walking_df[walking_df.departure == node]
    for i, row in can_walk.iterrows():
        # Compute the time in the dataframe from the time column
        walk_time = row["time"]
        can_walk.set_value(i,'departure_time', datetime.strptime(time, "%H:%M"))
        can_walk.set_value(i,'arrival_time', datetime.strptime(add_time(time, minutes = walk_time), "%H:%M"))
    
    can_walk.set_index("departure_time", inplace=True)
        
    if t_id is not None:
        # In case of change of trip, add 2 minutes to do the change
        t_id_time = add_time(time, minutes=2)
        diff_t_id = now[now.trip_id != t_id].between_time(t_id_time, add_time(t_id_time, minutes=waiting_to_go_time_minutes))
        now = pd.concat([now[now.trip_id == t_id], diff_t_id, can_walk])
             
    neighbors = now.arrival.unique()
    result = []
    for nei in neighbors:
        result.append(now[now.arrival == nei].sort_values("arrival_time").iloc[0])
    return sorted(result, key=lambda x: x.arrival_time)

def bfs(starting_station, query_time):
    """
    Simple implementation of a breadth first
    search to discover the network.
    """
    to_visit = [(starting_station, query_time, None)]
    edges = []
    closed_set = set()

    while to_visit:
        node, query_time, t_id = to_visit.pop(0)
        closed_set.update({node})
        neighbors = get_neighs(node, query_time, t_id)
        clear_output(wait=True)
        for nei in neighbors:
            if nei.arrival not in closed_set:
                edges.append(nei)
                to_visit.append((nei.arrival, nei.arrival_time.strftime("%H:%M"), nei.trip_id))

    return edges

In [16]:
class Graph:
    """
    A simple implementation of a graph,
    with specific variables tying into
    our own usecase.
    """
    
    def __init__(self):
        """
        Builds the graph
        """
        self.nodes = set()
        self.edges = defaultdict(list)
        self.distances = {} #(from_node, to_node) -> time to reach
        self.trips_dict = {} #(from_node, to_node) -> trip_id
        self.schedule = {} #(from_node, to_node) -> (departure time, arrival time)
        self.proba_connections = {} # node -> {(arriving_trip_id, departing_trip_id) -> proba to get the connection}

    def add_node(self, value):
        """
        Adds a node to the graph
        """
        self.nodes.add(value)

    def add_edge(self, from_node, to_node, departure_time, arrival_time, distance, trip_id, trip_delays_df = None):
        """
        Adds an edge to the graph, using the given information
        """
        self.edges[from_node].append(to_node)
        self.edges[to_node].append(from_node)
        self.distances[(from_node, to_node)] = distance
        self.schedule[(from_node, to_node)] = (departure_time, arrival_time)
        if trip_delays_df is not None:
            # In case we use the delay probabilities given by trip_delays_df
            median = float(trip_delays_df[trip_delays_df["trip_id"] == trip_id].median_arrival)
            lambda_arrival = 0.0
            if median != 0.0:
                lambda_arrival = 1 / median
            self.trips_dict[(from_node, to_node)] = (trip_id, lambda_arrival)
        else:
            self.trips_dict[(from_node, to_node)] = trip_id
            
    def build_proba(self, node):
        """
        Given a node, fills the proba_connections dict of
        the graph with the connections and probability of
        connections to the node.
        """
        inner_edges = [dep for dep, arrival in self.distances.keys() if arrival == node]
        outer_edges = [arrival for dep, arrival in self.distances.keys() if dep == node]
        
        connections = {} # it is a dictionary of tuple of possible connections happening at the node with the proba of catching the connection, format: (trip arriving, trip departing) -> proba to get the connections
        for from_node in inner_edges:
            trip_id_arriving, lambda_arrival = self.trips_dict[(from_node, node)]
            _, arriving_time = self.schedule[(from_node, node)]
            for to_node in outer_edges:
                trip_id_departing, _ = self.trips_dict[(node, to_node)]
                departure_time, _ = self.schedule[(node, to_node)] 
                
                # Compute time to do the connections 
                time_to_change = (departure_time - arriving_time).seconds / 60
                
                # Compute the probability to get the connection
                if trip_id_arriving == trip_id_departing or trip_id_arriving == "walk" or trip_id_departing == "walk" or lambda_arrival == 0.0:
                    # You stay in the train and or you walked to the next station or the arrival train has no delays -> You are sure to get the connection
                    p = 1.0 
                elif add_time(arriving_time.strftime('%H:%M'), minutes = 2) < departure_time.strftime("%H:%M"):
                    # You don't have time to change of train -> proba is 0
                    p = 0.0
                else:
                    p = proba_get_connection(lambda_arrival, time_to_change)
                connections[(trip_id_arriving, trip_id_departing)] = p
        self.proba_connections[node] = connections

                
                
def dijsktra(graph, initial):
    """
    Simple implementation of Dijkstra's algorithm.
    """
    visited = {initial: 0}
    path = {}

    nodes = set(graph.nodes)

    while nodes: 
        min_node = None
        for node in nodes:
            if node in visited:
                if min_node is None:
                    min_node = node
                elif visited[node] < visited[min_node]:
                    min_node = node

        if min_node is None:
            break

        nodes.remove(min_node)
        current_weight = visited[min_node]

        for edge in graph.edges[min_node]:
            try:
                weight = current_weight + graph.distances[(min_node, edge)]
            except:
                import math
                weight = current_weight + math.inf
                
            if (edge not in visited or weight < visited[edge]) and (edge, min_node) in graph.distances.keys():
                visited[edge] = weight
                path[edge] = min_node

    return visited, path

## Predictive model with delays 

We will use the same routing algorithm by including the probabilities of having the connection after computing on the graph that obtained the BFS. We keep the same edges

Compute difference between scheduled and actual arrival / departure for all rows in the DF:

In [17]:
def clean_date(date):
    """
    Cleans up some of the dates which are in 
    the format "13.09.2017 09:43", and 
    we want to convert it to "13.09.2017 09:43:00"
    
    Returns the new date
    """
    result = date
    split_date = date.split(':')
    # If there is no seconds
    if len(split_date) == 2:
        result += ':00'
    return result

def compute_time_difference(actual, scheduled):
    """
    Computes the difference in minutes between
    the actual and scheduled time
    
    Returns the difference if there is one to compute
    """
    # FMT is the format of the timestamp
    FMT = "%d.%m.%Y %H:%M:%S"
    if actual != "null" and scheduled != "null" and (isinstance(actual, str) and isinstance(scheduled, str)):
        # Make sure inputs have correct format: as FMT
        cleaned_actual = clean_date(actual)
        cleaned_scheduled = clean_date(scheduled)
        
        time_actual = datetime.strptime(cleaned_actual, FMT)
        time_scheduled = datetime.strptime(cleaned_scheduled, FMT)
        
        # Map the time to the next minute if the seconds are different from 0
        if time_scheduled.second != 0:
            time_scheduled = time_schedule + timedelta(seconds = 60 - time_scheduled.second)
        if time_actual.second != 0:
            time_actual = time_actual + timedelta(seconds = 60 - time_actual.second)
        
        
        if time_actual > time_scheduled:
            # In case of delay
            delta = time_actual - time_scheduled
            return int(delta.seconds / 60)
        else:
            # In case actual happened before scheduled time
            delta = time_scheduled - time_actual
            return - int(delta.seconds / 60)
    else: 
        return None
    
compute_delay = udf(lambda row: compute_time_difference(row[0], row[1]), IntegerType())

df_delays = schedule_df.withColumn("arrival_delay", compute_delay(struct([schedule_df['AN_PROGNOSE'], schedule_df['ANKUNFTSZEIT']])))
df_delays = df_delays.withColumn("departure_delay", compute_delay(struct([schedule_df['AB_PROGNOSE'], schedule_df['ABFAHRTSZEIT']])))
df_delays = df_delays.filter(df_delays["departure_delay"] >= -5)
df_delays = df_delays.filter(df_delays["arrival_delay"] >= -5)
df_delays.show(10)

+-----------+----------------+---------+-------+-----------------+----------------+-------------------+----------------+-------------------+-------------+---------------+
|BETRIEBSTAG|FAHRT_BEZEICHNER|LINIEN_ID|  BPUIC|HALTESTELLEN_NAME|    ANKUNFTSZEIT|        AN_PROGNOSE|    ABFAHRTSZEIT|        AB_PROGNOSE|arrival_delay|departure_delay|
+-----------+----------------+---------+-------+-----------------+----------------+-------------------+----------------+-------------------+-------------+---------------+
| 11.12.2017|  85:11:1252:001|     1252|8503000|        Zürich HB|11.12.2017 21:23|11.12.2017 21:23:09|11.12.2017 21:36|11.12.2017 21:37:05|            1|              2|
| 11.12.2017|  85:11:1255:001|     1255|8503000|        Zürich HB|11.12.2017 08:26|11.12.2017 08:26:46|11.12.2017 08:37|11.12.2017 08:38:59|            1|              2|
| 11.12.2017|  85:11:1507:002|     1507|8503000|        Zürich HB|11.12.2017 06:30|11.12.2017 06:30:49|11.12.2017 06:39|11.12.2017 06:39:27|     

Since we want to compare the delay based on different criteria (day of weekend or not, rush hour...), we need to add a new column to the dataframe for each criteria we want to check. We'll begin by adding to the df_delays dataframe one "weekend" column with each element set to True if it occurs during the weekend and False otherwise (based on "BETRIEBSTAG"):

In [18]:
is_weekend = udf(lambda row: True if datetime.strptime(row, "%d.%m.%Y").weekday() in [5,6] else False)
df_delays = df_delays.withColumn("weekend", is_weekend(df_delays["BETRIEBSTAG"]))
df_delays.show()

+-----------+----------------+---------+-------+-----------------+----------------+-------------------+----------------+-------------------+-------------+---------------+-------+
|BETRIEBSTAG|FAHRT_BEZEICHNER|LINIEN_ID|  BPUIC|HALTESTELLEN_NAME|    ANKUNFTSZEIT|        AN_PROGNOSE|    ABFAHRTSZEIT|        AB_PROGNOSE|arrival_delay|departure_delay|weekend|
+-----------+----------------+---------+-------+-----------------+----------------+-------------------+----------------+-------------------+-------------+---------------+-------+
| 11.12.2017|  85:11:1252:001|     1252|8503000|        Zürich HB|11.12.2017 21:23|11.12.2017 21:23:09|11.12.2017 21:36|11.12.2017 21:37:05|            1|              2|  false|
| 11.12.2017|  85:11:1255:001|     1255|8503000|        Zürich HB|11.12.2017 08:26|11.12.2017 08:26:46|11.12.2017 08:37|11.12.2017 08:38:59|            1|              2|  false|
| 11.12.2017|  85:11:1507:002|     1507|8503000|        Zürich HB|11.12.2017 06:30|11.12.2017 06:30:49|11

We will now compute the probability to miss a connection when the user change of train based on the delay of the train he takes to arrive. To do so, we use an exponential distribution with parameter $\lambda = \frac{1}{median \ of \ delay}$ for each trip id. If our user has $t$ minutes from the moment his train $T1$ arrives at the station (scheduled) and the moment the other train $T2$ leaves, the probability that the user gets the $T2$ is:

$$P(delay_{\ T1} \leq t) = 1 - e^{- \lambda_{T1} * t}$$

In [19]:
def compute_median_delay(list_delays):
    """
    Given a list of delays (in minutes) and 
    the certitude percentage of the user 
    (between 0 and 100), this function 
    computes the delay corresponding 
    to the certitude ratio
    """
    if isinstance(list_delays, (list,)):
        if len(list_delays) == 0:
            # No information on the delays -> assume there is none: 0 minute
            return 0.0
        else:
            delay = np.median(list_delays)
            return float(delay)
    elif isinstance(list_delays, (int,)):
        return float(list_delays)
    else:
        return 0.0
    
def proba_get_connection(lambda_, time_to_change):
    """
    Given a lambda corresponding to the parameter
    of an exponential function, and a time to change,
    returns the probability that the time changes
    """
    if lambda_ < 0:
        return 1
    else:
        return 1 - np.exp(- lambda_ * time_to_change)

compute_median = udf(lambda row: compute_median_delay(row))

# Group by trip id and weekend variables in order
# to get the list of delays for each line during
# the weekend or during the weekdays
dep_arr_delays = df_delays.groupBy("FAHRT_BEZEICHNER") \
                            .agg(F.collect_list("departure_delay").alias("list_departures"),
                                 F.collect_list("arrival_delay").alias("list_arrivals"))
dep_arr_delays = dep_arr_delays.withColumn("median_departure", compute_median(dep_arr_delays["list_departures"])) \
                                   .withColumn("median_arrival", compute_median(dep_arr_delays["list_arrivals"]))

dep_arr_delays.show(10)

+----------------+--------------------+--------------------+----------------+--------------+
|FAHRT_BEZEICHNER|     list_departures|       list_arrivals|median_departure|median_arrival|
+----------------+--------------------+--------------------+----------------+--------------+
| 85:11:13752:001|[1, 1, 2, 3, 3, 4...|[0, 0, 1, 3, 2, 3...|             3.0|           2.0|
| 85:11:18388:001|[1, 1, 1, 1, 2, 1...|[1, 1, 1, 0, 1, 1...|             1.5|           1.0|
| 85:11:18718:001|[1, 1, 2, 1, 2, 3...|[0, 1, 1, 1, 1, 2...|             2.0|           1.0|
| 85:11:18982:002|[3, 2, 1, 2, 1, 2...|[1, 2, 1, 1, 1, 1...|             2.0|           1.0|
| 85:11:19439:001|[3, 1, 3, 3, 2, 1...|[1, 1, 1, 2, 2, 1...|             2.0|           1.0|
| 85:11:19526:001|[3, 2, 4, 2, 2, 3...|[2, 3, 2, 2, 2, 2...|             2.0|           1.0|
| 85:11:19570:001|[1, 1, 2, 1, 1, 2...|[1, 1, 1, 1, 1, 2...|             2.0|           2.0|
| 85:11:20453:001|[1, 1, 1, 3, 2, 2...|[1, 1, 1, 2, 0, 2...|          

Keep only the interesting columns to reduce the size of the dataframe in order to convert it to a pandas DF:

In [20]:
trip_delays = dep_arr_delays.select(["FAHRT_BEZEICHNER", "median_departure", "median_arrival"]).toPandas()
trip_delays.rename(columns = {'FAHRT_BEZEICHNER':'trip_id'}, inplace = True)
# Append the walk delay in the dataframe
walk_delay = pd.DataFrame(data=[["walk", 0.0, 0.0]], columns = ["trip_id", "median_departure", "median_arrival"])
trip_delays = trip_delays.append(walk_delay)
trip_delays.tail()

Unnamed: 0,trip_id,median_departure,median_arrival
115958,85:882:127678-18101-1,2.5,2.0
115959,85:882:127750-18101-1,2.5,2.0
115960,85:882:127788-18101-1,2.5,2.5
115961,85:882:6610-36101-1,1.0,2.0
0,walk,0.0,0.0


## Test the system:
You want to travel from Zürich HB to Dübendorf for example, then with our system here is your trip !

In [22]:
# Let's test it out !
query_time = "18:00"
waiting_to_go_time_minutes = 15
starting_station = "Zürich HB"
arrival_station = "Dübendorf"

MAX_HOURS = 2

considering = edges_df.set_index("departure_time") \
                      .between_time(query_time, add_time(query_time, hours=MAX_HOURS)) \
                      .sort_index()
        
all_trips = bfs(starting_station, query_time)
edges = pd.DataFrame(all_trips).drop_duplicates()
edges = edges.sort_values("arrival_time")[~edges.sort_values("arrival_time").duplicated(["arrival", "departure"])]

# Building the graph
g = Graph()
for n in set(edges[["departure", "arrival"]].values.ravel()):
    g.add_node(n)
    
for i,e in edges.iterrows():
    # Don't need to add trip_delays if you don't want to consider the delays
    g.add_edge(e.departure, e.arrival, i, e.arrival_time, e.time, e.trip_id, trip_delays)

g.proba_connections = {}
for i, node in enumerate(g.nodes):
    g.build_proba(node)

    
# We run the algorithm and find 
# the shortest possible path !
vis, path = dijsktra(g, arrival_station)
total = []
n = starting_station
while True:
    total.append(n)
    if n == arrival_station:
        break
    else:
        n = path[n]

# User friendly printing of the path:
# follow the instructions !
print("Here is your trip:")
prev_trip_id = 0
path = []
for i, station in enumerate(total[:-1]):
    next_station = total[i+1]
    current = edges[(edges.departure == station) & (edges.arrival == next_station)]
    dep_time = pd.to_datetime(str(current.index.values[0])).strftime('%H:%M')
    arr_time = pd.to_datetime(str(current.arrival_time.values[0])).strftime('%H:%M')
    trip_id = current.trip_id.values[0]
    
    
    if i == 0 or i == len(total[:-1]):
        proba_next = 1.0
    else:
        proba_next = g.proba_connections[station][(prev_trip_id, trip_id)]
    print("from {} to {} with trip id: {}\nand dep time: {}, arr time: {}, proba with next connection: {} \n"
          .format(station, next_station, trip_id,
                  dep_time, arr_time, proba_next))
    
    path.append((station, dep_time, arr_time, trip_id))
    
    prev_trip_id = trip_id
path.append((total[len(total) - 1], "null", "null", "null"))
paths = [path]

Here is your trip:
from Zürich HB to Zürich Oerlikon with trip id: 85:11:2664:001
and dep time: 18:01, arr time: 18:07, proba with next connection: 1.0 

from Zürich Oerlikon to Wallisellen with trip id: 85:11:19469:001
and dep time: 18:18, arr time: 18:20, proba with next connection: 0.0 

from Wallisellen to Dübendorf with trip id: 85:11:19469:001
and dep time: 18:21, arr time: 18:24, proba with next connection: 1.0 



## Map visualization

Let's visualize it !

In [23]:
RADIUS = 6378137.0

# Functions useful to convert a latitude / longitude to the y / x coordinate of the map
def lat2y(a):
    return math.log(math.tan(math.pi / 4 + math.radians(a) / 2)) * RADIUS

def lon2x(a):
    return math.radians(a) * RADIUS

def gps_of(station_name):
    '''Get the latitude and longitude of a station name in the metadata dataframe'''
    temp = final_df_metadata.loc[final_df_metadata['stop_name'] == station_name]
    if len(temp) == 0:
        return (0,0)
    else:
        return (float(temp['latitude']), float(temp['longitude']))

def extend_path(input_list):
    '''Modify the input list by adding latitude and longitude'''
    data = []
    for li in input_list:
        temp_list = []
        for tupl in li:
            temp = tupl + gps_of(tupl[0])
            temp_list.append(temp)
        data.append(temp_list)
    return data

example = extend_path(paths)

def process_to_plot(data):
    '''Return a latitude, longitude and name list for each path'''
    lat = []
    lon = []
    name = []
    departure = []
    arrivals = []
    for l in data:
        lat1 = []
        lon1 = []
        name1 = []
        departure1 = []
        arrivals1 = []
        for elem in l:
            lat1.append(lat2y(elem[4]))
            lon1.append(lon2x(elem[5]))
            departure1.append(elem[1])
            arrivals1.append(elem[2])
            name1.append(elem[0])
        lat.append(lat1)
        lon.append(lon1)
        name.append(name1)
        departure.append(departure1)
        arrivals.append(arrivals1)
    return lat, lon, name, departure, arrivals

output_notebook()

hover = HoverTool(tooltips=[
    ('Station', '@name'),
    ('Departure', '@departure'),
    ('Arrival', '@arrival')
])

p = figure(x_range=(935000, 955000), y_range=(5980000,6025000), x_axis_type="mercator", y_axis_type="mercator", tools=[hover])
p.add_tile(CARTODBPOSITRON)

lat, lon, name, departures, arrivals = process_to_plot(example)
source1 = ColumnDataSource(data = dict(lat=lat[0],lon=lon[0],name=name[0],departure=departures[0],arrival=arrivals[0]))
p.circle(x='lon', y='lat', size=8, color="red", alpha = 0.8, source = source1)
p.line(x=lon[0], y=lat[0], color='red')

handle=show(p, notebook_handle=True)

## Isochronous Map

We apply the same method to create the isochronous map, we take a radius of 30 minutes around Zürich:

In [24]:
# Let's test it out !
query_time = "18:00"
waiting_to_go_time_minutes = 15
starting_station = "Zürich HB"

MAX_MINUTES = 30

considering = edges_df.set_index("departure_time") \
                      .between_time(query_time, add_time(query_time, minutes=MAX_MINUTES))\
                      .sort_index()
        
all_trips = bfs(starting_station, query_time)
edges = pd.DataFrame(all_trips).drop_duplicates()
edges = edges.sort_values("arrival_time")[~edges.sort_values("arrival_time").duplicated(["arrival", "departure"])]

# Building the graph
g = Graph()
for n in set(edges[["departure", "arrival"]].values.ravel()):
    g.add_node(n)
    
for i,e in edges.iterrows():
    # Don't need to add trip_delays if you don't want to consider the delays
    g.add_edge(e.departure, e.arrival, i, e.arrival_time, e.time, e.trip_id, trip_delays)
   
# We run the algorithm and find 
# the shortest possible path !
all_paths_found = []
for node_arrival in g.nodes:
    if node_arrival != starting_station:
        vis, path = dijsktra(g, node_arrival)
        total = []
        n = starting_station
        while True:
            total.append(n)
            if n == node_arrival:
                break
            else:
                n = path[n]

        # User friendly printing of the path:
        # follow the instructions !
        prev_trip_id = 0
        path = []
        for i, station in enumerate(total[:-1]):
            next_station = total[i+1]
            current = edges[(edges.departure == station) & (edges.arrival == next_station)]
            dep_time = pd.to_datetime(str(current.index.values[0])).strftime('%H:%M')
            arr_time = pd.to_datetime(str(current.arrival_time.values[0])).strftime('%H:%M')
            trip_id = current.trip_id.values[0]

            path.append((station, dep_time, arr_time, trip_id))

            prev_trip_id = trip_id
        path.append((total[len(total) - 1], "null", "null", "null"))
    all_paths_found.append(path)

# Build a list of tuples: each station
# and the time it takes to go there.
fixed_paths = []
for path in all_paths_found:
    fixed_path = []
    for station in path:
        if station[1] == "null" or station[2] == "null":
            fixed_path.append(station)
            break
        if int(station[1][-2:]) >= MAX_MINUTES or int(station[2][-2:]) > MAX_MINUTES:
            break
        fixed_path.append(station)
    
    if fixed_path[-1][1] == "null":
        time_found = MAX_MINUTES - int(fixed_path[-2][2][-2:])
    else:
        time_found = MAX_MINUTES - int(fixed_path[-1][1][-2:])
    data = (station[0], time_found)
    fixed_paths.append(data)
    
# Select only the fastest ways to get there
reachable = {}
for station, times in fixed_paths:
    reachable.setdefault(station, []).append(times)
reachable = [(i,min(j)) for i,j in reachable.items()]
print("This is an excerpt of the reachable list:")
reachable[:10]

This is an excerpt of the reachable list:


[('Zürich, Kirche Fluntern', 1),
 ('Kilchberg ZH, Auf Brunnen', 1),
 ('Rümlang', 10),
 ('Zürich, Luggwegstrasse', 9),
 ('Dietlikon, Brandbachstrasse', 8),
 ('Zürich, Höfliweg', 8),
 ('Dübendorf', 6),
 ('Glattbrugg, Lättenwiesen', 2),
 ('Dübendorf, Hochbord Süd', 13),
 ('Zürich, Lehenstrasse', 3)]

Let's see it:

In [25]:
def process_to_isochrone(data):
    result = []
    for elem in data:
        temp = gps_of(elem[0]) + (elem[1],)
        result.append(temp)
    
    lat = [] 
    lon = []
    radius = []
    for elem in result:
        lat.append(lat2y(elem[0]))
        lon.append(lon2x(elem[1]))
        radius.append(elem[2])
    return lat, lon, radius

output_notebook()

p = figure(x_range=(935000, 955000), y_range=(5980000,6025000), x_axis_type="mercator", y_axis_type="mercator")
p.add_tile(CARTODBPOSITRON)

lat, lon, radius = process_to_isochrone(reachable)
#Coefficient for radius to scale the walk
radius = [x*60 for x in radius]

source = ColumnDataSource(data = dict(lat=lat,lon=lon,radius=radius))
zurich = ColumnDataSource(data = dict(latZ=[lat2y(47.378177)],lonZ=[lon2x(8.540192)],radiusZ=[100]))

p.circle(x='lon', y='lat', radius='radius', color="blue", alpha = 0.5, source = source)
p.circle(x='lonZ', y='latZ', radius='radiusZ', color='red', alpha=1, source = zurich)

handle=show(p, notebook_handle=True)

### If you come from GitLab, checkout both png files to see the graphs !