# Joining Datasets and Performing Aggregations on Grouped Data

## Xander Hieken
***
### 1. Load both files into Spark and print the schema

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

# Starting the SparkSession
spark = SparkSession.builder.appName("FlightData").getOrCreate()

# Loading both files into Spark
df_flights = spark.read.parquet("flights.parquet")
df_airport_codes = spark.read.load("airport-codes.csv", format = "csv", sep = ",", inferSchema = True, header = True)

# Printing the schema
df_flights.printSchema()
df_airport_codes.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- l

***
### 2. Join the flight data to airport codes data by matching the IATA code of the originating flight to the IATA code in the airport codes file, then print the schema.

In [2]:
left_join = df_flights.join(df_airport_codes, 
                            df_flights.origin_airport_code == df_airport_codes.iata_code,
                            how = "left")
left_join.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_c

***
### 3. Remove the following columns from the joined dataframe:
* \_\_index_level_0\_\_
* ident
* local_code  
* continent
* iso_country
* iata_code

In [3]:
df_dropped = left_join.drop("__index_level_0__",
                            "ident",
                            "local_code",
                            "continent",
                            "iso_country",
                            "iata_code")
df_dropped.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



***
### 4. Rename the following columns:
* type: origin_airport_type
* name: origin_airport_name
* elevation_ft: origin_airport_elevation_ft
* iso_region: origin_airport_region
* municipality: origin_airport_municipality
* gps_code: origin_airport_gps_code
* coordinates: origin_airport_coordinates

In [4]:
df_renamed = df_dropped.withColumnRenamed("type", "origin_airport_type")\
                       .withColumnRenamed("name", "origin_airport_name")\
                       .withColumnRenamed("elevation_ft", "origin_airport_elevation_ft")\
                       .withColumnRenamed("iso_region", "origin_airport_region")\
                       .withColumnRenamed("municipality", "origin_airport_municipality")\
                       .withColumnRenamed("gps_code", "origin_airport_gps_code")\
                       .withColumnRenamed("coordinates", "origin_airport_coordinates")
df_renamed.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)



***
### 5. Repeat steps 2-4 for destinations

Join the data on destination airport instead of origin airport

In [5]:
left_join_dest = df_renamed.join(df_airport_codes, 
                                 df_renamed.destination_airport_code == df_airport_codes.iata_code,
                                 how = "left")
left_join_dest.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |--

Drop the same columns from step 3, then rename the same columns from step 4 using the prefix `destination_airport_` instead of `origin_airport_` and print the schema

In [6]:
df_dropped_dest = left_join_dest.drop("__index_level_0__",
                            "ident",
                            "local_code",
                            "continent",
                            "iso_country",
                            "iata_code")

df_renamed_dest = df_dropped_dest.withColumnRenamed("type", "destination_airport_type")\
                                 .withColumnRenamed("name", "destination_airport_name")\
                                 .withColumnRenamed("elevation_ft", "destination_airport_elevation_ft")\
                                 .withColumnRenamed("iso_region", "destination_airport_region")\
                                 .withColumnRenamed("municipality", "destination_airport_municipality")\
                                 .withColumnRenamed("gps_code", "destination_airport_gps_code")\
                                 .withColumnRenamed("coordinates", "destination_airport_coordinates")

df_renamed_dest.printSchema()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type: string (nullable = true)
 |-- destination_airp

***
### 6. Create a dataframe using only data from 2008. This dataframe will be a report of the top ten airports by the number of inbound passengers. 

**This dataframe should contain the following fields:**
* Rank (1-10)
* Name
* IATA code
* Total Inbound Passengers
* Total Inbound Flights
* Average Daily Passengers
* Average Inbound Flights

In [7]:
from pyspark.sql.functions import col, desc
from pyspark.sql import Window
import pyspark.sql.functions as psf

# Create a dataframe for just 2008 data
df08 = df_renamed_dest.filter(col("flight_year") == 2008)

# Create a dataframe for total inbound passengers of each airport
ranked = df08.groupBy("destination_airport_name").sum("passengers")

# Use Window to order and rank by the number of inbound passengers
windowA = Window.orderBy(psf.desc("sum(passengers)"))
dfr = ranked.withColumn("Rank", psf.dense_rank().over(windowA))

dfr = dfr.select("Rank", "destination_airport_name", "sum(passengers)")\
         .withColumnRenamed("destination_airport_name", "NameA")\
         .withColumnRenamed("sum(passengers)", "Total_Inbound_Passengers")\
         .withColumn("Average_Daily_Passengers", ((col('Total_Inbound_Passengers')/365)))

