### Load csv file

In [0]:
# File location and type
file_location = "/FileStore/tables/Data_Engineer_exercise-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

orig,dest,id,actl_dep_lcl_tms,actl_arr_lcl_tms,flight_num,flights,acft_regs_cde,airborne_lcl_tms,landing_lcl_tms
YYZ,YVR,1,2022-12-31T20:36:00,2022-12-31T22:28:00,127,1,737,2022-12-31T21:02:00,2022-12-31T22:17:00
YYZ,YVR,2,2022-12-31T19:39:00,2022-12-31T21:22:00,185,1,451,2022-12-31T20:05:00,2022-12-31T21:14:00
YYZ,YVR,3,2022-12-31T18:53:00,2022-12-31T20:33:00,123,1,843,2022-12-31T19:10:00,2022-12-31T20:22:00
YYZ,YVR,4,2022-12-31T17:27:00,2022-12-31T19:00:00,121,1,747,2022-12-31T17:43:00,2022-12-31T18:53:00
YYZ,YVR,5,2022-12-31T16:44:00,2022-12-31T18:31:00,119,1,464,2022-12-31T16:56:00,2022-12-31T18:23:00
YYZ,YVR,6,2022-12-31T14:35:00,2022-12-31T16:43:00,113,1,743,2022-12-31T15:04:00,2022-12-31T16:35:00
YYZ,YVR,7,2022-12-31T13:08:00,2022-12-31T14:53:00,111,1,462,2022-12-31T13:20:00,2022-12-31T14:46:00
YYZ,YVR,8,2022-12-31T11:23:00,2022-12-31T13:03:00,105,1,735,2022-12-31T11:40:00,2022-12-31T12:54:00
YYZ,YVR,9,2022-12-31T10:18:00,2022-12-31T12:27:00,107,1,457,2022-12-31T10:39:00,2022-12-31T12:08:00
YYZ,YVR,10,2022-12-31T08:50:00,2022-12-31T10:38:00,103,1,451,2022-12-31T09:13:00,2022-12-31T10:28:00


###Imports

In [0]:
from pyspark.sql.functions import *
from datetime import datetime
import datetime
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *

###Exercise 1
#### The turn is computed based on airborne lcl tms field, the data set is sorted with this field and the next flight is gotten with the lead window function.

In [0]:
wind = Window.partitionBy('flights').orderBy('airborne_lcl_tms')
df_string_flights = df.withColumn("takeoff_order", row_number().over(wind))\
                    .withColumn("next_flight", lead('id', 1).over(wind))\
                    .select('takeoff_order', col('airborne_lcl_tms').alias('takeoff_time'), 'id', 'flight_num' , 'orig', 'dest',
                        col('actl_dep_lcl_tms').alias('departure_time'), col('actl_arr_lcl_tms').alias('arrival_time'), col('landing_lcl_tms').alias('landing_time'), 'next_flight')
display(df_string_flights)

takeoff_order,takeoff_time,id,flight_num,orig,dest,departure_time,arrival_time,landing_time,next_flight
1,2022-12-31T01:01:00,21,128,YVR,YYZ,2022-12-31T00:29:00,2022-12-31T08:27:00,2022-12-31T08:11:00,20.0
2,2022-12-31T06:23:00,20,100,YVR,YYZ,2022-12-31T06:04:00,2022-12-31T13:24:00,2022-12-31T13:16:00,19.0
3,2022-12-31T08:24:00,19,104,YVR,YYZ,2022-12-31T08:07:00,2022-12-31T15:19:00,2022-12-31T15:10:00,10.0
4,2022-12-31T09:13:00,10,103,YYZ,YVR,2022-12-31T08:50:00,2022-12-31T10:38:00,2022-12-31T10:28:00,18.0
5,2022-12-31T10:02:00,18,108,YVR,YYZ,2022-12-31T09:48:00,2022-12-31T16:58:00,2022-12-31T16:50:00,17.0
6,2022-12-31T10:23:00,17,110,YVR,YYZ,2022-12-31T10:08:00,2022-12-31T17:27:00,2022-12-31T17:15:00,9.0
7,2022-12-31T10:39:00,9,107,YYZ,YVR,2022-12-31T10:18:00,2022-12-31T12:27:00,2022-12-31T12:08:00,8.0
8,2022-12-31T11:40:00,8,105,YYZ,YVR,2022-12-31T11:23:00,2022-12-31T13:03:00,2022-12-31T12:54:00,16.0
9,2022-12-31T12:10:00,16,106,YVR,YYZ,2022-12-31T11:55:00,2022-12-31T19:10:00,2022-12-31T18:55:00,15.0
10,2022-12-31T13:05:00,15,114,YVR,YYZ,2022-12-31T12:47:00,2022-12-31T19:59:00,2022-12-31T19:48:00,7.0


###Exercise 2

