# FlightAware Statistics
The notebook intends to leverage flightaware raw data and generate the turnaround statisitcs by building data engineering pipelines.

In [1]:
import sys
from pyspark.sql import functions as F
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
from pyspark.sql import SparkSession, Window
import numpy as np
import functools

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1582572919676_0001,pyspark3,idle,,,✔


SparkSession available as 'spark'.


In [2]:
# prepare airline code schema
df_airline = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("encoding", "utf-8").load("s3://sita-coe-ds-dev-v1/jupyter/jovyan/airlines_capa_wicao_20200218.csv")
df_airline = df_airline.dropna(how = 'any',  subset = ['ICAO'])
df_airline = df_airline.dropDuplicates(subset = ['ICAO'])

In [3]:
# prepare airport code schema
df_airport = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("encoding", "utf-8").load("s3://sita-coe-ds-dev-v1/jupyter/jovyan/airports_openflight_wgeo_updated_20200212.csv")
df_airport = df_airport.dropna(how = 'any',  subset = ['ICAO'])
df_airport = df_airport.dropDuplicates(subset = ['ICAO'])

In [4]:
df_airport = df_airport.withColumn('geo_seg', F.when((F.col('country') == 'Canada') | (F.col('country') == 'United States'), 'NA(CA&US)').when(F.col('Continent/Region') == 'Africa', 'Africa').when(F.col('country') == 'China', 'China').when(F.col('country') == 'India', 'India').otherwise(F.col('GEO'))).drop('GEO')
df_airport = df_airport.withColumn('geo_seg', F.when(F.col('geo_seg') == 'AMER', 'AMER(Excl US&CA)').when(F.col('geo_seg') == 'APAC', 'APAC(Excl China)').when(F.col('geo_seg') == 'MEIA', 'MEIA(Excl Africa&India)').otherwise(F.col('geo_seg')) )

In [5]:
# prepare aircraft model schema
df_aircraft = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("encoding", "utf-8").load("s3://sita-coe-ds-dev-v1/jupyter/jovyan/aircrafts_openflight.csv")
df_aircraft = df_aircraft.dropna(how = 'any',  subset = ['icao'])
df_aircraft = df_aircraft.dropDuplicates(subset = ['icao'])

In [6]:
def dist(lat_x, long_x, lat_y, long_y):
    """
    Inspired from https://stackoverflow.com/questions/38994903/how-to-sum-distances-between-data-points-in-a-dataset-using-pyspark
    """
    return acos(
        sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
        cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
            cos(toRadians(long_x) - toRadians(long_y))
    ) * lit(6371.0)

In [7]:
def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs)

## Data Preparation

In [None]:
# Load FA data from predifined table and create TempView
spark = SparkSession.builder.getOrCreate()
df_pos = spark.read.load("s3://sita-coe-ds-prod-v1/mart/flightaware/position")
df_pos.createOrReplaceTempView("pos")

In [None]:
# filter der table by ident (i.e., null, helicopter, private jet etc.)
df_departure_pos = spark.sql("""
               SELECT id, id_timestamp, reg, aircrafttype, ident, orig, dest FROM pos
               WHERE id is not null
               """)
df_departure_pos = df_departure_pos.dropDuplicates()

In [None]:
# filter der table by ident (i.e., null, helicopter, private jet etc.)
df_departure_pos = spark.sql("""
               SELECT id, id_timestamp, reg, aircrafttype, ident, orig, dest FROM pos
               WHERE id is not null
               AND (orig RLIKE '^[A-Z][A-Z][A-Z][A-Z]$' OR dest RLIKE '^[A-Z][A-Z][A-Z][A-Z]$')
               """)
df_departure_pos = df_departure_pos.dropDuplicates()

In [None]:
# filter der table by ident (i.e., null, helicopter, private jet etc.)
df_departure_pos = spark.sql("""
               SELECT id, id_timestamp, reg, aircrafttype, ident, orig, dest FROM pos
               WHERE ident is not null
               AND ident != reg
               AND id is not null
               AND ident RLIKE '^[A-Z]{3}[0-9]'
               AND (orig RLIKE '^[A-Z][A-Z][A-Z][A-Z]$' OR dest RLIKE '[A-Z][A-Z][A-Z][A-Z]$')
               """)
