# Lab in Data Science: Final Project

Pierre Fouche, Matthias Leroy and Raphaël Steinmann

## Imports

In [1]:
%matplotlib inline
import matplotlib.pylab as plt
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

In [125]:
import getpass
import pyspark
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as functions
from pyspark.sql.types import BooleanType
from pyspark.sql.window import Window
import math
import helpers
import pickle
import numpy as np

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Initialize the `SparkSession`

In [3]:
conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('project-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '6g')
conf.set('spark.executor.instances', '6')
conf.set('spark.port.maxRetries', '100')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [4]:
# init spark session
spark = SparkSession(sc)

## Loading the data

## Data Processing

### Cleaning metadata
First, let's clean the metadata dataframe:

In [5]:
# load metadata
raw_metadata = spark.read.load('/datasets/project/metadata', format='com.databricks.spark.csv', header='false', sep='\\t')

In [6]:
# remove multiple spaces
metadata = raw_metadata.withColumn('_c0', functions.regexp_replace(raw_metadata._c0, '\s+', ' '))
# split into columns
metadata = metadata.withColumn('name', functions.split(metadata._c0, '%')[1])
for (name, index, type_) in [('station_ID',0, 'int'), ('long',1, 'double'), ('lat',2, 'double'), ('height',3, 'int')]:
    metadata = metadata.withColumn(name, functions.split(metadata._c0, ' ')[index].cast(type_))
# remove useless column
metadata = metadata.drop('_c0')
# trim name column to remove left/right blank
metadata = metadata.withColumn('name', functions.trim(metadata.name))

In [7]:
metadata.show(5)

+----------------+----------+---------+---------+------+
|            name|station_ID|     long|      lat|height|
+----------------+----------+---------+---------+------+
|       Bucuresti|         2|26.074412| 44.44677|     0|
|          Calais|         3| 1.811446|50.901549|     0|
|      Canterbury|         4| 1.075329|51.284212|     0|
|          Exeter|         5|-3.543547|50.729172|     0|
|Fideris, Bahnhof|         7| 9.733756|46.922368|   744|
+----------------+----------+---------+---------+------+
only showing top 5 rows



We will use the SBB data limited around the Zurich area. We will focus on all the stops within 10km of the Zurich train station. Let's get rid of all the stations that are too far away from Zurich:

In [8]:
metadata.count()

25935

In [9]:
# coordinates of Zürich main train station
lat_zurich = 47.3782
long_zurich = 8.5402

In [10]:
# convert to pandas dataframe
pandas_df = metadata.toPandas()
# keep only the stops that are located < 10km from Zurich HB
pandas_df['distance_to_zh'] = pandas_df.apply(lambda x: helpers.distance(x['long'], x['lat'], long_zurich, lat_zurich), axis=1)
pandas_df = pandas_df[pandas_df['distance_to_zh'] < 10]

In [11]:
# pandas_df.distance_to_zh.max()

In [12]:
# recreate spark dataframe from pandas dataframe
metadata = spark.createDataFrame(pandas_df)
# create dict of stations from pandas dataframe
stations = pandas_df.set_index('station_ID').to_dict('index')

### Cleaning main dataset

In [13]:
# load full data
# raw_df = spark.read.load('/datasets/project/istdaten/*/*', format='csv', header='true', inferSchema='true', sep=';')
# load sample data
raw_df = spark.read.load('/datasets/project/istdaten/2018/01', format='csv', header='true', inferSchema='true', sep=';')

In [14]:
# rename the fields german -> english
fields = {
    'BETRIEBSTAG':'date',
    'FAHRT_BEZEICHNER':'trip_id',
    'PRODUKT_ID':'transport_type',
    'LINIEN_ID':'train_id',
    'VERKEHRSMITTEL_TEXT':'train_type',
    'ZUSATZFAHRT_TF':'additional_trip',
    'FAELLT_AUS_TF':'trip_failed',
    'HALTESTELLEN_NAME':'stop_name',
    'BPUIC':'stop_id',
    'ANKUNFTSZEIT':'schedule_arrival',
    'AN_PROGNOSE':'real_arrival',
    'AN_PROGNOSE_STATUS':'arr_forecast_status',
    'ABFAHRTSZEIT':'schedule_dep',
    'AB_PROGNOSE':'real_dep',
    'AB_PROGNOSE_STATUS':'dep_forecast_status',
    'DURCHFAHRT_TF':'no_stop_here'
}

df = raw_df.selectExpr([k + ' as ' + fields[k] for k in fields])

In [15]:
# refactor dates
df = df.withColumn('date', functions.from_unixtime(functions.unix_timestamp('date', 'dd.MM.yyyy')))
df = df.withColumn('schedule_arrival', functions.from_unixtime(functions.unix_timestamp('schedule_arrival', 'dd.MM.yyyy HH:mm')))
df = df.withColumn('real_arrival', functions.from_unixtime(functions.unix_timestamp('real_arrival', 'dd.MM.yyyy HH:mm')))
df = df.withColumn('schedule_dep', functions.from_unixtime(functions.unix_timestamp('schedule_dep', 'dd.MM.yyyy HH:mm')))
df = df.withColumn('real_dep', functions.from_unixtime(functions.unix_timestamp('real_dep', 'dd.MM.yyyy HH:mm')))

In [16]:
# add a column containing the weekday (monday=1, sunday=6)
df = df.withColumn('weekday', helpers.get_weekday(df.date))

In [17]:
# keep only the rows with stops near zurich
df = df.where(df.stop_id.isin([int(x) for x in list(pandas_df.station_ID.unique())]))

In [18]:
# there is still 51'571'541 rows in zurich area
# df.count()

In [19]:
# keep only date after the 10th of december, because the schedule changed
df = df.where(df.date > '2017-12-10 00:00:00')

In [20]:
# discard the rows when there is no stop here
df2 = df.where(df.no_stop_here == 'false')

In [21]:
# discard ill-formated rows where the train leaves a station before arriving in it
df2 = df2.where((df2.schedule_dep >= df2.schedule_arrival) | functions.col('schedule_arrival').isNull() | functions.col('schedule_dep').isNull())

## Modeling the network

### From stops to trips

In [22]:
# create a column with the schedule time that will be used to build the network
df2 = df2.withColumn('schedule_time', helpers.date_choice(df2.schedule_arrival, df2.schedule_dep))
#df2 = df2.withColumn('schedule_time', functions.from_unixtime(functions.unix_timestamp('schedule_time', 'dd.MM.yyyy HH:mm')))

# create a column that tells if a stop is the first/last one of its trip or in the middle
df2 = df2.withColumn('stop_type', helpers.stop_type(df2.schedule_dep, df2.schedule_arrival))

In [23]:
trips = df2.select(['trip_id', 'date', 'schedule_time', 'stop_id', 'stop_type', 'schedule_arrival', 'schedule_dep', 'transport_type', 'train_type', 'arr_forecast_status', 'weekday', 'real_arrival']).orderBy(['trip_id', 'schedule_time'], ascending=[0,0])

In [24]:
# duplicate the dataframe, shift the copy of one row and append it to the original
# this way, we have for each row the current stop and the next stop
w = Window().partitionBy(functions.col('trip_id')).orderBy(functions.col('trip_id'))
trips2 = trips.select("*", functions.lag("trip_id").over(w).alias("next_tid"))
trips2 = trips2.select("*", functions.lag("schedule_time").over(w).alias("next_time"))
trips2 = trips2.select("*", functions.lag("stop_id").over(w).alias("next_sid"))
trips2 = trips2.select("*", functions.lag("stop_type").over(w).alias("next_type"))
trips2 = trips2.select("*", functions.lag("schedule_arrival").over(w).alias("next_sched_arr"))
trips2 = trips2.select("*", functions.lag("schedule_dep").over(w).alias("next_sched_dep"))
trips2 = trips2.select("*", functions.lag("arr_forecast_status").over(w).alias("next_arr_forecast_status"))
trips2 = trips2.select("*", functions.lag("real_arrival").over(w).alias("next_real_arrival"))

trips2 = trips2.where(trips2.next_time.isNotNull())

In [25]:
#trips2.where(trips2.stop_type=='first').count()
#trips2.where(trips2.stop_type=='last').count()
#trips2.where(trips2.stop_type=='mid').count()

In [26]:
# create a new column telling if the edge is valid or not
# (i.e. if the stop and next stop are really part of the same ride)
trips3 = trips2.withColumn('is_valid', helpers.edge_is_valid(trips2.trip_id, trips2.schedule_time, trips2.stop_id, trips2.stop_type, trips2.next_tid, trips2.next_time, trips2.next_sid, trips2.next_type, trips2.schedule_dep,trips2.next_sched_arr))

In [27]:
# keep only valid edges
trips4 = trips3.filter(trips3.is_valid=='true')

In [28]:
# trips4.select('stop_id', 'next_sid').distinct().count()
# gives 6606

# trips3.select('stop_id', 'next_sid').distinct().count()
# gives 8557

In [29]:
trips4.where(trips4.schedule_dep > trips4.next_sched_arr).count()

0

### For each day of the week, model the network
Get the edges of the network and the departure/arrival times for each trip (edge=trip)
We assume the schedule repeat every week, and we generate one schedule per weekday.
Days off have the same schedules as sundays.

In [None]:
# creating a model for each day of the week
# this code needs to be run only once
typical_monday = '2018-01-15 00:00:00'
typical_tuesday = '2018-01-16 00:00:00'
typical_wednesday = '2018-01-17 00:00:00'
typical_thursday = '2018-01-18 00:00:00'
typical_friday = '2018-01-19 00:00:00'
typical_saturday = '2018-01-20 00:00:00'
typical_sunday = '2018-01-21 00:00:00'
typical_week = [typical_monday,typical_tuesday,typical_wednesday,typical_thursday,typical_friday,typical_saturday,typical_sunday]

In [None]:
regenerate_models = False
models = []
days_names = ['monday','tuesday','wednesday','thursday','friday','saturday','sunday']

# generate one network for each weekday and store them in pickles
if regenerate_models:
    for (date, day_name) in zip(typical_week, days_names):
        network = helpers.model_network(trips4, date)
        with open('./data/'+day_name+'.pickle', 'wb') as handle:
            pickle.dump(network, handle, protocol=pickle.HIGHEST_PROTOCOL)
        print(str(day_name) + ' done')

In [None]:
# load the networks from the pickles
for day in days_names:
    with open('./data/'+ day +'.pickle', 'rb') as handle:
        network = pickle.load(handle)
    models.append(network)

In [None]:
models[0][list(models[0].keys())[0]]

## Prediction / Regression

Deux idées : Le but c'est de prévoir le retard ou l'avance d'un départ ou d'une arrivée pour un arrêt précis à un moment précis et un trip précis.

- Faire une vraie regression ou n'importe quel algo pour déterminer le retard. On utilise comme features : l'arrêt (catégorie), latitude longitude (peut être mettre ensemble en tuple), jour precis ou jour de la semaine ?, le type de transport (bus-train ...), le trip id ou redondant ???, 

- Faire une moyenne, à un arrêt, un trip, un jour, une heure la moyenne de retard qu'il a eu dans ses conditions ... 

On va avoir besoin des trips id dans le dico histoire de pouvoir lier le graph des arrêts avec le réél trajet, quel bus/train fait cet edge.

### Preprocessing :

In [29]:
trips5 = trips4.drop('stop_type', 'next_type', 'is_valid', 'arr_forcast_status', 
                     'schedule_time', 'schedule_arrival', 'next_sched_dep', 
                     'next_time', 'next_tid', 'real_arrival', 'arr_forecast_status')

In [31]:
trips5.limit(10).toPandas()

Unnamed: 0,trip_id,date,stop_id,schedule_dep,transport_type,train_type,weekday,next_sid,next_sched_arr,next_arr_forecast_status,next_real_arrival
0,85:11:13752:001,2018-01-28 00:00:00,8502221,2018-01-28 02:12:00,Zug,SN,6,8502222,2018-01-28 02:16:00,GESCHAETZT,2018-01-28 02:18:00
1,85:11:13752:001,2018-01-28 00:00:00,8502229,2018-01-28 02:07:00,Zug,SN,6,8502221,2018-01-28 02:12:00,GESCHAETZT,2018-01-28 02:13:00
2,85:11:13752:001,2018-01-28 00:00:00,8502220,2018-01-28 02:06:00,Zug,SN,6,8502229,2018-01-28 02:07:00,GESCHAETZT,2018-01-28 02:10:00
3,85:11:13752:001,2018-01-28 00:00:00,8503001,2018-01-28 02:01:00,Zug,SN,6,8502220,2018-01-28 02:06:00,GESCHAETZT,2018-01-28 02:08:00
4,85:11:13752:001,2018-01-28 00:00:00,8503020,2018-01-28 01:59:00,Zug,SN,6,8503001,2018-01-28 02:01:00,GESCHAETZT,2018-01-28 02:02:00
5,85:11:13752:001,2018-01-28 00:00:00,8503000,2018-01-28 01:57:00,Zug,SN,6,8503020,2018-01-28 01:59:00,GESCHAETZT,2018-01-28 01:59:00
6,85:11:13752:001,2018-01-28 00:00:00,8503003,2018-01-28 01:52:00,Zug,SN,6,8503000,2018-01-28 01:55:00,GESCHAETZT,2018-01-28 01:54:00
7,85:11:13752:001,2018-01-27 00:00:00,8502221,2018-01-27 02:12:00,Zug,SN,5,8502222,2018-01-27 02:16:00,GESCHAETZT,2018-01-27 02:18:00
8,85:11:13752:001,2018-01-27 00:00:00,8502229,2018-01-27 02:07:00,Zug,SN,5,8502221,2018-01-27 02:12:00,GESCHAETZT,2018-01-27 02:13:00
9,85:11:13752:001,2018-01-27 00:00:00,8502220,2018-01-27 02:06:00,Zug,SN,5,8502229,2018-01-27 02:07:00,GESCHAETZT,2018-01-27 02:10:00


In [30]:
@functions.udf
def keep_time(date):
    return date.split(' ')[1]

In [31]:
trips5 = trips5.withColumn('schedule_dep', keep_time(trips5.schedule_dep))
trips5 = trips5.withColumn('next_sched_arr', keep_time(trips5.next_sched_arr))
trips5 = trips5.withColumn('next_real_arrival', keep_time(trips5.next_real_arrival))

In [32]:
@functions.udf
def create_rush(date):
    # rush_hour = 6/9 17/19 = 0
    # not_rush_hour = 0/6 9/17 19/24  = 1
    if (date >= '06:00:00' and date <= '09:00:00') or (date >= '17:00:00' and date <= '19:00:00'):
        return 0
    else:
        return 1
    
@functions.udf
def create_interval(date):
    if (date >= '00:00:00' and date < '06:00:00'):
        return 0
    elif (date >= '06:00:00' and date < '09:00:00'):
        return 1
    elif (date >= '09:00:00' and date < '13:00:00'):
        return 2
    elif (date >= '13:00:00' and date < '16:00:00'):
        return 3
    elif (date >= '16:00:00' and date < '19:00:00'):
        return 4
    else:
        return 5
    
@functions.udf
def group_weekday(weekday):
    # Wednesday
    if weekday == 2:
        return 1
    # Saturday
    elif weekday == 5:
        return 2
    # Sunday
    elif weekday == 6:
        return 3
    # Weekday
    else:
        return 0

In [33]:
# We compute the arrival delay of each stop station
df3 = trips5.withColumn("delay_arrival", 
                     functions.unix_timestamp('next_real_arrival', 'HH:mm:ss') -
                     functions.unix_timestamp('next_sched_arr', 'HH:mm:ss'))

# We create time interval in order to differentiate rush hour and other time of the day
# We have to try different combinations of interval time (that gave best prediction)
df3 = df3.withColumn('arrival_interval', create_rush(df3.next_sched_arr))
df3 = df3.withColumn('weekday', group_weekday(df3.next_sched_arr))

In [34]:
# We create a dataframe where we only keep the rows where there is a real arrival time
arrival_df = df3.filter(df3.next_arr_forecast_status == "GESCHAETZT")
arrival_df = arrival_df.drop('next_arr_forecast_status')

# we have to check to distribution of the entier delay arrival in order to drop outlayers
#arrival_df = arrival_df.filter(arrival_df['delay_arrival'] <= 600)

In [85]:
arrival_df.limit(10).toPandas()

Unnamed: 0,trip_id,date,stop_id,schedule_dep,transport_type,train_type,weekday,next_sid,next_sched_arr,next_real_arrival,delay_arrival,arrival_interval
0,85:11:13752:001,2018-01-28 00:00:00,8502221,02:12:00,Zug,SN,0,8502222,02:16:00,02:18:00,120,1
1,85:11:13752:001,2018-01-28 00:00:00,8502229,02:07:00,Zug,SN,0,8502221,02:12:00,02:13:00,60,1
2,85:11:13752:001,2018-01-28 00:00:00,8502220,02:06:00,Zug,SN,0,8502229,02:07:00,02:10:00,180,1
3,85:11:13752:001,2018-01-28 00:00:00,8503001,02:01:00,Zug,SN,0,8502220,02:06:00,02:08:00,120,1
4,85:11:13752:001,2018-01-28 00:00:00,8503020,01:59:00,Zug,SN,0,8503001,02:01:00,02:02:00,60,1
5,85:11:13752:001,2018-01-28 00:00:00,8503000,01:57:00,Zug,SN,0,8503020,01:59:00,01:59:00,0,1
6,85:11:13752:001,2018-01-28 00:00:00,8503003,01:52:00,Zug,SN,0,8503000,01:55:00,01:54:00,-60,1
7,85:11:13752:001,2018-01-27 00:00:00,8502221,02:12:00,Zug,SN,0,8502222,02:16:00,02:18:00,120,1
8,85:11:13752:001,2018-01-27 00:00:00,8502229,02:07:00,Zug,SN,0,8502221,02:12:00,02:13:00,60,1
9,85:11:13752:001,2018-01-27 00:00:00,8502220,02:06:00,Zug,SN,0,8502229,02:07:00,02:10:00,180,1


## Time Buckets

It was an attempt of finding time buckets directly according to the delay.
However Kmeans does not return meaningful clusters. That is why we decide to create our own interval based on main daily period. 

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer

train_type_Indexer = StringIndexer(inputCol="train_type", outputCol="train_type_cat")
train_type_model = train_type_Indexer.fit(arrival_df)
arrival_df = train_type_model.transform(arrival_df)

arrival_df = arrival_df.withColumn('weekday', arrival_df['weekday'].cast('int'))
arrival_df = arrival_df.withColumn('train_type_cat', arrival_df['train_type_cat'].cast('int'))

arrival_df_test = arrival_df.filter(arrival_df.weekday < 5)
arrival_df_test = arrival_df_test.withColumn('delay_arrival', arrival_df_test.delay_arrival/60)

vec_ass = VectorAssembler(inputCols=['delay_arrival', 'train_type_cat'], outputCol="features")
kmeans_df = vec_ass.transform(arrival_df_test)
kmean = KMeans(k=7)
model_km = kmean.fit(kmeans_df)
kmeans_df = model_km.transform(kmeans_df)

In [None]:
kmeans_df = kmeans_df.withColumn('timestamp', functions.unix_timestamp('next_sched_arr',  'HH:mm:ss'))

In [None]:
mean_ = kmeans_df.groupBy('prediction').mean('timestamp').collect()
print(mean_)

In [None]:
for i in mean_:
    print(i['prediction'])
    print(datetime.datetime.fromtimestamp(i['avg(timestamp)']).strftime("%H:%M:%S"))

## Interquartiles and uncertainty

In [35]:
#arrival_df.groupby(['trip_id']).approxQuantile('delay_arrival', [0.25,0.50,0.75], 0.1).limit(50).toPandas()
from pyspark.sql import Window

trip_window = Window.partitionBy(['trip_id', 'arrival_interval', 'weekday']).orderBy('delay_arrival')
trip_quant = functions.ntile(4).over(trip_window).alias('quantiles')

In [36]:
trip_quartile = (arrival_df.select('trip_id', 'schedule_dep', 'weekday', 
                                   'next_sched_arr', 'delay_arrival','arrival_interval', trip_quant)
                 .sort('trip_id', 'arrival_interval', 'weekday', 'quantiles'))

In [37]:
trip_quartile = trip_quartile.where(functions.col('quantiles') % 2 != 0)

In [38]:
trip_quartile = trip_quartile.groupby(['trip_id', 'arrival_interval', 'weekday', 'quantiles']).max('delay_arrival')

In [39]:
trip_quartile.limit(50).toPandas()

Unnamed: 0,trip_id,arrival_interval,weekday,quantiles,max(delay_arrival)
0,85:11:13710:001,1,0,1,0
1,85:11:13710:001,1,0,3,120
2,85:11:13711:001,1,0,1,0
3,85:11:13711:001,1,0,3,60
4,85:11:13712:001,1,0,1,-60
5,85:11:13712:001,1,0,3,60
6,85:11:13713:001,1,0,1,60
7,85:11:13713:001,1,0,3,240
8,85:11:13714:001,1,0,1,60
9,85:11:13714:001,1,0,3,420


In [40]:
trip_quartile = trip_quartile.groupby(['trip_id', 'arrival_interval', 'weekday']).agg(functions.collect_list(functions.col('max(delay_arrival)')))

In [108]:
# check 85:78:24565:001
trip_quartile.limit(50).toPandas()

Unnamed: 0,trip_id,arrival_interval,weekday,collect_list(max(delay_arrival))
0,85:11:18330:001,1,0,"[0, 60]"
1,85:11:2684:001,1,0,"[-60, 0]"
2,85:11:18231:002,0,0,"[0, 60]"
3,85:11:19536:001,1,0,"[0, 0]"
4,85:11:18283:001,1,0,"[0, 60]"
5,85:11:20495:001,1,0,"[-60, 0]"
6,85:11:829:001,1,0,"[0, 120]"
7,85:11:89278:001,1,0,"[-60, 0]"
8,85:78:24854:001,0,0,"[0, 60]"
9,85:11:18362:001,0,0,"[0, 60]"


In [41]:
trip_quartile = trip_quartile.withColumn('interquartile_range', 
                                         functions.col('collect_list(max(delay_arrival))')[1] - 
                                         functions.col('collect_list(max(delay_arrival))')[0])

In [42]:
test_pd = trip_quartile.toPandas()

In [45]:
test_pd

Unnamed: 0,trip_id,arrival_interval,weekday,collect_list(max(delay_arrival)),interquartile_range
0,85:11:18231:002,0,0,"[0, 60]",60.0
1,85:11:18330:001,1,0,"[0, 60]",60.0
2,85:11:19536:001,1,0,"[0, 0]",0.0
3,85:11:2684:001,1,0,"[-60, 0]",60.0
4,85:11:19422:001,0,0,"[0, 60]",60.0
5,85:11:30901:001,1,0,"[-60, 0]",60.0
6,85:78:24854:001,0,0,"[0, 60]",60.0
7,85:78:24924:001,1,0,"[0, 60]",60.0
8,85:11:18283:001,1,0,"[0, 60]",60.0
9,85:11:18362:001,0,0,"[0, 60]",60.0


# interquartile mean + interquarile

In [240]:
delay_distribution = arrival_df.groupby(['trip_id', 'arrival_interval', 'weekday']).agg(functions.collect_list(functions.col('delay_arrival'))).alias('distri')

In [241]:
@functions.udf
def delete_neg(distri):
    return [0 if d < 0 else d for d in distri]

In [242]:
delay_distribution = delay_distribution.withColumn('distri', delete_neg(functions.col('collect_list(delay_arrival)')))

In [243]:
@functions.udf
def iqm(distri):
    s = len(distri)
    lp = s/4 + 1
    lm = 3*s/4
    f = 0
    for i,x in enumerate(sorted(distri)):
        if lp <= i <= lm:
            f += x
    return 2/s * f

@functions.udf
def interquartile(distri):
    q1, q3 = np.percentile(distri, [25, 75])
    return q3 - q1

In [244]:
delay_distribution = delay_distribution.withColumn('IQM', iqm(delay_distribution.distri))

In [236]:
delay_distri_pd = delay_distribution.toPandas()

In [237]:
delay_distri_pd

Unnamed: 0,trip_id,arrival_interval,weekday,collect_list(delay_arrival),distri,IQM
0,85:11:18330:001,1,0,"[120, 60, 0, 0, 60, 0, 0, 0, 0, 0, -60, 0, 60,...","[120, 60, 0, 0, 60, 0, 0, 0, 0, 0, 0, 0, 60, 0...",17.419354838709676
1,85:11:19536:001,1,0,"[0, 0, -60, 0, 0, 0, 0, -60, 0, 0, 0, 60, -60,...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 0, 0, 60...",0.0
2,85:11:18231:002,0,0,"[-60, -60, 0, 60, 0, 0, 0, 60, 60, 0, -60, -60...","[0, 0, 0, 60, 0, 0, 0, 60, 60, 0, 0, 0, 0, 60,...",12.0
3,85:11:2684:001,1,0,"[-120, -60, -60, 0, 0, 0, -60, -60, 60, 60, 60...","[0, 0, 0, 0, 0, 0, 0, 0, 60, 60, 60, 0, 0, 180...",0.0
4,85:11:18762:001,1,0,"[180, 180, 180, 180, 60, 60, 60, 120, 0, 0, 60...","[180, 180, 180, 180, 60, 60, 60, 120, 0, 0, 60...",33.87096774193548
5,85:11:30692:005,1,0,"[0, 0, 0, 0, 0, 60, 60, 60, 0, 0, 0, 0, 0, 0, ...","[0, 0, 0, 0, 0, 60, 60, 60, 0, 0, 0, 0, 0, 0, ...",62.278481012658226
6,85:11:30501:001,1,0,"[120, 240, 240, 180, 360, 360, 300]","[120, 240, 240, 180, 360, 360, 300]",257.1428571428571
7,85:11:20480:002,1,0,"[60, 60, 180, 120, 60, 120, 60, 180, 120, 120,...","[60, 60, 180, 120, 60, 120, 60, 180, 120, 120,...",32.516129032258064
8,85:11:18362:001,0,0,"[180, 120, 0, 0, 60, 0, 0, 0, 120, 60, 0, 0, 6...","[180, 120, 0, 0, 60, 0, 0, 0, 120, 60, 0, 0, 6...",35.806451612903224
9,85:78:24924:001,1,0,"[0, 60, 60, 0, 0, 60, 0, -60, -60, 0, 0, 0, -6...","[0, 60, 60, 0, 0, 60, 0, 0, 0, 0, 0, 0, 0, 60,...",6.8789808917197455


In [245]:
delay_distribution_df = delay_distribution.withColumn('interquartile', interquartile(delay_distribution.distri))

In [246]:
delay_distribution_df.limit(10).toPandas()

Py4JJavaError: An error occurred while calling o3171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 190.0 failed 4 times, most recent failure: Lost task 0.3 in stage 190.0 (TID 16508, iccluster079.iccluster.epfl.ch, executor 6): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:156)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:155)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2808)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2805)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:156)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$6.apply(BatchEvalPythonExec.scala:155)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [127]:
delay_distribution = delay_distribution.withColumn('worst_case', delay_distribution.IQM + delay_distribution.interquartile)

In [None]:
delay_distribution.limit(50).toPandas()

## After shortest path, predict % of uncertainty

In [46]:
import datetime
test = [(8576218,
  8576217,
  datetime.datetime(2018, 1, 15, 14, 3),
  datetime.datetime(2018, 1, 15, 14, 4),
  '85:849:104861-01912-1',
  '916'),
 (8576217,
  8576216,
  datetime.datetime(2018, 1, 15, 14, 4),
  datetime.datetime(2018, 1, 15, 14, 5),
  '85:849:104861-01912-1',
  '916'),
 (8576216,
  8576209,
  datetime.datetime(2018, 1, 15, 14, 5),
  datetime.datetime(2018, 1, 15, 14, 6),
  '85:849:104861-01912-1',
  '916'),
 (8576209,
  8576208,
  datetime.datetime(2018, 1, 15, 14, 6),
  datetime.datetime(2018, 1, 15, 14, 7),
  '85:849:104861-01912-1',
  '916'),
 (8576208,
  8576207,
  datetime.datetime(2018, 1, 15, 14, 7),
  datetime.datetime(2018, 1, 15, 14, 8),
  '85:849:104861-01912-1',
  '916'),
 (8576207,
  8576206,
  datetime.datetime(2018, 1, 15, 14, 9),
  datetime.datetime(2018, 1, 15, 14, 9),
  '85:849:104861-01912-1',
  '916'),
 (8576206,
  8576205,
  datetime.datetime(2018, 1, 15, 14, 10),
  datetime.datetime(2018, 1, 15, 14, 10),
  '85:849:104861-01912-1',
  '916'),
 (8576205,
  8576204,
  datetime.datetime(2018, 1, 15, 14, 10),
  datetime.datetime(2018, 1, 15, 14, 11),
  '85:849:104861-01912-1',
  '916'),
 (8576204,
  8576203,
  datetime.datetime(2018, 1, 15, 14, 11),
  datetime.datetime(2018, 1, 15, 14, 13),
  '85:849:104861-01912-1',
  '916'),
 (8576203,
  8576202,
  datetime.datetime(2018, 1, 15, 14, 13),
  datetime.datetime(2018, 1, 15, 14, 14),
  '85:849:104861-01912-1',
  '916'),
 (8576202,
  8576201,
  datetime.datetime(2018, 1, 15, 14, 14),
  datetime.datetime(2018, 1, 15, 14, 15),
  '85:849:104861-01912-1',
  '916'),
 (8576201,
  8576182,
  datetime.datetime(2018, 1, 15, 14, 15),
  datetime.datetime(2018, 1, 15, 14, 16),
  '85:849:104861-01912-1',
  '916'),
 (8576182,
  8576200,
  datetime.datetime(2018, 1, 15, 14, 17),
  datetime.datetime(2018, 1, 15, 14, 18),
  '85:3849:80930-02004-1',
  '4'),
 (8576200,
  8576199,
  datetime.datetime(2018, 1, 15, 14, 18),
  datetime.datetime(2018, 1, 15, 14, 19),
  '85:3849:80930-02004-1',
  '4'),
 (8576199,
  8576198,
  datetime.datetime(2018, 1, 15, 14, 19),
  datetime.datetime(2018, 1, 15, 14, 20),
  '85:3849:80930-02004-1',
  '4'),
 (8576198,
  8576197,
  datetime.datetime(2018, 1, 15, 14, 20),
  datetime.datetime(2018, 1, 15, 14, 21),
  '85:3849:80930-02004-1',
  '4'),
 (8576197,
  8576196,
  datetime.datetime(2018, 1, 15, 14, 21),
  datetime.datetime(2018, 1, 15, 14, 22),
  '85:3849:80930-02004-1',
  '4'),
 (8576196,
  8576195,
  datetime.datetime(2018, 1, 15, 14, 23),
  datetime.datetime(2018, 1, 15, 14, 24),
  '85:3849:80930-02004-1',
  '4'),
 (8576195,
  8503003,
  datetime.datetime(2018, 1, 15, 14, 24),
  datetime.datetime(2018, 1, 15, 14, 25, 44, 784423),
  'walk',
  'walk'),
 (8503003,
  8503000,
  datetime.datetime(2018, 1, 15, 14, 27),
  datetime.datetime(2018, 1, 15, 14, 29),
  '85:11:18654:001',
  'S6'),
 (8503000,
  8503020,
  datetime.datetime(2018, 1, 15, 14, 29),
  datetime.datetime(2018, 1, 15, 14, 31),
  '85:11:18352:001',
  'S3'),
 (8503020,
  8503001,
  datetime.datetime(2018, 1, 15, 14, 31),
  datetime.datetime(2018, 1, 15, 14, 35),
  '85:11:18352:001',
  'S3'),
 (8503001,
  8591057,
  datetime.datetime(2018, 1, 15, 14, 35),
  datetime.datetime(2018, 1, 15, 14, 36, 9, 650259),
  'walk',
  'walk'),
 (8591057,
  8591434,
  datetime.datetime(2018, 1, 15, 14, 39),
  datetime.datetime(2018, 1, 15, 14, 41),
  '85:849:302080-32301-1',
  '304'),
 (8591434,
  8591402,
  datetime.datetime(2018, 1, 15, 14, 41),
  datetime.datetime(2018, 1, 15, 14, 41),
  '85:849:91676-18089-1',
  '89'),
 (8591402,
  8591197,
  datetime.datetime(2018, 1, 15, 14, 41),
  datetime.datetime(2018, 1, 15, 14, 42),
  '85:849:302080-32301-1',
  '304'),
 (8591197,
  8591436,
  datetime.datetime(2018, 1, 15, 14, 42),
  datetime.datetime(2018, 1, 15, 14, 43),
  '85:849:302080-32301-1',
  '304'),
 (8591436,
  8591136,
  datetime.datetime(2018, 1, 15, 14, 43),
  datetime.datetime(2018, 1, 15, 14, 45),
  '85:849:302080-32301-1',
  '304'),
 (8591136,
  8590725,
  datetime.datetime(2018, 1, 15, 14, 46),
  datetime.datetime(2018, 1, 15, 14, 47),
  '85:849:302080-32301-1',
  '304'),
 (8590725,
  8590726,
  datetime.datetime(2018, 1, 15, 14, 47),
  datetime.datetime(2018, 1, 15, 14, 48),
  '85:849:302080-32301-1',
  '304'),
 (8590726,
  8590728,
  datetime.datetime(2018, 1, 15, 14, 48),
  datetime.datetime(2018, 1, 15, 14, 49),
  '85:849:302080-32301-1',
  '304'),
 (8590728,
  8590727,
  datetime.datetime(2018, 1, 15, 14, 49),
  datetime.datetime(2018, 1, 15, 14, 50),
  '85:849:302080-32301-1',
  '304')]

