In [1]:
import getpass
import pyspark
from pyspark.sql import SparkSession


spark = (SparkSession
         .builder
         .appName('final-proj')
         .master('yarn')
         .config('spark.port.maxRetries', 100)
         .config('spark.executor.memory', '1g')
         .config('spark.executor.instances', '2')
         .config('spark.executor.cores', '2')
         .config('spark.jars.packages', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0')
         .getOrCreate())

sc = spark.sparkContext
conf = sc.getConf()

spark

In [3]:
# Metadata file contains all the stations and gps coordinates
metadata = spark.read.text('/datasets/project/metadata/BFKOORD_GEO')

In [4]:
# Splitting and adding new columns to the spark dataframe
split_col = pyspark.sql.functions.split(metadata['value'], " % ")
split_left = pyspark.sql.functions.split(split_col.getItem(0), " +")
metadata = metadata.withColumn('longitude', split_left.getItem(2))
metadata = metadata.withColumn('latitude', split_left.getItem(1))
metadata = metadata.withColumn('stop', split_col.getItem(1))
metadata = metadata.drop('value')

metadataPandas = metadata.toPandas()
metadataPandas.head()

Unnamed: 0,longitude,latitude,stop
0,44.44677,26.074412,Bucuresti
1,50.901549,1.811446,Calais
2,51.284212,1.075329,Canterbury
3,50.729172,-3.543547,Exeter
4,46.922368,9.733756,"Fideris, Bahnhof"


In [5]:
# Zürich HB is the starting point
zurich_coord = metadataPandas[metadataPandas['stop'] == 'Zürich HB']
zurich_coord

Unnamed: 0,longitude,latitude,stop
2379,47.378177,8.540192,Zürich HB


In [57]:
from math import sin, cos, sqrt, atan2, radians

# Function to calculate the distance of gps coordinates
def compute_distance(lat1, lon1, lat2, lon2):
    # approximate radius of earth in km
    R = 6373.0 
    
    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)
    
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c
    return distance*1000


In [58]:
# keep only stations inside 10km
mask = metadataPandas.apply(lambda x: compute_distance(
    float(zurich_coord['latitude']), 
    float(zurich_coord['longitude']),
    float(x['latitude']), 
    float(x['longitude'])) <= 10000, axis=1)
metadataPandas = metadataPandas[mask]

In [11]:
df = spark.read.option("delimiter", ";").option("header", "true").csv('/datasets/project/istdaten/*/*')

In [12]:
# Keep only the needed information
df = df.select(
          df['BETRIEBSTAG'].alias('date'), 
          df['FAHRT_BEZEICHNER'].alias('id'), 
          df['PRODUKT_ID'].alias('transport_type'), 
          df['LINIEN_ID'].alias('train_number'), 
          df['HALTESTELLEN_NAME'].alias('stop_name'), 
          df['ANKUNFTSZEIT'].alias('arrival_time'), 
          df['AN_PROGNOSE'].alias('actual_arrival_time'),
          df['AN_PROGNOSE_STATUS'].alias('status_arrival_time'),
          df['ABFAHRTSZEIT'].alias('departure_time'),
          df['AB_PROGNOSE'].alias('actual_departure_time'),
          df['AB_PROGNOSE_STATUS'].alias('status_departure_time'),
          df['DURCHFAHRT_TF'].alias('stop_here'))

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pandas import Series
import datetime
import time

formatTS1 = "dd.MM.yyyy HH:mm:ss"
formatTS2 = "dd.MM.yyyy HH:mm"

# compute the departure and arrival delays
departure_delay = (F.unix_timestamp('actual_departure_time', formatTS1) - 
        F.unix_timestamp('departure_time', formatTS2))
arrival_delay = (F.unix_timestamp('actual_arrival_time', formatTS1) - 
        F.unix_timestamp('arrival_time', formatTS2))

# extract the day of the week
def get_week_day(date):
    converted_date = datetime.datetime.strptime(str(date), "%d.%m.%Y")
    return converted_date.weekday()

week_day_udf = F.udf(get_week_day, IntegerType())

# udf to fill NaN departure (arrival) values with the arrival (departure) time
@udf('string')
def fillWithOther(a, b):
    if (a is None):
        return b
    return a
    
