# Timetable data wrangling

In this pyspark notebook, we will wrangle timetable data to process it into the desired data format.

Indeed, we want to modelize our data through a graph, where each node will represent a station with the 15 km boundaries of Zurich HB. The edges will represent any trips from one stop to another.

Thus, we want to convert the timetable data to this graph representation. The end goal of this pyspark kernel is to have 3 tables:
* *Graph nodes*: that will represent the node of our graph (stop_id, stop_name, stop_lat, stop_lon)
* *Graph edges 1*, that will represent one possible set of edges for the graph. This one will include all possible trips from one stop to anothers. Thus, it will includes the public transportation trips, but also the walking trips for all stops. On this graph edges, we will have only one row for a walking trip, without putting any departure time and arrival time.
* *Graph Edges 2*, that will represent another possible set of edges for the graph. The public transportation trips which are the same as *Graph Edges 1*. The key difference is that for any arrival at a stop with a public transportation, we will add all of the possible walking trips from this stop. The departure time of these walking trips will be the same as the arrival time to the stop.

We will then write in hdfs the wrangled datasets

## Creating spark application and libraries import

In [1]:
%%local
import os
import json
from IPython import get_ipython

username = os.environ['JUPYTERHUB_USER']
namespace = os.environ['CI_NAMESPACE']
project = os.environ['CI_PROJECT']

configuration = dict(
    name = f"{username}-namespace-project",
    executorMemory = "4G",
    executorCores = 4,
    numExecutors = 10,
    conf = {
        # "spark.pyspark.python": "/opt/anaconda3/bin/python3", # Use python3
        "spark.jars.repositories": "https://repos.spark-packages.org",
        "spark.jars.packages": "graphframes:graphframes:0.7.0-spark2.3-s_2.11"
    }
)


get_ipython().run_cell_magic('configure', line="-f", 
                             cell=json.dumps(configuration))

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
7231,application_1618324153128_6962,pyspark,idle,Link,Link,,


In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
7232,application_1618324153128_6963,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f45ab031810>

In [3]:
sc.addPyFile('graphframes_graphframes-0.7.0-spark2.3-s_2.11.jar')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
import pandas
import datetime
import pyspark.sql.functions as F

from graphframes import *
from math import radians, cos, sin, asin, sqrt, floor
from pyspark.sql import Window
from pyspark.sql.types import StringType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data loading and exploration

