# Compute statistical informations: mean, std and percentiles : 

This notebook computes the mean, std and needed percentiles of waiting time for the arrival time of each transport type, hour and station. It uses the historical data of the SBB. It will write the created data as an ORC file in the home of the user running the notebook.

Note, this notebook takes approximately 30 minutes to run.  

#### Set up spark:

In [None]:
%%configure
{"conf": {
    "spark.app.name": "dslab-group_final"
}}

#### Imports:

In [None]:
import networkx as nx
from geopy.distance import distance as geo_distance
from pyspark.sql import Row
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType
from networkx.algorithms.shortest_paths.weighted import dijkstra_path
from scipy.stats import norm

In [None]:
# Load stops in Zürich area
stops_zurich = spark.read.format('orc').load("/user/gottraux/nodes.orc")\
                                        .select('stop_id').distinct().rdd.flatMap(lambda x:x).collect()

In [None]:
# Taken from find_train_type_correspondace.ipynb, to transform `verkehrsmittel_text` in the same format
replace_actual = {
    'BUS': 'Bus', # Buses
    'B': 'Bus',
    'NFB': 'Bus',
    'KB': 'Bus',
    'BAT': 'Bus',
    'Trm': 'Tram', # Trams
    'T': 'Tram',
    'TRAM': 'Tram',
    'ATZ': 'ARZ', #AutoZug
    'D': 'RE', # Regional
    'RB': 'R',
    'M': 'Metro', # Metro
    'ICE': 'IC', # InterCityExpress, but routes.txt doesn't have that category
    'IRE': 'IR', # InterRegioExpress, but routes.txt doesn't have that category
    'BN': '', # Night
    'TN': '',
    'SN': '',
    'BT': '',
    'VAE': '', # Panorama trains in the Alps
    'PE': '',
    'TER': '', # France
    'TE2': '',
    'RJX': '', # International
    'null': '', # Other
    '': ''
}

In [None]:
@udf("string")
def replace_verkehrsmittel_text(text):
    if text in replace_actual.keys():
        return replace_actual[text]
    else:
        return text

In [None]:
actual = spark.read.format('orc').load('/data/sbb/orc/istdaten/')\
                                 .where((col("ankunftszeit") != "") & (col("an_prognose") != ""))\
                                 .select(col('bpuic').alias('stop_id'), # Transform bpuic
                                         replace_verkehrsmittel_text(col('verkehrsmittel_text')).alias('verkehrsmittel_text'), # Translate `verkehrsmittel_text`
                                         from_unixtime(unix_timestamp('ankunftszeit', 'dd.MM.yyy HH:mm')).alias('ankunftszeit'),
                                         from_unixtime(unix_timestamp('an_prognose', 'dd.MM.yyy HH:mm:ss')).alias('an_prognose'),
                                         col('an_prognose_status'))\
                                 .where(col('stop_id').isin(stops_zurich))\
                                 .where(col("verkehrsmittel_text") != "")

In [None]:
actual.show()

In [None]:
actual = (actual.withColumn('hour', hour(col('ankunftszeit')))
               .withColumn('diff', unix_timestamp('an_prognose') - unix_timestamp('ankunftszeit')) # Compute delay
               .select(col('stop_id'), col('verkehrsmittel_text'), col('an_prognose_status'), col('hour'), col('diff')))

# Take only trains at reasonable hours
actual = actual.where((col('hour') >= 8) & (col('hour') <= 20))

# Count how many 'REAL' an_prognose status there are, if there are >= 10 we say that there are enough of them
enough_real_values = actual.where(col('an_prognose_status') == "REAL")\
                        .groupBy('stop_id', 'verkehrsmittel_text', 'hour')\
                        .agg(count('diff').alias('count'))\
                        .withColumn("enough_values", col("count") >= 10)\
                        .select(col('stop_id').alias('stop_id2'), 
                                col('verkehrsmittel_text').alias('verkehrsmittel_text2'), 
                                col('hour').alias('hour2'), col('enough_values'))

# Left join with enough_values
actual = actual.join(enough_real_values, (actual.stop_id == enough_real_values.stop_id2) &\
                                        (actual.verkehrsmittel_text == enough_real_values.verkehrsmittel_text2) &\
                                        (actual.hour == enough_real_values.hour2), "left")\
                    .select('stop_id', 'verkehrsmittel_text', 'an_prognose_status', 'hour', 'diff', 'enough_values')

# Set `enough_values` to False for trains that didn't have any 'REAL' `an_prognose_status`
actual = actual.na.fill(False)

# Take only REAL an_prognose_status if there are enough values
# If there aren't enough we use all the data available
actual = actual.where((~(col("enough_values"))) | (col("an_prognose_status") == "REAL"))

# Compute mean and std
actual = actual.groupBy('stop_id', 'verkehrsmittel_text', 'hour')\
                .agg(mean("diff").alias("mean"), stddev("diff").alias("std"), count("diff").alias('number_of_records'))

In [None]:
@udf("float")
def compute_delay_percentile(mean, std, percentile):
    X = norm(loc=mean, scale=std)
    return float(X.ppf(percentile))

In [None]:
# We now compute and add all the percentiles 90-99
# That means, given the delay distribution how much time do we have to wait to be 'percentile'% sure we'll be arrived by then
# This will be useful when validating paths to only take safe edges
actual = actual.withColumn('p_90', compute_delay_percentile(col('mean'), col('std'), lit(0.9)))\
                .withColumn('p_91', compute_delay_percentile(col('mean'), col('std'), lit(0.91)))\
                .withColumn('p_92', compute_delay_percentile(col('mean'), col('std'), lit(0.92)))\
                .withColumn('p_93', compute_delay_percentile(col('mean'), col('std'), lit(0.93)))\
                .withColumn('p_94', compute_delay_percentile(col('mean'), col('std'), lit(0.94)))\
                .withColumn('p_95', compute_delay_percentile(col('mean'), col('std'), lit(0.95)))\
                .withColumn('p_96', compute_delay_percentile(col('mean'), col('std'), lit(0.96)))\
                .withColumn('p_97', compute_delay_percentile(col('mean'), col('std'), lit(0.97)))\
                .withColumn('p_98', compute_delay_percentile(col('mean'), col('std'), lit(0.98)))\
                .withColumn('p_99', compute_delay_percentile(col('mean'), col('std'), lit(0.99)))

### Save to hdfs:

In [None]:
%%local
import os
username = os.environ['JUPYTERHUB_USER']

In [None]:
%%send_to_spark -i username -t str -n username

In [None]:
actual.write.format("orc").mode('overwrite').save("/user/{}/delay_distribution_percentiles.orc".format(username))