# date parser
date = F.to_date('date', 'dd.MM.yyyy')

# First filtering. arrival_delay and departure_delay columns
df_filtered = (df.filter((df.status_arrival_time == 'GESCHAETZT') | (df.status_departure_time == 'GESCHAETZT'))
     .withColumn("arrival_delay", arrival_delay)
     .withColumn("departure_delay", departure_delay)
                      )

# Handling the NaN values
df_with_delays = df_filtered.select('id', 'train_number', 'stop_name', 'arrival_time', 'arrival_delay', 'departure_time', 'departure_delay')
df_with_delays = df_with_delays.fillna(0, ['departure_delay', 'arrival_delay'])
df_with_delays = (df_with_delays
                  .withColumn('arrival_time2', fillWithOther(df_with_delays.arrival_time, df_with_delays.departure_time))
                  .withColumn('departure_time2', fillWithOther(df_with_delays.departure_time, df_with_delays.arrival_time))
                  .drop(df_with_delays.departure_time).drop(df_with_delays.arrival_time)
                 )
df_with_delays = df_with_delays.withColumnRenamed('departure_time2', 'departure_time').withColumnRenamed('arrival_time2', 'arrival_time')
# Adding the day of the week
df_with_delays = df_with_delays.withColumn("day_week", week_day_udf(F.split(df_with_delays.arrival_time, " ")[0]))

# Filtering outliers
df_with_delays = df_with_delays.filter((df_with_delays['arrival_delay'] > -1000) &
                                       (df_with_delays['arrival_delay'] < 1000) &
                                       (df_with_delays['departure_delay'] > -1000) &
                                       (df_with_delays['departure_delay'] < 1000)
                                      )

In [14]:
#df_with_delays.take(100)

In [15]:
# Filtering the stops that are not in the metadata
# Adding a new column 'info' containing all the needed information
# Grouping by trip id
# Collect the list of the 'info' rows in the group
df_grouped = (df_with_delays
    .filter(df_with_delays.stop_name.isin(metadataPandas.stop.tolist()))
    .withColumn('info', F.struct('stop_name', F.split('arrival_time', " ")[1], 'arrival_delay', F.split('departure_time', " ")[1], 'departure_delay', 'day_week'))
    .groupby('id')
    .agg(F.collect_list('info').alias('info'))
              )

In [16]:
# Collecting the groups
groups = df_grouped.collect()

In [157]:
import pandas as pd
import numpy as np
import time
import datetime

In [158]:
# Function to compute the delta between departur and arrival time. If the delta is too high, it will be filtered
def compute_delta(arrival, departure, day):
    d1 = datetime.datetime.strptime(arrival, "%H:%M")
    d2 = datetime.datetime.strptime(departure, "%H:%M")
    d1_ts = time.mktime(d1.timetuple())
    d2_ts = time.mktime(d2.timetuple())
    res = int(d2_ts-d1_ts) / 60
    if (day > 0):
        res = res + 60 * 24 * day
    return res

# DataFrame containing the final schedule
result = pd.DataFrame(columns= ['trip_id', 'start', 'stop', 
                               'arrival_time_1', 'departure_time_1', 
                               'arrival_time_2', 'departure_time_2',
                               'mean_delay', 'std_delay', 'day_1', 'day_2', 'delta'])

