###1. Set file paths

In [0]:
from pyspark.sql.functions import expr

departure_delays_file_path = "dbfs:/FileStore/shared_uploads/sergio.salgado@n.world/departuredelays-1.csv"
airport_codes_file_path = "dbfs:/FileStore/shared_uploads/sergio.salgado@n.world/airport_codes_na-1.txt"

###2. Obtain the datasets

#####2.1 Obtain the departure delays dataset

In [0]:
departure_delays = (spark
                    .read
                    .format("csv")
                    .options(header = "true")
                    .load(departure_delays_file_path)
                   )

departure_delays = (departure_delays
                    .withColumn("delay", expr("CAST(delay as INT) as delay"))
                    .withColumn("distance", expr("CAST(distance as INT) as distance"))
                   )

departure_delays.createOrReplaceTempView("departure_delays")

#####2.2 Obtain the airports dataset

In [0]:
airports = (spark
            .read
            .format("csv")
            .options(header="true", inferSchema= "true", sep= "\t")
            .load(airport_codes_file_path)
             )

airports.createOrReplaceTempView("airports")

###3. Create a temporary small table

In [0]:
table_temp = (departure_delays
              .filter(expr("""origin == 'SEA' and destination == 'SFO' and date like '01010%' and delay > 0"""))
             )
table_temp.createOrReplaceTempView("table_temp")

###4. Show the datasets

#####4.1 Show the airports dataset

In [0]:
spark.sql("SELECT * FROM airports LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



#####4.2 Show the departure delays dataset

In [0]:
spark.sql("SELECT * FROM departure_delays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



#####4.3 Show the temporary table

In [0]:
spark.sql("SELECT * FROM table_temp LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



###5. Unions

#####5.1 Union two tables

In [0]:
bar = departure_delays.union(table_temp)
bar.createOrReplaceTempView("bar")

#####5.2 Show the union

In [0]:
# filtering for SEA and SFO in a specifict time range
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



###6. Joins

In [0]:
# Join departure delays data (foo) with airport info
table_temp.join(
    airports,
    airports.IATA == table_temp.origin
).select("City", "State", "date", "delay", "distance", "destination").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



###7. Windowing

#####7.1 Total review of delays experienced in flights from Seattle(SEA), San Francisco(SFO) and New York City(JFK)

In [0]:
%sql
DROP TABLE IF EXISTS departure_delays_window;

CREATE TABLE departure_delays_window AS
SELECT origin, destination, SUM(delay) AS TotalDelays
FROM departure_delays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, destination;

SELECT * FROM departure_delays_window

origin,destination,TotalDelays
JFK,ORD,5608
JFK,SFO,35619
JFK,DEN,4315
JFK,ATL,12141
JFK,SEA,7856
JFK,LAX,35755
SEA,LAX,9359
SFO,ORD,27412
SFO,DEN,18688
SFO,SEA,17080


#####7.2 Find the three destinations that experienced the most delays with those three ariports as origin

In [0]:
%sql
SELECT origin, destination, SUM(TotalDelays) AS TotalDelays
FROM departure_delays_window
WHERE origin = '[ORIGIN]'
GROUP BY origin, destination
ORDER BY SUM(TotalDelays) DESC
LIMIT 3


#####7.3 Do the same as before but with the dense_rank() window function

In [0]:
%sql
spark.sql("""
SELECT origin, destination, TotalDelays, rank
  FROM (
    SELECT origin, destination, TotalDelays, dense_rank()
      OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
      FROM departure_delays_window
  ) t
  WHERE rank <= 3
""").show()


###8. Modifications

#####8.1 Select the table_temp dataset

In [0]:
table_temp.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



#####8.2 Add a new column

In [0]:
from pyspark.sql.functions import expr

table_temp_2 = (table_temp
                .withColumn("satus", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' 'END'"))
               )

[0;31m---------------------------------------------------------------------------[0m
[0;31mParseException[0m                            Traceback (most recent call last)
[0;32m<command-374121175550857>[0m in [0;36m<module>[0;34m[0m
[1;32m      2[0m [0;34m[0m[0m
[1;32m      3[0m table_temp_2 = (table_temp
[0;32m----> 4[0;31m                 [0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"satus"[0m[0;34m,[0m [0mexpr[0m[0;34m([0m[0;34m"CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' 'END'"[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      5[0m                )

[0;32m/databricks/spark/python/pyspark/sql/functions.py[0m in [0;36mexpr[0;34m(str)[0m
[1;32m   1515[0m     """
[1;32m   1516[0m     [0msc[0m [0;34m=[0m [0mSparkContext[0m[0;34m.[0m[0m_active_spark_context[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1517[0;31m     [0;32mreturn[0m [0mColumn[0m[0;34m([0m[0msc[0m[0;34m.[0m[0m_jvm[0m[0;34m.[0m[0mfunctio

#####8.3 Drop the delay column

In [0]:
table_temp_3 = table_temp_2.drop("delay")
table_temp_3.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-374121175550858>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mtable_temp_3[0m [0;34m=[0m [0mtable_temp_2[0m[0;34m.[0m[0mdrop[0m[0;34m([0m[0;34m"delay"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mtable_temp_3[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mNameError[0m: name 'table_temp_2' is not defined

#####8.4 Rename the column "status"

In [0]:
table_temp_4 = table_temp_3.withColumnRenamed("status", "flight_status")
table_temp_4.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-374121175550855>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mtable_temp_4[0m [0;34m=[0m [0mtable_temp_3[0m[0;34m.[0m[0mwithColumnRenamed[0m[0;34m([0m[0;34m"status"[0m[0;34m,[0m [0;34m"flight_status"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mtable_temp_4[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mNameError[0m: name 'table_temp_3' is not defined

###9. Pivoting

In [0]:
%sql
-- swap the columns for the rows

SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
  FROM departure_delays
  WHERE origin = 'SEA'

destination,month,delay
ORD,1,92
JFK,1,-7
DFW,1,-5
MIA,1,-3
DFW,1,-3
DFW,1,1
ORD,1,-10
DFW,1,-6
DFW,1,-2
ORD,1,-3


In [0]:
%sql
-- pivoting to place names in the month column and aggregate calculations on the delays by destination and month

SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2)AS int) AS month, delay
  FROM departure_delays  WHERE origin = 'SEA'
)
PIVOT (
  CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
  FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination