In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

class AirlineDataQuerier:
    """Run queries against an apache spark instance to get Airline info"""
    def __init__(self):
        # Create spark session
        self.spark = SparkSession. \
            builder. \
            appName('Milestone3'). \
            master('local[*]'). \
            config("spark.serializer", KryoSerializer.getName). \
            config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
            config('spark.jars.packages',
                   'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.1-incubating,'
                   'org.datasyslab:geotools-wrapper:geotools-24.1'). \
            getOrCreate()

        SedonaRegistrator.registerAll(self.spark)
        
        self.load_tables()
        
    def load_tables(self):
        """
        Read in tables from data files.
        
        Tables:
        - Airports
        - Airlines
        - states
        - Cities
        - Routes
        """
        # read in airports
        airports = self.spark.read.option("delimiter", ",").option("header", "false").csv("clean_data/airports.csv") \
            .toDF("IDX", "Airport_ID","Name","City","Country","IATA","ICAO","Latitude","Longitude","Altitude","Timezone","DST","Tz_database_timezone","Type","Source")

        airports.createOrReplaceTempView("Airports")
        
        # read in airlines
        airlines = self.spark.read.option("delimiter", ",").option("header", "false").csv("clean_data/airlines.csv") \
            .toDF("IDX","Airline_ID","Name","Alias","IATA","ICAO","Callsign","Country","Active")

        airlines.createOrReplaceTempView("Airlines")
        
        # read in states
        boundary_each_state = self.spark.read.option("delimiter", "\t").option("header", "false").csv("clean_data/boundary-each-state.tsv") \
            .toDF("s_name","s_bound")
        boundary_each_state = boundary_each_state.selectExpr("s_name", "ST_GeomFromWKT(s_bound) as s_bound")
        boundary_each_state.createOrReplaceTempView("states")
        
        # read in cities
        cities = self.spark.read.option("delimiter", ",").option("header", "false").csv("clean_data/cities.csv") \
            .toDF("Name","Longitude","Latitude")

        cities.createOrReplaceTempView("Cities")
        
        # read in routes
        routes = self.spark.read.option("delimiter", ",").option("header", "false").csv("clean_data/routes.csv") \
            .toDF('IDX', 'Airline', 'Airline_ID', 'Source_airport',
               'Source_airport_ID', 'Destination_airport', 'Destination_airport_ID',
               'Codeshare', 'Stops', 'Equipment')

        routes.createOrReplaceTempView("Routes")
    
    def run_query(self, query):
        """Run a query against apache spark instance"""
        return self.spark.sql(query).collect()
    
    def get_airports_in_country(self, country_name):
        """Get a list of Airport_IDs and Names of the airports in country `country_name`"""
        query = f"SELECT Airport_ID, Name FROM airports WHERE Country LIKE '%{country_name}%'"
        return self.run_query(query)
    
    def get_airlines_with_x_stops(self, stop_value):
        """Get a list of Airport_IDs and Names that have `stop_value` stops"""
        query = f"""
        SELECT DISTINCT a.Airline_ID, a.Name, r.stops
        FROM Airlines AS a, Routes AS r
        INNER JOIN Routes ON a.Airline_ID = r.Airline_ID
        WHERE r.Stops = {stop_value}"""
        return self.run_query(query)
    
    def get_airlines_with_code_share(self):
        """Get airlines that have codeshare"""
        query = f"""
        SELECT DISTINCT Airline
        FROM routes
        WHERE Codeshare = 'Y'
        """
        return self.run_query(query)
    
    def get_active_airlines_in_country(self, country_name):
        """Get the airlines that are active in country `country_name`"""
        query = f"""
        SELECT Airline_ID, Name
        FROM Airlines
        WHERE Airlines.Country LIKE '%{country_name}%' AND Airlines.active = 'Y'
        """
        return self.run_query(query)

    def get_active_airlines_in_United_States(self):
        """Get the airlines that are active in United States"""
        return self.get_active_airlines_in_country("United States")
    
    def get_country_or_teritory_with_most_airports(self):
        """Get the country or teritory with the most airports"""
        query = f"""
        SELECT * FROM (
        SELECT COUNT(Airlines.Airline_ID) as number, Airlines.Country
        FROM Airlines
        GROUP BY Airlines.Country)
        ORDER BY number DESC
        LIMIT 1
        """
        return self.run_query(query)
    
    def get_top_k_cities_with_most_incoming_airlines(self, k):
        """Get the top `k` cities with the most incoming airlines"""
        query = f"""
        SELECT count(Routes.airline) as number, Airports.city
        FROM Routes JOIN Airports ON Routes.Destination_airport_ID = Airports.Airport_ID
        GROUP BY Airports.city
        ORDER BY number DESC
        LIMIT {k}
        """
        return self.run_query(query)
    
    def get_closest_airport_to_city(self, city):
        """
        Find the closest airport to `city` and get the Airport_Name, 
        Airport_ID and distance between the city and the airport
        """
        query = f"""
        SELECT SQRT(POW(a_lat - c_lat, 2) + POW(a_long - c_long, 2)) as dist, Airport_Name, Airport_ID
        FROM
        (SELECT a.Latitude as a_lat, a.Longitude as a_long, a.Name as Airport_Name, a.Airport_ID, c.Latitude as c_lat, c.Longitude as c_long
        FROM
        (SELECT Airports.Latitude, Airports.Longitude, Airports.Name, Airports.Airport_ID
        FROM Airports
        WHERE Latitude NOT LIKE 'Latitude' AND Longitude NOT LIKE 'Longitude'
        ) as a 
        INNER JOIN
        (SELECT Cities.Latitude, Cities.Longitude 
        FROM Cities  WHERE Name='{city}' LIMIT 1) as c)
        ORDER BY dist ASC
        LIMIT 1
        """

        return self.run_query(query)
    
    def get_k_closest_airport_to_city(self, city, k):
        """
        Find the k closest airport to `city` and get the Airport_Name, 
        Airport_ID and distance between the city and the airport
        """
        query = f"""
        SELECT POW(a_lat - c_lat, 2) + POW(a_long - c_long, 2) as dist, Airport_Name, Airport_ID
        FROM
        (SELECT a.Latitude as a_lat, a.Longitude as a_long, a.Name as Airport_Name, a.Airport_ID, c.Latitude as c_lat, c.Longitude as c_long
        FROM
        (SELECT Airports.Latitude, Airports.Longitude, Airports.Name, Airports.Airport_ID
        FROM Airports
        WHERE Latitude NOT LIKE 'Latitude' AND Longitude NOT LIKE 'Longitude'
        ) as a 
        INNER JOIN
        (SELECT Cities.Latitude, Cities.Longitude 
        FROM Cities  WHERE Name='{city}' LIMIT 1) as c)
        ORDER BY dist ASC
        LIMIT {k}
        """

        return self.run_query(query)
    
    def get_airport_in_each_state(self):
        """Get the airport in each state, selecting the Airport_ID, Airport_name, and s_name (State name)"""
        query = f"""
        SELECT _contains, Airport_name, Airport_ID, s_name as state_name
        FROM
        (
            SELECT
            ST_Contains(s_bound, c_point) as _contains, Name as Airport_name, Airport_ID, s_name
            FROM
            (
                SELECT * FROM states
            ) AS s
            INNER JOIN
            (
                SELECT ST_GeomFromWKT(CONCAT('POINT(', Longitude, ' ', Latitude, ')')) as c_point, Name, Airport_ID
                FROM Airports 
                WHERE Longitude NOT LIKE 'Longitude' AND Latitude NOT LIKE 'Latitude'
            ) AS c
        )
        WHERE _contains=True
        ORDER BY state_name
        """

        return self.run_query(query)