# Iterating through groups
for group in groups:
    trip_id = group[0]
    info = group[1]
    p = pd.DataFrame(info, columns=['stop', 'arrival_time', 'arrival_delay','departure_time', 'departure_delay', 'week_day'])
    
    # Computing the mean of arrival and departure delays
    c = p.groupby(['week_day', 'arrival_time', 'departure_time', 'stop']).agg([np.mean, np.std])

    c = c.reset_index()
    c.columns = [' '.join(col).strip() for col in c.columns.values]
    
    # Creating all the couples of (start_station, arrival_station) for each trip. The order of the schedule 
    # is manteined by the groupby operation. We can have the same schedule for different days of the week, 
    # or the schedule can even change on different days. The compute_delta function calculates the time interval
    # between each couple: if the interval is too high, it means that a new trip started and the couple has to be
    # discarded. We also consider trips that are scheduled between midnight (in this case the week_day changes). 
    partialResult = pd.DataFrame([(trip_id,
                        c.loc[i, 'stop'], 
                        c.loc[i+1, 'stop'], 
                        c.loc[i, 'arrival_time'],
                        c.loc[i, 'departure_time'], 
                        c.loc[i+1, 'arrival_time'], 
                        c.loc[i+1, 'departure_time'], 
                        c.loc[i+1, 'arrival_delay mean'],
                        c.loc[i+1, 'arrival_delay std'], 
                        c.loc[i, 'week_day'], 
                        c.loc[i+1, 'week_day'], 
                        compute_delta(c.loc[i, 'departure_time'], c.loc[i+1, 'arrival_time'], c.loc[i+1, 'week_day']-c.loc[i, 'week_day'])) 
                                  for i in range(len(c)-1)],
                     columns= ['trip_id', 'start', 'stop', 
                               'arrival_time_1', 'departure_time_1', 
                               'arrival_time_2', 'departure_time_2',
                               'mean_delay', 'std_delay', 'day_1', 'day_2', 'delta'])
    # Need to merge the last station with the first, in case of a trip that starts on sunday night and finisches
    # on monday morning
    m = len(c)-1
    partialResult = partialResult.append([{'trip_id': trip_id,
                'start': c.loc[m, 'stop'], 
                'stop': c.loc[0, 'stop'], 
                'arrival_time_1': c.loc[m, 'arrival_time'],
                'departure_time_1': c.loc[m, 'departure_time'], 
                'arrival_time_2': c.loc[0, 'arrival_time'],
                'departure_time_2': c.loc[0, 'departure_time'],
                'mean_delay': c.loc[m, 'arrival_delay mean'], 
                'std_delay': c.loc[m, 'arrival_delay std'], 
                'day_1': c.loc[m, 'week_day'], 
                'day_2': c.loc[0, 'week_day'],
                'delta': compute_delta(c.loc[m, 'departure_time'], c.loc[0, 'arrival_time'], c.loc[0, 'week_day']+7-c.loc[m, 'week_day']),
                 }], ignore_index=True
               )
    # Filtering the wrong couples and couples where the start and stop stations are the same
    partialResult = partialResult[partialResult.delta < 180]
    partialResult = partialResult[partialResult.start != partialResult.stop]
    #partialResult = partialResult.drop(['delta'], axis=1)
    result = result.append(partialResult)

result.head()

Unnamed: 0,trip_id,start,stop,arrival_time_1,departure_time_1,arrival_time_2,departure_time_2,mean_delay,std_delay,day_1,day_2,delta
1,85:11:18388:001,Dietlikon,Stettbach,23:13,23:14,23:17,23:18,60.151515,105.014023,0,0,3.0
2,85:11:18388:001,Stettbach,Zürich Stadelhofen,23:17,23:18,23:22,23:23,67.666667,97.935459,0,0,4.0
3,85:11:18388:001,Zürich Stadelhofen,Zürich HB,23:22,23:23,23:26,23:29,14.606061,92.191695,0,0,3.0
4,85:11:18388:001,Zürich HB,Zürich Hardbrücke,23:26,23:29,23:31,23:31,40.606061,38.48615,0,0,2.0
5,85:11:18388:001,Zürich Hardbrücke,Zürich Altstetten,23:31,23:31,23:35,23:36,3.545455,40.070322,0,0,4.0


In [159]:
# We found some wrong schedules, always between Zurich HB and Zurich Oerlikon. 
# We suppose that these are errors, because in other days the same schedule doesn't include Zurich Oerlikon
result[(result['departure_time_1'] > result['arrival_time_2']) & (result['day_1'] == result['day_2'])].head()

Unnamed: 0,trip_id,start,stop,arrival_time_1,departure_time_1,arrival_time_2,departure_time_2,mean_delay,std_delay,day_1,day_2,delta
2,85:11:19256:001,Zürich HB,Zürich Oerlikon,15:11,15:14,15:12,15:12,-66.0,,0,0,-2.0
2,85:11:19254:001,Zürich HB,Zürich Oerlikon,14:41,14:44,14:42,14:42,-139.0,,0,0,-2.0
2,85:11:19253:001,Zürich HB,Zürich Oerlikon,14:15,14:17,14:16,14:16,54.0,,0,0,-1.0
17,85:11:19285:001,Zürich HB,Zürich Oerlikon,22:15,22:17,22:16,22:16,156.0,,3,3,-1.0
2,85:11:19247:001,Zürich HB,Zürich Oerlikon,12:45,12:47,12:46,12:46,94.0,,0,0,-1.0


