In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import time

In [12]:
spark = SparkSession.builder.appName("MyApp").config("spark.sql.catalogImplementation", "hive").getOrCreate()

In [3]:
departureDelays = spark.read.csv("data/departuredelays.csv", header=True)
airportsna = spark.read.csv("data/airport-codes-na.txt", header=True, sep="\t", inferSchema=True)
airportsna.createOrReplaceTempView("airports_na")
airportsna.printSchema()
departureDelays.printSchema()

[Stage 2:>                                                          (0 + 1) / 1]

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



                                                                                

In [4]:
departureDelays = departureDelays.withColumn(
    "delay", expr("CAST(delay as INT) as delay")
).withColumn("distance", expr("CAST(distance as INT) as distance"))
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [5]:
foo = departureDelays.filter(
    expr("origin = 'SEA' and destination = 'SFO' and delay > 0 and date like '01010%'")
)
foo.createOrReplaceTempView("foo")


In [6]:
spark.sql("select * from airports_na limit 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [7]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

[Stage 4:>                                                          (0 + 8) / 8]

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



                                                                                

In [8]:
spark.sql("SELECT * FROM foo").show()

                                                                                

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [9]:
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")
bar.filter(expr("origin = 'SEA' and destination = 'SFO' and delay > 0 and date like '01010%'")).show()
spark.sql("SELECT * FROM bar WHERE origin = 'SEA' and destination = 'SFO' and delay > 0 and date like '01010%'").show()

                                                                                

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



                                                                                

In [10]:
foo.join(airportsna, airportsna.IATA == foo.origin).select(
    "City", "State", "date", "delay", "distance", "destination"
).show()
spark.sql(
    "SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination FROM foo f JOIN airports_na a ON a.IATA = f.origin"
).show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [29]:
start_time = time.time()
departureDelays.join(airportsna, airportsna.IATA == departureDelays.origin).select(
    "City", "State", "date", "delay", "distance", "destination"
)
print(f"Time taken: {(time.time() - start_time) * 1000} ms")

Time taken: 73.36020469665527 ms


In [30]:
start_time = time.time()
departureDelays.createOrReplaceTempView("departureDelays")
spark.sql(
    "SELECT a.City, a.State, d.date, d.delay, d.distance, d.destination FROM departureDelays d JOIN airports_na a ON a.IATA = d.origin"
)
print(f"Time taken: {(time.time() - start_time) * 1000} ms")

Time taken: 31.081199645996094 ms


In [37]:
spark.sql("DROP TABLE IF EXISTS departureDelaysWindow")
spark.sql(
    """CREATE TABLE departureDelaysWindow AS
SELECT
  origin,
  destination,
  SUM(delay) as TotalDelays
FROM
  departureDelays
WHERE
  origin in ('SEA', 'SFO', 'JFK')
  AND destination in ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY
  origin,
  destination"""
)


23/03/31 11:03:22 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.


DataFrame[]

In [38]:
spark.sql("SELECT * FROM departureDelaysWindow").show()

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   JFK|        SFO|      35619|
|   JFK|        DEN|       4315|
|   JFK|        ATL|      12141|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SEA|        LAX|       9359|
|   SFO|        ORD|      27412|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+



In [18]:
spark.sql(
    """
SELECT origin, destination, TotalDelays, rank
FROM (
SELECT origin, destination, TotalDelays, dense_rank()
OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
FROM departureDelaysWindow
) t
WHERE rank <= 3
"""
).show()


+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+



In [32]:
foo2 = foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
foo2.show()
foo3 = foo2.drop("delay")
foo3.show()
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|       

In [34]:
spark.sql(
    """SELECT * FROM (
        SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
        FROM departureDelays WHERE origin = 'SEA'
       )PIVOT (
            CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
            FOR month IN (1 JAN, 2 FEB)
         )
       ORDER BY destination"""
).show()


[Stage 67:>                                                         (0 + 8) / 8]

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D

                                                                                