In [104]:
def search_inter(trip_id, weekday, time_interval):
    interquartile = test_pd[(test_pd.trip_id == trip_id) & 
                            (test_pd.arrival_interval == time_interval) & 
                            (test_pd.weekday == weekday)].interquartile_range
    print(interquartile)
    return float(interquartile) if interquartile.size != 0 else float(0)

def rush_inter(time):
    if (time >= '06:00:00' and time <= '09:00:00') or (time >= '17:00:00' and time <= '19:00:00'):
        return 0
    else:
        return 1    

def routing_algo(path):
    prev_edge = path[0]
    certainty = {}
    for i, edge in enumerate(path):
        if (edge[4] != prev_edge[4]) and (edge[4] != 'walk') and (prev_edge[4] != 'walk'):
            time_for_change = edge[2] - prev_edge[3]
            time_inter = rush_inter(prev_edge[3].time().isoformat())
            inter = search_inter(prev_edge[4], prev_edge[3].weekday(), time_inter)
            if inter:
                certainty[i-1] = min(time_for_change / inter, 1)
            else:
                certainty[i-1] = 1
        prev_edge = edge
    return certainty

In [105]:
routing_algo(test)

Series([], Name: interquartile_range, dtype: float64)
Series([], Name: interquartile_range, dtype: float64)
Series([], Name: interquartile_range, dtype: float64)
Series([], Name: interquartile_range, dtype: float64)


