# Overview:
In this notebook, we filter and preprocess timetable data to later build a connection graph. We do the following steps:
1. Filter stops consider only 15km radius around  Zürich's train station.
2. Filter trips, routes etc that are not related to filtered stops.
3. Convert string ids to int ids to reduce space (dictionary encoding).
4. Saved obtained files on HDFS.

These steps allowed us to get small enough data (~100MB) to fit into operational memory, so now we can build connection graph without Spark.

In [1]:
%%local
%matplotlib inline
import matplotlib.pylab as plt

import plotly
import plotly.graph_objs as go
import plotly.express as px

import pyproj

import os
username = os.environ['JUPYTERHUB_USER']

plt.rcParams['figure.figsize'] = (20,15)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

In [2]:
%%local
get_ipython().run_cell_magic('configure', line="-f", cell='{ "name":"%s-final", "executorMemory":"4G", "executorCores":4, "numExecutors":10, "driverMemory": "4G" }' % username)

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6358,application_1618324153128_5874,pyspark,idle,Link,Link,,
6462,application_1618324153128_6023,pyspark,idle,Link,Link,,
6533,application_1618324153128_6110,pyspark,busy,Link,Link,,
6574,application_1618324153128_6156,pyspark,busy,Link,Link,,
6575,application_1618324153128_6161,pyspark,idle,Link,Link,,
6656,application_1618324153128_6258,pyspark,idle,Link,Link,,
6659,application_1618324153128_6261,pyspark,dead,Link,Link,,
6671,application_1618324153128_6273,pyspark,idle,Link,Link,,
6681,application_1618324153128_6283,pyspark,busy,Link,Link,,
6682,application_1618324153128_6284,pyspark,idle,Link,Link,,


In [3]:
import pyspark.sql.functions as F
from pyspark import SparkConf, SparkContext

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6740,application_1618324153128_6345,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Loading Spark

In [4]:
%%send_to_spark -i username -t str -n username

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'username' as 'username' to Spark kernel

## Data Preparation

