# Imports & Configuration

## Libraries

In [1]:
from pyspark.sql.functions import *
import pandas as pd
from operator import itemgetter
import numpy as np
from datetime import *
import ast
from pyspark.sql.types import *
import itertools
from functools import reduce
import math
from collections import Counter
from dateutil.parser import parse
from geopy import distance
from pyspark.sql.functions import lit
from pyspark.sql import functions as F

from collections import defaultdict

import warnings
warnings.filterwarnings("ignore")

import networkx as nx
import os
import getpass
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col


## Spark Config

In [2]:
conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('scheduler-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '8g')
conf.set('spark.executor.instances', '16')
conf.set('spark.port.maxRetries', '100')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [3]:
spark = SparkSession(sc)

## Parameters

In [4]:
stop_name = 'Zürich HB'

## Read Data

In this section we read the sbb data and rename all columns in english for the sake of readability. We also filter out trips that do not contain actual time of arrival or departure.

NB : We chose to build our model based on the schedule of April 2018. When trying more recent months, we figured out that there was often no measure of actual arrival or departure times (prognose_status != 'GESCHAETZT')

In [5]:
#df_all = spark.read.option("header", "true").csv("/datasets/sbb/2019/01/2019-01-23istdaten.csv.bz2", sep= ';')

# Using only april 2018 dataset, to get the full dataset uncomment first line
df = spark.read.option("header", "true").csv("/datasets/sbb/2018/04/*", sep= ';')

In [6]:
df = df.withColumnRenamed('BETRIEBSTAG', 'date').\
withColumnRenamed('FAHRT_BEZEICHNER', 'trip_id').\
withColumnRenamed('LINIEN_ID', 'train_id').\
withColumnRenamed('BPUIC', 'BPUIC').\
withColumnRenamed('HALTESTELLEN_NAME', 'stop_name').\
withColumnRenamed('ANKUNFTSZEIT', 'arrival_time').\
withColumnRenamed('AN_PROGNOSE', 'actual_arrival_time').\
withColumnRenamed('AN_PROGNOSE_STATUS', 'an_prognose_status').\
withColumnRenamed('ABFAHRTSZEIT', 'departure_time').\
withColumnRenamed('AB_PROGNOSE', 'actual_departure_time').\
withColumnRenamed('AB_PROGNOSE_STATUS', 'ab_prognose_status').\
drop('BETREIBER_ID','LINIEN_TEXT','ZUSATZFAHRT_TF','FAELLT_AUS_TF', 'BETREIBER_ABK', 'BETREIBER_NAME', 'PRODUKT_ID', 'UMLAUF_ID', 'VERKEHRSMITTEL_TEXT','DURCHFAHRT_TF')

In [7]:
#Filter out trips where there is no measure of actual time of arrival
df = df.filter(df.an_prognose_status=="GESCHAETZT")
df = df.filter(df.ab_prognose_status=="GESCHAETZT")

In [8]:
df = df.drop('an_prognose_status','ab_prognose_status')

In [9]:
df.limit(5).toPandas()

Unnamed: 0,date,trip_id,train_id,BPUIC,stop_name,arrival_time,actual_arrival_time,departure_time,actual_departure_time
0,20.04.2018,80:80DBR_:3040:000,3040,8503424,Schaffhausen,20.04.2018 08:14,20.04.2018 08:15:48,20.04.2018 08:16,20.04.2018 08:17:31
1,20.04.2018,80:80DBR_:3041:000,3041,8503424,Schaffhausen,20.04.2018 07:42,20.04.2018 07:41:49,20.04.2018 07:43,20.04.2018 07:43:51
2,20.04.2018,80:80DBR_:3042:000,3042,8503424,Schaffhausen,20.04.2018 10:14,20.04.2018 10:17:37,20.04.2018 10:16,20.04.2018 10:18:49
3,20.04.2018,80:80DBR_:3043:000,3043,8503424,Schaffhausen,20.04.2018 09:42,20.04.2018 09:41:42,20.04.2018 09:43,20.04.2018 09:43:41
4,20.04.2018,80:80DBR_:3044:000,3044,8503424,Schaffhausen,20.04.2018 12:14,20.04.2018 12:15:26,20.04.2018 12:16,20.04.2018 12:16:51


## Coordinates

In this section we read the metadata file and keep only the columns we are interested in. This will be used it in the next section.

In [10]:
cols = ['BPUIC','long','lat','altitude','name']

coordinates = pd.read_csv('BFKOORD_GEO',sep='\t', header=None)

coordinates[0] = coordinates[0].apply(lambda x : x.split())
for i in range(4):
    if i == 0:
        coordinates[cols[i]] = coordinates[0].apply(lambda x : x[i])
    else :
        coordinates[cols[i]] = coordinates[0].apply(lambda x : float(x[i]))

coordinates[cols[-1]] = coordinates[0].apply(lambda x : " ".join(x[5:]))

coordinates = coordinates.drop(columns=[0])
coordinates.head()

Unnamed: 0,BPUIC,long,lat,altitude,name
0,2,26.074412,44.44677,0.0,Bucuresti
1,3,1.811446,50.901549,0.0,Calais
2,4,1.075329,51.284212,0.0,Canterbury
3,5,-3.543547,50.729172,0.0,Exeter
4,7,9.733756,46.922368,744.0,"Fideris, Bahnhof"


# Filter Data

In this section we will filter out the stops that are outside the radius of 10 km from the starting station Zürich.

In [11]:
def calculate_distance(row):
    point = (row['lat'],row['long'],row['altitude'])
    return distance.distance(point,start_coord).km

In [12]:
coordinates[coordinates['name'] == stop_name]

Unnamed: 0,BPUIC,long,lat,altitude,name
2379,8503000,8.540192,47.378177,408.0,Zürich HB


In [13]:
start_coord = coordinates[coordinates['name'] == stop_name]
start_coord = (start_coord['lat'].values[0],start_coord['long'].values[0],start_coord['altitude'].values[0])

# Get the distance from each stop to start
coordinates['distance_to_start_km'] = coordinates.apply(calculate_distance, axis=1)

# Keep only stops that are within 10 km of start
coordinates = coordinates[coordinates.distance_to_start_km <= 10]
reachable_stops_id = list(coordinates.BPUIC)


In [14]:
# Keep only stops that are within a radius of 10 km of start
df = df.filter(df.BPUIC.isin(reachable_stops_id))

In [15]:
df.show(5,False)

+----------+--------------+--------+-------+----------------+----------------+-------------------+----------------+---------------------+
|date      |trip_id       |train_id|BPUIC  |stop_name       |arrival_time    |actual_arrival_time|departure_time  |actual_departure_time|
+----------+--------------+--------+-------+----------------+----------------+-------------------+----------------+---------------------+
|20.04.2018|85:11:1255:001|1255    |8503000|Zürich HB       |20.04.2018 08:26|20.04.2018 08:27:41|20.04.2018 08:37|20.04.2018 08:39:43  |
|20.04.2018|85:11:1258:001|1258    |8503000|Zürich HB       |20.04.2018 21:23|20.04.2018 21:25:19|20.04.2018 21:34|20.04.2018 21:34:51  |
|20.04.2018|85:11:1507:002|1507    |8503000|Zürich HB       |20.04.2018 06:30|20.04.2018 06:30:14|20.04.2018 06:39|20.04.2018 06:39:52  |
|20.04.2018|85:11:1507:002|1507    |8503016|Zürich Flughafen|20.04.2018 06:49|20.04.2018 06:49:43|20.04.2018 06:51|20.04.2018 06:52:01  |
|20.04.2018|85:11:1509:003|1509   

## Group by Trip id 

**In this section we will try to recover all the itineraries by grouping all rows over Trip ID. Since we are working with a whole month (for the sake of completeness), we will get the same patterns again and again. We will then filter the duplicates and keep only single daily itineraries.**


In [16]:
def remove_date(date):
    """This function keeps only time from a given date"""
    if date:
        return date.split()[1]
    else:
        return None
remove_date_udf = F.udf(remove_date)

**In the next cell, we added a column stop_dt_at (stop, departure time, arrival time) to have data in a more concise form.**

In [17]:
df_schedule = df.withColumn("arrival_time",remove_date_udf(df.arrival_time))
df_schedule = df_schedule.withColumn("departure_time",remove_date_udf("departure_time"))
df_schedule = df_schedule.withColumn("stop_dt_at",struct(df_schedule.stop_name,df_schedule.arrival_time,df_schedule.departure_time))

df_schedule.show(5,False)


+----------+--------------+--------+-------+----------------+------------+-------------------+--------------+---------------------+--------------------------------+
|date      |trip_id       |train_id|BPUIC  |stop_name       |arrival_time|actual_arrival_time|departure_time|actual_departure_time|stop_dt_at                      |
+----------+--------------+--------+-------+----------------+------------+-------------------+--------------+---------------------+--------------------------------+
|20.04.2018|85:11:1255:001|1255    |8503000|Zürich HB       |08:26       |20.04.2018 08:27:41|08:37         |20.04.2018 08:39:43  |[Zürich HB, 08:26, 08:37]       |
|20.04.2018|85:11:1258:001|1258    |8503000|Zürich HB       |21:23       |20.04.2018 21:25:19|21:34         |20.04.2018 21:34:51  |[Zürich HB, 21:23, 21:34]       |
|20.04.2018|85:11:1507:002|1507    |8503000|Zürich HB       |06:30       |20.04.2018 06:30:14|06:39         |20.04.2018 06:39:52  |[Zürich HB, 06:30, 06:39]       |
|20.04.201

**We now group all rows by trip ID to try to reconstruct the different trips**

In [18]:
# Group trips by Trip ID and collect all the itineraries
df_grouped_trips = df_schedule.select(["date", "trip_id", "stop_dt_at"]) \
                              .groupBy(["date", "trip_id"]) \
                              .agg(F.collect_list("stop_dt_at") \
                              .alias("stations")) \
                              .groupBy("trip_id") \
                              .agg(F.collect_list("stations").alias("stations"))
#df_grouped_trips_week_day.show(10, False)

**After grouping all the itineraries by trip ID, the next step is to build single typical journeys. We will start by removing duplicates.**

In [46]:
def remove_duplicates(l):
    """This function removes duplicates from a given list """
    elements = []
    for i in l:
        if i not in elements:
            elements.append(i)
    return elements

setudf = F.udf(remove_duplicates, ArrayType(ArrayType(StructType([
                    StructField("stop_name", StringType(), True),
                    StructField("arrival_time", StringType(), True),
                    StructField("departure_time", StringType(), True)
                    ]))))
trips = df_grouped_trips.withColumn("stations", setudf(df_grouped_trips.stations)).toPandas()

In [49]:
trips['stations'][103]

[[Row(stop_name='Dietlikon', arrival_time='06:43', departure_time='06:44'),
  Row(stop_name='Stettbach', arrival_time='06:47', departure_time='06:48'),
  Row(stop_name='Zürich Stadelhofen', arrival_time='06:52', departure_time='06:53'),
  Row(stop_name='Zürich HB', arrival_time='06:56', departure_time='06:59'),
  Row(stop_name='Zürich Hardbrücke', arrival_time='07:01', departure_time='07:01'),
  Row(stop_name='Zürich Altstetten', arrival_time='07:05', departure_time='07:06'),
  Row(stop_name='Schlieren', arrival_time='07:08', departure_time='07:08'),
  Row(stop_name='Glanzenberg', arrival_time='07:10', departure_time='07:10')],
 [Row(stop_name='Dietlikon', arrival_time='06:43', departure_time='06:44'),
  Row(stop_name='Stettbach', arrival_time='06:47', departure_time='06:48'),
  Row(stop_name='Zürich Stadelhofen', arrival_time='06:52', departure_time='06:53')]]

**We then transform the list of lists into list of tuples**

In [50]:
def transform_to_tuples(list_list):
    """This function transform a list of lists to a list of tuples"""
    total_list = []
    for l in list_list:
        total_list.append([tuple(y) for y in l ])
    return total_list

trips["stations"] = trips.stations.apply(transform_to_tuples)

In [51]:
trips['stations'][103]

[[('Dietlikon', '06:43', '06:44'),
  ('Stettbach', '06:47', '06:48'),
  ('Zürich Stadelhofen', '06:52', '06:53'),
  ('Zürich HB', '06:56', '06:59'),
  ('Zürich Hardbrücke', '07:01', '07:01'),
  ('Zürich Altstetten', '07:05', '07:06'),
  ('Schlieren', '07:08', '07:08'),
  ('Glanzenberg', '07:10', '07:10')],
 [('Dietlikon', '06:43', '06:44'),
  ('Stettbach', '06:47', '06:48'),
  ('Zürich Stadelhofen', '06:52', '06:53')]]

**To keep only a single itinerary, we keep the longest sublist of stops only. This gets more clear when looking at the example.**

In [52]:
def keep_only_longest(list_stations):
    """ This function keeps the longest sublist of all the sublists of a given list"""
    if len(list_stations) > 1:
        if len(set([len(i) for i in list_stations])) > 1:
            return sorted(list_stations, key=len, reverse=True)[0]
        else:
            nones = [ len([ x for i in l_tup for x in i if not x]) for l_tup in list_stations ]
            return list_stations[np.argmin(nones)]
    else:
        return list_stations[0]
    
trips["stations"] = trips["stations"].apply(keep_only_longest)

In [53]:
trips['stations'][103]

[('Dietlikon', '06:43', '06:44'),
 ('Stettbach', '06:47', '06:48'),
 ('Zürich Stadelhofen', '06:52', '06:53'),
 ('Zürich HB', '06:56', '06:59'),
 ('Zürich Hardbrücke', '07:01', '07:01'),
 ('Zürich Altstetten', '07:05', '07:06'),
 ('Schlieren', '07:08', '07:08'),
 ('Glanzenberg', '07:10', '07:10')]

## Find the edges

In the next cell, we will consider a new graph with nodes being the departure and arrival stations and the edge the connection between them.

In [25]:
# Array of dictionaries containing the connections between stations

edges = []

for _, row in trips.iterrows():
    t_id, stations = row
    for i in range(len(stations) - 1):
        start = stations[i]
        arrival = stations[i + 1]
        
            #Name of the station of departure
        departure_name = start[0]
        
        if start[-1] is not None:
            departure_time = datetime.strptime(start[-1], "%H:%M")
        else:
            departure_time = None
        
        #Name of the station of arrival
        arrival_name = arrival[0]
        
        if arrival[1] is not None:
            arrival_time = datetime.strptime(arrival[1], "%H:%M")
        else:
            arrival_time = None
        
        if start[-1] is not None and arrival[1] is not None:
            delta = int((arrival_time - departure_time).seconds/60)
        else:
            delta = None
#         fill the array
        edges.append({"trip_id":t_id,
                      "departure":departure_name,
                      "departure_time":departure_time,
                      "arrival":arrival_name,
                      "arrival_time":arrival_time,
                      "time":delta})

In [26]:
# Drop it into a pandas dataframe
edges_df = pd.DataFrame(edges, columns=["trip_id", "departure", "departure_time",
                      "arrival", "arrival_time", "time"])
edges_df = edges_df.dropna()
edges_df = edges_df[edges_df.departure != edges_df.arrival]
edges_df.head()

Unnamed: 0,trip_id,departure,departure_time,arrival,arrival_time,time
0,85:11:18388:001,Dietlikon,1900-01-01 23:14:00,Stettbach,1900-01-01 23:17:00,3
1,85:11:18388:001,Stettbach,1900-01-01 23:18:00,Zürich Stadelhofen,1900-01-01 23:22:00,4
2,85:11:18388:001,Zürich Stadelhofen,1900-01-01 23:23:00,Zürich HB,1900-01-01 23:26:00,3
3,85:11:18388:001,Zürich HB,1900-01-01 23:29:00,Zürich Hardbrücke,1900-01-01 23:31:00,2
4,85:11:18388:001,Zürich Hardbrücke,1900-01-01 23:31:00,Zürich Altstetten,1900-01-01 23:35:00,4


In [27]:
# This is to construct a dictionary of walking connection i.e if the distance between 2 stations is less than
# DIST_THRESHOLD then no need to take another bus --> just walk

DIST_THRESHOLD = 0.5 # in kilometers

def time_to_walk(dist):
    """
    Google Maps assumes a 5 kms an hour walking speed, 
    which is not that realistic for most people especially when you are new to an area 
    and keeping a lookout around you.

    We will use 4km an hour 
    """
    
    return np.ceil(dist * 60 / 4 )

def get_name_coordinates(row):
    name = row["name"]
    lat = row["lat"]
    long = row["long"]
    alt = row["altitude"]
    return name, (lat,long,alt)

# init
walking_connections = {}

for _, row in coordinates.iterrows():
    station,from_coords = get_name_coordinates(row)
    station_dist = []
    for _, row2 in coordinates.iterrows():
        station2,to_coords = get_name_coordinates(row2)
        if station != station2:
            km_dist = distance.distance(from_coords,to_coords).km
            if km_dist < DIST_THRESHOLD:
                station_dist.append((station2, km_dist, time_to_walk(km_dist)))
    walking_connections[station] = station_dist

We now build a dataframe of all close stations (less than 500m) where we assume that the rider gains time by simply walking.

In [28]:
df_array_walk= []
the_stations = set(edges_df.arrival).union(set(edges_df.departure))

for from_station in  the_stations:
    if from_station in walking_connections.keys():
        connections = walking_connections[from_station]
        for connection in connections:
            # connection here is a tuple if the form (name_of_station, dist_to_stations, time_to_walk_to_station)
            to_station = connection[0]
            if to_station in the_stations:
                row = ["walk", from_station, "null", to_station, "null", int(connection[2])]
                df_array_walk.append(row)      
        
df_walk = pd.DataFrame(data = df_array_walk, columns = ["trip_id", "departure", "departure_time", "arrival", "arrival_time", "time"])


In [29]:
df_walk.sample(5)

Unnamed: 0,trip_id,departure,departure_time,arrival,arrival_time,time
4,walk,Zürich Binz,,Zürich Giesshübel,,4
5,walk,Glattbrugg,,Opfikon,,4
0,walk,Opfikon,,Glattbrugg,,4
2,walk,Zürich Manegg,,Zürich Leimbach,,7
3,walk,Zürich Leimbach,,Zürich Manegg,,7


# Routing Algorithm

Our routing algorithm works as follows :

    - The first step is to reduce the data by filtering out the nodes that are not reachable from Zurich. This is done by discovering the graph using bfs.
    
    - Next apply djisktra algorithm to find the shortest path starting from Zurich to the desired arrival_station

The next cells contain the different implementations. We will then test the model.

In [30]:
def time_addition(time, hours=0, minutes=0):
    """
    time is a String that is of the format "HH:MM",
    hours are the hours to add
    minutes are the minutes to add
    
    Returns the time after addition
    """
    return (datetime.strptime(time, "%H:%M") + timedelta(hours=hours, minutes=minutes)).strftime('%H:%M')

In [31]:
def neighbours(node, time,edges_max_hours, t_id=None):
    """
    This auxilary function returns the different reachable nodes from a given node starting from time
    
    """
    available = edges_max_hours[edges_max_hours.departure == node] \
                    .between_time(time, time_addition(time, minutes=waiting_time))
    
    walkable = df_walk[df_walk.departure == node]
    for i, row in walkable.iterrows():

        walk_time = row["time"]
        walkable.set_value(i,'departure_time', datetime.strptime(time, "%H:%M"))
        walkable.set_value(i,'arrival_time', datetime.strptime(time_addition(time, minutes = walk_time), "%H:%M"))
    
    walkable.set_index("departure_time", inplace=True)
        
    if t_id is not None:
        
        # We considered that two minutes are necessary when the rider has to change from means of transportation
        t_id_time = time_addition(time, minutes=2)
        
        diff_t_id = available[available.trip_id != t_id].between_time(t_id_time, time_addition(t_id_time, minutes=waiting_time))
        available = pd.concat([available[available.trip_id == t_id], diff_t_id, walkable])
             
    result = []
    
    for node in available.arrival.unique():
        result.append(available[available.arrival == node].sort_values("arrival_time").iloc[0])
        
    return sorted(result, key=lambda x: x.arrival_time)


In [32]:
def bfs(starting_station, query_time,edges_max_hours):
    """
    Returns all reachable nodes of the graph from starting_station
    """
    remaining_nodes = [(starting_station, query_time, None)]
    
    set_ = set()
    edges = []
    
    while remaining_nodes:
        
        actual_node, query_time, t_id = remaining_nodes.pop(0)
        
        #Mark as visited
        set_.update({actual_node})
        
        #Look for the set of reachable nodes from node
        neighbors = neighbours(actual_node, query_time,edges_max_hours, t_id)
        
        for node in neighbors:
            
            if node.arrival not in set_:
                edges.append(node)
                remaining_nodes.append((node.arrival, node.arrival_time.strftime("%H:%M"), node.trip_id))

    return edges

# Measuring delays

The next step to build a robust model is to take into account the different departure and arrival delays of each of the trip_ids. Therefore, we computed their mean and used it to model the delay as a random variable following an exponential distribution. 

We also considered only trips happening during working days for the sake of simplicity. Only small changes are needed to test the model on a week end.

In [33]:
def add_zeros(date):
    """
    Adds the :00 as seconds
    """
    result = date
    split_date = date.split(':')
    if len(split_date) == 2:
        result += ':00'
    return result

def calc_time_difference(actual_time, scheduled_time):
    """
    Calculates the time difference between actual and scheduled times 
    
    """
    #Check the times are non empty
    if actual_time != "null" and scheduled_time != "null" :
    
      if (isinstance(actual_time, str) and isinstance(scheduled_time, str)):
            actual = add_zeros(actual_time)
            time_actual = datetime.strptime(actual, "%d.%m.%Y %H:%M:%S")

            scheduled = add_zeros(scheduled_time)
            time_scheduled = datetime.strptime(scheduled, "%d.%m.%Y %H:%M:%S")
          
            # Add one minute if number of seconds is non null 
            if time_actual.second != 0:
                time_actual = time_actual + timedelta(seconds = 60 - time_actual.second)

            if time_scheduled.second != 0:
                time_scheduled = time_schedule + timedelta(seconds = 60 - time_scheduled.second)

            if time_actual > time_scheduled:
                delta = time_actual - time_scheduled
                return int(delta.seconds / 60)
            else:
                return 0
    else: 
        return None
    
calc_time_difference_udf = udf(lambda row: calc_time_difference(row[0], row[1]), IntegerType())

df_delays = df.withColumn("arrival_delay", calc_time_difference_udf(struct([df['actual_arrival_time'], df['arrival_time']])))
df_delays = df_delays.withColumn("departure_delay", calc_time_difference_udf(struct([df['actual_departure_time'], df['departure_time']])))

#Add a column weekend to know if the trip is happening on a workin day or on week ends
is_weekend = udf(lambda row: True if datetime.strptime(row, "%d.%m.%Y").weekday() in [5,6] else False)
df_delays = df_delays.withColumn("weekend", is_weekend(df_delays["date"]))

# For the sake of simplicity we will only consider the trips happening during a workin day
df_delays = df_delays.filter(df_delays.weekend == False)
df_delays.show()


+----------+--------------+--------+-------+----------------+----------------+-------------------+----------------+---------------------+-------------+---------------+-------+
|      date|       trip_id|train_id|  BPUIC|       stop_name|    arrival_time|actual_arrival_time|  departure_time|actual_departure_time|arrival_delay|departure_delay|weekend|
+----------+--------------+--------+-------+----------------+----------------+-------------------+----------------+---------------------+-------------+---------------+-------+
|20.04.2018|85:11:1255:001|    1255|8503000|       Zürich HB|20.04.2018 08:26|20.04.2018 08:27:41|20.04.2018 08:37|  20.04.2018 08:39:43|            2|              3|  false|
|20.04.2018|85:11:1258:001|    1258|8503000|       Zürich HB|20.04.2018 21:23|20.04.2018 21:25:19|20.04.2018 21:34|  20.04.2018 21:34:51|            3|              1|  false|
|20.04.2018|85:11:1507:002|    1507|8503000|       Zürich HB|20.04.2018 06:30|20.04.2018 06:30:14|20.04.2018 06:39|  20.

In [34]:
def compute_mean_delay(list_delays):
    """
    Compute the mean of the delays of a list of delays
    """
    if isinstance(list_delays, (list,)):
        if len(list_delays) == 0:
            return 0.0
        else:
            delay = np.mean(list_delays)
            return float(delay)
    elif isinstance(list_delays, (int,)):
        return float(list_delays)
    else:
        return 0.0
    
compute_mean_delay_udf= udf(lambda row: compute_mean_delay(row))

#Group the delays by Trip ID and compute the mean delay of departure and arrival of each of them
dep_arr_delays = df_delays.groupBy("trip_id") \
                            .agg(F.collect_list("departure_delay").alias("list_departures"),
                                 F.collect_list("arrival_delay").alias("list_arrivals"))
dep_arr_delays = dep_arr_delays.withColumn("mean_departure", compute_mean_delay_udf(dep_arr_delays["list_departures"])) \
                                   .withColumn("mean_arrival", compute_mean_delay_udf(dep_arr_delays["list_arrivals"]))

trip_delays = dep_arr_delays.select(["trip_id", "mean_departure", "mean_arrival"]).toPandas()

#We consider no delays when walking
trip_delays = trip_delays.append({'trip_id':"walk",'mean_departure':0.0,'mean_arrival':0.0},ignore_index=True)

trip_delays.head()


Unnamed: 0,trip_id,mean_departure,mean_arrival
0,85:11:18388:001,2.159509202453988,1.3987730061349692
1,85:11:18718:001,1.7797619047619049,1.113095238095238
2,85:11:18982:002,1.6402116402116402,1.058201058201058
3,85:11:19439:001,1.8666666666666667,1.3714285714285714
4,85:11:19526:001,2.3278688524590163,1.9918032786885247


# Running the model

### Parameters

In [35]:
query_time = "12:00"
waiting_time = 15
starting_station = "Zürich HB"
arrival_station = "Dübendorf"
MAX_HOURS = 1

### Find reachable nodes 

In [36]:

edges_max_hours = edges_df.set_index("departure_time") \
                      .between_time(query_time, time_addition(query_time, hours=MAX_HOURS)) \
                      .sort_index()

reachable_nodes = bfs(starting_station, query_time,edges_max_hours)

edges = pd.DataFrame(reachable_nodes).drop_duplicates()
edges = edges.sort_values("arrival_time")[~edges.sort_values("arrival_time").duplicated(["arrival", "departure"])]
edges = edges.reset_index().rename(columns={'index':'departure_time'})
edges.head()

Unnamed: 0,departure_time,trip_id,departure,arrival,arrival_time,time
0,1900-01-01 12:00:00,85:11:18643:001,Zürich HB,Zürich Stadelhofen,1900-01-01 12:02:00,2
1,1900-01-01 12:01:00,85:11:18644:001,Zürich HB,Zürich Hardbrücke,1900-01-01 12:03:00,2
2,1900-01-01 12:03:00,85:11:18643:001,Zürich Stadelhofen,Zürich Tiefenbrunnen,1900-01-01 12:05:00,2
3,1900-01-01 12:01:00,85:11:2640:001,Zürich HB,Zürich Oerlikon,1900-01-01 12:07:00,6
4,1900-01-01 12:06:00,85:11:18643:001,Zürich Tiefenbrunnen,Zollikon,1900-01-01 12:08:00,2


### Find shortest path to destination

In [37]:
G = nx.DiGraph()

for i in range(len(edges)):
    entry = edges.iloc[i]
    node_in = entry.departure
    node_out = entry.arrival
    if (trip_delays[trip_delays.trip_id == entry.trip_id].shape[0] != 0):
        the_lambda = float(trip_delays[trip_delays.trip_id == entry.trip_id].mean_arrival)
    
    kind = 'train'
    if (entry.trip_id =='walk'):
        kind = 'walk'
    G.add_edge(node_in,node_out, weight = entry.time, departure = entry.departure_time, arrival = entry.arrival_time, lambda_ = the_lambda ,kind=kind,train =entry.trip_id )


In [38]:
G.nodes

NodeView(('Zürich HB', 'Zürich Stadelhofen', 'Zürich Hardbrücke', 'Zürich Tiefenbrunnen', 'Zürich Oerlikon', 'Zollikon', 'Zürich Wiedikon', 'Küsnacht Goldbach', 'Stettbach', 'Zürich Seebach', 'Küsnacht ZH', 'Zürich Enge', 'Dietlikon', 'Thalwil', 'Zürich Altstetten', 'Zürich Affoltern', 'Zürich Wollishofen', 'Erlenbach ZH', 'Zürich Flughafen', 'Kilchberg', 'Glattbrugg', 'Zürich Wipkingen', 'Urdorf', 'Regensdorf-Watt', 'Rümlang', 'Wallisellen', 'Urdorf Weihermatt', 'Opfikon', 'Schlieren', 'Rüschlikon', 'Dübendorf', 'Birmensdorf ZH', 'Glanzenberg', 'Schwerzenbach ZH', 'Bonstetten-Wettswil', 'Bassersdorf', 'Kloten Balsberg', 'Kloten'))

In [39]:
path = list(nx.shortest_path(G,source='Zürich HB',target='Regensdorf-Watt',weight='weight'))

In [40]:
path

['Zürich HB',
 'Zürich Oerlikon',
 'Zürich Seebach',
 'Zürich Affoltern',
 'Regensdorf-Watt']

### Model the distribution of delays

In [41]:
def CDFExponential(lamb,x): #lamb = lambda
    if x<=0:
        cdf=0
    else:
        cdf=1-np.exp(-lamb*x)
    return cdf

In [42]:
def compute_probability(time_difference,kind,lamb,train_id_arrival,train_id_departure):
    if((kind == 'walk') or (train_id_arrival == train_id_departure)):
        return 1
    else:
        return CDFExponential(lamb,time_difference)

In [43]:
def compute_robustness (array_probas):
    return (1/m)

In [44]:
print('start at '+ 'Zürich HB ' + 'at time '+ query_time)
probabilities = []
for i in range(len(path)-1) :
    print('-'*20)
#     The time difference between the arrival of this train and the departure of the next
    probability = 1
    if (i == 0):
        actual_metadata = G.get_edge_data(path[i],path[i+1])
    if i >0 :
        previous_metadata = actual_metadata
        actual_metadata = G.get_edge_data(path[i],path[i+1])
        
        time_difference = (actual_metadata['departure'] - previous_metadata['arrival']).seconds //60
#         Compute lambda 
        lamb = 1/(previous_metadata['lambda_']*0.5 + actual_metadata['lambda_']*0.5)
#         Get the probability
        probability = compute_probability(time_difference,actual_metadata['kind'],lamb,previous_metadata['train'],actual_metadata['train'])
    
    probabilities.append( probability)
    print(actual_metadata['kind'] +' from '+path[i]+ ' at '+ actual_metadata['departure'].strftime('%Y-%m-%d %H:%M:%S')[11:] + ' arrives in '+ path[i+1]+' at '+ actual_metadata['arrival'].strftime('%Y-%m-%d %H:%M:%S')[11:] +' ' +actual_metadata['train'] +' with probability '+str(probability))
    
robustness = np.min(probabilities)
print('-'*20)
print('Robustness of this journey is ' + str(robustness))
    

start at Zürich HB at time 12:00
--------------------
train from Zürich HB at 12:01:00 arrives in Zürich Oerlikon at 12:07:00 85:11:2640:001 with probability 1
--------------------
train from Zürich Oerlikon at 12:09:00 arrives in Zürich Seebach at 12:11:00 85:11:18644:001 with probability 0.8309866845939339
--------------------
train from Zürich Seebach at 12:11:00 arrives in Zürich Affoltern at 12:14:00 85:11:18644:001 with probability 1
--------------------
train from Zürich Affoltern at 12:14:00 arrives in Regensdorf-Watt at 12:18:00 85:11:18644:001 with probability 1
--------------------
Robustness of this journey is 0.8309866845939339


# Visualization on a map

The last step of our work is a visualisation of the itinerary on a follium map. It perfectly fits with google maps' one with the difference that our journey planner is more robust as it gives probabilites of missing connections as seen above.

In [45]:
import folium

departure = coordinates[coordinates['name']==path[len(path)//2]][['lat','long']]
m = folium.Map(location=departure, zoom_start=12)

#Change list of stops by the output of Sami
stops = coordinates[coordinates['name'].isin(path)][['name','lat','long']]
stops_info = np.array(stops[['name','lat','long']])
for i, info in enumerate(stops_info):
    folium.Marker(
        location=[info[1], info[2]],
        popup=info[0]).add_to(m)
    
#add lines
points = [(elem[1],elem[2]) for elem in stops_info]
folium.PolyLine(points, color="red", weight=2.5, opacity=1).add_to(m)

m