### Analyzing airline data with Spark SQL

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Analyzing airline data") \
    .getOrCreate()

### Exploring SQL query options

In [1]:
from pyspark.sql.types import Row
from datetime import datetime

#### Creating a dataframe with different data types

In [23]:
record = sc.parallelize([Row(id = 1,
                             name = "Jill",
                             active = True,
                             clubs = ['chess', 'hockey'],
                             subjects = {"math": 80, 'english': 56},
                             enrolled = datetime(2014, 8, 1, 14, 1, 5)),
                         Row(id = 2,
                             name = "George",
                             active = False,
                             clubs = ['chess', 'soccer'],
                             subjects = {"math": 60, 'english': 96},
                             enrolled = datetime(2015, 3, 21, 8, 2, 5))
])

In [24]:
record_df = record.toDF()
record_df.show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[chess, hockey]|2014-08-01 14:01:05|  1|  Jill|[english -> 56, m...|
| false|[chess, soccer]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



#### Register the dataframe as a temporary view

* The view is valid for one session
* This is required to run SQL commands on the dataframe

In [25]:
record_df.createOrReplaceTempView("records")

In [26]:
all_records_df = sqlContext.sql('SELECT * FROM records')

all_records_df.show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[chess, hockey]|2014-08-01 14:01:05|  1|  Jill|[english -> 56, m...|
| false|[chess, soccer]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



In [27]:
sqlContext.sql('SELECT id, clubs[1], subjects["english"] FROM records').show()

+---+--------+-----------------+
| id|clubs[1]|subjects[english]|
+---+--------+-----------------+
|  1|  hockey|               56|
|  2|  soccer|               96|
+---+--------+-----------------+



In [28]:
sqlContext.sql('SELECT id, NOT active FROM records').show()

+---+------------+
| id|(NOT active)|
+---+------------+
|  1|       false|
|  2|        true|
+---+------------+



### Conditional statements in SQL 

In [29]:
sqlContext.sql('SELECT * FROM records where active').show()

+------+---------------+-------------------+---+----+--------------------+
|active|          clubs|           enrolled| id|name|            subjects|
+------+---------------+-------------------+---+----+--------------------+
|  true|[chess, hockey]|2014-08-01 14:01:05|  1|Jill|[english -> 56, m...|
+------+---------------+-------------------+---+----+--------------------+



In [30]:
sqlContext.sql('SELECT * FROM records where subjects["english"] > 90').show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
| false|[chess, soccer]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



#### Global temporary view

* Temporary view shared across multiple sessions
* Kept alive till the Spark application terminates

In [32]:
record_df.createGlobalTempView("global_records")

In [35]:
sqlContext.sql('SELECT * FROM global_temp.global_records').show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[chess, hockey]|2014-08-01 14:01:05|  1|  Jill|[english -> 56, m...|
| false|[chess, soccer]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



## Working on external dataset with SQL queries

Data Collection

In [21]:
airlinesPath = "datasets/airlines.csv"
flightsPath = "datasets/flights.csv"
airportsPath = "datasets/airports.csv"

Loading External Data As DataFrames

In [22]:
airlines = spark.read\
                .format("csv")\
                .option("header", "true")\
                .load(airlinesPath)

In [23]:
airlines.createOrReplaceTempView("airlines")

In [24]:
df = spark.sql("SELECT * FROM airlines")
df.show(5)

+-----+--------------------+
| Code|         Description|
+-----+--------------------+
|19031|Mackey Internatio...|
|19032|Munz Northern Air...|
|19033|Cochise Airlines ...|
|19034|Golden Gate Airli...|
|19035|  Aeromech Inc.: RZZ|
+-----+--------------------+
only showing top 5 rows



In [25]:
flights = spark.read\
               .format("csv")\
               .option("header", "true")\
               .load(flightsPath)

In [26]:
flights.createOrReplaceTempView("flights")

In [27]:
flights.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

### Using count function with python commands

In [28]:
flights_count_py = flights.count()
airlines_count_py = airlines.count()

In [29]:
flights_count_py, airlines_count_py

(476881, 1579)

### Using count function with SQL query

In [30]:
flights_count = spark.sql("SELECT COUNT(*) FROM flights")
airlines_count = spark.sql("SELECT COUNT(*) FROM airlines")

**NOTE: Count( ) Using SQL Query Doesnt Return Integer, It Returns DataFrame **

In [31]:
flights_count, airlines_count

(DataFrame[count(1): bigint], DataFrame[count(1): bigint])

In [32]:
flights_count.collect()[0][0], airlines_count.collect()[0][0]

(476881, 1579)

In [33]:
flights_count = flights_count.collect()[0][0]
airlines_count = airlines_count.collect()[0][0]

### Finding average distance of flights

Summation Of All The Distance

In [34]:
total_distance_DF = spark.sql("SELECT distance FROM flights")\
                         .agg({"distance":"sum"})\
                         .withColumnRenamed("sum(distance)","total_distance")

In [35]:
total_distance_DF.show()

+--------------+
|total_distance|
+--------------+
|  3.79052917E8|
+--------------+



In [36]:
total_distance = total_distance_DF.collect()[0][0] 

Finding Average Distance

In [37]:
avg_distance = total_distance/flights.count()
avg_distance

794.8585013871385

### How many flights got delayed

Fligts With Delay

In [38]:
all_delay = spark.sql("select airlines, flight_number, departure_delay from flights where departure_delay > 0")

In [39]:
all_delay.createOrReplaceTempView("all_delay")

In [40]:
all_delay.orderBy(all_delay.departure_delay.desc()).show(5)

+--------+-------------+---------------+
|airlines|flight_number|departure_delay|
+--------+-------------+---------------+
|   20366|         5246|          99.00|
|   19393|         2948|          99.00|
|   20366|         5365|          99.00|
|   19977|          616|          99.00|
|   20366|         6030|          99.00|
+--------+-------------+---------------+
only showing top 5 rows



Total Number Of Flights Got Delayed

In [41]:
delay_count = spark.sql("SELECT COUNT(departure_delay) FROM all_delay")

In [42]:
delay_count.show()

+----------------------+
|count(departure_delay)|
+----------------------+
|                179015|
+----------------------+



In [43]:
delay_count.collect()[0][0]

179015

### How many percent flights got delayed

In [44]:
delay_percent = delay_count.collect()[0][0] / flights_count * 100
delay_percent

37.53871510922012

### Finding delay per aIrlines

In [45]:
delay_per_airline = spark.sql("SELECT airlines, departure_delay FROM flights")\
                        .groupBy("airlines")\
                        .agg({"departure_delay":"avg"})\
                        .withColumnRenamed("avg(departure_delay)",
                                           "departure_delay")

In [46]:
delay_per_airline.orderBy(delay_per_airline.departure_delay.desc()).show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows



In [47]:
delay_per_airline.createOrReplaceTempView("delay_per_airline")

In [48]:
delay_per_airline = spark.sql("SELECT * FROM delay_per_airline ORDER BY departure_delay DESC")

In [49]:
delay_per_airline.show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows



## Join Operation in SQL Commands

Now we know the number of airlines which have delay but we dont know thier names<br>
We need to get their names from different DataFrame

In [50]:
spark.sql("SELECT * FROM delay_per_airline JOIN airlines ON airlines.code = delay_per_airline.airlines").show(5)

+--------+-------------------+-----+--------------------+
|airlines|    departure_delay| Code|         Description|
+--------+-------------------+-----+--------------------+
|   19690|-2.1981308411214955|19690|Hawaiian Airlines...|
|   19930|-0.6991515343747522|19930|Alaska Airlines I...|
|   20437|  5.110621095185594|20437|AirTran Airways C...|
|   19393| 13.429567657134724|19393|Southwest Airline...|
|   19977|  8.818392620527979|19977|United Air Lines ...|
+--------+-------------------+-----+--------------------+
only showing top 5 rows



**Note: We are getting duplicate columns with different names** <br>
      For this we need to drop unwanted column

In [51]:
spark.sql("SELECT * FROM delay_per_airline JOIN airlines ON airlines.code = delay_per_airline.airlines")\
     .drop("code")\
     .show(5)

+--------+-------------------+--------------------+
|airlines|    departure_delay|         Description|
+--------+-------------------+--------------------+
|   19690|-2.1981308411214955|Hawaiian Airlines...|
|   19930|-0.6991515343747522|Alaska Airlines I...|
|   20437|  5.110621095185594|AirTran Airways C...|
|   19393| 13.429567657134724|Southwest Airline...|
|   19977|  8.818392620527979|United Air Lines ...|
+--------+-------------------+--------------------+
only showing top 5 rows



* TODO: How many flights got delayed per airport
* Find the airports with the worst and best delays
* Average distance a flight travels from an airport

### Finding average distance travelled by flights per airport

In [52]:
#Loading data
airports = spark.read\
                .format("csv")\
                .option("header", "true")\
                .load(airportsPath) 

In [53]:
#Creating Temp view
airports.createOrReplaceTempView("airports") 

In [54]:
dist_airport = spark.sql("SELECT origin,distance FROM flights")\
                    .groupBy("origin")\
                    .agg({"distance":"avg"})\
                    .withColumnRenamed("avg(distance)","distance")

In [55]:
dist_airport.createOrReplaceTempView("dist_airport")

In [56]:
spark.sql("SELECT * FROM dist_airport ORDER BY distance DESC").show(5)

+------+------------------+
|origin|          distance|
+------+------------------+
|   GUM|            3801.0|
|   PPG|            2599.0|
|   BQN|1469.3361344537816|
|   BLI|1455.5238095238096|
|   JFK|1422.3096654275093|
+------+------------------+
only showing top 5 rows



In [57]:
spark.sql("SELECT * FROM dist_airport JOIN airports ON airports.code = dist_airport.origin ORDER BY distance DESC")\
     .drop('origin')\
     .show(5)

+------------------+----+--------------------+
|          distance|Code|         Description|
+------------------+----+--------------------+
|            3801.0| GUM|Guam, TT: Guam In...|
|            2599.0| PPG|Pago Pago, TT: Pa...|
|1469.3361344537816| BQN|Aguadilla, PR: Ra...|
|1455.5238095238096| BLI|Bellingham, WA: B...|
|1422.3096654275093| JFK|New York, NY: Joh...|
+------------------+----+--------------------+
only showing top 5 rows



### How many Flights got Delayed per Airport 

TODO
* add percent

In [58]:
total_delay = spark.sql("SELECT origin, departure_delay FROM flights where departure_delay > 0")\
                      .groupBy("origin")\
                      .agg({"departure_delay":"sum"})\
                      .withColumnRenamed("sum(departure_delay)","departure_delay")

In [59]:
import pyspark.sql.functions as func

In [60]:
total = flights.agg({"departure_delay":"sum"})\
               .collect()[0][0]

In [61]:
total_delay = total_delay.withColumn("percent",
                                     func.round(total_delay['departure_delay']/total * 100, 2))

In [62]:
total_delay.createOrReplaceTempView("total_delay")

In [63]:
spark.sql("SELECT * FROM total_delay ORDER BY percent DESC").show(5)

+------+---------------+-------+
|origin|departure_delay|percent|
+------+---------------+-------+
|   ORD|       326080.0|   8.22|
|   ATL|       316740.0|   7.99|
|   DFW|       235129.0|   5.93|
|   DEN|       218359.0|   5.51|
|   LAX|       194285.0|    4.9|
+------+---------------+-------+
only showing top 5 rows



In [64]:
spark.sql("SELECT * FROM total_delay ORDER BY percent ASC").show(5)

+------+---------------+-------+
|origin|departure_delay|percent|
+------+---------------+-------+
|   ADK|          143.0|    0.0|
|   PSG|          140.0|    0.0|
|   CDV|           69.0|    0.0|
|   INL|          131.0|    0.0|
|   WRG|           95.0|    0.0|
+------+---------------+-------+
only showing top 5 rows



### Average Delays of Flight  from Airport

In [65]:
import pyspark.sql.functions as func

In [66]:
delays_airport = spark.sql("SELECT origin, departure_delay FROM flights where departure_delay > 0")\
                      .groupBy("origin")\
                      .agg(func.avg('departure_delay')\
                      .alias('departure_delay'))

In [67]:
delays_airport.show(5)

+------+------------------+
|origin|   departure_delay|
+------+------------------+
|   INL|              26.2|
|   PSE|             43.75|
|   MSY|29.088963210702342|
|   PPG|             116.5|
|   GEG| 18.51497005988024|
+------+------------------+
only showing top 5 rows



In [68]:
delays_airport.createOrReplaceTempView("delays_airport")

In [69]:
delays_airport = spark.sql("SELECT * FROM delays_airport ORDER BY departure_delay DESC")
delays_airport.show(5)

+------+-----------------+
|origin|  departure_delay|
+------+-----------------+
|   LMT|            123.5|
|   PPG|            116.5|
|   PAH|91.38461538461539|
|   FAI|81.76923076923077|
|   TWF|77.11111111111111|
+------+-----------------+
only showing top 5 rows