In [160]:
# These are the trips that have to be modified
trip_id_to_modify = result[(result['departure_time_1'] > result['arrival_time_2']) & (result['day_1'] == result['day_2'])]['trip_id']

In [161]:
result = result.reset_index().drop(['index'], axis=1)
result.head()

Unnamed: 0,trip_id,start,stop,arrival_time_1,departure_time_1,arrival_time_2,departure_time_2,mean_delay,std_delay,day_1,day_2,delta
0,85:11:18388:001,Dietlikon,Stettbach,23:13,23:14,23:17,23:18,60.151515,105.014023,0,0,3.0
1,85:11:18388:001,Stettbach,Zürich Stadelhofen,23:17,23:18,23:22,23:23,67.666667,97.935459,0,0,4.0
2,85:11:18388:001,Zürich Stadelhofen,Zürich HB,23:22,23:23,23:26,23:29,14.606061,92.191695,0,0,3.0
3,85:11:18388:001,Zürich HB,Zürich Hardbrücke,23:26,23:29,23:31,23:31,40.606061,38.48615,0,0,2.0
4,85:11:18388:001,Zürich Hardbrücke,Zürich Altstetten,23:31,23:31,23:35,23:36,3.545455,40.070322,0,0,4.0


In [162]:
# Modifying the wrong rows
for i, row in result.iterrows():
    if ((result.loc[i, 'trip_id'] in list(trip_id_to_modify.values)) &
       (result.loc[i, 'stop'] == 'Zürich Oerlikon')):
        result.loc[i, 'stop'] = result.loc[i+1, 'stop']
        result.loc[i, 'arrival_time_2'] = result.loc[i+1, 'arrival_time_2']
        result.loc[i, 'departure_time_2'] = result.loc[i+1, 'departure_time_2']
        result.loc[i, 'mean_delay'] = result.loc[i+1, 'mean_delay']
        result.loc[i, 'std_delay'] = result.loc[i+1, 'std_delay']

In [163]:
# Eliminating the wrong rows
result = result[ (~result['trip_id'].isin(list(trip_id_to_modify.values)))
        | ((result['trip_id'].isin(list(trip_id_to_modify.values)))
           & (result['start'] != 'Zürich Oerlikon'  ) )]

In [164]:
result

Unnamed: 0,trip_id,start,stop,arrival_time_1,departure_time_1,arrival_time_2,departure_time_2,mean_delay,std_delay,day_1,day_2,delta
0,85:11:18388:001,Dietlikon,Stettbach,23:13,23:14,23:17,23:18,60.151515,105.014023,0,0,3.0
1,85:11:18388:001,Stettbach,Zürich Stadelhofen,23:17,23:18,23:22,23:23,67.666667,97.935459,0,0,4.0
2,85:11:18388:001,Zürich Stadelhofen,Zürich HB,23:22,23:23,23:26,23:29,14.606061,92.191695,0,0,3.0
3,85:11:18388:001,Zürich HB,Zürich Hardbrücke,23:26,23:29,23:31,23:31,40.606061,38.486150,0,0,2.0
4,85:11:18388:001,Zürich Hardbrücke,Zürich Altstetten,23:31,23:31,23:35,23:36,3.545455,40.070322,0,0,4.0
5,85:11:18388:001,Dietlikon,Stettbach,23:13,23:14,23:17,23:18,85.812500,133.801354,1,1,3.0
6,85:11:18388:001,Stettbach,Zürich Stadelhofen,23:17,23:18,23:22,23:23,82.312500,138.159897,1,1,4.0
7,85:11:18388:001,Zürich Stadelhofen,Zürich HB,23:22,23:23,23:26,23:29,30.218750,130.796008,1,1,3.0
8,85:11:18388:001,Zürich HB,Zürich Hardbrücke,23:26,23:29,23:31,23:31,59.531250,78.816645,1,1,2.0
9,85:11:18388:001,Zürich Hardbrücke,Zürich Altstetten,23:31,23:31,23:35,23:36,23.906250,77.567602,1,1,4.0


