# Data preparation in PySpark

In this notebook, we make some data exploration and data cleaning to have dataframes easier to handle for the tasks. Moreover, we also compute the statistics of the delays, in order to compute later the probability of each route.

# Imports and Configuration

In [None]:
%%local
import os
import json


username = 'olam'
namespace = os.environ['CI_NAMESPACE']
project = os.environ['CI_PROJECT']

configuration = dict(
    name=f"{username}-{namespace}-{project}",
    executorMemory = "4G",
    executorCores = 4, 
    numExecutors = 10,
    conf = {
        "spark.jars.repositories": "https://repos.spark-packages.org",
        "spark.jars.packages": "graphframes:graphframes:0.7.0-spark2.3-s_2.11"
    }
)

# set the application name as "<gaspar_id>-final_assignment"
get_ipython().run_cell_magic('configure', line="-f", cell=json.dumps(configuration))

In [None]:
%%send_to_spark -i username -t str -n username

In [None]:
print('We are using Spark %s' % spark.version)

In [None]:
sc.addPyFile('graphframes_graphframes-0.7.0-spark2.3-s_2.11.jar')

In [None]:
username

# Load Data and Data Exploration

In [None]:
import pyspark.sql.functions as F

from math import radians, cos, sin, asin, sqrt
from pyspark.sql.functions import col, length, lit, min, date_format, udf
from pyspark.sql.types import *

import datetime

import numpy as np

from graphframes import * 

## Actual data

