In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
import geopandas as gpd

spark = SparkSession. \
    builder. \
    appName('Milestone2 Loading data'). \
    master('local[*]'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.1-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.1'). \
    getOrCreate()

SedonaRegistrator.registerAll(spark)

21/10/12 09:24:42 WARN Utils: Your hostname, DESKTOP-RV59F4C resolves to a loopback address: 127.0.1.1; using 192.168.20.1 instead (on interface eth0)
21/10/12 09:24:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/root/spark-3.0.3-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eaf8be56-07de-4721-a50d-7e299b2ae465;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.0.1-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found com.fasterxml.jackson.core#jackson-databind;2.12.2 in central
	found com.fasterxml.j

True

# Importing State and convert it to geospacial

In [2]:
states_wkt = spark.read.option("delimiter", "\t").option("header", "false").csv("boundary-each-state.tsv").toDF("s_name","s_bound")
states_wkt.show()
states_wkt.printSchema()

states = states_wkt.selectExpr("s_name", "ST_GeomFromWKT(s_bound) as s_bound")
states.show()
states.printSchema()
states.createOrReplaceTempView("states")

                                                                                

+-------------+--------------------+
|       s_name|             s_bound|
+-------------+--------------------+
|       Alaska|POLYGON((-141.020...|
|      Alabama|POLYGON((-88.1955...|
|     Arkansas|POLYGON((-94.0416...|
|      Arizona|POLYGON((-112.598...|
|   California|POLYGON((-124.400...|
|     Colorado|POLYGON((-109.044...|
|  Connecticut|POLYGON((-73.4875...|
|     Delaware|POLYGON((-75.7919...|
|      Florida|POLYGON((-87.6050...|
|      Georgia|POLYGON((-85.6082...|
|       Hawaii|POLYGON((-154.628...|
|         Iowa|POLYGON((-95.7623...|
|        Idaho|POLYGON((-117.031...|
|     Illinois|POLYGON((-90.6290...|
|      Indiana|POLYGON((-87.5253...|
|       Kansas|POLYGON((-102.050...|
|     Kentucky|POLYGON((-89.5372...|
|    Louisiana|POLYGON((-94.0430...|
|Massachusetts|POLYGON((-72.7789...|
|     Maryland|POLYGON((-79.4778...|
+-------------+--------------------+
only showing top 20 rows

root
 |-- s_name: string (nullable = true)
 |-- s_bound: string (nullable = true)



                                                                                

+-------------+--------------------+
|       s_name|             s_bound|
+-------------+--------------------+
|       Alaska|POLYGON ((-141.02...|
|      Alabama|POLYGON ((-88.195...|
|     Arkansas|POLYGON ((-94.041...|
|      Arizona|POLYGON ((-112.59...|
|   California|POLYGON ((-124.40...|
|     Colorado|POLYGON ((-109.04...|
|  Connecticut|POLYGON ((-73.487...|
|     Delaware|POLYGON ((-75.791...|
|      Florida|POLYGON ((-87.605...|
|      Georgia|POLYGON ((-85.608...|
|       Hawaii|POLYGON ((-154.62...|
|         Iowa|POLYGON ((-95.762...|
|        Idaho|POLYGON ((-117.03...|
|     Illinois|POLYGON ((-90.629...|
|      Indiana|POLYGON ((-87.525...|
|       Kansas|POLYGON ((-102.05...|
|     Kentucky|POLYGON ((-89.537...|
|    Louisiana|POLYGON ((-94.043...|
|Massachusetts|POLYGON ((-72.778...|
|     Maryland|POLYGON ((-79.477...|
+-------------+--------------------+
only showing top 20 rows

root
 |-- s_name: string (nullable = true)
 |-- s_bound: geometry (nullable = false)



# Importing airports.dat and perform data cleaning

In [3]:
airport_dat = spark.read.option("delimiter", ",").option("header", "false").csv("airports.dat").toDF("Airport ID","Name"
,"City","Country","IATA","ICAO","Latitude","Longitude","Altitude","Timezone","DST","Database Time Zone","Type","Source")
airport_dat = airport_dat.drop("Source")
airport_dat = airport_dat.drop("Type")
airport_dat = airport_dat.drop("Database Time Zone")
airport_dat = airport_dat.drop("DST")
airport_dat = airport_dat.drop("Timezone")
airport_dat.createOrReplaceTempView("airport")

from pyspark.sql import functions
airport_dat = airport_dat.select("Airport ID", "Name", "City", "Country","IATA","ICAO",
                                 functions.concat_ws("_",airport_dat.Latitude,airport_dat.Longitude,airport_dat.Altitude
                                                    ).alias("Location"))
airport_dat.show()
airport_dat.printSchema()

airport = airport_dat.selectExpr("*", "ST_PointFromText(Location, \'_\') as geospacial_Location")
airport = airport.drop("Location")
airport.show()
airport.printSchema()

[Stage 6:>                                                          (0 + 1) / 1]                                                                                

+----------+--------------------+--------------+----------------+----+----+--------------------+
|Airport ID|                Name|          City|         Country|IATA|ICAO|            Location|
+----------+--------------------+--------------+----------------+----+----+--------------------+
|         1|      Goroka Airport|        Goroka|Papua New Guinea| GKA|AYGA|-6.08168983459000...|
|         2|      Madang Airport|        Madang|Papua New Guinea| MAG|AYMD|-5.20707988739_14...|
|         3|Mount Hagen Kagam...|   Mount Hagen|Papua New Guinea| HGU|AYMH|-5.82678985595703...|
|         4|      Nadzab Airport|        Nadzab|Papua New Guinea| LAE|AYNZ|-6.569803_146.725...|
|         5|Port Moresby Jack...|  Port Moresby|Papua New Guinea| POM|AYPY|-9.44338035583496...|
|         6|Wewak Internation...|         Wewak|Papua New Guinea| WWK|AYWK|-3.58383011818_14...|
|         7|  Narsarsuaq Airport|  Narssarssuaq|       Greenland| UAK|BGBW|61.1604995728_-45...|
|         8|Godthaab / Nuuk A.

[Stage 7:>                                                          (0 + 1) / 1]

+----------+--------------------+--------------+----------------+----+----+--------------------+
|Airport ID|                Name|          City|         Country|IATA|ICAO| geospacial_Location|
+----------+--------------------+--------------+----------------+----+----+--------------------+
|         1|      Goroka Airport|        Goroka|Papua New Guinea| GKA|AYGA|POINT (-6.0816898...|
|         2|      Madang Airport|        Madang|Papua New Guinea| MAG|AYMD|POINT (-5.2070798...|
|         3|Mount Hagen Kagam...|   Mount Hagen|Papua New Guinea| HGU|AYMH|POINT (-5.8267898...|
|         4|      Nadzab Airport|        Nadzab|Papua New Guinea| LAE|AYNZ|POINT (-6.569803 ...|
|         5|Port Moresby Jack...|  Port Moresby|Papua New Guinea| POM|AYPY|POINT (-9.4433803...|
|         6|Wewak Internation...|         Wewak|Papua New Guinea| WWK|AYWK|POINT (-3.5838301...|
|         7|  Narsarsuaq Airport|  Narssarssuaq|       Greenland| UAK|BGBW|POINT (61.1604995...|
|         8|Godthaab / Nuuk A.

                                                                                

# Importing airline.dat and perform data cleaning

In [4]:
airline_dat = spark.read.option("delimiter", ",").option("header", "false").csv("airlines.dat").toDF("AirlineID","Name"
,"Alias","IATA","ICAO","Callsign","Country","Active")
airline_dat.createOrReplaceTempView("airline")
airline_dat = spark.sql("select * from airline where AirlineID > 0")
airline_dat = airline_dat.fillna('N/A')
airline_dat.show()
airline_dat.printSchema()

[Stage 9:>                                                          (0 + 1) / 1]

+---------+--------------------+-----+----+----+---------------+--------------+------+
|AirlineID|                Name|Alias|IATA|ICAO|       Callsign|       Country|Active|
+---------+--------------------+-----+----+----+---------------+--------------+------+
|        1|      Private flight|   \N|   -| N/A|            N/A|           N/A|     Y|
|        2|         135 Airways|   \N| N/A| GNL|        GENERAL| United States|     N|
|        3|       1Time Airline|   \N|  1T| RNX|        NEXTIME|  South Africa|     Y|
|        4|2 Sqn No 1 Elemen...|   \N| N/A| WYT|            N/A|United Kingdom|     N|
|        5|     213 Flight Unit|   \N| N/A| TFU|            N/A|        Russia|     N|
|        6|223 Flight Unit S...|   \N| N/A| CHD| CHKALOVSK-AVIA|        Russia|     N|
|        7|   224th Flight Unit|   \N| N/A| TTF|     CARGO UNIT|        Russia|     N|
|        8|         247 Jet Ltd|   \N| N/A| TWF|   CLOUD RUNNER|United Kingdom|     N|
|        9|         3D Aviation|   \N| N/A|

                                                                                

# Importing routes.dat

In [12]:
route_dat = spark.read.option("delimiter", ",").option("header", "false").csv("routes.dat").toDF("Airline","AirlineID"
,"SourceAirport","SourceAirportID","DestinationAirport","DestinationAirportID","Codeshare","Stops","Equipment")
route_dat.createOrReplaceTempView("route")
route_dat = route_dat.drop("Equipment")

#route_dat = route_dat.na.replace("\\N","NULL")
route_dat.show()
#route_dat.printSchema()

route = spark.sql(
'''
    SELECT *
    FROM route, airline
    WHERE (route.Airline = airline.IATA) or (route.Airline = airline.ICAO)
''')
route.show()

+-------+---------+-------------+---------------+------------------+--------------------+---------+-----+
|Airline|AirlineID|SourceAirport|SourceAirportID|DestinationAirport|DestinationAirportID|Codeshare|Stops|
+-------+---------+-------------+---------------+------------------+--------------------+---------+-----+
|     2B|      410|          AER|           2965|               KZN|                2990|     null|    0|
|     2B|      410|          ASF|           2966|               KZN|                2990|     null|    0|
|     2B|      410|          ASF|           2966|               MRV|                2962|     null|    0|
|     2B|      410|          CEK|           2968|               KZN|                2990|     null|    0|
|     2B|      410|          CEK|           2968|               OVB|                4078|     null|    0|
|     2B|      410|          DME|           4029|               KZN|                2990|     null|    0|
|     2B|      410|          DME|           40