df_departure_pos = df_departure_pos.dropDuplicates()

In [None]:
df_departure_pos = df_departure_pos.withColumn('dep_arr', F.lit(True))

In [8]:
# Load FA data from predifined table and create TempView
df = spark.read.load("s3://sita-coe-ds-prod-v1/mart/flightaware/der_fadooce_latest")
df.createOrReplaceTempView("der_dep")

In [None]:
# filter der table by ident (i.e., null, helicopter, private jet etc.)
df_departure_fad = spark.sql("""
               SELECT id, id_timestamp, reg, aircrafttype, ident, orig, dest, aat_arr, adt_dep FROM der_dep
               WHERE id is not null
               """)
df_departure_fad = df_departure_fad.dropDuplicates()

In [None]:
# filter der table by ident (i.e., null, helicopter, private jet etc.)
df_departure_fad = spark.sql("""
               SELECT id, id_timestamp, reg, aircrafttype, ident, orig, dest, aat_arr, adt_dep FROM der_dep
               WHERE id is not null
               AND (orig RLIKE '^[A-Z][A-Z][A-Z][A-Z]$' OR dest RLIKE '^[A-Z][A-Z][A-Z][A-Z]$')
               """)
df_departure_fad = df_departure_fad.dropDuplicates()

In [9]:
# filter der table by ident (i.e., null, helicopter, private jet etc.)
df_departure_fad = spark.sql("""
               SELECT id, id_timestamp, reg, aircrafttype, ident, orig, dest, aat_arr, adt_dep FROM der_dep
               WHERE ident is not null
               AND ident != reg
               AND id is not null
               AND ident RLIKE '^[A-Z]{3}[0-9]'
               AND (orig RLIKE '^[A-Z][A-Z][A-Z][A-Z]$' OR dest RLIKE '^[A-Z][A-Z][A-Z][A-Z]$')
               """)
df_departure_fad = df_departure_fad.dropDuplicates()

In [None]:
df_departure_fad = df_departure_fad.withColumn('dep_arr', F.col('aat_arr').isNotNull() | F.col('adt_dep').isNotNull()) 
df_departure_fad = df_departure_fad.drop('aat_arr', 'adt_dep')

In [None]:
df_departure = unionAll([df_departure_pos, df_departure_fad])

In [10]:
# create year column (i.e., year 2017, 2018, 2019)
df_departure_fad = df_departure_fad.withColumn('year', F.year(F.date_trunc('year', df_departure_fad.id_timestamp))).withColumn('quarter', F.quarter(F.date_trunc('quarter', df_departure_fad.id_timestamp))).withColumn('month', F.month(F.date_trunc('mon', df_departure_fad.id_timestamp)))
#df_departure = df_departure.withColumn('year', F.year(F.date_trunc('year', df_departure.id_timestamp))).withColumn('quarter', F.quarter(F.date_trunc('quarter', df_departure.id_timestamp))).withColumn('month', F.month(F.date_trunc('mon', df_departure.id_timestamp)))

In [11]:
# filter perticular year-date coverage
df_departure_fad = df_departure_fad.filter(" (year == 2017 and quarter > 1) or (year == 2018) or (year == 2019) or (year == 2020)  ")
#df_departure = df_departure.filter(" (year == 2019)   ")

In [12]:
df_departure_fad =df_departure_fad.dropDuplicates(['id'])
df_departure_fad = df_departure_fad.groupBy('year', 'month').agg(F.countDistinct('id').alias('flight_count')).orderBy('year', 'month')

In [13]:
df_departure_fad.show(40)

