In [1]:
%matplotlib inline
import matplotlib.pylab as plt
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

In [2]:
import getpass
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('final_proj-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '4g')
conf.set('spark.executor.instances', '10')
conf.set('spark.port.maxRetries', '100')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [3]:
spark = SparkSession(sc)

In [4]:
df = spark.read.csv('/datasets/project/istdaten/*/*/*', sep=';', header=True)

First, we rename the columns in English language:

In [5]:
columns = 'TripDate string, TripId string, OperatorId string, OperatorAbbrv string, OperatorName string, ProductId string, LineId string, LineType string, UmlaufId string, TransportType string, AdditionalTrip boolean, FailedTrip boolean, BPUIC string, StopName string, ArrivalTimeScheduled string, ArrivalTimeActual string, ArrivalTimeActualStatus string,     DepartureTimeScheduled string, DepartureTimeActual string, DepartureTimeActualStatus string, SkipStation boolean'
columns = list(map(lambda x: x.split()[0],columns.split(',')))

for old, new in zip(df.columns, columns):
    df = df.withColumnRenamed(old, new)

# Computing the quality of a transfer

## Assumptions: 

   * everytime when making a transfer in a station, the traveler needs one minute for actually changing transport.
   * even though a train departs late all the time in a specific station, the trip planner will never use the fact that it does so, so we will only take into consideration the early departures and the correct ones. 

## Main idea: 

The idea behind computing the quality of a specific transfer given the *expected arrival hour* in the station and the *expected departure hour* from that same station, and some *extra information* regarding the trip before the transfer and the one after the transfer:

   * First, we compute the **discrete distribution of arrival delays $\mathcal{D}_a$** in that station, given the information of the trip before the transfer.
   * Then, we compute the **discrete distribution of negative departure delays $\mathcal{D}_d$** in that station, given the information of the trip after the transfer.
   * Next, we compute the probability of successfully realizing the transfer, by computing a convolution between the two given distributions. Therefore, assuming that the time of transfer is $k$ minutes, then we would simply compute:
      
      $\sum\limits_{t_a }\Pr[\mathcal{D}_a = t_a] \cdot \Pr[\mathcal{D}_d = k-1+t_a]$,
      
       where we have taken into consideration the minute needed by the traveler for changing the transport. 
       
---
       
Therefore, we first need to decide what are the features which will decide the distributions of the delays. For that, we will use a **Decision Tree Regressor**, selecting several features which might be important from the data, and the target label will be the delay for each datapoint, expressed in seconds. Then, we will train the regressor on both departures and arrivals data, and will look into which are the most important features in each case, for making a good prediction of the delay time. 

We have to emphasize that we considered this method, because of the way that Decision Trees decide which are the most important feature, i.e. the one which have the most variance of delays between the different values for the specific feature. 

After constructing the Decision Tree and deciding which are the most important features, we will construct the distributions of the delays from the **actual data**, by grouping the datapoints with the same value for the decisive features, and making the distribution of delays for each group.

We decided to use the actual data instead of modelling the distribution of delays using a fixed distribution family (e.g. Log-normal or Gamma distributions), because we consider that the actual data is more relevant, then considering just an estimator or to assume that it follows a distribution in a family of distributions.

## Constructing the Decision Tree Regressor

The first step in constructing the Decision Tree Regressor is to construct some potential important features from the given data, and also to compute the delays for each datapoint:

In [6]:
from pyspark.sql.functions import unix_timestamp, to_timestamp

DATE_FORMAT_SCHEDULED = 'dd.MM.yyyy HH:mm' 
DATE_FORMAT_ACTUAL = 'dd.MM.yyyy HH:mm:ss' # both formats are used

df_processed = df.withColumn('ArrivalTimeScheduledDate', to_timestamp(df.ArrivalTimeScheduled, DATE_FORMAT_SCHEDULED))
df_processed = df_processed.withColumn('DepartureTimeScheduledDate', to_timestamp(df_processed.DepartureTimeScheduled, DATE_FORMAT_SCHEDULED))

df_processed = df_processed.withColumn('ArrivalTimeScheduled', unix_timestamp(df_processed.ArrivalTimeScheduled, DATE_FORMAT_SCHEDULED))
df_processed = df_processed.withColumn('ArrivalTimeActual', unix_timestamp(df_processed.ArrivalTimeActual, DATE_FORMAT_ACTUAL))
df_processed = df_processed.withColumn('DepartureTimeScheduled', unix_timestamp(df_processed.DepartureTimeScheduled, DATE_FORMAT_SCHEDULED))
df_processed = df_processed.withColumn('DepartureTimeActual', unix_timestamp(df_processed.DepartureTimeActual, DATE_FORMAT_ACTUAL))

Let's look into how the data looks so far:

In [7]:
df_processed.head()

Row(TripDate='13.09.2017', TripId='80:06____:17010:000', OperatorId='80:06____', OperatorAbbrv='DB', OperatorName='DB Regio AG', ProductId='Zug', LineId='17010', LineType='RE', UmlaufId=None, TransportType='RE', AdditionalTrip='false', FailedTrip='false', BPUIC='8500090', StopName='Basel Bad Bf', ArrivalTimeScheduled=None, ArrivalTimeActual=None, ArrivalTimeActualStatus='PROGNOSE', DepartureTimeScheduled=1505274300, DepartureTimeActual=1505274300, DepartureTimeActualStatus='PROGNOSE', SkipStation='false', ArrivalTimeScheduledDate=None, DepartureTimeScheduledDate=datetime.datetime(2017, 9, 13, 5, 45))

Next, we also add the hour of departure and of the arrival to the dataset:

In [8]:
from pyspark.sql.types import FloatType, StringType
from pyspark.sql.functions import hour, to_date, date_format, month

df_to_classify = df_processed.select(
    df_processed.LineId.alias('line_id'), 
    df_processed.ProductId.alias('product_id'), 
    df_processed.StopName.alias('stop_name'),
    df_processed.AdditionalTrip.alias('additional_trip'), 
    hour(df_processed.ArrivalTimeScheduledDate).alias("arrival_hour").astype(StringType()),
    hour(df_processed.DepartureTimeScheduledDate).alias("departure_hour").astype(StringType()),
    date_format(to_date(df_processed.TripDate, 'dd.MM.yyyy'), 'u').alias("day_of_week"),
    ((df_processed.ArrivalTimeActual - df_processed.ArrivalTimeScheduled)).alias("delta_arrival").astype(FloatType()),
    ((df_processed.DepartureTimeActual - df_processed.DepartureTimeScheduled)).alias("delta_departure").astype(FloatType()))

df_to_classify.cache()

DataFrame[line_id: string, product_id: string, stop_name: string, additional_trip: string, arrival_hour: string, departure_hour: string, day_of_week: string, delta_arrival: float, delta_departure: float]

In [9]:
df_to_classify.head(5)

[Row(line_id='17010', product_id='Zug', stop_name='Basel Bad Bf', additional_trip='false', arrival_hour=None, departure_hour='5', day_of_week='3', delta_arrival=None, delta_departure=0.0),
 Row(line_id='17012', product_id='Zug', stop_name='Basel Bad Bf', additional_trip='false', arrival_hour=None, departure_hour='6', day_of_week='3', delta_arrival=None, delta_departure=0.0),
 Row(line_id='17013', product_id='Zug', stop_name='Basel Bad Bf', additional_trip='false', arrival_hour='6', departure_hour=None, day_of_week='3', delta_arrival=180.0, delta_departure=None),
 Row(line_id='17014', product_id='Zug', stop_name='Basel Bad Bf', additional_trip='false', arrival_hour=None, departure_hour='9', day_of_week='3', delta_arrival=None, delta_departure=0.0),
 Row(line_id='17015', product_id='Zug', stop_name='Basel Bad Bf', additional_trip='false', arrival_hour='8', departure_hour=None, day_of_week='3', delta_arrival=300.0, delta_departure=None),
 Row(line_id='17016', product_id='Zug', stop_name='

Next, for using the Decision Tree Regressor, and because each feature is in fact categorial, we must each one of them using a *StringIndexer*:

In [10]:
from pyspark.ml.feature import StringIndexer

def transform_dataset(dataset, departure):
    '''
    Function that transforms a dataset, adding for each categorial feature a column, which represents the output of the 
    StringIndexer applied to that column. 
    
    Parameters:
        - dataset: the dataset to be processed
        - departure: True if the dataset is for departures, False otherwise
    '''
    
    line_id_indexer = StringIndexer(inputCol="line_id", outputCol="line_id_cat", handleInvalid='keep') # keep nulls 
    product_id_indexer = StringIndexer(inputCol="product_id", outputCol="product_id_cat", handleInvalid='skip')
    stop_name_indexer = StringIndexer(inputCol="stop_name", outputCol="stop_name_cat", handleInvalid='skip')
    additional_trip_indexer = StringIndexer(inputCol="additional_trip", outputCol="additional_trip_cat", handleInvalid='skip')
    day_of_week_indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_cat", handleInvalid='skip')
    departure_hour_indexer = StringIndexer(inputCol="departure_hour", outputCol="departure_hour_cat", handleInvalid='skip')
    arrival_hour_indexer = StringIndexer(inputCol="arrival_hour", outputCol="arrival_hour_cat", handleInvalid='skip')

    indexers = [line_id_indexer, product_id_indexer, stop_name_indexer, additional_trip_indexer,day_of_week_indexer]
    
    if departure:
        indexers.append(departure_hour_indexer)
    else:
        indexers.append(arrival_hour_indexer)

    indexed = dataset

    for indexer in indexers:
        indexed = indexer.fit(indexed).transform(indexed) # add columns to dataset
        
    return indexed

Next, we use the *VectorAssembler* to construct the column for features, which will be used by the Decision Tree:

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorIndexer

def compute_features_column(dataset, is_departure):
    '''
    Function that computes the features column for the given dataset.
    
    Parameters:
        - dataset: the dataset to compute the features column for
        - is_departure: True is dataset is used for departures, False otherwise.
    '''
    input_cols = ['line_id_cat', 'product_id_cat', 'stop_name_cat', 'additional_trip_cat', 'day_of_week_cat']
    
    if is_departure:
        input_cols.append('departure_hour_cat') # departure dataset
    else:
        input_cols.append('arrival_hour_cat') # arrival dataset
        
    vector_assembler = VectorAssembler(inputCols = input_cols, outputCol = 'features')
    dataset = transform_dataset(dataset, is_departure) # add categorial features
    
    df_features = vector_assembler.transform(dataset) # add features column
    # Use VectorIndexer to make sure that the added features are recognized as categorical
    
    featureIndexer = \
        VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=100000000).fit(df_features)
    
    df_features = featureIndexer.transform(df_features) # transform features to categorical
    
    if is_departure:
        df_final = df_features.select(df_features.indexedFeatures, df_features.delta_departure.alias("delta"))
    else:
        df_final = df_features.select(df_features.indexedFeatures, df_features.delta_arrival.alias("delta"))
    
    return df_final

Finally, we construct our datasets to input to the Decision Tree:

In [15]:
# Construct departures dataset
df_departure_to_regress = df_to_classify.filter(
    df_to_classify.departure_hour.isNotNull() & # filter only departures
    df_to_classify.delta_departure.isNotNull())

df_departure = compute_features_column(df_departure_to_regress, is_departure=True)

# Construct arrivals dataset
df_arrival_to_regress = df_to_classify.filter(
    df_to_classify.arrival_hour.isNotNull() & # filter only arrivals
    df_to_classify.delta_arrival.isNotNull())

df_arrival = compute_features_column(df_arrival_to_regress, is_departure=False)

Let's check the generated dataframes:

In [16]:
df_departure.head(5)

[Row(indexedFeatures=DenseVector([14483.0, 2.0, 2333.0, 0.0, 2.0, 18.0]), delta=0.0),
 Row(indexedFeatures=DenseVector([10292.0, 2.0, 2333.0, 0.0, 2.0, 11.0]), delta=0.0),
 Row(indexedFeatures=DenseVector([10275.0, 2.0, 2333.0, 0.0, 2.0, 13.0]), delta=0.0),
 Row(indexedFeatures=DenseVector([13971.0, 2.0, 2333.0, 0.0, 2.0, 12.0]), delta=0.0),
 Row(indexedFeatures=DenseVector([14059.0, 2.0, 2333.0, 0.0, 2.0, 8.0]), delta=780.0)]

In [17]:
df_arrival.head(5)

[Row(indexedFeatures=DenseVector([10223.0, 2.0, 2006.0, 0.0, 2.0, 11.0]), delta=180.0),
 Row(indexedFeatures=DenseVector([9633.0, 2.0, 2006.0, 0.0, 2.0, 4.0]), delta=300.0),
 Row(indexedFeatures=DenseVector([12909.0, 2.0, 2006.0, 0.0, 2.0, 12.0]), delta=60.0),
 Row(indexedFeatures=DenseVector([12924.0, 2.0, 2006.0, 0.0, 2.0, 8.0]), delta=300.0),
 Row(indexedFeatures=DenseVector([10172.0, 2.0, 2006.0, 0.0, 2.0, 6.0]), delta=300.0)]

Next, we write the function for training the Decision Tree Regressor:

In [18]:
from pyspark.ml.regression import DecisionTreeRegressor

def train_regressor(dataset):
    dt = DecisionTreeRegressor(featuresCol ='indexedFeatures', labelCol = 'delta', maxBins=100000000, maxDepth=3)
    dt_model = dt.fit(dataset)
    
    return dt_model

Finally, we train the decision trees for both datasets and we extract the most important features:

In [20]:
# Get most important features for departures dataset
regressor_departures = train_regressor(df_departure)
print("Feature importances departures: {}".format(regressor_departures.featureImportances))

# Get most important features for arrivals dataset
regressor_arrivals = train_regressor(df_arrival)
print("Feature importances arrivals: {}".format(regressor_arrivals.featureImportances))

Feature importances departures: (6,[0,2,5],[0.284510290083,0.0974031881251,0.618086521792])
Feature importances arrivals: (6,[0,2,3,5],[0.334593468,0.0606024087098,0.0354578885646,0.569346234726])


So, we can see that the 3 most important features are, in both cases, the *hour*, the *line_id* and the *stop_name*. We can see that everything makes very much sense, because we have big differences of delays between normal hours and rush hours, for example, and also specific stops and routes have usually more delays than the others.

Therefore, we continue by constructing the probability distributions for each possible value of the three most important features.

## Computing the probability distributions 

First, we only consider the three most important features in the two initial datasets. We will consider the unity of time to be the minute from now on, instead of seconds: 

In [24]:
from pyspark.sql.types import IntegerType

df_best_feat_departures = df_departure_to_regress.select(
                df_departure_to_regress.departure_hour,
                df_departure_to_regress.stop_name,
                df_departure_to_regress.line_id,
                (df_departure_to_regress.delta_departure / 60).astype(IntegerType()).alias("delta_minutes"))

df_best_feat_departures = df_best_feat_departures.filter(df_best_feat_departures.delta_minutes <= 0) 
# only keep departures which left on time or earlier, we do not want to base our recommendation on assumption
# that a train or bus leaves with a delay.

df_best_feat_arrival = df_arrival_to_regress.select(
                df_arrival_to_regress.arrival_hour,
                df_arrival_to_regress.stop_name,
                df_arrival_to_regress.line_id,
                (df_arrival_to_regress.delta_arrival / 60).astype(IntegerType()).alias("delta_minutes"))

In [25]:
df_best_feat_departures.head(5)

[Row(departure_hour='5', stop_name='Basel Bad Bf', line_id='17010', delta_minutes=0),
 Row(departure_hour='6', stop_name='Basel Bad Bf', line_id='17012', delta_minutes=0),
 Row(departure_hour='9', stop_name='Basel Bad Bf', line_id='17014', delta_minutes=0),
 Row(departure_hour='10', stop_name='Basel Bad Bf', line_id='17016', delta_minutes=0),
 Row(departure_hour='14', stop_name='Basel Bad Bf', line_id='17024', delta_minutes=0)]

Finally, we want to make the distribution of delays for each possible value of the features, for both departures and arrivals:

In [126]:
from pyspark.sql.functions import collect_list, struct, count, lit

df_departures_grouped_count = df_best_feat_departures.groupby( 
                df_best_feat_departures.departure_hour,
                df_best_feat_departures.stop_name,
                df_best_feat_departures.line_id,
                df_best_feat_departures.delta_minutes).agg(count(lit(1)).alias("count_min")) # add a count for each possible value
        
df_departures_distribution = df_departures_grouped_count.\
                                    groupby('departure_hour', 'stop_name', 'line_id').\
                                    agg(collect_list(struct('delta_minutes', 'count_min')).alias('counts'))

# for each value of (departure_hour, stop_name, line_id), we have a list of the form [(delay_minutes, count)]

In [127]:
df_departures_grouped_count.head(3)

[Row(departure_hour='13', stop_name='Payerne', line_id='12950', delta_minutes=0, count_min=164),
 Row(departure_hour='15', stop_name='Lausanne', line_id='12955', delta_minutes=0, count_min=141),
 Row(departure_hour='7', stop_name='Yverdon-les-Bains', line_id='14518', delta_minutes=0, count_min=137)]

In [128]:
df_departures_distribution.show(2)

+--------------+-----------+-------+--------+
|departure_hour|  stop_name|line_id|  counts|
+--------------+-----------+-------+--------+
|             0|        Bex|   3591|[[0,50]]|
|             0|Biel/Bienne|   7845|[[0,65]]|
+--------------+-----------+-------+--------+
only showing top 2 rows



In [130]:
def compute_key_for_feature_values(hour, line_id, stop_name):
    return '{}#{}#{}'.format(hour, line_id, stop_name)

In [131]:
collected = df_departures_distribution.collect()

distribution_departures = {
    compute_key_for_feature_values(x.departure_hour, x.line_id, x.stop_name) : 
    list(sorted(x.counts, key=lambda y: y[0])) for x in collected}

We do the same now for the arrivals: 

In [132]:
df_arrivals_grouped_count = df_best_feat_arrival.groupby( 
                df_best_feat_arrival.arrival_hour,
                df_best_feat_arrival.stop_name,
                df_best_feat_arrival.line_id,
                df_best_feat_arrival.delta_minutes).agg(count(lit(1)).alias("count_min")) # add a count for each possible value
        
df_arrivals_distribution = df_arrivals_grouped_count.\
                                    groupby('arrival_hour', 'stop_name', 'line_id').\
                                    agg(collect_list(struct('delta_minutes', 'count_min')).alias('counts'))
        
collected = df_arrivals_distribution.collect()

distribution_arrivals = {
    compute_key_for_feature_values(x.arrival_hour, x.line_id, x.stop_name) : 
    list(sorted(x.counts, key=lambda y: y[0])) for x in collected}

We also want to include a default distribution, for the case we have new data, which was not encountered anymore. We will compute it as the distribution of all the data:

In [134]:
df_default_distrib_departures = df_best_feat_departures.groupby('delta_minutes').agg(count(lit(1)).alias("count_min")) # add a count for each possible value
collected_default = df_default_distrib_departures.collect()
default_departures = list(sorted(collected_default, key=lambda x: x[0]))

df_default_distrib_arrivals = df_best_feat_arrival.groupby('delta_minutes').agg(count(lit(1)).alias("count_min")) # add a count for each possible value
collected_default = df_default_distrib_arrivals.collect()
default_arrivals = list(sorted(collected_default, key=lambda x: x[0]))

Next, we add the default values to the dictionary of distributions:

In [135]:
distribution_departures['default'] = default_departures
distribution_arrivals['default'] = default_arrivals

Finally, we transform the counts to probabilities, to be able to compute the final quality faster:

In [147]:
def transform_to_proba(counts_list):
    total_sum = 0
    final_proba = []
    
    for row in counts_list:
        total_sum += row.count_min
        
    for row in counts_list:
        final_proba.append((row.delta_minutes, row.count_min / total_sum))
        
    return final_proba

In [148]:
distribution_departures = {k : transform_to_proba(v) for k, v in distribution_departures.items()}
distribution_arrivals = {k : transform_to_proba(v) for k, v in distribution_arrivals.items()}

We finally write the computed dictionaries to file, to be able to load them later:

In [2]:
import pickle
import os

FILE_DISTRIBUTION_DEPARTURES = 'distrib_departures.pic'
FILE_DISTRIBUTION_ARRIVALS = 'distrib_arrivals.pic'

In [None]:
pickle.dump(distribution_departures, open(FILE_DISTRIBUTION_DEPARTURES, 'wb'))
pickle.dump(distribution_arrivals, open(FILE_DISTRIBUTION_ARRIVALS, 'wb'))

## The exposed API for computing distributions

Finally, the last part is to write a function which receives the features of a specific transfer, and it returns the quality of the transfer, by performing the convolution of the corresponding distributions, using the formula:

$\sum\limits_{t_a }\Pr[\mathcal{D}_a = t_a] \cdot \Pr[\mathcal{D}_d = k-1+t_a]$,
      
where we have taken into consideration the minute needed by the traveler for changing the transport. 
       
Here, we considered $\mathcal{D}_a$ to be the distribution of arrivals and $\mathcal{D}_d$ the distribution of departures.


In [9]:
from datetime import datetime
import time
DATE_FORMAT = '%b %d %Y %H:%M:%S'

class TransferQualityComputer:
    def __init__(self):
        self.distribution_departures = pickle.load(open(FILE_DISTRIBUTION_DEPARTURES, 'rb'))
        self.distribution_arrivals = pickle.load(open(FILE_DISTRIBUTION_ARRIVALS, 'rb'))
        
    def compute_key_for_feature_values(self, hour, line_id, stop_name): # same as before
        return '{}#{}#{}'.format(hour, line_id, stop_name)
    
    def compute_quality(self, arrival_timestamp, departure_timestamp, stop_name, line_id):
        # timestamp in the format: Dec 31 2017 20:40:49,01

        arrival_time = datetime.strptime(arrival_timestamp[:-3], DATE_FORMAT)
        departure_time = datetime.strptime(departure_timestamp[:-3], DATE_FORMAT)

        arrival_hour = arrival_time.hour
        departure_hour = departure_time.hour
        delta_minutes = int((time.mktime(departure_time.timetuple()) - time.mktime(arrival_time.timetuple())) / 60)

        if delta_minutes < 0:
            return 0 # impossible to complete the transfer

        departure_key = self.compute_key_for_feature_values(departure_hour, line_id, stop_name)
        if departure_key in self.distribution_departures:
            departure_dist = self.distribution_departures[departure_key]
        else: 
            departure_dist = self.distribution_departures['default'] # default distribution

        arrival_key = self.compute_key_for_feature_values(arrival_hour, line_id, stop_name)
        if arrival_key in self.distribution_arrivals:
            arrival_dist = self.distribution_arrivals[arrival_key]
        else: 
            arrival_dist = self.distribution_arrivals['default'] # default distribution

        total_proba = 0

        for dep_delay, dep_proba in departure_dist:
            for arr_delay, arr_proba in arrival_dist:

                delta_minutes = ((departure_time - arrival_time).seconds // 60) % 60
                if delta_minutes >= dep_delay + arr_delay + 1:
                    total_proba += (dep_proba * arr_proba)
                else:
                    break

        return total_proba

Testing the code:

In [10]:
computer = TransferQualityComputer()

print(computer.compute_quality('Dec 31 2017 00:40:49,01', 'Dec 31 2017 00:41:58,01', 'Dietikon, Birmensdorferstrasse', '85:849:303'))

0.8011350894443582
