# Route planning algorithm


We decided to use the [Connection Scan Algorithm](https://transport.okfn.org/index-60.html).

This algorithm is described in the following [pdf](https://link.springer.com/content/pdf/10.1007%2F978-3-642-38527-8.pdf) starting page 54 (of the pdf, not the book).

This algorithm computes the earliest arrival time at the final stop given a departure stop and a departure time.

Even though it is not what is asked (we want to input the desired arrival time at the arrival stop), once we understand this algorithm properly and have a working solution, we can then reverse the algorithm.

The main building block of our algorithm is a connexion:

_It is a tuple consiting of:_
- _a departure stop_id_
- _a departure time_
- _an arrival stop_id_
- _an arrival time_
- _a trip_id_


_Once we have all the connexions, we can build a 1D array of connexions sorted by ascending departure time._
_We also construct a dictionary of all the possible stops and it will be updated with the earliest arrival time found yet._



_Now that we have these two simple data structures (an array and a dictionary), we iterate one time over the array and for each connexion that is reachable (the user can take it given its current state), we the arrival time at the arrival stop_id if it is improved by the current connexion._

_Once we finished the pass on the array, all reachable stations have had their earliest arrival time updated in the dictionary._

_We now construct the path that was taken by the user to obtain the best route._

In [1]:
%%configure
{"conf": {
    "spark.app.name": "group100_final"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9130,application_1589299642358_3694,pyspark,idle,Link,Link,
9171,application_1589299642358_3738,pyspark,idle,Link,Link,
9192,application_1589299642358_3759,pyspark,idle,Link,Link,
9204,application_1589299642358_3771,pyspark,idle,Link,Link,
9205,application_1589299642358_3772,pyspark,idle,Link,Link,
9206,application_1589299642358_3773,pyspark,idle,Link,Link,
9216,application_1589299642358_3783,pyspark,idle,Link,Link,
9217,application_1589299642358_3784,pyspark,idle,Link,Link,
9226,application_1589299642358_3793,pyspark,busy,Link,Link,
9230,application_1589299642358_3797,pyspark,idle,Link,Link,


### Start Spark

In [2]:
# Initialization

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9261,application_1589299642358_3830,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
username = 'mjouve'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from datetime import time, datetime, timedelta
from collections import defaultdict
import numpy as np

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Loading filtered dataframes obtain previously

_Those dataframes were computed previously in the `to_begin_with.ipynb` notebook. We will now iterate on them while working on the first task of the project, i.e, building a network structure and compute best routes._

In [5]:
stops = spark.read.orc("/user/{}/zurich_stops.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
reachable_pair_grouped = spark.read.orc("/user/{}/reachable_pair_grouped.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
stop_times = spark.read.orc("/user/{}/stop_times_filtered.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
connexions = spark.read.orc("/user/{}/connexions.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 1 - Computing connexions - Jump at the end to load it directly

_Our algorithm will need to access to connexions._
_A connexion is a tuple consiting of:_
- _a departure stop_id_
- _a departure time_
- _an arrival stop_id_
- _an arrival time_
- _a trip_id_


_Once we have all the connexions, we can build a 1D array of connexions sorted by ascending departure time._

In [9]:
stop_times_1 = stop_times.select(stop_times.trip_id.alias('trip_id'),
                                 stop_times.stop_id.alias('stop_id_1'),
                                 stop_times.arrival_time.alias('arrival_time_1'),
                                 stop_times.departure_time.alias('departure_time_1'),
                                 stop_times.stop_sequence.alias('stop_sequence_1'))

stop_times_2 = stop_times.select(stop_times.trip_id.alias('trip_id'),
                                 stop_times.stop_id.alias('stop_id_2'),
                                 stop_times.arrival_time.alias('arrival_time_2'),
                                 stop_times.departure_time.alias('departure_time_2'),
                                 stop_times.stop_sequence.alias('stop_sequence_2'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

_We now join the two stop times on the `trip_id`, and with `stop_sequence_1 == stop_sequence_2 - 1`._

_We realized that there might be some gaps such as a sequence like 1, 2, 4, 5. It might be some special trips that jump a stop but it can also be that those stops were filtered because they were not inside the 15km cicle of Zürich station. As a result, because we don't want to go outside, we will have two resultings trips: one between 1 and 2 and another between 4 and 5._

In [10]:
connexions = stop_times_1.join(stop_times_2, on='trip_id').where(stop_times_1.stop_sequence_1 == (stop_times_2.stop_sequence_2 - 1))\
                        .select('stop_id_1', 'departure_time_1', 'stop_id_2', 'arrival_time_2', 'trip_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

_As indicated, we can assume that the algorithm will be used for common hours (not late nor too early). Thus we only kept the connections that had a departure_time between '01:00:00' '23:00:00'._

In [11]:
connexions_filtered = connexions.filter((connexions.departure_time_1 < '23:01:00') 
                                        & (connexions.departure_time_1 > '00:59:00')) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
connexions_filtered.show(5)

In [12]:
# already saved
# connexions_filtered.write.format("orc").mode("overwrite").save("/user/{}/connexions.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Load Obtained DataFrame: connexions

In [15]:
connexions = spark.read.orc("/user/{}/connexions.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
connexions.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------------+---------+--------------+--------------------+
|stop_id_1|departure_time_1|stop_id_2|arrival_time_2|             trip_id|
+---------+----------------+---------+--------------+--------------------+
|  8503051|        05:09:00|  8503052|      05:11:00|1.TA.26-10-B-j19-...|
+---------+----------------+---------+--------------+--------------------+
only showing top 1 row

# 2 - Obtain footpath dictionary - Jump at the end for loading the DataFrame

In [21]:
reachable_pair = spark.read.orc("/user/{}/zurich_reachable_pair.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
@udf
def from_distance_to_duration(distance):
    """
    Given a walking distance in m as input, compute the duration of the walking trip.
    """
    distance = float(distance)
    seconds = distance / 50 * 60
    
    # take into account the 2min changement time
    return round(seconds)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
from pyspark.sql.types import ArrayType, StringType

@udf(ArrayType(StringType()))
def from_columns_to_tuple(column_1, column_2):
    """
    Takes as input two values found in two column and returns an array containing the two values (as string)
    """
    
    return [column_1, column_2]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
reachable_pair_time = reachable_pair.withColumn('duration_s', from_distance_to_duration(reachable_pair.distance))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
reachable_pair_dest = reachable_pair_time.select(reachable_pair_time.id_1,
                                                from_columns_to_tuple(reachable_pair_time.id_2,  
                                                                      reachable_pair_time.duration_s).alias('destination'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

_We now group the row by `id_1` to obtain for each station_id, a list of reachable stations by walk with the walk duration in seconds._

In [25]:
reachable_pair_grouped = reachable_pair_dest.groupBy(['id_1']).agg(F.collect_list('destination').alias('destinations'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# already saved
# reachable_pair_grouped.write.format("orc").mode("overwrite").save("/user/{}/reachable_pair_grouped.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Loading previous dataframe

In [27]:
reachable_pair_grouped = spark.read.orc("/user/{}/reachable_pair_grouped.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
reachable_pair_grouped.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+
|   id_1|        destinations|
+-------+--------------------+
|8576223|[[8590704, 448.0]...|
+-------+--------------------+
only showing top 1 row

In [29]:
def compute_footpaths_dict(reachable_pair_df):
    """
    Given a pyspark Dataframe of reachable pairs grouped,
    returns the footpaths dictionary used by our algorithm
    """
    return dict(((row.id_1, row.destinations) for row in reachable_pair_df.collect()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2 - Route planning algorithm

## A. Obtain the array of connexions

In [30]:
def to_datetime(str_time):
    """
    Given a string representing a time (format 'H:M:s', H: hour, M: minute, s:second), convert it to a datetime object
    """
    hour, minute, second = str_time.split(':')
    
    # convert it to int and remove potential errors by taking a modulo
    hour = int(hour) % 24
    minute = int(minute) % 60
    second = int(second) % 60
    
    # the year, month and day are dummies heres
    return datetime(year=2020, month=1, day=1, hour=hour, minute=minute, second=second)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
def sort_connexions(connexions_df, departure = True):
    """
    Given a pyspark DataFrame of connexions, returns an array of sorted connexions in ascending order of departure
    if departure = True, else in descending order of arrival
    """

    connexions_array = [{'departure_location': row.stop_id_1, 
                         'departure_time': to_datetime(row.departure_time_1), 
                         'arrival_location': row.stop_id_2, 
                         'arrival_time': to_datetime(row.arrival_time_2), 
                         'trip_id': row.trip_id} for row in connexions_df.collect()]
    
    if departure:
        sorted_connexions = sorted(connexions_array, key = (lambda tup: tup['departure_time']))
    
    else:
        sorted_connexions = sorted(connexions_array, key = (lambda tup: tup['arrival_time']), reverse = True)
        
    return sorted_connexions

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## B. Obtain the arrival times dictionary

_For each `stop_id`, we map it to its earliest arrival time. When initialized we map it to an "infinite" date if `positive_infinite` is True else a "negative infinite" date._

In [32]:
def construct_times_dict(stops_df, positive_infinite = True):
    """
    Given a pyspark DataFrame of stops, construct the times dictionary used by the algorithm
    """
    
    if positive_infinite:
        times = dict(((row.stop_id, 
               (datetime(year=2020, month=1, day=6, hour=23, minute=59, second = 59), None)) 
              for row in stops.select(stops_df.stop_id).collect()))
    else:
        times = dict(((row.stop_id, 
               (datetime(year=2019, month=1, day=1, hour=23, minute=59, second = 59), None)) 
              for row in stops.select(stops_df.stop_id).collect()))
    
    return times

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## C. Running the algorithm and printing route

_We can now run the algorithm:_

In [33]:
def updates_times_dict_given_departure(times, sorted_connexions, footpaths, departure_location, departure_time):
    """
    Given an initialized times dictionary, the array of sorted connexion, the footpaths dictionary
    a departure_location given as a stop_id (str),
    and a departure time (datetime object)
    updates the times dictionary
    """
    
    times[departure_location] = (departure_time, None)

    # Initalize a dictionary of trips taken. For each trip already taken, 
    # we map it to the first departure location and departure time where we could have taken this trip. 
    # Returns None if the key is not assigned to another value thanks to defaultdict.
    trips_taken = defaultdict(lambda: None)

    # Iterate over connexions in sorted order
    for c in sorted_connexions:
    
        # trip_id of the current connexion
        trip_id = c['trip_id']
    
        # departure location of the current connexion
        departure_location = c['departure_location']
    
        # departure time of the current connexion
        departure_time = c['departure_time']
    
        # arrival location of the current connexion
        arrival_location = c['arrival_location']
    
        # arrival time of the current connexion
        arrival_time = c['arrival_time']
    
        # If current trip could have been taken earlier
        if trips_taken[trip_id]:
        
            # obtain data about this current trip (where we could have taken it and when)
            trip_data = trips_taken[trip_id]
        
            # if arrival_time is earlier than the current best time assigned for this location
            if arrival_time < times[arrival_location][0]:
            
                # update arrival time as well as connexion data for this arrival location
                times[arrival_location] = (arrival_time, {'departure_location': trip_data[0],
                                                              'departure_time': trip_data[1],
                                                              'arrival_location':arrival_location,
                                                              'arrival_time': arrival_time,
                                                              'trip_id': trip_id})
            
                # obtain the stops reachable by walking
                reachable_stops_walking = footpaths.get(arrival_location, None)
            
            
                if reachable_stops_walking:
                
                    # for each possible destination
                    for destination in reachable_stops_walking:
                    
                        # obtain the stop_id
                        location = destination[0]
                    
                        # obtain the current arrival time
                        curr_arrival_time = times[location][0]    
      
                        # obtain the walk duration from arrival_location (convert it to float)
                        walking_time = float(destination[1])
                    
                        # compute the new arrival time if using this path
                        new_arrival_time = arrival_time + timedelta(seconds = walking_time)
                    
                    
                        # if it improves the current best arrival time, we update our dictionary
                        if new_arrival_time < curr_arrival_time:
                            times[location] = (new_arrival_time, {'departure_location': arrival_location,
                                                              'departure_time': arrival_time,
                                                              'arrival_location':location,
                                                              'arrival_time': new_arrival_time,
                                                              'trip_id': 'Walking during {s} seconds'.format(s = walking_time)})
        
    
        # if we can take this connexion
        elif (times[departure_location][0] + timedelta(seconds = 120)) <= departure_time:

            # update trips taken with this new trip
            trips_taken[trip_id] = (departure_location, departure_time)
        
            # if the arrival time is better than the current best
            if arrival_time < times[arrival_location][0]:
            
                # update the best time for the arrival location
                times[arrival_location] = (arrival_time, c)           
            
            
                # obtain the stops reachable by walking
                reachable_stops_walking = footpaths.get(arrival_location, None)
            
                if reachable_stops_walking:
                
                    # for each possible destination
                    for destination in reachable_stops_walking:
                    
                        # obtain the stop_id
                        location = destination[0]
                    
                        # obtain the current arrival time
                        curr_arrival_time = times[location][0]    
                      
                        # obtain the walk duration from arrival_location (convert it to float)
                        walking_time = float(destination[1])
                    
                        # compute the new arrival time if using this path
                        new_arrival_time = arrival_time + timedelta(seconds = walking_time)
                    
                    
                        # if it improves the current best arrival time, we update our dictionary
                        if new_arrival_time < curr_arrival_time:
                            times[location] = (new_arrival_time, {'departure_location': arrival_location,
                                                              'departure_time': arrival_time,
                                                              'arrival_location':location,
                                                              'arrival_time': new_arrival_time,
                                                              'trip_id': 'Walking during {s} seconds'.format(s = walking_time)})            

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
def backward(departure_stop, arrival_stop, times):
    """
    Returns a list of paths taken from departure_stop to arrival_stop given the times dictionary computed
    """
    
    paths = []
    
    current_connexion = times[arrival_stop][1]
    current_stop = None
    
    while current_stop != departure_stop:
        
        current_stop = current_connexion['departure_location']
        arrival_location = current_connexion['arrival_location']
        trip = current_connexion['trip_id']
        
        if 'Walking' in trip:
            path = trip + ' from {d} to {a}'.format(d = current_stop, a = arrival_location)
        
        else:
            path = 'From {d_l} (at {d_t}) to {a_l} (at {a_t}) using trip: {t}'.format(d_l = current_stop,
                                                                                      d_t = current_connexion['departure_time'].time(),
                                                                                      a_l = arrival_location,
                                                                                      a_t = current_connexion['arrival_time'].time(),
                                                                                      t = current_connexion['trip_id'])
        
        paths.append(path)
        
        
        current_connexion = times[current_stop][1]
        
    
    return paths[::-1] 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
def compute_best_path_from_departure(departure_stop, departure_time, arrival_stop, times, sorted_connexions, footpaths):
    """
    Given a departure_stop (str), a departure_time (str in the format HH:MM:ss with HH:hour, MM: minute, ss: second),
    the times dictionary, the sorted connexions array, and the footpaths dictionary
    returns the best route possible as an array of paths
    """
    hour, minute, second = departure_time.split(':')
    hour = int(hour)
    minute = int(minute)
    second = int(second)
    
    departure_time_datetime = datetime(year=2020, month=1, day=1, hour=hour, minute=minute, second=second)
    
    updates_times_dict_given_departure(times, sorted_connexions, footpaths, departure_stop, departure_time_datetime)
    
    paths = backward(departure_stop, arrival_stop, times)
    
    return paths
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Running the algorithm

In [36]:
sorted_connexions = sort_connexions(connexions)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
footpaths = compute_footpaths_dict(reachable_pair_grouped)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# need to be refresh before each used
times = construct_times_dict(stops)

departure_stop = '8503000'
departure_time = '12:01:00'
arrival_stop = '8591049'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
paths = compute_best_path_from_departure(departure_stop,
                                         departure_time,
                                         arrival_stop,
                                         times,
                                         sorted_connexions,
                                         footpaths)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
for path in paths:
    print(path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

From 8503000 (at 12:05:00) to 8503006 (at 12:11:00) using trip: 32.TA.80-159-Y-j19-1.8.H
Walking during 72.0 seconds from 8503006 to 8580449
From 8580449 (at 12:15:00) to 8591049 (at 12:24:00) using trip: 1914.TA.26-11-A-j19-1.27.R

# Next step:

_Now that we implemented the basic algorithm we wanted to improve it to have multiple routes. The next implementation is shown in the `route_planning_top_k.ipynb` notebook._