In [1]:
%load_ext sparkmagic.magics

In [2]:
import os
from IPython import get_ipython
username = os.environ['RENKU_USERNAME']
server = "http://iccluster029.iccluster.epfl.ch:8998"

# set the application name as "<your_gaspar_id>-homework3"
get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-homework3", "executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G"}}""".format(username)
)

In [3]:
get_ipython().run_line_magic(
    "spark", "add -s {0}-homework3 -l python -u {1} -k".format(username, server)
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5351,application_1649870447656_1958,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [4]:
%%spark
print('We are using Spark %s' % spark.version)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We are using Spark 2.3.2.3.1.4.0-315

In [5]:
%%spark?

[0;31mDocstring:[0m
::

  %spark [-c CONTEXT] [-s SESSION] [-o OUTPUT] [-q [QUIET]]
             [-m SAMPLEMETHOD] [-n MAXROWS] [-r SAMPLEFRACTION] [-u URL]
             [-a USER] [-p PASSWORD] [-t AUTH] [-l LANGUAGE] [-k [SKIP]]
             [-i ID] [-e COERCE]
             [command ...]

Magic to execute spark remotely.

This magic allows you to create a Livy Scala or Python session against a Livy endpoint. Every session can
be used to execute either Spark code or SparkSQL code by executing against the SQL context in the session.
When the SQL context is used, the result will be a Pandas dataframe of a sample of the results.

If invoked with no subcommand, the cell will be executed against the specified session.

Subcommands
-----------
info
    Display the available Livy sessions and other configurations for sessions.
add
    Add a Livy session given a session name (-s), language (-l), and endpoint credentials.
    The -k argument, if present, will skip adding this session if it al

## Confidence

### Import data

In [55]:
%%spark
#import filtered stop_ids
stops_table = spark.read.option("header",True).options(delimiter=';').csv("/group/five-guys/stops_table")
stops_table = stops_table.select(stops_table.stop_id)
stops_table.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- stop_id: string (nullable = true)

In [56]:
%%spark
actual_data = spark.read.orc('/data/sbb/orc/istdaten')
#actual_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
%%spark
#subsample
actual_data = actual_data.sample(0.001, seed=0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Rename columns

Columns are renamed in English as follows:
- `betriebstag` to DATE
- `produkt_id` to PRODUCT_ID
- `fahrt_bezeichner` to TRIP_ID
- `bpuic` to STOP_ID
- `haltestellen_name` to STOP_NAME
- `ankunftszeit` to ARRIVAL_TIME
- `an_prognose_status` to ARRIVAL_TIME_STATUS
- `an_prognose` to ACTUAL_ARRIVAL_TIME
- `abfahrtszeit` to DEPARTURE_TIME
- `ab_prognose_status` to DEPARTURE_TIME_STATUS
- `ab_prognose` to ACTUAL_DEPARTURE_TIME

In [58]:
%%spark

#rename columns in english
actual_data = actual_data.select(actual_data.betriebstag.alias('DATE'),\
                                 actual_data.produkt_id.alias('PRODUCT_ID'),\
                                 actual_data.bpuic.alias('STOP_ID'),\
                                 actual_data.haltestellen_name.alias('STOP_NAME'),\
                                 actual_data.fahrt_bezeichner.alias('TRIP_ID'),\
                                 actual_data.ankunftszeit.alias('ARRIVAL_TIME'),\
                                 actual_data.an_prognose_status.alias('ARRIVAL_TIME_STATUS'),\
                                 actual_data.an_prognose.alias('ACTUAL_ARRIVAL_TIME'),\
                                 actual_data.abfahrtszeit.alias('DEPARTURE_TIME'),\
                                 actual_data.ab_prognose_status.alias('DEPARTURE_TIME_STATUS'),
                                 actual_data.ab_prognose.alias('ACTUAL_DEPARTURE_TIME'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Filter stop_id

In [59]:
%%spark
#use only stop_ids from stops_table
tmp = actual_data.join(stops_table, (actual_data['STOP_ID'] == stops_table['stop_id']), how='inner')
actual_data = tmp.select(actual_data.DATE,\
                                 actual_data.PRODUCT_ID,\
                                 actual_data.STOP_ID,\
                                 actual_data.STOP_NAME,\
                                 actual_data.TRIP_ID,\
                                 actual_data.ARRIVAL_TIME,\
                                 actual_data.ARRIVAL_TIME_STATUS,\
                                 actual_data.ACTUAL_ARRIVAL_TIME,\
                                 actual_data.DEPARTURE_TIME,\
                                 actual_data.DEPARTURE_TIME_STATUS,
                                 actual_data.ACTUAL_DEPARTURE_TIME
                        )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Prepare table

In [60]:
%%spark
import pyspark.sql.functions as F
# F.date_format(F.to_timestamp(df_used.timestamp_s), 'yyyy-MM-dd HH:mm:ss').alias('date'))
df = actual_data.withColumn('DATE', F.date_format(F.to_timestamp(actual_data.DATE, 'dd.MM.yyyy'), 'yyyy-MM-dd'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
%%spark
import pyspark.sql.functions as F

df = df.withColumn('DAY_OF_WEEK', ((F.dayofweek(df['DATE'])+5)%7)+1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
%%spark
import pyspark.sql.functions as F

#remove entries with ARRIVAL_TIME_STATUS and DEPARTURE_TIME_STATUS unknown
df = df.filter(df['ARRIVAL_TIME_STATUS'] != 'UNBEKANNT')
df = df.filter(df['DEPARTURE_TIME_STATUS'] != 'UNBEKANNT')

#remove entries with ARRIVAL_TIME_STATUS and DEPARTURE_TIME_STATUS is empty
df = df.filter(df['ARRIVAL_TIME_STATUS'] != '')
df = df.filter(df['DEPARTURE_TIME_STATUS'] != '')

#remove entries with PRODUCT_ID empty
df = df.filter(df['PRODUCT_ID'] != '')

#remove entries with STOP_ID empty
df = df.filter(df['STOP_ID'] != '')

#remove entries with STOP_NAME empty
df = df.filter(df['STOP_NAME'] != '')

#remove entries with TRIP_ID empty
df = df.filter(df['TRIP_ID'] != '')

#transform to unix_timestamp
df = df.withColumn('ARRIVAL_TIME', F.unix_timestamp(df.ARRIVAL_TIME, 'dd.MM.yyyy HH:mm'))
df = df.withColumn('ACTUAL_ARRIVAL_TIME', F.unix_timestamp(df.ACTUAL_ARRIVAL_TIME, 'dd.MM.yyyy HH:mm'))
df = df.withColumn('DEPARTURE_TIME', F.unix_timestamp(df.DEPARTURE_TIME, 'dd.MM.yyyy HH:mm'))
df = df.withColumn('ACTUAL_DEPARTURE_TIME', F.unix_timestamp(df.ACTUAL_DEPARTURE_TIME, 'dd.MM.yyyy HH:mm'))

#fill null with 0 
#When the stop is the start or end of a journey, the corresponding columns will be empty (ANKUNFTSZEIT/ABFAHRTSZEIT)
df = df.na.fill({'ARRIVAL_TIME' : 0, 'ACTUAL_ARRIVAL_TIME' : 0, 'DEPARTURE_TIME' : 0, 'ACTUAL_DEPARTURE_TIME' : 0})

#compute delay
df = df.withColumn('ARRIVAL_DELAY', F.when((df['ACTUAL_ARRIVAL_TIME']-df['ARRIVAL_TIME'] )< 0, 0).otherwise(df['ACTUAL_ARRIVAL_TIME']-df['ARRIVAL_TIME']))
df = df.withColumn('DEPARTURE_DELAY', F.when((df['ACTUAL_DEPARTURE_TIME']-df['DEPARTURE_TIME'] )< 0, 0).otherwise(df['ACTUAL_DEPARTURE_TIME']-df['DEPARTURE_TIME']))

#convert seconds in minutes
df = df.withColumn('ARRIVAL_DELAY', F.round(df['ARRIVAL_DELAY']/60))
df = df.withColumn('DEPARTURE_DELAY', F.round(df['DEPARTURE_DELAY']/60))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
%%spark

#define HOUR_ARRIVAL
df = df.withColumn('HOUR_ARRIVAL', F.hour(F.from_unixtime(df['ARRIVAL_TIME'])))
df = df.withColumn('HOUR_DEPARTURE', F.hour(F.from_unixtime(df['DEPARTURE_TIME'])))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Filter data based on simplifying assumptions

We only consider journeys at reasonable hours of the day, and on a typical business day.
- `day`: from Monday to Friday
- `hours`: first departure at 9:00, last arrival at 17:59

In [64]:
%%spark

#business day
df = df.filter(df['DAY_OF_WEEK'] <6)
#reasonable hours of the day
df = df.filter(F.hour(F.from_unixtime(df['DEPARTURE_TIME']))>=9)
df = df.filter(F.hour(F.from_unixtime(df['ARRIVAL_TIME']))<18)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
%%spark

df.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[DATE: string, PRODUCT_ID: string, STOP_ID: string, STOP_NAME: string, TRIP_ID: string, ARRIVAL_TIME: bigint, ARRIVAL_TIME_STATUS: string, ACTUAL_ARRIVAL_TIME: bigint, DEPARTURE_TIME: bigint, DEPARTURE_TIME_STATUS: string, ACTUAL_DEPARTURE_TIME: bigint, DAY_OF_WEEK: int, ARRIVAL_DELAY: double, DEPARTURE_DELAY: double, HOUR_ARRIVAL: int, HOUR_DEPARTURE: int]

In [66]:
%%spark

#create all possible pairs between stops
df_A = df.select([F.col(c).alias("A_"+c) for c in df.columns])
df_B = df.select([F.col(c).alias("B_"+c) for c in df.columns])

df_1 = connections_table.join(df_A, connections_table.departure_id == df_A.A_STOP_ID, how='inner')
df_1 = df_1.join(df_B, (df_1.arrival_id == df_B.B_STOP_ID) & (df_1.A_DATE == df_B.B_DATE))

df_2 = df_1.select(df_1.A_DAY_OF_WEEK.alias('DAY_OF_WEEK'),\
                   df_1.A_STOP_ID.alias('DEPARTURE_ID'),\
                   df_1.B_STOP_ID.alias('ARRIVAL_ID'),\
                   df_1.B_HOUR_ARRIVAL.alias('HOUR_ARRIVAL'),\
                   df_1.B_ARRIVAL_DELAY.alias('ARRIVAL_DELAY')
                  )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----------+--------------+------------+--------------------+------+-------+---------+--------+------+----------+------------+---------+--------------------+--------------------+--------------+---------------------+---------------------+----------------+-----------------------+-----------------------+-------------+---------------+-----------------+--------------+----------------+----------+------------+---------+--------------------+--------------------+--------------+---------------------+---------------------+----------------+-----------------------+-----------------------+-------------+---------------+-----------------+--------------+----------------+
|departure_id|arrival_id|departure_time|arrival_time|             trip_id|monday|tuesday|wednesday|thursday|friday|    A_DATE|A_PRODUCT_ID|A_STOP_ID|         A_STOP_NAME|           A_TRIP_ID|A_ARRIVAL_TIME|A_ARRIVAL_TIME_STATUS|A_ACTUAL_ARRIVAL_TIME|A_DEPARTURE_TIME|A_DEPARTURE_TIME_STATUS|A_ACTUAL_DEPARTURE_TIME|A_DAY_OF_

### Comparison number of stops in the two different tables

In [71]:
%%spark

df_2.select(df_2.DEPARTURE_ID, df_2.ARRIVAL_ID,).distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3145

In [72]:
%%spark

connections_table.select(connections_table.departure_id, connections_table.arrival_id ).distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4235

In [98]:
# %%spark

# #create all possible pairs between stops
# df_A = df.select([F.col(c).alias("A_"+c) for c in df.columns])
# df_B = df.select([F.col(c).alias("B_"+c) for c in df.columns])
# cp = df_A.crossJoin(df_B)
# #keep only pairs (A_STOP_ID, B_STOP_ID) that are in connections_table

# df_1 = cp.join(connections_table, (connections_table.departure_id == cp.A_STOP_ID) & (connections_table.arrival_id == cp.B_STOP_ID), how='inner')
# #select only interesting fields
# df_2 = df_1.select(df_1.A_DAY_OF_WEEK.alias('DAY_OF_WEEK'),\
#                    df_1.A_STOP_ID.alias('DEPARTURE_ID'),\
#                    df_1.B_STOP_ID.alias('ARRIVAL_ID'),\
#                    df_1.B_HOUR_ARRIVAL.alias('HOUR_ARRIVAL'),\
#                    df_1.trip_id.alias('TRIP_ID'),\
#                    df_1.B_ARRIVAL_DELAY.alias('ARRIVAL_DELAY')
#                   )
# df_2.cache()

### Obtain delay cumulative distribution

In [115]:
%%spark
from pyspark.sql import Window

#compute cumulative distribution of delay over groupBy 'DAY_OF_WEEK', 'PRODUCT_ID', 'HOUR_ARRIVAL' 
cumulative_function_window = Window.partitionBy('DAY_OF_WEEK', 'DEPARTURE_ID', 'ARRIVAL_ID').orderBy('ARRIVAL_DELAY').rangeBetween(Window.unboundedPreceding, 0)
count_window = Window.partitionBy('DAY_OF_WEEK', 'DEPARTURE_ID', 'ARRIVAL_ID')

df2 = df_2.groupBy(df_2['DAY_OF_WEEK'], df_2['DEPARTURE_ID'], df_2['ARRIVAL_ID'], df_2['ARRIVAL_DELAY']).count()
df2 = df2.withColumn("TOTAL_SUM", F.sum("count").over(count_window))
df2 = df2.withColumn("PARTIAL_SUM", F.sum("count").over(cumulative_function_window))
df2 = df2.withColumn("CUMULATIVE", F.sum("count").over(cumulative_function_window)/F.sum("count").over(count_window))
df2.filter((df2.DEPARTURE_ID == '8500926') |(df2.ARRIVAL_ID == '8590737') | (df2.DAY_OF_WEEK ==3)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------+----------+-------------+-----+---------+-----------+-------------------+
|DAY_OF_WEEK|DEPARTURE_ID|ARRIVAL_ID|ARRIVAL_DELAY|count|TOTAL_SUM|PARTIAL_SUM|         CUMULATIVE|
+-----------+------------+----------+-------------+-----+---------+-----------+-------------------+
|          3|     8503083|   8591199|          0.0| 4870|     8766|       4870| 0.5555555555555556|
|          3|     8503083|   8591199|          1.0| 3409|     8766|       8279| 0.9444444444444444|
|          3|     8503083|   8591199|          2.0|  487|     8766|       8766|                1.0|
|          3|     8576239|   8576238|          0.0|   18|       36|         18|                0.5|
|          3|     8576239|   8576238|          1.0|   18|       36|         36|                1.0|
|          3|     8591136|   8591320|          0.0| 1200|     1950|       1200| 0.6153846153846154|
|          3|     8591136|   8591320|          1.0|  300|     1950|       1500| 0.7692307692307693|


In [128]:
%%spark

#reorder columns
df2 = df2.select(df2["DEPARTURE_ID"], df2["ARRIVAL_ID"], df2["DAY_OF_WEEK"], df2["ARRIVAL_DELAY"].alias("MAX_ARRIVAL_DELAY"), df2["CUMULATIVE"])
#df2 = df2.orderBy(df2["TRIP_ID"], df2["DEPARTURE_ID"], df2["ARRIVAL_ID"], df2["DAY_OF_WEEK"], df2["HOUR_ARRIVAL"], df2["MAX_ARRIVAL_DELAY"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Split based on day

In [129]:
%%spark
df2.orderBy('DEPARTURE_ID','ARRIVAL_ID', 'DAY_OF_WEEK', 'MAX_ARRIVAL_DELAY').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----------+-----------+-----------------+------------------+
|DEPARTURE_ID|ARRIVAL_ID|DAY_OF_WEEK|MAX_ARRIVAL_DELAY|        CUMULATIVE|
+------------+----------+-----------+-----------------+------------------+
|     8500926|   8590737|          1|              0.0|               0.5|
|     8500926|   8590737|          1|              1.0|               1.0|
|     8500926|   8590737|          3|              2.0|               1.0|
|     8502185|   8502248|          1|              0.0|               1.0|
|     8502185|   8502248|          2|              0.0|               1.0|
|     8502185|   8502248|          4|              0.0|               1.0|
|     8502185|   8502248|          5|              0.0|               1.0|
|     8502208|   8502209|          1|              2.0|               1.0|
|     8502208|   8502209|          5|              0.0|               1.0|
|     8502209|   8502208|          1|              0.0|0.6666666666666666|
+------------+----------+

In [130]:
%%spark
#all

df2.write.mode('overwrite').option("delimiter", ";").option("header","true").format("csv").save("/group/five-guys/confidence")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
%%spark
#monday

df2_monday = df2.filter(df2.DAY_OF_WEEK == 1)
df2_monday.write.mode('overwrite').option("delimiter", ";").option("header","true").format("csv").save("/group/five-guys/confidence_monday")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
#tuesday

df2_tuesday = df2.filter(df2.DAY_OF_WEEK == 2)
df2_tuesday.write.mode('overwrite').option("delimiter", ";").option("header","true").format("csv").save("/group/five-guys/confidence_tuesday")

In [None]:
%%spark
#wednesday

df2_wednesday = df2.filter(df2.DAY_OF_WEEK == 3)
df2_wednesday.write.mode('overwrite').option("delimiter", ";").option("header","true").format("csv").save("/group/five-guys/confidence_wednesday")

In [None]:
%%spark
#thursday

df2_thursday = df2.filter(df2.DAY_OF_WEEK == 4)
df2_thursday.write.mode('overwrite').option("delimiter", ";").option("header","true").format("csv").save("/group/five-guys/confidence_thursday")

In [None]:
%%spark
#friday

df2_friday = df2.filter(df2.DAY_OF_WEEK == 5)
df2_friday.write.mode('overwrite').option("delimiter", ";").option("header","true").format("csv").save("/group/five-guys/confidence_friday")