+----+-----+------------+
|year|month|flight_count|
+----+-----+------------+
|2017|    4|     2707398|
|2017|    5|     2827287|
|2017|    6|     2850039|
|2017|    7|     3075442|
|2017|    8|     3083789|
|2017|    9|     2899611|
|2017|   10|     2970176|
|2017|   11|     2766476|
|2017|   12|     2847838|
|2018|    1|     2879051|
|2018|    2|     2666110|
|2018|    3|     3039107|
|2018|    4|     3037497|
|2018|    5|     3172727|
|2018|    6|     3214070|
|2018|    7|     3412391|
|2018|    8|     3400244|
|2018|    9|     3280521|
|2018|   10|     3262856|
|2018|   11|     2990129|
|2018|   12|     3047791|
|2019|    1|     3101611|
|2019|    2|     2854253|
|2019|    3|     3227074|
|2019|    4|     3221085|
|2019|    5|     3333827|
|2019|    6|     3374292|
|2019|    7|     3570188|
|2019|    8|     3539471|
|2019|    9|     3322166|
|2019|   10|     3361767|
|2019|   11|     3153208|
|2019|   12|     3316160|
|2020|    1|      662881|
+----+-----+------------+

In [None]:
# filter perticular year-date coverage
df_departure = df_departure.filter(" (year == 2017 and quarter > 1) or (year == 2018) or (year == 2019)   ")
#df_departure = df_departure.filter(" (year == 2018) or (year == 2019)   ")

In [None]:
df_departure =df_departure.dropDuplicates(['id'])

In [None]:
# join orign airport code
#df_dep_null = df_departure.join(F.broadcast(df_airport), df_departure.orig == df_airport.ICAO, 'left').where(df_airport['ICAO'].isNull()).drop('IATA', 'ICAO', 'Type', 'Source', 'Timezone', 'dst', 'tz_dataset', 'Continent/Region')
#df_arr_null = df_departure.join(F.broadcast(df_airport), df_departure.dest == df_airport.ICAO, 'left').where(df_airport['ICAO'].isNull()).drop('IATA', 'ICAO', 'Type', 'Source', 'Timezone', 'dst', 'tz_dataset', 'Continent/Region')
#df_departure = df_departure.withColumnRenamed('Airport Names', 'orig_airport').withColumnRenamed('geo_seg', 'orig_geo').withColumnRenamed('City', 'orig_city').withColumnRenamed('Country', 'orig_country').withColumnRenamed('Latitude', 'orig_lat').withColumnRenamed('Longitude', 'orig_lon').withColumnRenamed('Altitude', 'orig_alt').withColumnRenamed('Region Code', 'orig_region')

In [None]:
# derive and filter airline code 
df_departure = df_departure.withColumn("airline_code", df_departure.ident.substr(0,3))
# join airline code
df_departure = df_departure.join(F.broadcast(df_airline), df_departure.airline_code == df_airline.ICAO, 'left').drop('IATA', 'ICAO','Status Since', 'Main Hub', 'Hub IATA', 'Country/Territory', 'Country/Territory Code', 'Region')
df_departure = df_departure.withColumnRenamed('Name', 'airline_name').withColumnRenamed('Status', 'airline_status').withColumnRenamed('Business Model', 'airline_type')

In [None]:
# join orign airport code
df_departure = df_departure.join(F.broadcast(df_airport), df_departure.orig == df_airport.ICAO, 'left').drop('IATA', 'ICAO', 'Type', 'Source', 'Timezone', 'dst', 'tz_dataset', 'Continent/Region')
df_departure = df_departure.withColumnRenamed('Airport Names', 'orig_airport').withColumnRenamed('geo_seg', 'orig_geo').withColumnRenamed('City', 'orig_city').withColumnRenamed('Country', 'orig_country').withColumnRenamed('Latitude', 'orig_lat').withColumnRenamed('Longitude', 'orig_lon').withColumnRenamed('Altitude', 'orig_alt').withColumnRenamed('Region Code', 'orig_region')

In [None]:
# join destination airport code
df_departure = df_departure.join(F.broadcast(df_airport), df_departure.dest == df_airport.ICAO, 'left').drop('IATA', 'ICAO', 'Type', 'Source', 'Timezone', 'dst', 'tz_dataset', 'Continent/Region')
df_departure = df_departure.withColumnRenamed('Airport Names', 'dest_airport').withColumnRenamed('geo_seg', 'dest_geo').withColumnRenamed('City', 'dest_city').withColumnRenamed('Country', 'dest_country').withColumnRenamed('Latitude', 'dest_lat').withColumnRenamed('Longitude', 'dest_lon').withColumnRenamed('Altitude', 'dest_alt').withColumnRenamed('Region Code', 'dest_region')

