In [1]:
# Robert K Burns
# DSC650 Big Data
# Dec 18, 2019
# Assignment 4.2  Programing Exercise: Joining Datasets and Performing Aggregations on Grouped Data

In [3]:
# import data
df_flights = spark.read.parquet("/FileStore/tables/flights.parquet")
df_airport_codes = spark.read.load("/FileStore/tables/airport_codes-e62ed.csv",
  format="csv",
  sep=",",
  inferSchema=True,
  header=True
)


In [4]:
df_flights.printSchema()

In [5]:
df_airport_codes.printSchema()

In [6]:
# A. Join the Data
origin_join = df_flights.join(df_airport_codes, df_flights.origin_airport_code == df_airport_codes.iata_code, how = 'left_outer')
origin_join.show()

In [7]:
origin_join.printSchema()

In [8]:
# B. Delete columns
columns_to_drop = ['__index_level_0__', 'ident', 'local_code', 'continent', 'iso_country', 'iata_code']
origin_join = origin_join.drop(*columns_to_drop)

In [9]:
origin_join.printSchema()

In [10]:
# B, con't.  Rename columns
origin_join = origin_join.withColumnRenamed("type" , "origin_airport_type").withColumnRenamed("name","origin_airport_name").withColumnRenamed("elevation_ft","origin_airport_elevation_ft").withColumnRenamed("iso_region","origin_airport_region").withColumnRenamed("municipality","origin_airport_municipality").withColumnRenamed("gps_code","origin_airport_gps_code").withColumnRenamed("coordinates","origin_airport_coordinates")

In [11]:
origin_join.printSchema()

In [12]:
# C/A Join to Destination Airport
dest_join = df_flights.join(df_airport_codes, df_flights.destination_airport_code == df_airport_codes.iata_code, how = 'left_outer')
dest_join.show()

In [14]:
# C/B. Delete columns
columns_to_drop = ['__index_level_0__', 'ident', 'local_code', 'continent', 'iso_country', 'iata_code']
dest_join = dest_join.drop(*columns_to_drop)

In [15]:
# C/B, con't.  Rename columns
dest_join = dest_join.withColumnRenamed("type" , "destination_airport_type").withColumnRenamed("name","destination_airport_name").withColumnRenamed("elevation_ft","destination_airport_elevation_ft").withColumnRenamed("iso_region","destination_airport_region").withColumnRenamed("municipality","destination_airport_municipality").withColumnRenamed("gps_code","destination_airport_gps_code").withColumnRenamed("coordinates","destination_airport_coordinates")

In [16]:
dest_join.printSchema()

In [18]:
# D Top Ten Airports
# filter - show only 2008
dest2008=dest_join.filter((dest_join['flight_year']=='2008'))


In [19]:
from pyspark.sql.functions import desc, asc
from pyspark.sql.functions import sum, count, avg, expr


In [20]:
from pyspark.sql import Window
import pyspark.sql.functions as psf

In [21]:
testdf = dest2008.groupBy("destination_airport_name").agg(sum("passengers"), sum("flights"), avg("passengers"), avg("flights"))

In [22]:
testdf.describe()

In [23]:
rankme = Window.orderBy(psf.desc("sum(passengers)"))

rankdf = testdf.withColumn(
    "rank", 
    psf.dense_rank().over(rankme)
)


In [24]:
top10=rankdf.filter((rankdf['rank']<='10'))
top10.show()

In [26]:
# E. User Defined Functions
from pyspark.sql.functions import udf

@udf('double')
def get_latitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None

    return float(split_coords[0].strip())


@udf('double')
def get_longitude(coordinates):
    split_coords = coordinates.split(',')
    if len(split_coords) != 2:
        return None

    return float(split_coords[1].strip())

destcoords = dest_join.withColumn(
  'destination_airport_longitude',
  get_longitude(dest_join['destination_airport_coordinates'])
).withColumn('destination_airport_latitude', get_latitude(dest_join['destination_airport_coordinates']))


In [27]:
destcoords.show()