## Joining Datasets and Performing Aggregations on Grouped Data

### Import Libraries and Start Spark Session

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as psf

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'ipython3'

In [2]:
spark = SparkSession.builder.appName("Week 4 Assignment").getOrCreate()

### 1) Flight Data

In [3]:
df_flights = spark.read.parquet("data/domestic-flights/flights.parquet")

In [4]:
df_airport_codes = spark.read.format("csv").options(header="true", inferSchema="true").load("data/airport-codes/airport-codes.csv")

In [5]:
df_flights.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [6]:
df_airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [7]:
df_flights.show()

+-------------------+------------------------+-------------+----------------+----------+-----+-------+--------+-----------------+----------------------+-----------+------------+-----------------+
|origin_airport_code|destination_airport_code|  origin_city|destination_city|passengers|seats|flights|distance|origin_population|destination_population|flight_year|flight_month|__index_level_0__|
+-------------------+------------------------+-------------+----------------+----------+-----+-------+--------+-----------------+----------------------+-----------+------------+-----------------+
|                MHK|                     AMW|Manhattan, KS|        Ames, IA|        21|   30|      1|   254.0|           122049|                 86219|       2008|          10|                0|
|                EUG|                     RDM|   Eugene, OR|        Bend, OR|        41|  396|     22|   103.0|           284093|                 76034|       1990|          11|                1|
|                EUG

In [8]:
df_airport_codes.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|        11.0|     null|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|      3435.0|     null|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|       450.0|     null|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|       820.0|     null|         US|     

#### a) Join Data

In [9]:
df_combined = df_flights.join(df_airport_codes, df_flights.origin_airport_code == df_airport_codes.iata_code, "left")

In [10]:
df_combined.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_c

#### b) Rename and Remove Columns

In [11]:
# Drop Columns
df_combined = df_combined.drop("__index_level_0__", "ident", "local_code", "continent", "iso_country", "iata_code")

# Rename Columns
df_combined = df_combined.withColumnRenamed("type", "origin_airport_type") \
                         .withColumnRenamed("name", "origin_airport_name") \
                         .withColumnRenamed("elevation_ft", "origin_airport_elevation_ft") \
                         .withColumnRenamed("iso_region", "origin_airport_region") \
                         .withColumnRenamed("municipality", "origin_airport_municipality") \
                         .withColumnRenamed("gps_code", "origin_airport_gps_code") \
                         .withColumnRenamed("coordinates", "origin_airport_coordinates")

In [12]:
df_combined.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)



#### c) Join to Destination Airport

In [13]:
df_combined = df_combined.join(df_airport_codes, df_flights.destination_airport_code == df_airport_codes.iata_code, "left")

In [14]:
df_combined.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |--

In [15]:
# Drop Columns
df_combined = df_combined.drop("__index_level_0__", "ident", "local_code", "continent", "iso_country", "iata_code")

# Rename Columns
df_combined = df_combined.withColumnRenamed("type", "destination_airport_type") \
                         .withColumnRenamed("name", "destination_airport_name") \
                         .withColumnRenamed("elevation_ft", "destination_airport_elevation_ft") \
                         .withColumnRenamed("iso_region", "destination_airport_region") \
                         .withColumnRenamed("municipality", "destination_airport_municipality") \
                         .withColumnRenamed("gps_code", "destination_airport_gps_code") \
                         .withColumnRenamed("coordinates", "destination_airport_coordinates")

In [16]:
df_combined.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type: string (nullable = true)
 |-- destination_airp

#### d) Top Ten Airports

In [17]:
# Filter Data
df_2008 = df_combined.filter(df_combined.flight_year == "2008")

In [18]:
# Perform Aggregations
df_top10 = df_2008.groupBy("origin_airport_code", "origin_city") \
                  .agg(psf.sum("passengers"), psf.sum("flights"), psf.avg("passengers"), psf.avg("flights")) \
                  .selectExpr("origin_city", "origin_airport_code", "`sum(passengers)` as total_inbound_passengers", "`sum(flights)` as total_inbound_flights", "`avg(passengers)` as average_daily_passengers", "`avg(flights)` as average_daily_flights") \
                  .orderBy(psf.desc("sum(passengers)"))

In [19]:
df_top10.show(10)

+---------------+-------------------+------------------------+---------------------+------------------------+---------------------+
|    origin_city|origin_airport_code|total_inbound_passengers|total_inbound_flights|average_daily_passengers|average_daily_flights|
+---------------+-------------------+------------------------+---------------------+------------------------+---------------------+
|    Atlanta, GA|                ATL|                35435896|               395729|       4097.109029945658|    45.75430685628396|
|    Chicago, IL|                ORD|                26422032|               357181|      2799.5371900826444|   37.844988344988344|
|     Dallas, TX|                DFW|                22835496|               270055|       4659.354417465824|     55.1020199959192|
|Los Angeles, CA|                LAX|                19757561|               215359|       4029.688150112176|    43.92392412808485|
|  Las Vegas, NV|                LAS|                18315421|              

In [20]:
df_top10 = df_top10.withColumn("rank", psf.rank().over(Window.orderBy(psf.desc("total_inbound_passengers"))))

In [21]:
df_top10.show(10)

+---------------+-------------------+------------------------+---------------------+------------------------+---------------------+----+
|    origin_city|origin_airport_code|total_inbound_passengers|total_inbound_flights|average_daily_passengers|average_daily_flights|rank|
+---------------+-------------------+------------------------+---------------------+------------------------+---------------------+----+
|    Atlanta, GA|                ATL|                35435896|               395729|       4097.109029945658|    45.75430685628396|   1|
|    Chicago, IL|                ORD|                26422032|               357181|      2799.5371900826444|   37.844988344988344|   2|
|     Dallas, TX|                DFW|                22835496|               270055|       4659.354417465824|     55.1020199959192|   3|
|Los Angeles, CA|                LAX|                19757561|               215359|       4029.688150112176|    43.92392412808485|   4|
|  Las Vegas, NV|                LAS|    

#### e) User Defined Functions

In [22]:
@psf.udf('double')
def get_latitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None

    return float(split_coords[0].strip())


@psf.udf('double')
def get_longitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None

    return float(split_coords[1].strip())

In [23]:
df_combined = df_combined.withColumn('destination_airport_longitude', get_longitude(df_combined['destination_airport_coordinates'])) \
                         .withColumn('destination_airport_latitude', get_latitude(df_combined['destination_airport_coordinates'])) \
                         .withColumn('origin_airport_longitude', get_longitude(df_combined['origin_airport_coordinates'])) \
                         .withColumn('origin_airport_latitude', get_latitude(df_combined['origin_airport_coordinates']))

In [24]:
df_combined.show()

+-------------------+------------------------+-------------+----------------+----------+-----+-------+--------+-----------------+----------------------+-----------+------------+-------------------+--------------------+---------------------------+---------------------+---------------------------+-----------------------+--------------------------+------------------------+------------------------+--------------------------------+--------------------------+--------------------------------+----------------------------+-------------------------------+-----------------------------+----------------------------+------------------------+-----------------------+
|origin_airport_code|destination_airport_code|  origin_city|destination_city|passengers|seats|flights|distance|origin_population|destination_population|flight_year|flight_month|origin_airport_type| origin_airport_name|origin_airport_elevation_ft|origin_airport_region|origin_airport_municipality|origin_airport_gps_code|origin_airport_coord