In [2]:
querier = AirlineDataQuerier()

Ivy Default Cache set to: /home/sparky/.ivy2/cache
The jars for the packages stored in: /home/sparky/.ivy2/jars
:: loading settings :: url = jar:file:/home/sparky/spark-3.0.3-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fa8fdd28-148e-4bcd-8905-71cd8065c7f8;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.0.1-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found com.fasterxml.jackson.core#jackson-databind;2.12.2 in central
	found com.fasterxml.jackson.core#jackson-annotations;2.12.2 in central
	found com.fasterxml.jackson.core#jackson-core;2.12.2 in central
	found org.apache.sedona#sedona-core-3.0_2.12;1.0.1-incubating in central
	found org.apache.sedona#sedo

In [3]:
querier.get_airlines_with_x_stops(stop_value=1)

                                                                                

[Row(Airline_ID='1316', Name='AirTran Airways', stops='1'),
 Row(Airline_ID='1936', Name='Cubana de Aviación', stops='1'),
 Row(Airline_ID='4319', Name='Scandinavian Airlines System', stops='1'),
 Row(Airline_ID='1623', Name='Canadian North', stops='1'),
 Row(Airline_ID='330', Name='Air Canada', stops='1'),
 Row(Airline_ID='4547', Name='Southwest Airlines', stops='1')]