In [5]:
istdaten =  spark.read.orc("/data/sbb/orc/istdaten")
stop = spark.read.orc("/data/sbb/orc/geostops")
calendar = spark.read.csv("/data/sbb/csv/timetable/calendar/2019/05/07/calendar.csv", header=True)
route = spark.read.csv("/data/sbb/csv/timetable/routes/2019/05/07/routes.csv", header=True)
stop_time = spark.read.csv("/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv", header=True)
trip = spark.read.csv("/data/sbb/csv/timetable/trips/2019/05/07/trips.csv", header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
istdaten.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------+-------------+-------------------+----------+----------+-----------+---------+-------------------+--------------+-------------+-------+--------------------+----------------+-------------------+------------------+----------------+-------------------+------------------+-------------+
|betriebstag|    fahrt_bezeichner|betreiber_id|betreiber_abk|     betreiber_name|produkt_id| linien_id|linien_text|umlauf_id|verkehrsmittel_text|zusatzfahrt_tf|faellt_aus_tf|  bpuic|   haltestellen_name|    ankunftszeit|        an_prognose|an_prognose_status|    abfahrtszeit|        ab_prognose|ab_prognose_status|durchfahrt_tf|
+-----------+--------------------+------------+-------------+-------------------+----------+----------+-----------+---------+-------------------+--------------+-------------+-------+--------------------+----------------+-------------------+------------------+----------------+-------------------+------------------+-------------+
| 02.02.20

In [7]:
stop.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+----------------+-------------+--------------+
|    stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+-----------+--------------------+----------------+----------------+-------------+--------------+
|    8711790|      Bouilly Mairie|48.1917218939628|3.99698199202347|         null|              |
|    8573112|              Gwüest|46.6525768281602|8.51647805228063|         null|              |
|8014471:0:3|            Albbruck|47.5923598499222|8.13177453180289|         null|      8014471P|
|    8774562|          Perrignier| 46.303453968822|6.42497549173821|         null|              |
|    8592544|Schaffhausen, Gre...|47.6988945057898|8.64765005008602|         null|              |
+-----------+--------------------+----------------+----------------+-------------+--------------+
only showing top 5 rows

In [8]:
calendar.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|service_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|start_date|end_date|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|  TA+b0006|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0ch2|     0|      0|        0|       0|     0|       1|     1|  20181209|20191214|
|  TA+b0014|     0|      0|        0|       0|     0|       0|     1|  20181209|20191214|
|  TA+b000w|     0|      0|        0|       0|     0|       1|     0|  20181209|20191214|
|  TA+b001b|     1|      1|        1|       1|     1|       1|     0|  20181209|20191214|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
only showing top 5 rows

In [9]:
route.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+----------------+---------------+----------+----------+
|   route_id|agency_id|route_short_name|route_long_name|route_desc|route_type|
+-----------+---------+----------------+---------------+----------+----------+
|11-40-j19-1|      801|             040|           null|       Bus|       700|
|11-61-j19-1|     7031|             061|           null|       Bus|       700|
|11-62-j19-1|     7031|             062|           null|       Bus|       700|
|24-64-j19-1|      801|             064|           null|       Bus|       700|
|11-83-j19-1|      801|             083|           null|       Bus|       700|
+-----------+---------+----------------+---------------+----------+----------+
only showing top 5 rows

In [10]:
stop_time.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------+--------------+-------+-------------+-----------+-------------+
|            trip_id|arrival_time|departure_time|stop_id|stop_sequence|pickup_type|drop_off_type|
+-------------------+------------+--------------+-------+-------------+-----------+-------------+
|1.TA.1-84-j19-1.1.H|    06:13:00|      06:13:00|8572249|            1|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:14:00|      06:14:00|8577508|            2|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:15:00|      06:15:00|8581070|            3|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:16:00|      06:16:00|8578360|            4|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:17:00|      06:17:00|8583448|            5|          0|            0|
+-------------------+------------+--------------+-------+-------------+-----------+-------------+
only showing top 5 rows

In [11]:
trip.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+--------------------+---------------+------------+
|   route_id|service_id|             trip_id|trip_short_name|direction_id|
+-----------+----------+--------------------+---------------+------------+
| 1-85-j19-1|  TA+b0001| 2.TA.1-85-j19-1.1.H|          85003|           0|
|1-1-C-j19-1|  TA+b0001|5.TA.1-1-C-j19-1.3.R|            108|           1|
|1-1-C-j19-1|  TA+b0001|7.TA.1-1-C-j19-1.3.R|            112|           1|
|1-1-C-j19-1|  TA+b0001|9.TA.1-1-C-j19-1.3.R|            116|           1|
|1-1-C-j19-1|  TA+b0001|11.TA.1-1-C-j19-1...|            120|           1|
+-----------+----------+--------------------+---------------+------------+
only showing top 5 rows

In [12]:
print("istdaten has " + str(istdaten.count())+ " rows")
print("stops has " + str(stop.count()) + " rows")
print("calendars has " + str(calendar.count()) +" rows")
print("routes has "+ str(route.count()) + " rows")
print("stop_times has " + str(stop_time.count()) + " rows")
print("trips has " + str(trip.count()) +" rows")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

istdaten has 1415309386 rows
stops has 39027 rows
calendars has 21813 rows
routes has 4990 rows
stop_times has 10862563 rows
trips has 998456 rows

## Filtering

We want to keep only stops within a 15 km radius from Zürich HBB. 

We also want to keep only trips on business hours (8-20) and planned every business days

In [13]:
def haversine_distance(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

lat_zurich = 47.378177
lon_zurich = 8.540192

@F.udf
def in_zurich(lat, lon):
    return haversine_distance(lon, lat, lon_zurich, lat_zurich) < 15

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
stop_business_time = stop_time.where((stop_time.arrival_time <= "20:00:00") & (stop_time.departure_time >= "07:00:00"))
print("DataFrame representing filtered stop times has " + str(stop_business_time.count()) + " rows")

stop_zurich_unwrangled = stop.filter(in_zurich(stop.stop_lat, stop.stop_lon) == True)
stop_zurich = stop_zurich_unwrangled.withColumn('parent_station_2', F.substring(stop_zurich_unwrangled.stop_id, 0, 7))
stop_zurich = stop_zurich.withColumn('stop_id_2', F.when(F.length(F.col("stop_id")) > 7, stop_zurich.parent_station_2).otherwise(stop_zurich.stop_id))
stop_zurich = stop_zurich.drop("stop_id").withColumnRenamed("stop_id_2", "stop_id")

print("DataFrame representing filtered stop has " + str(stop_zurich.count()) + " rows")

stop_time_zurich = stop_business_time.join(stop_zurich_unwrangled, on="stop_id")
stop_time_zurich_cleaned = stop_time_zurich.withColumn('parent_station_2', F.substring(stop_time_zurich.stop_id, 0, 7))
stop_time_zurich_cleaned = stop_time_zurich_cleaned.withColumn('stop_id_2', F.when(F.length(F.col("stop_id")) > 7, stop_time_zurich_cleaned.parent_station_2).otherwise(stop_time_zurich_cleaned.stop_id))
stop_time_zurich_cleaned = stop_time_zurich_cleaned.drop("stop_id").withColumnRenamed("stop_id_2", "stop_id")
print("DataFrame representing stop time in zurich in normal hours has " + str(stop_time_zurich_cleaned.count()) + " rows")

trip_in_zurich = trip.join(stop_time_zurich_cleaned, on="trip_id")
print("DataFrame representing trip in zurich in normal hours has " + str(trip_in_zurich.count()) + " rows")

calendar_business_day = calendar.filter((calendar.monday == 1) &
                            (calendar.tuesday == 1) &
                            (calendar.wednesday == 1) &
                            (calendar.thursday == 1) &
                            (calendar.friday == 1))

joined_data = trip_in_zurich.join(calendar_business_day, on="service_id")
print("DataFrame representing stop in zurich during normal hours on week days has " + str(joined_data.count()) + "rows ")

joined_data = joined_data.join(route, on="route_id")

joined_data_filter = joined_data.select(["trip_id", "stop_id", "stop_name", "arrival_time", "departure_time", "stop_sequence", "parent_station", "direction_id", "route_desc"])
joined_data_filter.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame representing filtered stop times has 8224334 rows
DataFrame representing filtered stop has 1950 rows
DataFrame representing stop time in zurich in normal hours has 1473757 rows
DataFrame representing trip in zurich in normal hours has 1473757 rows
DataFrame representing stop in zurich during normal hours on week days has 292697rows 
+--------------------+-------+----------+------------+--------------+-------------+--------------+------------+----------+
|             trip_id|stop_id| stop_name|arrival_time|departure_time|stop_sequence|parent_station|direction_id|route_desc|
+--------------------+-------+----------+------------+--------------+-------------+--------------+------------+----------+
|1.TA.26-18-j19-1.1.H|8503064|  Scheuren|    10:41:00|      10:41:00|            1|              |           0|    S-Bahn|
|1.TA.26-18-j19-1.1.H|8503065|     Forch|    10:45:00|      10:45:00|            2|      8503065P|           0|    S-Bahn|
|1.TA.26-18-j19-1.1.H|8503074|Neue Forch

## Switching from stops to journey

We currently have a data formate where a row represents a stop.

We aim at switching to a data format where a row represent a journey of a public transport from one row to the next ones.

These rows will be the edges of our graph.

We will switch to this data format using pyspark windows.

In [15]:
joined_data_filter = joined_data_filter.withColumn("stop_sequence", joined_data_filter.stop_sequence.cast('int'))
trip_window = Window.partitionBy('trip_id').orderBy('stop_sequence')
stop_id_from = F.lag('stop_id', 1).over(trip_window).alias('stop_id_from')
departure_time = F.lag('departure_time', 1).over(trip_window).alias('departure_time')
stop_name_from = F.lag('stop_name', 1).over(trip_window).alias('stop_name_from')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
journey_df = joined_data_filter.select("trip_id", stop_id_from, "stop_id", stop_name_from, "stop_name", departure_time, "arrival_time", "route_desc")
journey_df = journey_df.withColumnRenamed("stop_id", "stop_id_to").withColumnRenamed("stop_name", "stop_name_to")
journey_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+----------+--------------+------------+--------------+------------+----------+
|             trip_id|stop_id_from|stop_id_to|stop_name_from|stop_name_to|departure_time|arrival_time|route_desc|
+--------------------+------------+----------+--------------+------------+--------------+------------+----------+
|1.TA.26-18-j19-1.1.H|        null|   8503064|          null|    Scheuren|          null|    10:41:00|    S-Bahn|
|1.TA.26-18-j19-1.1.H|     8503064|   8503065|      Scheuren|       Forch|      10:41:00|    10:45:00|    S-Bahn|
|1.TA.26-18-j19-1.1.H|     8503065|   8503074|         Forch|  Neue Forch|      10:45:00|    10:46:00|    S-Bahn|
|1.TA.26-18-j19-1.1.H|     8503074|   8503068|    Neue Forch|    Maiacher|      10:46:00|    10:47:00|    S-Bahn|
|1.TA.26-18-j19-1.1.H|     8503068|   8503066|      Maiacher|     Zumikon|      10:47:00|    10:48:00|    S-Bahn|
+--------------------+------------+----------+--------------+------------+--------------

In [17]:
@F.udf
def compute_duration(departure_time, arrival_time):
    if (departure_time is None):
        return 0
    dep_time = datetime.datetime.strptime(departure_time, "%H:%M:%S")
    arr_time = datetime.datetime.strptime(arrival_time, "%H:%M:%S")
    return (arr_time - dep_time).seconds / 60

journey_df = journey_df.withColumn("duration", compute_duration(journey_df.departure_time, journey_df.arrival_time))
journey_df = journey_df.filter(journey_df.departure_time.isNotNull())

print("We have " + str(journey_df.count())+ " possible journeys through public transportations")
journey_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have 268778 possible journeys through public transportations
+--------------------+------------+----------+--------------+------------+--------------+------------+----------+--------+
|             trip_id|stop_id_from|stop_id_to|stop_name_from|stop_name_to|departure_time|arrival_time|route_desc|duration|
+--------------------+------------+----------+--------------+------------+--------------+------------+----------+--------+
|1.TA.26-18-j19-1.1.H|     8503064|   8503065|      Scheuren|       Forch|      10:41:00|    10:45:00|    S-Bahn|       4|
|1.TA.26-18-j19-1.1.H|     8503065|   8503074|         Forch|  Neue Forch|      10:45:00|    10:46:00|    S-Bahn|       1|
|1.TA.26-18-j19-1.1.H|     8503074|   8503068|    Neue Forch|    Maiacher|      10:46:00|    10:47:00|    S-Bahn|       1|
|1.TA.26-18-j19-1.1.H|     8503068|   8503066|      Maiacher|     Zumikon|      10:47:00|    10:48:00|    S-Bahn|       1|
|1.TA.26-18-j19-1.1.H|     8503066|   8503075|       Zumikon|    Waltikon| 

## Walking distance

Every users can walk to a distance of 500 meters from one stop to another.

The duration of a transfer is 2 mn, on which we add 1 mn every 50 meters.

As we need to have these transfer to compute correctly our graph, we isolate it from the stops by using a cross join and computing each distance, then filtering it.

In [18]:
print("We have " +  str(stop_zurich.count())+ " stops")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have 1950 stops

In [19]:
stop_zurich_renamed = stop_zurich.select(stop_zurich.stop_id.alias("stop_id_2"), stop_zurich.stop_name.alias("stop_name_2"), stop_zurich.stop_lat.alias("stop_lat_2"), stop_zurich.stop_lon.alias("stop_lon_2"))
cross_join = stop_zurich.crossJoin(stop_zurich_renamed)
cross_join = cross_join.withColumn("stop_lat", cross_join.stop_lat.cast('float'))\
                       .withColumn("stop_lat_2", cross_join.stop_lat_2.cast('float'))\
                       .withColumn("stop_lon", cross_join.stop_lon.cast('float'))\
                       .withColumn("stop_lon_2", cross_join.stop_lon_2.cast('float'))

cross_join.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+--------+-------------+--------------+----------------+-------+---------+--------------------+----------+----------+
|           stop_name| stop_lat|stop_lon|location_type|parent_station|parent_station_2|stop_id|stop_id_2|         stop_name_2|stop_lat_2|stop_lon_2|
+--------------------+---------+--------+-------------+--------------+----------------+-------+---------+--------------------+----------+----------+
|Oberhasli, Industrie|47.459267|8.490014|         null|              |         8557033|8557033|  8557033|Oberhasli, Industrie| 47.459267|  8.490014|
|Oberhasli, Industrie|47.459267|8.490014|         null|              |         8557033|8557033|  8573711|   Zürich, Sädlenweg| 47.367756|   8.48748|
|Oberhasli, Industrie|47.459267|8.490014|         null|              |         8557033|8557033|  8591828|    Ebmatingen, Dorf|  47.35139|  8.641003|
|Oberhasli, Industrie|47.459267|8.490014|         null|              |         8557033|8557033|  8590610|F

In [20]:
@F.udf
def haversine_distance_udf(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

@F.udf
def transfer_time(distance):
    return (2 + distance * 20)

@F.udf 
def transfer_time_minute(transfer_time):
    return floor(transfer_time / 60)

@F.udf 
def transfer_time_second(transfer_time):
    return floor(transfer_time % 60)


cross_join = cross_join.withColumn("distance", haversine_distance_udf(cross_join.stop_lon, cross_join.stop_lat, cross_join.stop_lon_2, cross_join.stop_lat_2))
cross_join = cross_join.filter(cross_join.distance < 0.5)
cross_join = cross_join.withColumn("transfer_time", transfer_time(cross_join.distance))
cross_join.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+--------+-------------+--------------+----------------+-------+---------+--------------------+----------+----------+-------------------+------------------+
|           stop_name| stop_lat|stop_lon|location_type|parent_station|parent_station_2|stop_id|stop_id_2|         stop_name_2|stop_lat_2|stop_lon_2|           distance|     transfer_time|
+--------------------+---------+--------+-------------+--------------+----------------+-------+---------+--------------------+----------+----------+-------------------+------------------+
|Oberhasli, Industrie|47.459267|8.490014|         null|              |         8557033|8557033|  8557033|Oberhasli, Industrie| 47.459267|  8.490014|                0.0|               2.0|
|   Zürich, Sädlenweg|47.367756| 8.48748|         null|              |         8573711|8573711|  8573711|   Zürich, Sädlenweg| 47.367756|   8.48748|                0.0|               2.0|
|   Zürich, Sädlenweg|47.367756| 8.48748|         null|     

In [21]:
print("we have " + str(cross_join.count()) + " possible transfer by walking distance")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

we have 16288 possible transfer by walking distance

In [22]:
# Selecting transfers, converting transfer time in unix for next arrival time 
spark.conf.set("spark.sql.session.timeZone", "GMT")

stop_transfers = cross_join.select(cross_join.stop_id,cross_join.stop_name,cross_join.stop_id_2,cross_join.stop_name_2,cross_join.transfer_time)\
                            .withColumn("time_transfer_unix",F.unix_timestamp(F.col("transfer_time"),"mm"))

stop_transfers.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+---------+--------------------+------------------+------------------+
|stop_id|           stop_name|stop_id_2|         stop_name_2|     transfer_time|time_transfer_unix|
+-------+--------------------+---------+--------------------+------------------+------------------+
|8557033|Oberhasli, Industrie|  8557033|Oberhasli, Industrie|               2.0|               120|
|8573711|   Zürich, Sädlenweg|  8573711|   Zürich, Sädlenweg|               2.0|               120|
|8573711|   Zürich, Sädlenweg|  8591214|   Zürich, In der Ey| 9.068924210168964|               540|
|8573711|   Zürich, Sädlenweg|  8591163|Zürich, Goldackerweg|11.220965922946945|               660|
|8591828|    Ebmatingen, Dorf|  8591828|    Ebmatingen, Dorf|               2.0|               120|
+-------+--------------------+---------+--------------------+------------------+------------------+
only showing top 5 rows

## Creating our exhaustive graph edges

As we want to compute our *Graph edges 2* which contains the departure time for all walking distances, we need to left outer join our public transportation DataFrame arrival id with our stop transfers.

In [23]:
# Selecting all stops with arrival times for left outer join 
# spark.conf.set("spark.sql.session.timeZone", "UTC")

stop_times = journey_df.select(["stop_id_to", "stop_name_to", "arrival_time"])\
                       .withColumn("departure_time_unix",F.unix_timestamp(F.col("arrival_time"),"H:mm:ss") )\
                       .withColumnRenamed("arrival_time","departure_time")

stop_times.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+--------------+-------------------+
|stop_id_to|stop_name_to|departure_time|departure_time_unix|
+----------+------------+--------------+-------------------+
|   8503065|       Forch|      10:45:00|              38700|
|   8503074|  Neue Forch|      10:46:00|              38760|
|   8503068|    Maiacher|      10:47:00|              38820|
|   8503066|     Zumikon|      10:48:00|              38880|
|   8503075|    Waltikon|      10:50:00|              39000|
+----------+------------+--------------+-------------------+
only showing top 5 rows

In [25]:
# Inferring arrival time based on travel time 
left_join = stop_times.join(stop_transfers, stop_times.stop_id_to == stop_transfers.stop_id, how='left')
stop_transfer_joined = left_join.withColumn("arrival_time",
                                            F.from_unixtime(F.col("departure_time_unix")+F.col("time_transfer_unix")\
                                                                            ,"HH:mm:ss"))

stop_transfer_joined = stop_transfer_joined.select(["stop_id", "stop_name", "departure_time", "stop_id_2", "stop_name_2", "arrival_time", "transfer_time"])\
                                           .withColumnRenamed("transfer_time","duration")

stop_transfer_joined.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+--------------+---------+--------------------+------------+------------------+
|stop_id|           stop_name|departure_time|stop_id_2|         stop_name_2|arrival_time|          duration|
+-------+--------------------+--------------+---------+--------------------+------------+------------------+
|8502508|Spreitenbach, Rai...|      08:00:00|  8590270| Spreitenbach, Brüel|    08:11:00|11.482654229404913|
|8502508|Spreitenbach, Rai...|      08:00:00|  8502508|Spreitenbach, Rai...|    08:02:00|               2.0|
|8502508|Spreitenbach, Rai...|      08:00:00|  8590268|   Spreitenbach, ASP|    08:06:00|  6.46300812901671|
|8502508|Spreitenbach, Rai...|      16:29:00|  8590270| Spreitenbach, Brüel|    16:40:00|11.482654229404913|
|8502508|Spreitenbach, Rai...|      16:29:00|  8502508|Spreitenbach, Rai...|    16:31:00|               2.0|
+-------+--------------------+--------------+---------+--------------------+------------+------------------+
only showing top 5 

In [26]:
print("we have " + str(stop_transfer_joined.count()) + " possible transfer by walking distance")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

we have 5424038 possible transfer by walking distance

# Graph DataFrames creation

Now that we have all of our dataframes ready, we need to convert them to the correct data format for our graph. 

Thus, we need to combine the public transportation dataframe with either the walking distance one, either the stop_transfer_join previously created to get our 2 desired final dataframes representing edges.

In [27]:
graph_nodes = stop_zurich.select(["stop_id", "stop_name", "stop_lat", "stop_lon"])\
                   .withColumnRenamed("stop_id", "id")\
                   .withColumnRenamed("stop_lat", "lat")\
                   .withColumnRenamed("stop_lon", "lon")
graph_nodes.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+----------------+----------------+
|     id|           stop_name|             lat|             lon|
+-------+--------------------+----------------+----------------+
|8557033|Oberhasli, Industrie|47.4592670391304|8.49001368400678|
|8573711|   Zürich, Sädlenweg|47.3677549978138|8.48748043490521|
|8591828|    Ebmatingen, Dorf|47.3513920447801|8.64100251698261|
|8590610|Fällanden, Schütz...| 47.368625031339|8.63247750493513|
|8580617|    Bülach, Engelwis|47.5111891951511|8.53713032066541|
+-------+--------------------+----------------+----------------+
only showing top 5 rows

In [28]:
graph_journey_edges = journey_df.select(["trip_id", "stop_id_from", "stop_id_to", "departure_time", "arrival_time", "duration", "route_desc"])\
                        .withColumnRenamed("stop_id_from", "src")\
                        .withColumnRenamed("stop_id_to", "dst")\
                        .withColumnRenamed("route_desc", "type")

graph_journey_edges.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+-------+--------------+------------+--------+------+
|             trip_id|    src|    dst|departure_time|arrival_time|duration|  type|
+--------------------+-------+-------+--------------+------------+--------+------+
|1.TA.26-18-j19-1.1.H|8503064|8503065|      10:41:00|    10:45:00|       4|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503065|8503074|      10:45:00|    10:46:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503074|8503068|      10:46:00|    10:47:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503068|8503066|      10:47:00|    10:48:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503066|8503075|      10:48:00|    10:50:00|       2|S-Bahn|
+--------------------+-------+-------+--------------+------------+--------+------+
only showing top 5 rows

In [49]:
graph_transfer_edges = cross_join.withColumn("trip_id", F.lit(-1))\
                                 .select(["trip_id", "stop_id", "stop_id_2", "transfer_time"])\
                                 .withColumnRenamed("stop_id", "src")\
                                 .withColumnRenamed("stop_id_2", "dst")\
                                 .withColumnRenamed("transfer_time", "duration")\
                                 .withColumn("departure_time", F.lit(None).cast(StringType()))\
                                 .withColumn("arrival_time", F.lit(None).cast(StringType()))\
                                 .withColumn("type", F.lit("transfer"))\
                                 

graph_transfer_edges.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+-------+------------------+--------------+------------+--------+
|trip_id|    src|    dst|          duration|departure_time|arrival_time|    type|
+-------+-------+-------+------------------+--------------+------------+--------+
|     -1|8557033|8557033|               2.0|          null|        null|transfer|
|     -1|8573711|8573711|               2.0|          null|        null|transfer|
|     -1|8573711|8591214| 9.068924210168964|          null|        null|transfer|
|     -1|8573711|8591163|11.220965922946945|          null|        null|transfer|
|     -1|8591828|8591828|               2.0|          null|        null|transfer|
+-------+-------+-------+------------------+--------------+------------+--------+
only showing top 5 rows

In [50]:
print("graph_journey_edges has " + str(graph_journey_edges.count()) + " rows")
print("graph_transfer_edges has " + str(graph_transfer_edges.count()) + " rows")

graph_edges_1 = graph_journey_edges.union(graph_transfer_edges)

print("graph_edges has " + str(graph_edges_1.count()) + " rows")

graph_edges_1.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

graph_journey_edges has 268778 rows
graph_transfer_edges has 16288 rows
graph_edges has 285066 rows
+--------------------+-------+-------+--------------+------------+--------+------+
|             trip_id|    src|    dst|departure_time|arrival_time|duration|  type|
+--------------------+-------+-------+--------------+------------+--------+------+
|1.TA.26-18-j19-1.1.H|8503064|8503065|      10:41:00|    10:45:00|       4|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503065|8503074|      10:45:00|    10:46:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503074|8503068|      10:46:00|    10:47:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503068|8503066|      10:47:00|    10:48:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503066|8503075|      10:48:00|    10:50:00|       2|S-Bahn|
+--------------------+-------+-------+--------------+------------+--------+------+
only showing top 5 rows

In [46]:
graph_transfer_joined_edges = stop_transfer_joined.withColumn("trip_id", F.lit(-1))\
                                                  .select(["trip_id", "stop_id", "stop_id_2","departure_time", "arrival_time", "duration"])\
                                                  .withColumnRenamed("stop_id", "src")\
                                                  .withColumnRenamed("stop_id_2", "dst")\
                                                  .withColumn("type", F.lit("transfer"))\

graph_transfer_joined_edges.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+-------+--------------+------------+------------------+--------+
|trip_id|    src|    dst|departure_time|arrival_time|          duration|    type|
+-------+-------+-------+--------------+------------+------------------+--------+
|     -1|8502508|8590270|      13:00:00|    13:11:00|11.482654229404913|transfer|
|     -1|8502508|8502508|      13:00:00|    13:02:00|               2.0|transfer|
|     -1|8502508|8590268|      13:00:00|    13:06:00|  6.46300812901671|transfer|
|     -1|8502508|8590270|      16:44:00|    16:55:00|11.482654229404913|transfer|
|     -1|8502508|8502508|      16:44:00|    16:46:00|               2.0|transfer|
+-------+-------+-------+--------------+------------+------------------+--------+
only showing top 5 rows

In [48]:
print("graph_journey_edges has " + str(graph_journey_edges.count()) + " rows")
print("graph_transfer_joined_edges has " + str(graph_transfer_joined_edges.count()) + " rows")

graph_edges_2 = graph_journey_edges.union(graph_transfer_joined_edges)

print("graph_edges has " + str(graph_edges_2.count()) + " rows")

graph_edges_2.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

graph_journey_edges has 268778 rows
graph_transfer_joined_edges has 5424038 rows
graph_edges has 5692816 rows
+--------------------+-------+-------+--------------+------------+--------+------+
|             trip_id|    src|    dst|departure_time|arrival_time|duration|  type|
+--------------------+-------+-------+--------------+------------+--------+------+
|1.TA.26-18-j19-1.1.H|8503064|8503065|      10:41:00|    10:45:00|       4|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503065|8503074|      10:45:00|    10:46:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503074|8503068|      10:46:00|    10:47:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503068|8503066|      10:47:00|    10:48:00|       1|S-Bahn|
|1.TA.26-18-j19-1.1.H|8503066|8503075|      10:48:00|    10:50:00|       2|S-Bahn|
+--------------------+-------+-------+--------------+------------+--------+------+
only showing top 5 rows

## Exporting DataFrames to HDFS

As we want to process our graph with networkx, we need to modelize it locally in a python kernel.

Thus, we send our graph DataFrames to the HDFS.

The code is commented as the edges were already send

In [51]:
graph_nodes.write.parquet('/user/vyuan/final_6/graph_nodes.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
graph_edges_1.write.parquet('/user/vyuan/final_6/graph_edges_1.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
graph_edges_2.write.parquet('/user/vyuan/final_6/graph_edges_2.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…