In [0]:
dbfs_fileStore_prefix = "/FileStore/tables"
prefix = "ontimeperformance"
size = "small"
year = 2000

In [0]:
#Dataframe - Original -> Unoptimized method (No caching, no broadcast join, no repartitioning or coalescing)
def task_2(spark_session, flights_path, airlines_path, year):
  from pyspark.sql import functions as func
  from pyspark.sql.functions import col, format_string
  from pyspark.sql.functions import desc
  from pyspark.sql.functions import col, when
  from pyspark.sql.functions import year as yr
  from pyspark.sql.functions import to_date
  
  #Creation of dataframes
  flight_path_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .load(flights_path) \

  air_path_df = spark.read.format("csv") \
                          .option("header", "true") \
                          .option("inferSchema", "true") \
                          .load(airlines_path) \

  #Cleaning Dataframes and specifying columns
  flight_clean_df = flight_path_df.select(func.col('carrier_code'), func.col(' flight_date').alias('flight_date'), func.col(' scheduled_depature_time').alias('scheduled_departure'), func.col(' actual_departure_time').alias('actual_departure'))
  air_clean_df = air_path_df.select(func.col('carrier_code'), func.col('name'), func.col('country').alias('country'))
  
  #Extraction and filter for specified year that flight occurs
  flight_clean_df = flight_clean_df.withColumn('flight_year', yr(flight_clean_df.flight_date))
  flight_clean_df = flight_clean_df.select(func.col('carrier_code'), func.col('flight_year'), func.col('scheduled_departure'), func.col('actual_departure'))  
  flight_clean = flight_clean_df.filter(flight_clean_df.flight_year == year)

  #Filter for airlines based in United States - As specified in assessment
  airlines_clean = air_clean_df.filter(air_clean_df.country == 'United States')
  
  #Shorten variable names for easier coding
  flights = flight_clean
  airlines = airlines_clean
  
  #Perform Left Outer Join on the two dataframes
  flight_departure = flights.join(airlines, 'carrier_code', 'left_outer')
  
  #Filter all airline names that display null and order in ascending alphebetical order
  flight_departure = flight_departure.filter(flight_departure.name != 'null')
  flight_departure = flight_departure.orderBy('name', ascending = True)
  flight_departure = flight_departure.filter(flight_departure.actual_departure > 0)
  
  #Insert trailing zeros into departures to even string length for string split
  flight_departure = flight_departure.withColumn('scheduled_departure', format_string("%04d", col('scheduled_departure').cast('int')))
  flight_departure = flight_departure.withColumn('actual_departure', format_string("%04d", col('actual_departure').cast('int')))
  
  #Split hours and minutes and convert 00 to 24 to reflect 24 hour time
  flight_departure = flight_departure.withColumn("scheduled_hour", flight_departure.scheduled_departure.substr(1,2))
  flight_departure = flight_departure.withColumn("scheduled_minute", flight_departure.scheduled_departure.substr(3,4))
  flight_departure = flight_departure.withColumn("actual_hour", flight_departure.actual_departure.substr(1,2))
  flight_departure = flight_departure.withColumn("actual_minute", flight_departure.actual_departure.substr(3,4))
  flight_departure = flight_departure.withColumn('scheduled_hour_24', when(flight_departure.scheduled_hour == '00', '24') \
                                                              .otherwise(flight_departure.scheduled_hour))
  flight_departure = flight_departure.withColumn('actual_hour_24', when(flight_departure.actual_hour == '00', '24') \
                                                              .otherwise(flight_departure.actual_hour))
  
  #Convert hours to minutes - several computations done on a smaller flight_clean dataframe
  flight_departure = flight_departure.withColumn('scheduled_hour_to_minutes', col('scheduled_hour_24')*60)
  flight_departure = flight_departure.withColumn('actual_hour_to_minutes', col('actual_hour_24')*60)
  flight_departure = flight_departure.withColumn('total_scheduled_minutes',col('scheduled_hour_to_minutes') + col('scheduled_minute'))
  flight_departure = flight_departure.withColumn('total_actual_minutes',col('actual_hour_to_minutes') + col('actual_minute'))
  
  #Create a column that calculates the amount of minutes of flight delays
  flight_departure = flight_departure.withColumn('delayed_minutes', (col('total_actual_minutes') - col('total_scheduled_minutes')))
  flight_departure = flight_departure.filter(func.col('delayed_minutes') > 0)  
  
  
  min_delay = flight_departure.groupBy('name') \
                              .agg({'delayed_minutes': 'min'})
  max_delay = flight_departure.groupBy('name') \
                              .agg({'delayed_minutes': 'max'})
  avg_delay = flight_departure.groupBy('name') \
                              .agg({'delayed_minutes': 'avg'})
  count_delay = flight_departure.groupBy('name') \
                              .count()
  
  total_delay = count_delay.join(avg_delay, 'name', 'left_outer').join(min_delay, 'name', 'left_outer').join(max_delay, 'name', 'left_outer')
  delays = total_delay.select(func.col('name').alias('airline_name'), func.col('count').alias('num_delays'), func.col('avg(delayed_minutes)').alias('average_delay'), func.col('min(delayed_minutes)').alias('min_delay'), func.col('max(delayed_minutes)').alias('max_delay'))
  delays = delays.orderBy(func.col('airline_name').asc())
  
  display(delays)