In [0]:
# Convert string fields representing dates as timestamp.
df_cast = df.select('orig', 'dest', 'id', 'flight_num', 'flights', col('actl_dep_lcl_tms').cast(TimestampType()), col('actl_arr_lcl_tms').cast(TimestampType()))


In [0]:
# Variables that allow to bound the range of timestamps
min_date = datetime.datetime.strptime((df.agg(min('actl_dep_lcl_tms')).first()[0])[:10] + " 00:00:00", '%Y-%m-%d %H:%M:%S')
max_date = datetime.datetime.strptime((df.agg(max('actl_arr_lcl_tms')).first()[0])[:10] + " 23:59:59", '%Y-%m-%d %H:%M:%S')

# Specify delta value to get intervals of 15 minutes
delta_minutes = datetime.timedelta(minutes = 15)
datetime_list = []
i = min_date
j = 0
while i < max_date - delta_minutes:
    datetime_list.append(min_date + j * delta_minutes)
    i = min_date + j * delta_minutes
    j += 1

# Dataframe with datetime from minimun date to maximun date and with interval of 15 minutes.    
df_dates = spark.createDataFrame(datetime_list, TimestampType())\
            .withColumnRenamed('value', 'interval_date') 

In [0]:

# Cartesian product of airports code of origin vs dates with interval.
df_out_dates = df.groupBy('orig').agg(count('*').alias('cuenta'))\
                .select(col('orig').alias('orig1'))\
                .join(df_dates, on=None, how='cross')\
                .withColumn('next_interval', lead('interval_date', 1).over(Window.partitionBy('orig1').orderBy('interval_date')))

# Dataframe with number of flights departuring
df_flights_out = df_out_dates\
                .join(df_cast, on = (df_out_dates.orig1 == df_cast.orig) &\
                                    ((df_cast.actl_dep_lcl_tms >= df_out_dates.interval_date) &\
                                    (df_cast.actl_dep_lcl_tms < df_out_dates.next_interval)), how = 'left')\
                .withColumn('out', when(col('orig').isNull(), lit(0)).otherwise(lit(1)))\
                .select('orig1', 'interval_date', 'next_interval', 'out')
            

In [0]:
# Cartesian product of airports code of destination vs dates with interval.
df_in_dates = df.groupBy('dest').agg(count('*').alias('cuenta'))\
                .select(col('dest').alias('dest1'))\
                .join(df_dates, on=None, how='cross')\
                .withColumn('next_interval', lead('interval_date', 1).over(Window.partitionBy('dest1').orderBy('interval_date')))

# Dataframe with number of flights arriving
df_flights_in = df_in_dates\
                .join(df_cast, on = (df_in_dates.dest1 == df_cast.dest) &\
                                    ((df_cast.actl_arr_lcl_tms >= df_in_dates.interval_date) &\
                                    (df_cast.actl_arr_lcl_tms < df_in_dates.next_interval)), how = 'left')\
                .withColumn('in', when(col('dest').isNull(), lit(0)).otherwise(lit(1)))\
                .select('dest1', 'interval_date', 'next_interval', 'in')


In [0]:
#Final dataset with flights in and out.
df_final = df_flights_out\
            .join(df_flights_in, on = (df_flights_out.orig1 == df_flights_in.dest1) &\
                                    (df_flights_out.interval_date == df_flights_in.interval_date), how = 'inner')\
            .withColumn('interval_date_minus_two_hours', df_flights_out.interval_date - expr("INTERVAL 2 HOURS"))\
            .select(col('orig1').alias('airport_code'), df_flights_out.interval_date, 'interval_date_minus_two_hours', 'out', 'in')\
            .groupby('airport_code', 'interval_date', 'interval_date_minus_two_hours')\
            .agg(sum('out').alias('out'), sum('in').alias('in'))\
            .orderBy('airport_code', 'interval_date')
            
display(df_final)            

airport_code,interval_date,interval_date_minus_two_hours,out,in
YVR,2022-12-31T00:00:00.000+0000,2022-12-30T22:00:00.000+0000,0,0
YVR,2022-12-31T00:15:00.000+0000,2022-12-30T22:15:00.000+0000,1,0
YVR,2022-12-31T00:30:00.000+0000,2022-12-30T22:30:00.000+0000,0,0
YVR,2022-12-31T00:45:00.000+0000,2022-12-30T22:45:00.000+0000,0,0
YVR,2022-12-31T01:00:00.000+0000,2022-12-30T23:00:00.000+0000,0,0
YVR,2022-12-31T01:15:00.000+0000,2022-12-30T23:15:00.000+0000,0,0
YVR,2022-12-31T01:30:00.000+0000,2022-12-30T23:30:00.000+0000,0,0
YVR,2022-12-31T01:45:00.000+0000,2022-12-30T23:45:00.000+0000,0,0
YVR,2022-12-31T02:00:00.000+0000,2022-12-31T00:00:00.000+0000,0,0
YVR,2022-12-31T02:15:00.000+0000,2022-12-31T00:15:00.000+0000,0,0