{11: 1, 19: 1, 23: 1, 24: 1}

## PREDICTION / pandas and pyspark

In [None]:
from sklearn import preprocessing
from sklearn.linear_model import LinearRegression
import pandas as pd

In [None]:
le_train_type = preprocessing.LabelEncoder()
le_train_type.fit(arrival_pd_df['train_type'])
# see the class
#list(le_train_type.classes_)

In [None]:
arrival_pd_df['train_type_cat'] = le_train_type.transform(arrival_pd_df['train_type'])
# we can do the inverse
# arrival_pd_df['train_type'] = le_train_type.inverse_transform(arrival_pd_df['train_type_cat'])

In [None]:
arrival_pd_df

In [None]:
arrival_pd_df.weekday = arrival_pd_df.weekday.astype(int)
arrival_pd_df.arrival_interval = arrival_pd_df.arrival_interval.astype(int)
arrival_pd_df.arrival_interval = arrival_pd_df.arrival_interval.astype(int)

arrival_pd_df.dtypes

In [None]:
week_df = arrival_pd_df[arrival_pd_df.weekday == 0]
week_df.delay_arrival = week_df.delay_arrival/60
week_df

In [None]:
lin_regr = LinearRegression()
lin_regr.fit(week_df[['stop_id', 'next_sid', 'arrival_interval', 'train_type_cat']], week_df.delay_arrival)
pred = lin_regr.predict(week_df[['stop_id', 'next_sid', 'arrival_interval', 'train_type_cat']])
lin_regr.score(week_df[['stop_id', 'next_sid', 'arrival_interval', 'train_type_cat']],  week_df.delay_arrival)

In [None]:
week_df['pred'] = pred
week_df

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

trip_Indexer = StringIndexer(inputCol="trip_id", outputCol="trip_id_index")
trip_model = trip_Indexer.fit(arrival_df)

arrival_df = trip_model.transform(arrival_df)

In [None]:
arrival_df = arrival_df.drop('trip_id', 'transport_type')

In [None]:
arrival_df = arrival_df.withColumn('weekday', arrival_df['weekday'].cast('int'))
arrival_df = arrival_df.withColumn('arrival_interval', arrival_df['arrival_interval'].cast('int'))

In [None]:
arrival_df = arrival_df.withColumn('delay_arrival', arrival_df['delay_arrival'] / 60)

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler


vec_ass = VectorAssembler(inputCols=['stop_id','weekday','arrival_interval','trip_id_index','transport_type_index'], outputCol="features")

#arrival_df = vec_ass.transform(arrival_df)
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", 
                      labelCol='delay_arrival')
model = lr.fit(arrival_df)

test = model.transform(arrival_df)

In [None]:
test = test.withColumn('evaluation', test['delay_arrival'] - test['prediction'])

In [None]:
test.describe('evaluation').show()

In [None]:
test.describe('delay_arrival').show()

In [None]:
test.describe('prediction').show()

In [None]:
test_df = arrival_df.limit(50).toPandas()
test_df