In [5]:
# Read stop data
sbb_stops = spark.read.orc('/data/sbb/orc/geostops')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
sbb_stops.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- stop_id: string (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_lat: double (nullable = true)
 |-- stop_lon: double (nullable = true)
 |-- location_type: integer (nullable = true)
 |-- parent_station: string (nullable = true)

In [7]:
%%local
def distance_km(lat1, lon1, lat2, lon2):
    geod = pyproj.Geod(ellps='WGS84')
    _, _, distance = geod.inv(lon1, lat1, lon2, lat2)
    return distance

In [8]:
%%local
distance_km(47.3781762039461, 8.54019357578468, 48.3781762039461, 8.54019357578468)

111187.94975797477

In [9]:
def distance_between_coordinates_km(lat1, lon1, lat2, lon2):
    # Returns distance in km from Zurich HB
    from math import sin, cos, sqrt, atan2, radians

    # approximate radius of earth in km
    R = 6373.0

    lat1 = radians(float(lat1))
    lon1 = radians(float(lon1))
    lat2 = radians(float(lat2))
    lon2 = radians(float(lon2))

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c

    return distance

distance_between_coordinates_km = F.udf(distance_between_coordinates_km)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def distance_between_coordinates_degrees(lat1, lon1, lat2, lon2):
    from math import sin, cos, sqrt, atan2, radians

    lat1 = float(lat1)
    lon1 = float(lon1)
    lat2 = float(lat2)
    lon2 = float(lon2)

    dlon = lon2 - lon1
    dlat = lat2 - lat1
    
    return dlon**2 + dlat**2

distance_between_coordinates_degrees = F.udf(distance_between_coordinates_degrees)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Filter stops based on distance from Zurich.
filtered_stops = sbb_stops \
    .withColumn("distance",distance_between_coordinates_km(F.lit(47.3781762039461), F.lit(8.54019357578468), sbb_stops.stop_lat,sbb_stops.stop_lon)) \
    .filter(F.col('distance')<=15.5)

filt_stops = set([i['stop_id'] for i in filtered_stops.select(F.col('stop_id')).collect()])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
filtered_stops = sbb_stops \
stops = filtered_stops.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
len(stops)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2016

### GTFS Data

In [1]:
# Read data about stops of each trip.
sbb_stop_times = (spark.read      
  .option("header", "true")
  .option("inferSchema", "true")
  .csv('/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv')
)
sbb_stop_times.show(5)

Starting Spark application


KeyboardInterrupt: 

In [15]:
sbb_stop_times.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_sequence: integer (nullable = true)
 |-- pickup_type: integer (nullable = true)
 |-- drop_off_type: integer (nullable = true)

In [16]:
# Filter stops that are not close enough to Zurich.
filtered_stop_times = sbb_stop_times\
    .filter(F.col("stop_id").isin(filt_stops))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
filtered_stop_times.select("trip_id").count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2184777

In [18]:
stop_times = filtered_stop_times.select("trip_id").distinct().toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
len(stop_times)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

160325

In [20]:
# Define relevant trips - trips that include stops that are close enough to Zurich.
valid_trips = set(stop_times["trip_id"].values)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
print(len(valid_trips))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

160325

In [22]:
# Read trip information for HDFS.
sbb_trips = (spark.read      
  .option("header", "true")
  .option("inferSchema", "true")
  .csv('/data/sbb/csv/timetable/trips/2019/05/07/trips.csv')
)
sbb_trips.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+--------------------+---------------+------------+
|   route_id|service_id|             trip_id|trip_short_name|direction_id|
+-----------+----------+--------------------+---------------+------------+
| 1-85-j19-1|  TA+b0001| 2.TA.1-85-j19-1.1.H|          85003|           0|
|1-1-C-j19-1|  TA+b0001|5.TA.1-1-C-j19-1.3.R|            108|           1|
|1-1-C-j19-1|  TA+b0001|7.TA.1-1-C-j19-1.3.R|            112|           1|
|1-1-C-j19-1|  TA+b0001|9.TA.1-1-C-j19-1.3.R|            116|           1|
|1-1-C-j19-1|  TA+b0001|11.TA.1-1-C-j19-1...|            120|           1|
+-----------+----------+--------------------+---------------+------------+
only showing top 5 rows

In [23]:
print(sbb_trips.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

998456

In [24]:
# Filter relevant trips.
filtered_sbb_trips = sbb_trips\
    .join(filtered_stop_times, "trip_id", "inner")

filtered_sbb_trips = filtered_sbb_trips.select(F.col("route_id"), F.col("service_id"), F.col("trip_id"), F.col("direction_id")).distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
filtered_sbb_trips.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

160325

In [26]:
trips = filtered_sbb_trips.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Read schedule information: it says days when service is working.
sbb_calendar = (spark.read      
  .option("header", "true")
  .option("inferSchema", "true")
  .csv('/data/sbb/csv/timetable/calendar/2019/05/07/calendar.csv')
)
sbb_calendar.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|service_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|start_date|end_date|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|  TA+b0006|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0ch2|     0|      0|        0|       0|     0|       1|     1|  20181209|20191214|
|  TA+b0014|     0|      0|        0|       0|     0|       0|     1|  20181209|20191214|
|  TA+b000w|     0|      0|        0|       0|     0|       1|     0|  20181209|20191214|
|  TA+b001b|     1|      1|        1|       1|     1|       1|     0|  20181209|20191214|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
only showing top 5 rows

In [29]:
# Filter schedule to remove irrelevant services that operates outside of Zurich area.
filtered_sbb_calendar = sbb_calendar\
    .join(filtered_sbb_trips, "service_id", "inner")

filtered_sbb_calendar = filtered_sbb_calendar.select(
*[F.col(x) for x in sbb_calendar.columns]).distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
filtered_sbb_calendar.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5620

In [31]:
calendar = filtered_sbb_calendar.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# Read route information.
sbb_routes = (spark.read      
  .option("header", "true")
  .option("inferSchema", "true")
  .csv('/data/sbb/csv/timetable/routes/2019/05/07/routes.csv')
)
sbb_routes.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+----------------+---------------+----------+----------+
|   route_id|agency_id|route_short_name|route_long_name|route_desc|route_type|
+-----------+---------+----------------+---------------+----------+----------+
|11-40-j19-1|      801|             040|           null|       Bus|       700|
|11-61-j19-1|     7031|             061|           null|       Bus|       700|
|11-62-j19-1|     7031|             062|           null|       Bus|       700|
|24-64-j19-1|      801|             064|           null|       Bus|       700|
|11-83-j19-1|      801|             083|           null|       Bus|       700|
+-----------+---------+----------------+---------------+----------+----------+
only showing top 5 rows

In [33]:
# Filter relevant routes (again, ones that operate inside of Zurich area)
filtered_sbb_routes = sbb_routes\
    .join(filtered_sbb_trips, "route_id", "inner")

filtered_sbb_routes = filtered_sbb_routes.select(
*[F.col(x) for x in sbb_routes.columns]).distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
filtered_sbb_routes.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

513

In [35]:
routes = filtered_sbb_routes.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
def get_parent_station(x):
    """Returns proper station id.
    
    For some big stations (like Zurich HB), stop_id includes platform.
    This function allows to get one stop id for all platforms of the same station.
    """
    return int(x.replace('P', '').split(':')[0])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we use dictionary encoding: we transform big string ids into small int ids to reduce memory consumption. Also we transform string time into hour and minute numbers.

In [37]:
trip_map = {
    v: i for i, v in enumerate(sorted(list(valid_trips)))
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
stop_map = {
    v: i for i, v in enumerate(sorted(list(set(stops["stop_id"].map(get_parent_station).values))))
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
service_map = {
    v: i for i, v in enumerate(sorted(list(set(calendar["service_id"].values))))
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
route_map = {
    v: i for i, v in enumerate(sorted(list(set(routes["route_id"].values))))
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
def get_new_columns(x):
    """Creates new columns for hours and minutes."""
    result = []
    for name in x.__fields__:
        if name.endswith("time"):
            subname = name.split("_")[0]
            result.extend(["{subname}_hour".format(subname=subname), "{subname}_minute".format(subname=subname)])
        else:
            result.append(name)
    return result

def optimize(x, stop_map, trip_map, service_map, route_map):
    """Does dictionary encoding of different ids and time for 1 RDD row."""
    result = []
    for name in x.__fields__:
        value = x[name]
        if name == "trip_id":
            result.append(trip_map[value])
        elif name == "stop_id":
            result.append(stop_map[get_parent_station(value)])
        elif name == "service_id":
            result.append(service_map[value])
        elif name == "route_id":
            result.append(route_map[value])
        elif name.endswith("time"):
            subname = name.split("_")[0]
            result.extend([int(value[:2]), int(value[3:5])])
        else:
            result.append(value)
    return result

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
def optimize_df(df, schema=None):
    """Does dictionary encoding of different ids and time for spark DataFrame."""
    if schema is None:
        schema = get_new_columns(df.take(1)[0])
    return df.rdd\
        .map(lambda x: optimize(x, stop_map, trip_map, service_map, route_map))\
        .toDF(schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we run optimization function on all obtained datasets.

In [43]:
stops_opt = optimize_df(filtered_stops)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
stops_opt = stops_opt.select('stop_id', 'stop_name', 'stop_lat', 'stop_lon', 'distance').groupby(['stop_id', 'stop_name'])\
    .agg({'stop_lat':'avg', 'stop_lon':'avg', 'distance':'avg'})
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
stops_opt = stops_opt\
    .withColumn("stop_lon", F.col('avg(stop_lon)'))\
    .withColumn("stop_lat", F.col('avg(stop_lat)'))\
    .withColumn("distance", F.col('avg(distance)'))\
    .select('stop_id', 'stop_name', 'stop_lat', 'stop_lon', 'distance')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
stops_opt.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1660

In [47]:
stop_times_opt = optimize_df(filtered_stop_times)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
trips_opt = optimize_df(filtered_sbb_trips)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
calendar_opt = optimize_df(filtered_sbb_calendar)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
schema = StructType([
    StructField("route_id", LongType(), True),
    StructField("agency_id", StringType(), True),
    StructField("route_short_name", StringType(), True),
    StructField("route_long_name", StringType(), True),
    StructField("route_desc", StringType(), True),
    StructField("route-type", IntegerType(), True)
])
routes_opt = optimize_df(filtered_sbb_routes, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# Save optimized datasets on HDFS.

stops_opt.write.mode("overwrite").parquet("/user/{}/stops.parquet".format(username))
stop_times_opt.write.mode("overwrite").parquet("/user/{}/stop_times.parquet".format(username))
trips_opt.write.mode("overwrite").parquet("/user/{}/trips.parquet".format(username))
calendar_opt.write.mode("overwrite").parquet("/user/{}/calendar.parquet".format(username))
routes_opt.write.mode("overwrite").parquet("/user/{}/routes.parquet".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
import pandas as pd

def save_map(d, name):
    pdf = pd.DataFrame([(v, i) for v, i in d.items()], columns=[name + '_id', 'new_id'])
    df = spark.createDataFrame(pdf)
    df.write.mode("overwrite").parquet("/user/{}/{}_map.parquet".format(username, name))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
# Save mapping from string ids to int ids on HDFS.

save_map(stop_map, "stop")
save_map(trip_map, "trip")
save_map(service_map, "service")
save_map(route_map, "route")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…