In [1]:
import pyspark
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

import geopandas as gpd
spark = SparkSession. \
    builder. \
    appName('Milestone 2 Data Prep'). \
    master("spark://nwalton-ubuntu:7077"). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.1-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.1'). \
    getOrCreate()

SedonaRegistrator.registerAll(spark)

21/11/14 00:51:04 WARN Utils: Your hostname, parallels-Parallels-Virtual-Platform resolves to a loopback address: 127.0.1.1; using 10.211.55.3 instead (on interface enp0s5)
21/11/14 00:51:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/parallels/.ivy2/cache
The jars for the packages stored in: /home/parallels/.ivy2/jars
:: loading settings :: url = jar:file:/home/parallels/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-975782e2-c10f-4b7e-ae1b-4fc0c0350822;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.0.1-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found com.fasterxml.j

True

# Data Prep

In [2]:
#Airport DF
airports_dat = spark.read.format("csv").\
    option("header", "true").\
    option("delimiter", ",").\
    load("./Data/airports.dat").\
    toDF("Airport_ID", "Name", "City", "Country", "IATA", "ICAO", 
         "Latitude", "Longitude", "Altitude", "Timezone", "DST", "Tz Database time zone", 
         "Type", "Source")
    
airports_dat.show()
airports_dat.printSchema()

+----------+--------------------+------------------+----------------+----+----+------------------+-------------------+--------+--------+---+---------------------+-------+-----------+
|Airport_ID|                Name|              City|         Country|IATA|ICAO|          Latitude|          Longitude|Altitude|Timezone|DST|Tz Database time zone|   Type|     Source|
+----------+--------------------+------------------+----------------+----+----+------------------+-------------------+--------+--------+---+---------------------+-------+-----------+
|         2|      Madang Airport|            Madang|Papua New Guinea| MAG|AYMD|    -5.20707988739|      145.789001465|      20|      10|  U| Pacific/Port_Moresby|airport|OurAirports|
|         3|Mount Hagen Kagam...|       Mount Hagen|Papua New Guinea| HGU|AYMH|-5.826789855957031| 144.29600524902344|    5388|      10|  U| Pacific/Port_Moresby|airport|OurAirports|
|         4|      Nadzab Airport|            Nadzab|Papua New Guinea| LAE|AYNZ|      

In [3]:
state_boundary_dat = spark.read.option("delimiter", "\t").\
option("header", "false").\
csv("Data/boundary-each-state.tsv").\
toDF("s_name","s_bound")

state_boundary_dat.show()

