In [1]:
import pandas as pd
import pyspark.sql.functions as functions
import math
import getpass
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("yarn") \
    .appName('journey_planner-{0}'.format(getpass.getuser())) \
    .config('spark.executor.memory', '4g') \
    .config('spark.executor.instances', '5') \
    .config('spark.port.maxRetries', '100') \
    .getOrCreate()

In [2]:
# load the data
df = spark.read.csv('/datasets/sbb/2018/*/*istdaten.csv.bz2', sep=';', header=True)

In [3]:
stations = pd.read_csv('data/filtered_stations.csv')
valid_stations = set(stations['Remark'])

In [4]:
stations = stations[['Longitude', 'Latitude', 'Remark']];
stations['key'] = 0

earth_radius = 6371e3

def haversine(row):
    phi1         = 2 * math.pi * float(row['Latitude_x']) / 360
    phi2         = 2 * math.pi * float(row['Latitude_y']) / 360
    delta_phi    = 2 * math.pi * (float(row['Latitude_y']) - float(row['Latitude_x'])) / 360
    delta_lambda = 2 * math.pi * (float(row['Longitude_y']) - float(row['Longitude_x'])) / 360
    
    a = (math.sin(delta_phi/2) ** 2) + \
        math.cos(phi1) * math.cos(phi2) * (math.sin(delta_lambda/2) ** 2)
    
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    
    d = earth_radius * c
    
    return d / 1000

prod = pd.merge(stations, stations, on='key')
prod['dist'] = prod.apply(lambda row: haversine(row), axis=1)

In [5]:
# We don't consider walking to stops that are more than 3 kilometers away
max_walking_distance = 3
walk_df = prod[prod['dist'] <= max_walking_distance]
walk_df = walk_df[walk_df['Remark_x'] != walk_df['Remark_y']]

walk_df = walk_df[['Remark_x', 'Remark_y', 'dist']]
walk_df['type'] = 'walk'
walk_df['line'] = 'walk'
walk_df['departure_day']  = 'null'
walk_df['departure_time'] = 'null'
walk_df['arrival_time']   = 'null'
# We assume an average walking speed of 5 kilometers per hour
walk_df['lateAvg'] = walk_df.apply(lambda row: 3600 * float(row['dist']) / 5, axis=1)
walk_df['lateStd'] = 0.0
walk_df.drop('dist', axis=1, inplace=True)
walk_df.columns = ['src', 'dst', 'type', 'line', 'departure_day', 'departure_time', 'arrival_time', 'lateAvg', 'lateStd']

In [6]:
walk_edges = spark.createDataFrame(walk_df)

In [7]:
dateFormat = 'dd.MM.yyyy HH:mm'
timeLate = (functions.unix_timestamp('AN_PROGNOSE', format=dateFormat)
            - functions.unix_timestamp('ANKUNFTSZEIT', format=dateFormat))

@functions.udf
def clamp(late):
    return 0 if late < 0 else late

valid_stops = df.filter((df.DURCHFAHRT_TF=='false') & 
                        (df.FAELLT_AUS_TF=='false') & 
                        (df.ZUSATZFAHRT_TF=='false') &
                        (df.AN_PROGNOSE_STATUS=='GESCHAETZT') &
                        (df.HALTESTELLEN_NAME.isin(valid_stations))) \
                .select('BETRIEBSTAG',
                        'FAHRT_BEZEICHNER', 
                        'PRODUKT_ID', 
                        'LINIEN_TEXT', 
                        'HALTESTELLEN_NAME', 
                        'AN_PROGNOSE',
                        'ANKUNFTSZEIT', 
                        'ABFAHRTSZEIT') \
                .withColumn('AN_PROGNOSE',  functions.to_timestamp(df.AN_PROGNOSE, dateFormat))  \
                .withColumn('ANKUNFTSZEIT', functions.to_timestamp(df.ANKUNFTSZEIT, dateFormat)) \
                .withColumn('ABFAHRTSZEIT', functions.to_timestamp(df.ABFAHRTSZEIT, dateFormat)) \
                .withColumn('late', clamp(timeLate)) \
                .drop('AN_PROGNOSE')

In [8]:
departures = valid_stops.filter(valid_stops.ABFAHRTSZEIT.isNotNull())\
                        .drop('ANKUNFTSZEIT', 'late')
arrivals   = valid_stops.filter(valid_stops.ANKUNFTSZEIT.isNotNull())\
                        .drop('ABFAHRTSZEIT')

In [9]:
arrivals.createOrReplaceTempView('arrivals')
departures.createOrReplaceTempView('departures')

joinQuery = 'SELECT d.HALTESTELLEN_NAME AS src, a.HALTESTELLEN_NAME AS dst,              \
                    d.PRODUKT_ID AS type, d.LINIEN_TEXT AS line,                         \
                    date_format(d.ABFAHRTSZEIT, \'EEEE\') AS departure_day,              \
                    SUBSTRING(d.ABFAHRTSZEIT, 12, 8) AS departure_time,                  \
                    SUBSTRING(a.ANKUNFTSZEIT, 12, 8) AS arrival_time,                    \
                    a.late                                                               \
             FROM arrivals AS a INNER JOIN departures AS d                               \
             ON a.BETRIEBSTAG == d.BETRIEBSTAG                                           \
             AND a.FAHRT_BEZEICHNER == d.FAHRT_BEZEICHNER                                \
             WHERE a.HALTESTELLEN_NAME != d.HALTESTELLEN_NAME                            \
             AND d.ABFAHRTSZEIT < a.ANKUNFTSZEIT'

edges = spark.sql(joinQuery)

In [10]:
edges.createOrReplaceTempView('edges')

query = 'SELECT src, dst, type, line, departure_day, departure_time, arrival_time,              \
         AVG(late) AS lateAvg, STD(late) AS lateStd                                             \
         FROM edges GROUP BY src, dst, type, line, departure_day, departure_time, arrival_time'

aggregated = spark.sql(query)
aggregated_edges = aggregated.na.fill(0.0)

all_edges = aggregated_edges.union(walk_edges)

In [11]:
all_edges.write.parquet('/homes/schmutz/edges', mode='overwrite')

In [None]:
#grouped = edges.groupBy([edges.src, edges.dst, edges.type, edges.subtype])
#grouped_edges = grouped.agg({'departure_time': 'collect_list',
#                             'arrival_time'  : 'collect_list'})\
#                        .withColumnRenamed('collect_list(arrival_time)', 'arrival_times')\
#                        .withColumnRenamed('collect_list(departure_time)', 'departure_times')