In [0]:
#Enhanced Dataframe Method - Optimized Dataframe (Use of broadcast join, caching, and code logic for optimal operation runtime.)
def task_2(spark_session,flights_path, airlines_path, year):
  from pyspark.sql import functions as func
  from pyspark.sql import SQLContext
  from pyspark import SparkContext, SparkConf
  from pyspark.sql.functions import broadcast
  from pyspark.sql.functions import desc
  from pyspark.sql.functions import col, when
  from pyspark.sql.functions import year as yr
  from pyspark.sql.functions import col, format_string
  
  conf = SparkConf().setMaster("local").setAppName("Task2")
  sc = SparkContext.getOrCreate(conf=conf)
  sqlContext = SQLContext(sc)
  
  #Import, clean, and filter flights_path data - Cache Dataframe
  flight_clean = sqlContext.read.csv(flights_path, header = True).select(('carrier_code'), func.col(' flight_date').alias('flight_date'), func.col(' scheduled_depature_time').alias('scheduled_departure'), func.col(' actual_departure_time').alias('actual_departure'))
  flight_clean.cache() #Cache the table since this table will be used multiple times for filtering and computations.
  
  #Insert trailing zeroes into scheduled departure and actual departure
  flight_clean = flight_clean.withColumn('scheduled_departure', format_string("%04d", col('scheduled_departure').cast('int')))
  flight_clean = flight_clean.withColumn('actual_departure', format_string("%04d", col('actual_departure').cast('int')))
  
  #Clear Null Characters from Actual Departure and Scheduled Departure
  flight_clean = flight_clean.filter(flight_clean.actual_departure != 'null')
  flight_clean = flight_clean.filter(flight_clean.scheduled_departure != 'null')
  
  #Split hours and minutes and convert 00 to 24 to reflect 24 hour time
  flight_clean = flight_clean.withColumn("scheduled_hour", flight_clean.scheduled_departure.substr(1,2))
  flight_clean = flight_clean.withColumn("scheduled_minute", flight_clean.scheduled_departure.substr(3,4))
  flight_clean = flight_clean.withColumn("actual_hour", flight_clean.actual_departure.substr(1,2))
  flight_clean = flight_clean.withColumn("actual_minute", flight_clean.actual_departure.substr(3,4))
  flight_clean = flight_clean.withColumn('scheduled_hour_24', when(flight_clean.scheduled_hour == '00', '24') \
                                                              .otherwise(flight_clean.scheduled_hour))
  flight_clean = flight_clean.withColumn('actual_hour_24', when(flight_clean.actual_hour == '00', '24') \
                                                              .otherwise(flight_clean.actual_hour))
  
  #Extract year 2000 from the flight date - extract and filter out all flights that took place in 2000 - Decrease the amount of rows to process in the subsequent code that does computations
  flight_clean = flight_clean.withColumn('flight_date', yr(flight_clean.flight_date))
  #flight_clean = flight_clean.select(func.col('carrier_code'), func.col('flight_year'), func.col('scheduled_departure_minutes'), func.col('actual_departure_minutes'))  
  flight_clean = flight_clean.filter(flight_clean.flight_date == year)
  flight_clean.cache()
  
  #Convert hours to minutes - several computations done on a smaller flight_clean dataframe
  flight_clean = flight_clean.withColumn('scheduled_hour_to_minutes', col('scheduled_hour_24')*60)
  flight_clean = flight_clean.withColumn('actual_hour_to_minutes', col('actual_hour_24')*60)
  flight_clean = flight_clean.withColumn('total_scheduled_minutes',col('scheduled_hour_to_minutes') + col('scheduled_minute'))
  flight_clean = flight_clean.withColumn('total_actual_minutes',col('actual_hour_to_minutes') + col('actual_minute'))
  flight_clean = flight_clean.withColumn('delayed_minutes', col('total_actual_minutes') - col('total_scheduled_minutes'))
  flight_clean = flight_clean.select(('carrier_code'), ('flight_date'), ('delayed_minutes'))
  flights = flight_clean.filter(flight_clean.delayed_minutes > 0)
  flights.cache() 
  
  #Import, clean, and filter airlines_path data
  airline_df = sqlContext.read.csv(airlines_path, header = True)
  airline_clean = airline_df.select(func.col('carrier_code'), func.col('name'), func.col('country').alias('country'))
  airlines = airline_clean.filter(airline_clean.country == 'United States')
  
  #Join two tables by carrier_code
  airline_delays = flights.join(broadcast(airlines), 'carrier_code')
  airline_delays.cache() #Cache the table for quicker aggregate calculations (min, max, average, count)
  
  min_delay = airline_delays.groupBy('name') \
                              .agg({'delayed_minutes': 'min'})
  max_delay = airline_delays.groupBy('name') \
                              .agg({'delayed_minutes': 'max'})
  avg_delay = airline_delays.groupBy('name') \
                              .agg({'delayed_minutes': 'avg'})
  count_delay = airline_delays.groupBy('name') \
                              .count()
  print("Number of partitions: ", airline_delays.rdd.getNumPartitions())
  
  #Use of broadcast hash joins to speed up the joins of several tables.
  airline_delays = count_delay.join(broadcast(avg_delay), 'name').join(broadcast(min_delay), 'name',).join(broadcast(max_delay), 'name')
  
  delays = airline_delays.select(func.col('name').alias('airline_name'), func.col('count').alias('num_delays'), func.col('avg(delayed_minutes)').alias('average_delay'), func.col('min(delayed_minutes)').alias('min_delay'), func.col('max(delayed_minutes)').alias('max_delay'))
  delays = delays.orderBy(func.col('airline_name').asc())
  
  display(delays)
  
  