In [4]:
querier.get_airports_in_country("United States")

[Row(Airport_ID='3411', Name='Barter Island LRRS Airport'),
 Row(Airport_ID='3412', Name='Wainwright Air Station'),
 Row(Airport_ID='3413', Name='Cape Lisburne LRRS Airport'),
 Row(Airport_ID='3414', Name='Point Lay LRRS Airport'),
 Row(Airport_ID='3415', Name='Hilo International Airport'),
 Row(Airport_ID='3416', Name='Orlando Executive Airport'),
 Row(Airport_ID='3417', Name='Bettles Airport'),
 Row(Airport_ID='3418', Name='Clear Airport'),
 Row(Airport_ID='3419', Name='Indian Mountain LRRS Airport'),
 Row(Airport_ID='3420', Name='Fort Yukon Airport'),
 Row(Airport_ID='3421', Name='Sparrevohn LRRS Airport'),
 Row(Airport_ID='3422', Name='Bryant Army Heliport'),
 Row(Airport_ID='3423', Name='Tatalina LRRS Airport'),
 Row(Airport_ID='3424', Name='Cape Romanzof LRRS Airport'),
 Row(Airport_ID='3425', Name='Laurence G Hanscom Field'),
 Row(Airport_ID='3426', Name='St Paul Island Airport'),
 Row(Airport_ID='3427', Name='Cape Newenham LRRS Airport'),
 Row(Airport_ID='3428', Name='St George

In [5]:
querier.get_airlines_with_code_share()

                                                                                

[Row(Airline='CI'),
 Row(Airline='AZ'),
 Row(Airline='SC'),
 Row(Airline='UA'),
 Row(Airline='RO'),
 Row(Airline='LA'),
 Row(Airline='XL'),
 Row(Airline='AM'),
 Row(Airline='PS'),
 Row(Airline='AA'),
 Row(Airline='MK'),
 Row(Airline='AY'),
 Row(Airline='IB'),
 Row(Airline='DC'),
 Row(Airline='NX'),
 Row(Airline='T3'),
 Row(Airline='AT'),
 Row(Airline='UL'),
 Row(Airline='AV'),
 Row(Airline='S7'),
 Row(Airline='HY'),
 Row(Airline='JJ'),
 Row(Airline='QF'),
 Row(Airline='AD'),
 Row(Airline='NT'),
 Row(Airline='SV'),
 Row(Airline='CZ'),
 Row(Airline='VA'),
 Row(Airline='B2'),
 Row(Airline='PG'),
 Row(Airline='CX'),
 Row(Airline='S4'),
 Row(Airline='WY'),
 Row(Airline='NH'),
 Row(Airline='KL'),
 Row(Airline='LR'),
 Row(Airline='W2'),
 Row(Airline='OA'),
 Row(Airline='XY'),
 Row(Airline='SU'),
 Row(Airline='LX'),
 Row(Airline='F7'),
 Row(Airline='TP'),
 Row(Airline='7H'),
 Row(Airline='GZ'),
 Row(Airline='SA'),
 Row(Airline='CA'),
 Row(Airline='DL'),
 Row(Airline='BR'),
 Row(Airline='Z6'),


In [6]:
querier.get_active_airlines_in_United_States()

[Row(Airline_ID='10', Name='40-Mile Air'),
 Row(Airline_ID='22', Name='Aloha Airlines'),
 Row(Airline_ID='24', Name='American Airlines'),
 Row(Airline_ID='35', Name='Allegiant Air'),
 Row(Airline_ID='109', Name='Alaska Central Express'),
 Row(Airline_ID='149', Name='Air Cargo Carriers'),
 Row(Airline_ID='210', Name='Airlift International'),
 Row(Airline_ID='281', Name='America West Airlines'),
 Row(Airline_ID='282', Name='Air Wisconsin'),
 Row(Airline_ID='287', Name='Allegheny Commuter Airlines'),
 Row(Airline_ID='295', Name='Air Sunshine'),
 Row(Airline_ID='315', Name='ATA Airlines'),
 Row(Airline_ID='397', Name='Arrow Air'),
 Row(Airline_ID='452', Name='Atlantic Southeast Airlines'),
 Row(Airline_ID='659', Name='American Eagle Airlines'),
 Row(Airline_ID='792', Name='Access Air'),
 Row(Airline_ID='882', Name='Air Florida'),
 Row(Airline_ID='928', Name='Atlas Air'),
 Row(Airline_ID='1316', Name='AirTran Airways'),
 Row(Airline_ID='1442', Name='Bemidji Airlines'),
 Row(Airline_ID='1472

In [7]:
querier.get_country_or_teritory_with_most_airports()

                                                                                

[Row(number=1099, Country='United States')]

In [8]:
querier.get_top_k_cities_with_most_incoming_airlines(10)

                                                                                

[Row(number=1224, city='London'),
 Row(number=911, city='Atlanta'),
 Row(number=720, city='Paris'),
 Row(number=682, city='Chicago'),
 Row(number=620, city='Shanghai'),
 Row(number=614, city='Beijing'),
 Row(number=613, city='New York'),
 Row(number=608, city='Moscow'),
 Row(number=506, city='Istanbul'),
 Row(number=498, city='Los Angeles')]

In [9]:
querier.get_closest_airport_to_city(city="Phoenix")

[Row(dist=40.13331360830392, Airport_Name='South Pole Station Airport', Airport_ID='2033')]

In [10]:
querier.get_airport_in_each_state()

                                                                                

[Row(_contains=True, Airport_name='Mobile Downtown Airport', Airport_ID='3452', state_name='Alabama'),
 Row(_contains=True, Airport_name='Redstone Army Air Field', Airport_ID='3480', state_name='Alabama'),
 Row(_contains=True, Airport_name='Anniston Regional Airport', Airport_ID='3485', state_name='Alabama'),
 Row(_contains=True, Airport_name='Maxwell Air Force Base', Airport_ID='3500', state_name='Alabama'),
 Row(_contains=True, Airport_name='Lawson Army Air Field (Fort Benning)', Airport_ID='3539', state_name='Alabama'),
 Row(_contains=True, Airport_name='Dothan Regional Airport', Airport_ID='3739', state_name='Alabama'),
 Row(_contains=True, Airport_name='Mobile Regional Airport', Airport_ID='3782', state_name='Alabama'),
 Row(_contains=True, Airport_name='Birmingham-Shuttlesworth International Airport', Airport_ID='3811', state_name='Alabama'),
 Row(_contains=True, Airport_name='Craig Field', Airport_ID='3874', state_name='Alabama'),
 Row(_contains=True, Airport_name='Huntsville In