- `BPUIC` is the stop_id (from https://opentransportdata.swiss/de/cookbook/ist-daten/)

In [None]:
# Loading the ORC data into a Spark DataFrame
df = spark.read.orc("/data/sbb/orc/istdaten/")

In [None]:
df.printSchema()

In [None]:
df.show(n=2, vertical=True)

## Timetable data

### Stop Times

In [None]:
df_stop_times = spark.read.csv("/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv", header=True)

In [None]:
df_stop_times.printSchema()

In [None]:
df_stop_times.show(n=5)

### Trips

In [None]:
df_trips = spark.read.csv("/data/sbb/csv/timetable/trips/2019/05/07/trips.csv", header=True)

In [None]:
df_trips.printSchema()

In [None]:
df_trips.show(n=5)

### Calendar

In [None]:
df_calendar = spark.read.csv("/data/sbb/csv/timetable/calendar/2019/05/07/calendar.csv", header=True)

In [None]:
df_calendar.printSchema()

In [None]:
df_calendar.show(n=5)

In [None]:
df_calendar.select("monday").withColumn("monday", df_calendar["monday"].cast(IntegerType())).select(F.sum("monday")).show()
df_calendar.select("tuesday").withColumn("tuesday", df_calendar["tuesday"].cast(IntegerType())).select(F.sum("tuesday")).show()

We observe that we cannot conclude that the timetable doesn't depend on the day.

In [None]:
# Write to HDFS the processed data
df_calendar.write.parquet('/user/{0}/calendar.parquet'.format(username))

### Routes

In [None]:
df_routes = spark.read.csv("/data/sbb/csv/timetable/routes/2019/05/07/routes.csv", header=True)

In [None]:
df_routes.printSchema()

In [None]:
df_routes.show(n=5)

In [None]:
df_routes.select('route_id', 'route_desc').show()

In [None]:
# Cound the number of different type of transport
df_routes.select('route_desc').distinct().count()

In [None]:
# Check all the possible value for route_desc
df_routes.select('route_desc').distinct().show(n=23)

## Stations data

- Multiple stop_id for same stop (always have a parent, where stop_id finish with P) => keep only 7 first digit and drop duplicates

In [None]:
df_stations = spark.read.orc('/data/sbb/orc/geostops')

In [None]:
df_stations.printSchema()

In [None]:
df_stations.show(n=5)

In [None]:
df_stations.where(length(col('stop_id')) > 7).show()

In [None]:
# Check Albbruck station
df_stations.filter(col('stop_name') == 'Albbruck').show()

In [None]:
# Check Gaggiolo station
df_stations.filter(col('stop_name') == 'Gaggiolo').show()

# Data Cleaning

In [None]:
# Global var

COORD_ZURICH_HB = (47.378177, 8.540192)

## Helpers functions

In [None]:
@F.udf
def compute_distance(lat1, lon1, lat2, lon2):
    """
    Compute the distance between two coordinates on earth
    """
    
    # convert decimal degrees to radians 
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers
    
    return c * r

In [None]:
@F.udf
def rename_stop_id(name):
    """
    Keep only the 7 first digits of the stop_id
    """
    return name[:7]

In [None]:
@F.udf
def distance2walkTime(dist):
    """
    Compute the walking time given the distance.
    We assume that the walking speed is 50m/min
    """
    
    # Transform distance in km to m
    dist = 1000 * dist
    
    # Wakling speed in m/s
    walk_speed = 50.0 / 60.0
    
    # Walk time in seconds
    walk_time = dist / walk_speed
    
    return walk_time

In [None]:
@F.udf
def get_type_transport(type_value):
    """
    Get the general transport type, especially for trains
    """
    
    list_possible_train = ['TGV', 'Eurocity', 'Standseilbahn', 'Regionalzug', 'RegioExpress', 'S-Bahn',
                           'ICE', 'Nacht-Zug', 'Eurostar', 'Schnellzug', 'Intercity', 'InterRegio', 'Extrazug']
    
    if type_value in list_possible_train:
        return 'zug'
    else:
        return type_value.lower()

## Actual data

- Filter data when the date is not in the format of `__.__.____`
- Filter when `PRODUKT_ID` is NULL or empty
- Filter `DURCHFAHRT_TF`, `ZUSATZFAHRT_TF`, `FAELLT_AUS_TF`
- Convert string to timestamp for time data

In [None]:
# As in HW 2
df = df.filter((col('betriebstag').like('__.__.____')) &
               (col('produkt_id').isNotNull()) &
               (col('durchfahrt_tf') == 'false') &
               (col('zusatzfahrt_tf') == 'false') &
               (col('faellt_aus_tf') == 'false'))

df = df.drop(*['durchfahrt_tf', 'zusatzfahrt_tf', 'faellt_aus_tf'])

In [None]:
# Get timestamps instead of strings
df = df.withColumn('ankunftszeit', F.to_timestamp('ankunftszeit', 'dd.MM.yyyy HH:mm'))
df = df.withColumn('abfahrtszeit', F.to_timestamp('abfahrtszeit', 'dd.MM.yyyy HH:mm'))
df = df.withColumn('an_prognose', F.to_timestamp('an_prognose', 'dd.MM.yyyy HH:mm'))
df = df.withColumn('ab_prognose', F.to_timestamp('ab_prognose', 'dd.MM.yyyy HH:mm'))

In [None]:
# Rename bcpuic to stop_id
df = df.withColumnRenamed('bpuic', 'stop_id')

In [None]:
# Correct wrong produkt_id
df = df.withColumn("produkt_id", F.when(F.col("produkt_id")=="BUS", "Bus").otherwise(F.col("produkt_id")))

## Timetable data

### Stop Times

- Convert string to timestamp for time data

In [None]:
# Get timestamps instead of strings
df_stop_times = df_stop_times.withColumn('arrival_time', F.to_timestamp('arrival_time', 'HH:mm:ss'))
df_stop_times = df_stop_times.withColumn('departure_time', F.to_timestamp('departure_time', 'HH:mm:ss'))

# Drop useless columns
df_stop_times = df_stop_times.drop(*['pickup_type', 'drop_off_type'])

In [None]:
# Write to HDFS the processed data
df_stop_times.write.parquet('/user/{0}/stop_times.parquet'.format(username))

## Stations data

- Filter out stations outside of 15km rayon from Zurich HB
- Keep one `stop_id` for each station
- Filter actual data by keeping only data for station from Station data

In [None]:
df_stations.printSchema()

In [None]:
# Keep stations inside 15km rayon from Zurich HB
df_stations = df_stations.filter(compute_distance(col('stop_lat'), 
                                                  col('stop_lon'), 
                                                  lit(COORD_ZURICH_HB[0]),
                                                  lit(COORD_ZURICH_HB[1])) < 15)

In [None]:
# Rename stop_id 
df_stations = df_stations.withColumn('stop_id', rename_stop_id('stop_id'))\
                         .dropDuplicates(['stop_id'])\
                         .drop(*['location_type', 'parent_station'])

In [None]:
df_stations.show(n=5)

In [None]:
# Write to HDFS the processed data
df_stations.write.parquet('/user/{0}/stations_data.parquet'.format(username))

In [None]:
# Keep actual data for stations inside 15km rayon of Zurich HB
df = df.join(df_stations, on='stop_id', how='inner')

In [None]:
df.count()

In [None]:
# Write to HDFS the processed data
df.write.parquet('/user/{0}/actual_data.parquet'.format(username))

## Routes

In [None]:
# Clean the type of transport for a given route
df_routes = df_routes.withColumn('type', get_type_transport('route_desc')).select('route_id', 'type')
df_routes.show(n=5)

## Trips + Type of transport

In [None]:
# Get the type of transport for a given trip_id
df_trips_type = df_trips.join(df_routes, on='route_id').select('trip_id', 'type')

In [None]:
# Write to HDFS the processed data
df_trips_type.write.parquet('/user/{0}/trips_type.parquet'.format(username))

## Stop Times + Trips Data

In [None]:
# Create and store a dataframe enabling to have the stop_ids with the service_id
stop_times_trips = df_trips.join(df_stop_times[['trip_id', 'arrival_time', 'departure_time', \
                                                             'stop_id', 'stop_sequence']], 'trip_id', 'inner')
stop_times_trips = stop_times_trips[['service_id', 'trip_id', 'arrival_time', 'departure_time', 'stop_id', 'stop_sequence']]

In [None]:
stop_times_trips.show(5)

In [None]:
# Write to HDFS the processed data
stop_times_trips.write.parquet('/user/{0}/stop_times_and_trips.parquet'.format(username))

# Network of stops

In [None]:
# Global variable
MAX_DIST_WALK = 0.5

## Network by walk

We first compute the rows of a static network connecting all the stations which can be reached by walking from one to another.

In [None]:
# Get the dataframe that contains, for each row, the walking distance and time between two stops
# such that the two stops are less than 500m far from each other
df_edges_walk = df_stations.crossJoin(df_stations.withColumnRenamed('stop_id', 'stop_id2')\
                                                 .withColumnRenamed('stop_lat', 'stop_lat2')\
                                                 .withColumnRenamed('stop_lon', 'stop_lon2'))\
                           .withColumn('distance', compute_distance(col('stop_lat'), col('stop_lon'), col('stop_lat2'), col('stop_lon2')))\
                           .filter((col('stop_id') != col('stop_id2')) &
                                   (col('distance')< MAX_DIST_WALK))\
                           .withColumn('travel_time', distance2walkTime(col('distance')))\
                           .select(['stop_id', 'stop_id2', 'distance', 'travel_time'])

In [None]:
df_edges_walk.printSchema()
df_edges_walk.show(5)

In [None]:
# Write to HDFS the processed data
df_edges_walk.write.parquet('/user/{0}/edges_walk.parquet'.format(username))

## Network by public transportations

We then compute the rows of a static network connecting all the stations which can be reached by taking a public transportation from one to another.

In [None]:
df_stop_times.show()

In [None]:
# Keep stop times for stations inside 15km rayon of Zurich HB
df_stop_times_zurich = df_stop_times.join(df_stations.select("stop_id").distinct(), on='stop_id', how='inner')
df_stop_times_zurich.show(5)

In [None]:
# Assuming that the time between two stops are always the same for the same type of transport
df_edges_public_transportations = df_stop_times_zurich.crossJoin(df_stop_times_zurich.withColumnRenamed('stop_id', 'stop_id2')\
                                                     .withColumnRenamed('trip_id', 'trip_id2')\
                                                     .withColumnRenamed('arrival_time', 'arrival_time2')\
                                                     .withColumnRenamed('departure_time', 'departure_time2')\
                                                     .withColumnRenamed('stop_sequence', 'stop_sequence2'))\
                    .filter((col('stop_sequence2') == col('stop_sequence') + 1) & (col('trip_id') == col('trip_id2')))\
                    .dropDuplicates(['stop_id', 'stop_id2'])\
                    .withColumn('travel_time', col('arrival_time2').cast("long") - col('departure_time').cast("long"))\
                    .select('stop_id', 'stop_id2','travel_time')
df_edges_public_transportations.show(10)

In [None]:
# Write to HDFS the processed data
df_edges_public_transportations.write.parquet('/user/{0}/edges_transport.parquet'.format(username))

# Create statistics Dataframes

In [None]:
df.show(2, vertical=True)

In this part we compute the statistics of the delays for each route, each time and each different weekday.

In [None]:
df_stats = df.select("stop_id", "produkt_id", "ankunftszeit", "an_prognose")
df_stats.show(1)
df_stats.printSchema()

In [None]:
@F.udf
def weekday(timestamp):
    """
    Return the weekday of the timestamp: 0 == Monday, 1 == Tuesday,...
    """
    if timestamp:
        return timestamp.weekday()
    else:
        return None

In [None]:
# We add the weekday to the dataframe.
df_stats = df_stats.withColumn("weekday", weekday(col("ankunftszeit")))
df_stats.show(10)

In [None]:
# We compute the delay of each route
df_stats = df_stats.na.drop()\
                   .withColumn("delay", ((col("an_prognose").cast("long") - col("ankunftszeit").cast("long"))/60).cast("int"))\
                   .drop("an_prognose").cache()
df_stats.show(5)

In [None]:
# We have now everything to compute the statistics for each weekday and transportype.

idx2weekday = {0: 'monday', 1: 'tuesday', 2: 'wednesday', 3: 'thursday', 4: 'friday', 5: 'saturday', 6: 'sunday'}
transport_types = ["Zug", "Tram", "Bus"]

for weekday in range(5):
    for transport_type in transport_types:
        
        df_stats_temp = df_stats.filter((col("weekday")== weekday) & (col("produkt_id")==transport_type))\
                                .drop("weekday", "produkt_id")\
                                .withColumn("arrival_time", date_format('ankunftszeit', 'HH:mm:ss'))\
                                .drop("ankunftszeit")\
                                .groupBy("arrival_time", "stop_id")\
                                .agg(F.collect_list("delay"))\
                                .withColumnRenamed("collect_list(delay)", "delay_list")
        
        # Write to HDFS the data
        df_stats_temp.write.parquet('/user/{0}/stats_delays_{1}_{2}.parquet'.format(username, idx2weekday.get(weekday), transport_type))

In [None]:
df_stats.filter((col("weekday")== 0) & (col("produkt_id")=='Zug'))\
                                .drop("weekday", "produkt_id")\
                                .withColumn("arrival_time", date_format('ankunftszeit', 'HH:mm:ss'))\
                                .drop("ankunftszeit")\
                                .groupBy("arrival_time", "stop_id")\
                                .agg(F.collect_list("delay"))\
                                .withColumnRenamed("collect_list(delay)", "delay_list").show()
        