# Create dataframe for total inbound flights of each airport
flights = df08.groupBy("destination_airport_name").sum("flights")

dff = flights.withColumnRenamed("destination_airport_name", "NameB")\
             .withColumnRenamed("sum(flights)", "Total_Inbound_Flights")\
             .withColumn("Average_Inbound_Flights", ((col('Total_Inbound_Flights')/365)))

# Join the last two dataframes 
df_joined = dfr.join(dff, dfr.NameA == dff.NameB, how = "left")
df_joined = df_joined.drop("NameB").orderBy("Rank")

# Join the previous dataframe with df_airport_codes, to get the iata_code column
final_df = df_joined.join(df_airport_codes, df_joined.NameA == df_airport_codes.name)

# Create the final dataframe for Top Ten Airports
final_df = final_df.select("Rank", "NameA", "iata_code", "Total_Inbound_Passengers", 
                           "Total_Inbound_Flights", "Average_Daily_Passengers", "Average_Inbound_Flights")\
                   .withColumnRenamed("NameA", "Name")\
                   .withColumnRenamed("iata_code", "IATA_Code")\
                   .limit(10)

final_df.toPandas() # Displaying results in Pandas, so they're easier to read

Unnamed: 0,Rank,Name,IATA_Code,Total_Inbound_Passengers,Total_Inbound_Flights,Average_Daily_Passengers,Average_Inbound_Flights
0,1,Hartsfield Jackson Atlanta International Airport,ATL,35561795,395192,97429.575342,1082.717808
1,2,Chicago O'Hare International Airport,ORD,26398793,356570,72325.460274,976.90411
2,3,Dallas Fort Worth International Airport,DFW,22883558,270243,62694.679452,740.391781
3,4,Los Angeles International Airport,LAX,19741782,215000,54087.073973,589.041096
4,5,McCarran International Airport,LAS,18262263,164123,50033.59726,449.652055
5,6,Phoenix Sky Harbor International Airport,PHX,17305718,181259,47412.926027,496.6
6,7,Charlotte Douglas International Airport,CLT,15038489,205040,41201.339726,561.753425
7,8,George Bush Intercontinental Houston Airport,IAH,14870717,214245,40741.690411,586.972603
8,9,Orlando International Airport,MCO,14581086,131710,39948.180822,360.849315
9,10,Detroit Metropolitan Wayne County Airport,DTW,14228887,191910,38983.252055,525.780822


***
### 7. Create a user-defined function in Python that will convert the string coordinates into numeric coordinates

In [8]:
from pyspark.sql.functions import udf

@udf('double')
def get_latitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None
    return float(split_coords[0].strip())

@udf('double')
def get_longitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None
    return float(split_coords[1].strip())

***
### 8. Add new columns for:
* destination_airport_longitude
* destination_airport_latitude
* origin_airport_longitude
* origin_airport_latitude 

In [9]:
dfUDF = df_renamed_dest.withColumn('destination_airport_longitude', get_longitude(df_renamed_dest['destination_airport_coordinates']))\
                       .withColumn('destination_airport_latitude', get_latitude(df_renamed_dest['destination_airport_coordinates']))\
                       .withColumn('origin_airport_longitude', get_longitude(df_renamed_dest['origin_airport_coordinates']))\
                       .withColumn('origin_airport_latitude', get_latitude(df_renamed_dest['origin_airport_coordinates']))

dfUDF.printSchema()

spark.stop()

root
 |-- origin_airport_code: string (nullable = true)
 |-- destination_airport_code: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- passengers: long (nullable = true)
 |-- seats: long (nullable = true)
 |-- flights: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- origin_population: long (nullable = true)
 |-- destination_population: long (nullable = true)
 |-- flight_year: long (nullable = true)
 |-- flight_month: long (nullable = true)
 |-- origin_airport_type: string (nullable = true)
 |-- origin_airport_name: string (nullable = true)
 |-- origin_airport_elevation_ft: double (nullable = true)
 |-- origin_airport_region: string (nullable = true)
 |-- origin_airport_municipality: string (nullable = true)
 |-- origin_airport_gps_code: string (nullable = true)
 |-- origin_airport_coordinates: string (nullable = true)
 |-- destination_airport_type: string (nullable = true)
 |-- destination_airp