In [0]:
#Databricks note source
from pyspark.sql import SparkSession
spark=SparkSession.builder("Spark.DataFrames").getOrCreate()

In [0]:
fts = spark.read.options(inferSchema='True', header='True', delimiter=',').csv('s3://flightdata-cdb/2007.csv')

In [0]:
#Problem 1
#Find the most frequent tail number which is getting in destination by maximum

In [0]:
fts.groupBy("Dest", "TailNum") \
  .count() \
  .orderBy("count", ascending=False) \
  .show()

+----+-------+-----+
|Dest|TailNum|count|
+----+-------+-----+
| ORD|      0|11709|
| DFW|      0| 9361|
| EWR|      0| 5194|
| LGA|      0| 3935|
| ORD| 000000| 3201|
| JFK|      0| 3123|
| BOS|      0| 2736|
| LAX|      0| 2274|
| HNL| N655BR| 2241|
| HNL| N651BR| 2173|
| HNL| N654BR| 2138|
| DTW|      0| 2108|
| HNL| N693BR| 2067|
| DCA|      0| 2062|
| HNL| N479HA| 2038|
| HNL| N478HA| 2024|
| HNL| N485HA| 1984|
| HNL| N480HA| 1976|
| IAH|      0| 1967|
| HNL| N484HA| 1944|
+----+-------+-----+
only showing top 20 rows



In [0]:
#Problem 2
#Find out the cancelled flight  details for the last quarter of the year 2007

In [0]:
fts.filter(fts.Year == 2007).filter(fts.Month.isin([10,11,12])).filter(fts.Cancelled == 1).show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|   10|         2|        2|     NA|      1930|     NA|      2150|           WN|      195

In [0]:
#Problem 3
#Find out the average weather delays for a particular flight per month

In [0]:
from pyspark.sql.functions import avg, round
fts.groupBy('FlightNum', 'Month').agg(round(avg('WeatherDelay'),2).alias("Average Delay")).show()

+---------+-----+-------------+
|FlightNum|Month|Average Delay|
+---------+-----+-------------+
|     1504|    1|          0.0|
|     1384|    1|          0.0|
|     1161|    1|         1.04|
|      307|    1|         0.59|
|     2781|    1|         0.14|
|     2141|    1|          0.0|
|     2395|    1|         0.16|
|     2566|    1|         0.09|
|     2328|    1|         1.87|
|     2599|    1|          0.0|
|     2509|    1|         1.45|
|     1655|    1|          0.0|
|     3021|    1|          0.0|
|     3231|    1|         0.34|
|     7152|    1|          0.0|
|     7405|    1|          0.0|
|     5296|    1|         1.78|
|     5373|    1|         3.52|
|     5193|    1|         2.98|
|     5014|    1|          0.0|
+---------+-----+-------------+
only showing top 20 rows



In [0]:
#Problem 4
#Inspite of CarrierDelay, NASDelay, SecurityDelay, LateAircraftDelay,Weatherdelay which flight reached on time

In [0]:
fts.filter((fts.CarrierDelay > 0) | (fts.WeatherDelay > 0) | (fts.NASDelay > 0) | (fts.SecurityDelay > 0) | (fts.LateAircraftDelay > 0)).filter(fts.ArrDelay <= 0).show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------

In [0]:
#Problem 5
#Month wise total distance travelled by each flight number in every month

In [0]:
from pyspark.sql.functions import sum
fts.groupBy('FlightNum', 'Month').agg(sum('Distance').alias("Total Distance")).show()

+---------+-----+--------------+
|FlightNum|Month|Total Distance|
+---------+-----+--------------+
|     1504|    1|         85915|
|     1384|    1|         26070|
|     1161|    1|        172275|
|      307|    1|        218173|
|     2781|    1|         38181|
|     2141|    1|         44717|
|     2395|    1|         41289|
|     2566|    1|         22842|
|     2328|    1|         61944|
|     2599|    1|         12066|
|     2509|    1|         31729|
|     1655|    1|         58396|
|     3021|    1|         17631|
|     3231|    1|          5498|
|     7152|    1|         12519|
|     7405|    1|         14604|
|     5296|    1|          3627|
|     5373|    1|          8250|
|     5193|    1|         33124|
|     5014|    1|           834|
+---------+-----+--------------+
only showing top 20 rows



In [0]:
#Problem 6
#Month wise how many flights get diverted(origin to destination)

In [0]:
from pyspark.sql.functions import count
fts.filter(fts.Diverted == 1).groupBy('Month').agg(count('FlightNum').alias('Flights Diverted')).orderBy('Month').show()

+-----+----------------+
|Month|Flights Diverted|
+-----+----------------+
|    1|            1200|
|    2|            1261|
|    3|            1275|
|    4|            1193|
|    5|            1442|
|    6|            2199|
|    7|            2150|
|    8|            2101|
|    9|             962|
|   10|            1000|
|   11|             881|
|   12|            1515|
+-----+----------------+



In [0]:
#Problem 7
#Week and month wise number of trips in all the flights