+-------------+--------------------+
|       s_name|             s_bound|
+-------------+--------------------+
|       Alaska|POLYGON((-141.020...|
|      Alabama|POLYGON((-88.1955...|
|     Arkansas|POLYGON((-94.0416...|
|      Arizona|POLYGON((-112.598...|
|   California|POLYGON((-124.400...|
|     Colorado|POLYGON((-109.044...|
|  Connecticut|POLYGON((-73.4875...|
|     Delaware|POLYGON((-75.7919...|
|      Florida|POLYGON((-87.6050...|
|      Georgia|POLYGON((-85.6082...|
|       Hawaii|POLYGON((-154.628...|
|         Iowa|POLYGON((-95.7623...|
|        Idaho|POLYGON((-117.031...|
|     Illinois|POLYGON((-90.6290...|
|      Indiana|POLYGON((-87.5253...|
|       Kansas|POLYGON((-102.050...|
|     Kentucky|POLYGON((-89.5372...|
|    Louisiana|POLYGON((-94.0430...|
|Massachusetts|POLYGON((-72.7789...|
|     Maryland|POLYGON((-79.4778...|
+-------------+--------------------+
only showing top 20 rows



In [4]:
airlines_dat = spark.read.format("csv").\
option("header", "true").\
option("delimiter", ",").\
load("Data/airlines.dat").\
toDF("Airline_ID", "Name", "Alias", "IATA", "ICAO", "Callsign", 
    "Country", "Active")

airlines_dat.show()
airlines_dat.printSchema()


+----------+--------------------+-----+----+----+---------------+--------------+------+
|Airline_ID|                Name|Alias|IATA|ICAO|       Callsign|       Country|Active|
+----------+--------------------+-----+----+----+---------------+--------------+------+
|         1|      Private flight|   \N|   -| N/A|           null|          null|     Y|
|         2|         135 Airways|   \N|null| GNL|        GENERAL| United States|     N|
|         3|       1Time Airline|   \N|  1T| RNX|        NEXTIME|  South Africa|     Y|
|         4|2 Sqn No 1 Elemen...|   \N|null| WYT|           null|United Kingdom|     N|
|         5|     213 Flight Unit|   \N|null| TFU|           null|        Russia|     N|
|         6|223 Flight Unit S...|   \N|null| CHD| CHKALOVSK-AVIA|        Russia|     N|
|         7|   224th Flight Unit|   \N|null| TTF|     CARGO UNIT|        Russia|     N|
|         8|         247 Jet Ltd|   \N|null| TWF|   CLOUD RUNNER|United Kingdom|     N|
|         9|         3D Aviation

In [5]:
routes_dat = spark.read.format("csv").\
option("header", "true").\
option("delimiter", ",").\
load("Data/routes.dat").\
toDF("Airline", "Airline_ID", "Source_Airport", "Source_Airport_ID", 
     "Destination_Airport", "Destination_Airport_ID", "Codeshare", 
     "Stops", "Equipment")

routes_dat.show()
routes_dat.printSchema()



[Stage 9:>                                                          (0 + 1) / 1]                                                                                

+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|Airline|Airline_ID|Source_Airport|Source_Airport_ID|Destination_Airport|Destination_Airport_ID|Codeshare|Stops|Equipment|
+-------+----------+--------------+-----------------+-------------------+----------------------+---------+-----+---------+
|     2B|       410|           ASF|             2966|                KZN|                  2990|     null|    0|      CR2|
|     2B|       410|           ASF|             2966|                MRV|                  2962|     null|    0|      CR2|
|     2B|       410|           CEK|             2968|                KZN|                  2990|     null|    0|      CR2|
|     2B|       410|           CEK|             2968|                OVB|                  4078|     null|    0|      CR2|
|     2B|       410|           DME|             4029|                KZN|                  2990|     null|    0|      CR2|
|     2B|       

# Milestone 3 Tasks

## Airport and Airline search
### List of airports operating in country x

In [6]:
#filter by desired country, united states for example
United_States_Airports = airports_dat.filter(airports_dat["Country"] == "United States")
United_States_Airports.show()




[Stage 10:>                                                         (0 + 1) / 1]

+----------+--------------------+----------------+-------------+----+----+------------------+-------------------+--------+--------+---+---------------------+-------+-----------+
|Airport_ID|                Name|            City|      Country|IATA|ICAO|          Latitude|          Longitude|Altitude|Timezone|DST|Tz Database time zone|   Type|     Source|
+----------+--------------------+----------------+-------------+----+----+------------------+-------------------+--------+--------+---+---------------------+-------+-----------+
|      3411|Barter Island LRR...|   Barter Island|United States| BTI|PABA|     70.1340026855|     -143.582000732|       2|      -9|  A|    America/Anchorage|airport|OurAirports|
|      3412|Wainwright Air St...| Fort Wainwright|United States|  \N|PAWT|       70.61340332|       -159.8600006|      35|      -9|  A|    America/Anchorage|airport|OurAirports|
|      3413|Cape Lisburne LRR...|   Cape Lisburne|United States| LUR|PALU|       68.87509918|       -166.11000

                                                                                

### List of airlines having x stops

In [7]:
X_Stops = "1"
#Filter routes_dat by routes with only one stop
X_Stop_Flight_Routes = routes_dat.filter(routes_dat["Stops"] == X_Stops)

# Create a list of airline_IDs from routes with one stop
Airline_ID_List = [row[0] for row in X_Stop_Flight_Routes.select("Airline_ID").distinct().collect()]

# Filter airline_dat by airline_IDs from the list above
X_Stop_Airlines = airlines_dat.filter(airlines_dat["Airline_ID"].isin(Airline_ID_List))
X_Stop_Airlines.show()

                                                                                

+----------+--------------------+--------------------+----+----+------------+-------------+------+
|Airline_ID|                Name|               Alias|IATA|ICAO|    Callsign|      Country|Active|
+----------+--------------------+--------------------+----+----+------------+-------------+------+
|       330|          Air Canada|                  \N|  AC| ACA|  AIR CANADA|       Canada|     Y|
|      1316|     AirTran Airways|                  \N|  FL| TRS|      CITRUS|United States|     Y|
|      1623|      Canadian North|                  \N|  5T| MPE|     EMPRESS|       Canada|     Y|
|      1936|  Cubana de Aviación|                  \N|  CU| CUB|      CUBANA|         Cuba|     Y|
|      4319|Scandinavian Airl...|SAS Scandinavian ...|  SK| SAS|SCANDINAVIAN|       Sweden|     Y|
|      4547|  Southwest Airlines|                  \N|  WN| SWA|   SOUTHWEST|United States|     Y|
+----------+--------------------+--------------------+----+----+------------+-------------+------+



### List of airlines operating with code share

In [8]:
# Filter routes_dat by routes with codeshare
Codeshare_Routes = routes_dat.filter(routes_dat["Codeshare"] == "Y")

# Create a list of airline_IDs marked Y for codeshare
Codeshare_List = [row[0] for row in Codeshare_Routes.select("Airline_ID").distinct().collect()]

# Filter airline_dat by airline_IDs from the above list
Airlines_With_Codeshare = airlines_dat.filter(airlines_dat["Airline_ID"].isin(Codeshare_List))
Airlines_With_Codeshare.show()

                                                                                

+----------+--------------------+--------------------+----+----+---------------+-----------------+------+
|Airline_ID|                Name|               Alias|IATA|ICAO|       Callsign|          Country|Active|
+----------+--------------------+--------------------+----+----+---------------+-----------------+------+
|        24|   American Airlines|                  \N|  AA| AAL|       AMERICAN|    United States|     Y|
|        28|     Asiana Airlines|                  \N|  OZ| AAR|         ASIANA|Republic of Korea|     Y|
|        90|          Air Europa|                  \N|  UX| AEA|         EUROPA|            Spain|     Y|
|        96|     Aegean Airlines|                  \N|  A3| AEE|         AEGEAN|           Greece|     Y|
|       116|           Air Italy|                  \N|  I9| AEY|      AIR ITALY|            Italy|     Y|
|       130|Aeroflot Russian ...|                  \N|  SU| AFL|       AEROFLOT|           Russia|     Y|
|       137|          Air France|             

### List of active airlines in the United States

In [9]:
US_Airlines = airlines_dat.filter(airlines_dat["Country"] == "United States")
Active_US_Airlines = US_Airlines.filter(US_Airlines["Active"] == "Y")

Active_US_Airlines.show()

+----------+--------------------+-----+----+----+-------------+-------------+------+
|Airline_ID|                Name|Alias|IATA|ICAO|     Callsign|      Country|Active|
+----------+--------------------+-----+----+----+-------------+-------------+------+
|        10|         40-Mile Air|   \N|  Q5| MLA|     MILE-AIR|United States|     Y|
|        22|      Aloha Airlines|   \N|  AQ| AAH|        ALOHA|United States|     Y|
|        24|   American Airlines|   \N|  AA| AAL|     AMERICAN|United States|     Y|
|        35|       Allegiant Air|   \N|  G4| AAY|    ALLEGIANT|United States|     Y|
|       109|Alaska Central Ex...|   \N|  KO| AER|      ACE AIR|United States|     Y|
|       149|  Air Cargo Carriers|   \N|  2Q| SNC|  NIGHT CARGO|United States|     Y|
|       210|Airlift Internati...|   \N|null| AIR|      AIRLIFT|United States|     Y|
|       281|America West Airl...|   \N|  HP| AWE|       CACTUS|United States|     Y|
|       282|       Air Wisconsin|   \N|  ZW| AWI|AIR WISCONSIN|Un

### Country or territory with highest number of airports

In [10]:
airports_dat.groupBy("Country").count().orderBy("count", ascending=False).show()

                                                                                

+--------------+-----+
|       Country|count|
+--------------+-----+
| United States| 1512|
|        Canada|  430|
|     Australia|  334|
|        Russia|  264|
|        Brazil|  264|
|       Germany|  249|
|         China|  241|
|        France|  217|
|United Kingdom|  167|
|         India|  148|
|     Indonesia|  145|
|         Japan|  123|
|  South Africa|   99|
|     Argentina|   96|
|        Mexico|   84|
|         Italy|   83|
|          Iran|   82|
|        Sweden|   77|
|        Turkey|   76|
|      Colombia|   75|
+--------------+-----+
only showing top 20 rows



### Top K cities with most incoming airlines

In [15]:
# Count number of incoming flights based on routes
K = 5
Incoming_Flight_Count = routes_dat.groupby("Destination_Airport_ID").count()
Top_K_Destination = Incoming_Flight_Count.orderBy("count", ascending=False).limit(K)

# Create list of top K airports_IDs
Top_K_ID_List = [row[0] for row in Top_K_Destination.select("Destination_Airport_ID").collect()]

# Filter top K airports from airports_dat
Top_K_Airports = airports_dat.filter(airports_dat["Airport_ID"].isin(Top_K_ID_List))

# Sort top K airports by city
Top_K_Cities = [row[0] for row in Top_K_Airports.select("City").collect()]
print("Top " + str(K) + " cities with most incoming airlines:")
i = 1
for city in Top_K_Cities:
    print(i, end=". ")
    print(city)
    i += 1


                                                                                

Top 5 cities with most incoming airlines:
1. London
2. Paris
3. Beijing
4. Atlanta
5. Chicago


## Geospatial Analytics
### Closest airport to a city X's geospatial coordinate

### Airport in each US states geospatial boundary