In [0]:
spark.catalog.clearCache() #This line is required to clear the cache to ensure pure performance for any code block reliant on the same data frames - Please run this before running any of the optimized code. (Not required for unoptimized code, structure is different)

In [0]:
#Enhanced Dataframe Method - Optimized Dataframe (Use of broadcast join, caching, repartition/coalesce and code logic for optimal operation runtime.)
def task_2(spark_session,flights_path, airlines_path, year):
  from pyspark.sql import functions as func
  from pyspark.sql import SQLContext
  from pyspark import SparkContext, SparkConf
  from pyspark.sql.functions import broadcast, coalesce
  from pyspark.sql.functions import desc
  from pyspark.sql.functions import col, when
  from pyspark.sql.functions import year as yr
  from pyspark.sql.functions import col, format_string
  
  conf = SparkConf().setMaster("local").setAppName("Task2")
  sc = SparkContext.getOrCreate(conf=conf)
  sqlContext = SQLContext(sc)
  
  #Import, clean, and filter flights_path data - Cache Dataframe
  flight_clean = sqlContext.read.csv(flights_path, header = True).select(('carrier_code'), func.col(' flight_date').alias('flight_date'), func.col(' scheduled_depature_time').alias('scheduled_departure'), func.col(' actual_departure_time').alias('actual_departure'))
  flight_clean.cache() #Cache the table since this table will be used multiple times for filtering and computations.
  
  #Insert trailing zeroes into scheduled departure and actual departure
  flight_clean = flight_clean.withColumn('scheduled_departure', format_string("%04d", col('scheduled_departure').cast('int')))
  flight_clean = flight_clean.withColumn('actual_departure', format_string("%04d", col('actual_departure').cast('int')))
  
  #Clear Null Characters from Actual Departure and Scheduled Departure
  flight_clean = flight_clean.filter(flight_clean.actual_departure != 'null')
  flight_clean = flight_clean.filter(flight_clean.scheduled_departure != 'null')
  
  #Split hours and minutes and convert 00 to 24 to reflect 24 hour time
  flight_clean = flight_clean.withColumn("scheduled_hour", flight_clean.scheduled_departure.substr(1,2))
  flight_clean = flight_clean.withColumn("scheduled_minute", flight_clean.scheduled_departure.substr(3,4))
  flight_clean = flight_clean.withColumn("actual_hour", flight_clean.actual_departure.substr(1,2))
  flight_clean = flight_clean.withColumn("actual_minute", flight_clean.actual_departure.substr(3,4))
  flight_clean = flight_clean.withColumn('scheduled_hour_24', when(flight_clean.scheduled_hour == '00', '24') \
                                                              .otherwise(flight_clean.scheduled_hour))
  flight_clean = flight_clean.withColumn('actual_hour_24', when(flight_clean.actual_hour == '00', '24') \
                                                              .otherwise(flight_clean.actual_hour))
  
  #Extract year 2000 from the flight date - extract and filter out all flights that took place in 2000 - Decrease the amount of rows to process in the subsequent code that does computations
  flight_clean = flight_clean.withColumn('flight_date', yr(flight_clean.flight_date))
  #flight_clean = flight_clean.select(func.col('carrier_code'), func.col('flight_year'), func.col('scheduled_departure_minutes'), func.col('actual_departure_minutes'))  
  flight_clean = flight_clean.filter(flight_clean.flight_date == year)
  flight_clean.cache()
  
  #Convert hours to minutes - several computations done on a smaller flight_clean dataframe
  flight_clean = flight_clean.withColumn('scheduled_hour_to_minutes', col('scheduled_hour_24')*60)
  flight_clean = flight_clean.withColumn('actual_hour_to_minutes', col('actual_hour_24')*60)
  flight_clean = flight_clean.withColumn('total_scheduled_minutes',col('scheduled_hour_to_minutes') + col('scheduled_minute'))
  flight_clean = flight_clean.withColumn('total_actual_minutes',col('actual_hour_to_minutes') + col('actual_minute'))
  flight_clean = flight_clean.withColumn('delayed_minutes', col('total_actual_minutes') - col('total_scheduled_minutes'))
  flight_clean = flight_clean.select(('carrier_code'), ('flight_date'), ('delayed_minutes'))
  flights = flight_clean.filter(flight_clean.delayed_minutes > 0)
  flights = flights.repartition(2) #Only do this for the large (set to 12 partitions) and massive (set to 36 partitions) dataset. Performing this on smaller data sets may not change performance much.
  print("Number of partitions: ", flights.rdd.getNumPartitions())
  flights.cache() 
  
  #Import, clean, and filter airlines_path data
  airline_df = sqlContext.read.csv(airlines_path, header = True)
  airline_clean = airline_df.select(func.col('carrier_code'), func.col('name'), func.col('country').alias('country'))
  airlines = airline_clean.filter(airline_clean.country == 'United States')
  
  #Join two tables by carrier_code
  airline_delays = flights.join(broadcast(airlines), 'carrier_code')
  airline_delays = airline_delays.coalesce(1) #-> Coalescing can make aggregate calculations much faster. Use this for all data set sizes.
  
  airline_delays.cache() #Cache the table for quicker aggregate calculations (min, max, average, count)
  
  min_delay = airline_delays.groupBy('name') \
                              .agg({'delayed_minutes': 'min'})
  max_delay = airline_delays.groupBy('name') \
                              .agg({'delayed_minutes': 'max'})
  avg_delay = airline_delays.groupBy('name') \
                              .agg({'delayed_minutes': 'avg'})
  count_delay = airline_delays.groupBy('name') \
                              .count()
  print("Number of partitions: ", airline_delays.rdd.getNumPartitions())
  
  #Use of broadcast hash joins to speed up the joins of several tables.
  airline_delays = count_delay.join(broadcast(avg_delay), 'name').join(broadcast(min_delay), 'name',).join(broadcast(max_delay), 'name')
  
  delays = airline_delays.select(func.col('name').alias('airline_name'), func.col('count').alias('num_delays'), func.col('avg(delayed_minutes)').alias('average_delay'), func.col('min(delayed_minutes)').alias('min_delay'), func.col('max(delayed_minutes)').alias('max_delay'))
  delays = delays.orderBy(func.col('airline_name').asc())
  
  display(delays)
  
  

In [0]:
spark.catalog.clearCache() #This line is required to clear the cache to ensure pure performance for any code block reliant on the same data frames - Please run this before running any of the optimized code. (Not required for unoptimized code, structure is different)

In [0]:
  task_2(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_{size}.csv", f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", year)

airline_name,num_delays,average_delay,min_delay,max_delay
Alaska Airlines Inc.,65,24.184615384615384,1.0,119.0
American Airlines Inc.,251,29.95219123505976,1.0,379.0
Continental Air Lines Inc.,127,21.874015748031496,1.0,216.0
Delta Air Lines Inc.,390,31.115384615384617,1.0,1438.0
Northwest Airlines Inc.,144,33.78472222222222,1.0,237.0
Southwest Airlines Co.,358,30.044692737430168,2.0,213.0
US Airways,236,27.944915254237287,1.0,496.0
United Airlines,375,32.16266666666667,1.0,380.0