In [0]:
from pyspark.sql.functions import when
fts_1 = fts.withColumn("Week",when((fts.DayofMonth >= 1) & (fts.DayofMonth <= 7), 1).when((fts.DayofMonth >= 8) & (fts.DayofMonth <= 14), 2).when((fts.DayofMonth >= 15) & (fts.DayofMonth <= 21), 3).when((fts.DayofMonth >= 22) & (fts.DayofMonth <= 28), 4).when((fts.DayofMonth >= 29) & (fts.DayofMonth <= 31), 5).otherwise('other'))
fts_1.filter(fts_1.Cancelled == 0).groupBy('Month', 'Week').agg(count('FlightNum').alias("No_Of_Flights")).orderBy('Month', 'Week').show()

+-----+----+-------------+
|Month|Week|No_Of_Flights|
+-----+----+-------------+
|    1|   1|       139114|
|    1|   2|       134874|
|    1|   3|       133690|
|    1|   4|       137875|
|    1|   5|        60229|
|    2|   1|       134924|
|    2|   2|       129914|
|    2|   3|       138929|
|    2|   4|       136372|
|    3|   1|       139401|
|    3|   2|       142861|
|    3|   3|       138140|
|    3|   4|       142412|
|    3|   5|        59518|
|    4|   1|       142217|
|    4|   2|       140864|
|    4|   3|       140149|
|    4|   4|       139503|
|    4|   5|        40777|
|    5|   1|       141346|
+-----+----+-------------+
only showing top 20 rows



In [0]:
#Problem 8
#Which flights covered maximum origin and destination by month wise

In [0]:
from pyspark.sql.functions import countDistinct, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

In [0]:
fts_2 = fts.filter(fts.Cancelled == 0).groupBy('Month', 'FlightNum').agg(countDistinct('Origin').alias("Max_Origins")).sort('Month', desc('Max_Origins'))
windowDept = Window.partitionBy('Month').orderBy(col("Max_Origins").desc())
fts_2.withColumn("row",row_number().over(windowDept)).filter(col("row") == 1).drop("row").show()

+-----+---------+-----------+
|Month|FlightNum|Max_Origins|
+-----+---------+-----------+
|    1|      433|         18|
|    2|      500|         18|
|    3|      644|         18|
|    4|      644|         17|
|    5|      644|         17|
|    6|      226|         18|
|    7|      425|         17|
|    8|       67|         18|
|    9|       62|         20|
|   10|       66|         20|
|   11|      303|         21|
|   12|      151|         22|
+-----+---------+-----------+



In [0]:
fts_2 = fts.filter(fts.Cancelled == 0).groupBy('Month', 'FlightNum').agg(countDistinct('Dest').alias("Max_Dest")).sort('Month', desc('Max_Dest'))
windowDept = Window.partitionBy('Month').orderBy(col("Max_Dest").desc())
fts_2.withColumn("row",row_number().over(windowDept)).filter(col("row") == 1).drop("row").show()

+-----+---------+--------+
|Month|FlightNum|Max_Dest|
+-----+---------+--------+
|    1|      372|      18|
|    2|      432|      17|
|    3|      432|      17|
|    4|      473|      17|
|    5|      644|      19|
|    6|      308|      18|
|    7|      425|      17|
|    8|       67|      18|
|    9|      385|      18|
|   10|       66|      20|
|   11|      303|      21|
|   12|      151|      20|
+-----+---------+--------+



In [0]:
#Problem 9
#Average month wise arrival delay (flightnum wise)

In [0]:
fts.groupBy('FlightNum', 'Month').agg(round(avg('ArrDelay'), 2).alias('Average Arrival Delay')).orderBy('FlightNum', 'Month').show()

+---------+-----+---------------------+
|FlightNum|Month|Average Arrival Delay|
+---------+-----+---------------------+
|        1|    1|                10.72|
|        1|    2|                18.31|
|        1|    3|                 8.58|
|        1|    4|                 1.68|
|        1|    5|                 3.22|
|        1|    6|                 9.65|
|        1|    7|                 6.44|
|        1|    8|                 8.57|
|        1|    9|                 1.27|
|        1|   10|                 7.06|
|        1|   11|                -1.47|
|        1|   12|                 8.18|
|        2|    1|                10.94|
|        2|    2|                 6.04|
|        2|    3|                 9.85|
|        2|    4|                 2.57|
|        2|    5|                 5.28|
|        2|    6|                15.55|
|        2|    7|                10.87|
|        2|    8|                 9.39|
+---------+-----+---------------------+
only showing top 20 rows



In [0]:
#Problem 10
#Average month wise departure delay (flightnum wise)

In [0]:
fts.groupBy('FlightNum', 'Month').agg(round(avg('DepDelay'), 2).alias('Average Departure Delay')).orderBy('FlightNum', 'Month').show()

+---------+-----+-----------------------+
|FlightNum|Month|Average Departure Delay|
+---------+-----+-----------------------+
|        1|    1|                   9.85|
|        1|    2|                  16.93|
|        1|    3|                  11.17|
|        1|    4|                   5.62|
|        1|    5|                   3.71|
|        1|    6|                  12.01|
|        1|    7|                   9.79|
|        1|    8|                  10.46|
|        1|    9|                   4.58|
|        1|   10|                   9.89|
|        1|   11|                   3.94|
|        1|   12|                  10.61|
|        2|    1|                  12.22|
|        2|    2|                   8.56|
|        2|    3|                   7.49|
|        2|    4|                    3.6|
|        2|    5|                   3.96|
|        2|    6|                  11.51|
|        2|    7|                   8.07|
|        2|    8|                   9.33|
+---------+-----+-----------------