In [None]:
# join aircraft model code
df_departure = df_departure.join(F.broadcast(df_aircraft), df_departure.aircrafttype == df_aircraft.icao, 'left').drop('iata', 'icao')

In [None]:
# post-join filtering
#df_departure = df_departure.filter('(orig_airport is not null) and (dest_airport is not null) and (aircrafttype is not null) and (orig_city is not null) and (dest_city is not null) and (orig is not null) and (dest is not null) and (orig_airport is not null) and (dest_airport is not null) and (orig_geo is not null) and (dest_geo is not null)')

In [None]:
# add distances
df_departure = df_departure.withColumn('dist', dist('orig_lat', 'orig_lon', 'dest_lat', 'dest_lon') )
df_departure = df_departure.fillna(0, subset = ['dist'])

In [None]:
df_departure = df_departure.withColumn('flight_type', F.when(F.col('dist') <= F.lit(1500), 'SH').when((F.col('dist') > F.lit(1500)) & (F.col('dist') <= F.lit(4000)), 'MH').when(F.col('dist') > F.lit(4000), 'LH'))

In [None]:
df_departure = df_departure.filter( " (orig_geo is not null) or (dest_geo is not null) " )

In [None]:
df_test = df_departure.filter(" airline_type is null ")

In [None]:
df_test = df_departure.agg(F.countDistinct('reg').alias('unique_regcount'))
df_test.show()

In [None]:
df_regcount2019 = df_departure.dropDuplicates(['reg'])

In [None]:
#df_departure.filter("(dist > 0) and ((orig_geo is null) or (dest_geo is null))").count()
#df_airline_EAY = df_departure.filter( F.col('airline_code') == 'EAY'  )
#df_airline_RSC = df_departure.filter( F.col('airline_code') == 'RSC'  )
#df_airline_OUA = df_departure.filter( F.col('airline_code') == 'OUA'  )
#df_airline_WIS = df_departure.filter( F.col('airline_code') == 'WIS'  )
#df_airline_NCN = df_departure.filter( F.col('airline_code') == 'NCN'  )
#df_airline_SIS = df_departure.filter( F.col('airline_code') == 'SIS'  )
#df_airline_CAT = df_departure.filter( F.col('airline_code') == 'CAT'  )
#df_airline_KDS = df_departure.filter( F.col('airline_code') == 'KDS'  )
#df_airline_LXG = df_departure.filter( F.col('airline_code') == 'LXG'  )
#df_airline_AVL = df_departure.filter( F.col('airline_code') == 'AVL'  )
#df_airline_AIB = df_departure.filter( F.col('airline_code') == 'AIB'  )
#df_airline_PVO = df_departure.filter( F.col('airline_code') == 'PVO'  )
#df_airline_CWG = df_departure.filter( F.col('airline_code') == 'CWG'  )
#df_airline_BLO = df_departure.filter( F.col('airline_code') == 'BLO'  )
df_airline_BLO = df_departure.filter( F.col('airline_code') == 'BLO'  )

## By Geo

In [None]:
# total number of (unique) flight per SITA GEO
df_bygeo = df_departure.groupBy('year', 'quarter', 'month', 'dep_arr', 'aircrafttype', 'flight_type', 'airline_type', 'orig_geo', 'dest_geo').agg(F.countDistinct('id').alias('flight_count'),
                                                                                               F.countDistinct('reg').alias('reg_count'),
                                                                                               F.countDistinct('aircrafttype').alias('aircraftmodel_count'), 
                                                                                               F.sum('dist').alias('total_distance')).orderBy('year', 'quarter', 'month', 'dep_arr', 'aircrafttype', 'flight_type', 'airline_type', 'orig_geo', 'dest_geo')

In [None]:
# total number of (unique) flight per SITA GEO
df_bygeo01 = df_departure.groupBy('year', 'month', 'airline_type').agg(F.countDistinct('id').alias('flight_count'),
                                                                                               F.countDistinct('reg').alias('reg_count'),
                                                                                               F.countDistinct('aircrafttype').alias('aircraftmodel_count'), 
                                                                                               F.sum('dist').alias('total_distance')).orderBy('year', 'month', 'airline_type')