In [165]:
# change time type and drop/rename columns
result = (result.drop(['arrival_time_1', 'departure_time_2', 'delta'], axis=1))

def make_time(entry):
    h, m = entry.split(':')
    return datetime.timedelta(hours=int(h), minutes=int(m), seconds=0)

result['departure_time_1'] = result['departure_time_1'].apply(lambda x: make_time(x))
result['arrival_time_2'] = result['arrival_time_2'].apply(lambda x: make_time(x))
result = result2.rename(columns={'departure_time_1': 'departure_time', 'arrival_time_2': 'arrival_time'})
          
result.head()

Unnamed: 0,trip_id,start,stop,departure_time,arrival_time,mean_delay,std_delay,day_1,day_2
0,85:11:18388:001,Dietlikon,Stettbach,23:14:00,23:17:00,60.151515,105.014023,0,0
1,85:11:18388:001,Stettbach,Zürich Stadelhofen,23:18:00,23:22:00,67.666667,97.935459,0,0
2,85:11:18388:001,Zürich Stadelhofen,Zürich HB,23:23:00,23:26:00,14.606061,92.191695,0,0
3,85:11:18388:001,Zürich HB,Zürich Hardbrücke,23:29:00,23:31:00,40.606061,38.48615,0,0
4,85:11:18388:001,Zürich Hardbrücke,Zürich Altstetten,23:31:00,23:35:00,3.545455,40.070322,0,0


In [166]:
# Compute the walking times between stations. Keeping only stations within 300m
walk = pd.DataFrame(columns= ['trip_id', 'start', 'stop', 
                               'departure_time', 
                               'arrival_time',
                               'mean_delay', 'std_delay', 'day_1', 'day_2'])
for index1, stop1 in metadataPandas.iterrows():
    for index2, stop2 in metadataPandas.iterrows():
        if (stop1.stop != stop2.stop):
            distance = compute_distance(float(stop1.latitude), float(stop1.longitude), 
                                       float(stop2.latitude), float(stop2.longitude))
            if (distance <= 300):
                walk = walk.append({'trip_id': '0000', 
                                    'start': stop1.stop, 
                                    'stop': stop2.stop, 
                                    'mean_delay': distance, # considering 1m/s
                                    'std_delay': 0.0}, ignore_index=True)

In [167]:
walk.head()

Unnamed: 0,trip_id,start,stop,departure_time,arrival_time,mean_delay,std_delay,day_1,day_2
0,0,Bonstetten-Wettswil,"Bonstetten-Wettswil, Bahnhof",,,57.121084,0.0,,
1,0,"Waldegg, Birmensdorferstrasse","Waldegg, Post",,,163.432216,0.0,,
2,0,"Waldegg, Birmensdorferstrasse","Uitikon Waldegg, Bahnhof",,,287.582974,0.0,,
3,0,"Zürich, Goldbrunnenplatz","Zürich, Zwinglihaus",,,262.946946,0.0,,
4,0,Zürich HB,Zürich HB SZU,,,140.199392,0.0,,


In [168]:
result = result.append(walk)

In [169]:
result.head()

Unnamed: 0,trip_id,start,stop,departure_time,arrival_time,mean_delay,std_delay,day_1,day_2
0,85:11:18388:001,Dietlikon,Stettbach,23:14:00,23:17:00,60.151515,105.014023,0,0
1,85:11:18388:001,Stettbach,Zürich Stadelhofen,23:18:00,23:22:00,67.666667,97.935459,0,0
2,85:11:18388:001,Zürich Stadelhofen,Zürich HB,23:23:00,23:26:00,14.606061,92.191695,0,0
3,85:11:18388:001,Zürich HB,Zürich Hardbrücke,23:29:00,23:31:00,40.606061,38.48615,0,0
4,85:11:18388:001,Zürich Hardbrücke,Zürich Altstetten,23:31:00,23:35:00,3.545455,40.070322,0,0