In [None]:
# total number of (unique) flight per SITA GEO
df_bygeo = df_departure.groupBy('year', 'airline_type', 'flight_type').agg(F.countDistinct('reg').alias('reg_count')).orderBy('year', 'airline_type', 'flight_type')

In [None]:
#df_bygeo.show(100)

## By Country

In [None]:
# total number of (unique) flight per country
df_bycountry = df_departure.groupBy('year', 'quarter', 'month', 'dep_arr', 'aircrafttype', 'flight_type', 'airline_type', 'orig_country', 'dest_country').agg(F.countDistinct('id').alias('flight_count'),
                                                                                                           F.countDistinct('reg').alias('reg_count'),
                                                                                                           F.countDistinct('aircrafttype').alias('aircraftmodel_count'), 
                                                                                                           F.sum('dist').alias('total_distance')).orderBy(  'year', 'quarter', 'month', 'dep_arr', 'aircrafttype', 'flight_type', 'airline_type', 'orig_country', 'dest_country') 

# By Region

In [None]:
# total number of (unique) flight per region
df_byregion = df_departure.groupBy( 'year', 'quarter', 'month', 'dep_arr', 'aircrafttype', 'flight_type', 'airline_type', 'orig_region', 'dest_region').agg(F.countDistinct('id').alias('flight_count'),
                                                                                                           F.countDistinct('reg').alias('reg_count'),
                                                                                                           F.countDistinct('aircrafttype').alias('aircraftmodel_count'), 
                                                                                                           F.sum('dist').alias('total_distance')).orderBy(  'year', 'quarter', 'month', 'dep_arr', 'aircrafttype', 'flight_type', 'airline_type', 'orig_region', 'dest_region') 

# By Airline

In [None]:
# total number of (unique) flight per airline
df_byairline1 = df_departure.groupBy('year', 'airline_name', 'airline_type', 'orig_country').agg(F.countDistinct('id').alias('flight_count'),
                                          F.countDistinct('reg').alias('reg_count'),
                                          F.countDistinct('aircrafttype').alias('aircraftmodel_count'), 
                                          F.sum('dist').alias('total_distance')).orderBy('year', 'airline_name', 'airline_type', 'orig_country')

## By Airport

In [None]:
# total number of (unique) flight per country
#df_byairport = df_departure.groupBy('year', 'orig_airport', 'dest_airport').agg(F.countDistinct('id').alias('flight_count'),F.countDistinct('aircrafttype').alias('aircraft_count'), F.sum('dist').alias('total_distance')).orderBy( 'year', 'orig_airport','dest_airport')

## Export

In [None]:
#df_bygeo01.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/regcount_20200221_year+deparr_filter3')
df_bygeo01.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/regcount_20200224_year+month_filter1')
#df_bygeo03.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/regcount_20200220_year_filter1')
#df_bygeo02.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/bygeo_dis_20200219_yeardestonly_filter1')
#df_bycountry.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/bycountry_co_20200219_wairlinetype_filter1')
#df_byregion.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/byregion_co_20200219_wairlinetype_filter1')
#df_byairline1.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/byairline_co_20200219_wcountry1_filter1')

In [None]:
df_bycountry.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/bycountry_co_20200213_wairlinetype3_filter1')

In [None]:
df_byregion.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/byregion_co_20200213_wairlinetype3_filter1')

In [None]:
df_origcounty_icao  = df_bycountry.filter( F.col('orig_country') == 'Netherlands'  )
df_destcounty_icao  = df_bycountry.filter( F.col('dest_country') == 'Netherlands'  )

In [None]:
df_byairline.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/byairline_co_20200213_wairlinetype3_filter1')

In [None]:
#df_origcounty_icao.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/orignetherlands_20200212_filter1')
#df_destcounty_icao.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/destnetherlands_20200212_filter1')
#df_airline_EAY.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/airline_EAY')
#df_airline_RSC.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/airline_RSC')
#df_airline_OUA.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/airline_OUA')
#df_airline_AIB.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/airline_AIB')
df_regcount2019.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/regcount_unique2019_20200221')
#df_airline_BLO.repartition(1).write.format('csv').option('header', 'true').option("encoding", "utf-8").save('s3://sita-coe-ds-dev-v1/stats/airline_BLO')