In [22]:
# Import Spark modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Setup Spark session
spark = SparkSession.builder.appName("Explore_Airlines_dat").getOrCreate()

In [2]:
# Read airlines data files:
df_airlines = spark.read.csv('airlines.dat')

In [4]:
df_airlines.show()

+---+--------------------+---+----+----+--------------+--------------+---+
|_c0|                 _c1|_c2| _c3| _c4|           _c5|           _c6|_c7|
+---+--------------------+---+----+----+--------------+--------------+---+
| -1|             Unknown| \N|   -| N/A|            \N|            \N|  Y|
|  1|      Private flight| \N|   -| N/A|          null|          null|  Y|
|  2|         135 Airways| \N|null| GNL|       GENERAL| United States|  N|
|  3|       1Time Airline| \N|  1T| RNX|       NEXTIME|  South Africa|  Y|
|  4|2 Sqn No 1 Elemen...| \N|null| WYT|          null|United Kingdom|  N|
|  5|     213 Flight Unit| \N|null| TFU|          null|        Russia|  N|
|  6|223 Flight Unit S...| \N|null| CHD|CHKALOVSK-AVIA|        Russia|  N|
|  7|   224th Flight Unit| \N|null| TTF|    CARGO UNIT|        Russia|  N|
|  8|         247 Jet Ltd| \N|null| TWF|  CLOUD RUNNER|United Kingdom|  N|
|  9|         3D Aviation| \N|null| SEC|       SECUREX| United States|  N|
| 10|         40-Mile Air

In [7]:
from pyspark.sql.types import *
schema = StructType([StructField("_c0", IntegerType(), True)\
                   ,StructField("_c1", StringType(), True)\
                   ,StructField("_c2", StringType(), True)\
                   ,StructField("_c3", StringType(), True)\
                   ,StructField("_c4", StringType(), True)\
                   ,StructField("_c5", StringType(), True)\
                   ,StructField("_c6", StringType(), True)\
                   ,StructField("_c7", StringType(), True)])

In [8]:
df_airlines = spark.read.csv('airlines.dat', schema=schema)

In [14]:
# Renaming Columns with appropriate headers found on openflights.org/data.html
df_airlines = df_airlines.withColumnRenamed("_c0","id") \
           .withColumnRenamed("_c1","name") \
           .withColumnRenamed("_c2","Alias") \
           .withColumnRenamed("_c3","IATA") \
           .withColumnRenamed("_c4","ICAO") \
           .withColumnRenamed("_c5","Callsign") \
           .withColumnRenamed("_c6","Country") \
           .withColumnRenamed("_c7","Active")

In [15]:
df_airlines.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- Alias: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Callsign: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Active: string (nullable = true)



In [17]:
df_airlines.show()

+---+--------------------+-----+----+----+--------------+--------------+------+
| id|                name|Alias|IATA|ICAO|      Callsign|       Country|Active|
+---+--------------------+-----+----+----+--------------+--------------+------+
| -1|             Unknown|   \N|   -| N/A|            \N|            \N|     Y|
|  1|      Private flight|   \N|   -| N/A|          null|          null|     Y|
|  2|         135 Airways|   \N|null| GNL|       GENERAL| United States|     N|
|  3|       1Time Airline|   \N|  1T| RNX|       NEXTIME|  South Africa|     Y|
|  4|2 Sqn No 1 Elemen...|   \N|null| WYT|          null|United Kingdom|     N|
|  5|     213 Flight Unit|   \N|null| TFU|          null|        Russia|     N|
|  6|223 Flight Unit S...|   \N|null| CHD|CHKALOVSK-AVIA|        Russia|     N|
|  7|   224th Flight Unit|   \N|null| TTF|    CARGO UNIT|        Russia|     N|
|  8|         247 Jet Ltd|   \N|null| TWF|  CLOUD RUNNER|United Kingdom|     N|
|  9|         3D Aviation|   \N|null| SE

In [68]:
# Selecting only revelant columns out of these:
# And filter out 'nulls' and -' on IATA
df_airlines.select(col("name").alias("carrier"),
                   col("iata").alias("code")) \
                    .filter(col("iata") != "-") \
                    .filter(col("iata") != "null") \
                    .filter(col("active") == 'Y') \
                    .dropDuplicates() \
                    .createOrReplaceTempView('dm_arlines_name')

In [69]:
# Checking view dm_airlines
spark.sql("""
    DESCRIBE dm_arlines_name
""").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| carrier|   string|   null|
|    code|   string|   null|
+--------+---------+-------+



In [70]:
# Checking the distinct count of codes
# This reduced from 6000+ to just 1535 (but seems like still has duplicates)
# Reduced by 2 duplicate entries to 1533
# Still has duplicates, checked entries with group counts by code
# Filtered again on active only, still seeing many duplicates though
spark.sql("""
    SELECT COUNT(code), code
    FROM dm_arlines_name
    GROUP BY code
    HAVING COUNT(code) > 1
    ORDER BY 1 DESC
""").show(100)

+-----------+----+
|count(code)|code|
+-----------+----+
|          3|  G3|
|          2|  RA|
|          2|  VY|
|          2|  5D|
|          2|  C3|
|          2|  8Q|
|          2|  ZA|
|          2|  WA|
|          2|  8M|
|          2|  LH|
|          2|  CC|
|          2|  TL|
|          2|  JL|
|          2|  ZB|
|          2|  CP|
|          2|  1I|
|          2|  CO|
|          2|  I9|
|          2|  SQ|
+-----------+----+



In [71]:
# Checking out values that are duplicate
spark.sql("""
    SELECT carrier, code
    FROM dm_arlines_name
    WHERE code IN (SELECT code
                    FROM dm_arlines_name
                    GROUP BY code
                    HAVING COUNT(code) > 1
                    ORDER BY 1 DESC
    )
    ORDER BY 2
""").show(100)

+--------------------+----+
|             carrier|code|
+--------------------+----+
|             NetJets|  1I|
|              Novair|  1I|
|         Aerolitoral|  5D|
|         DonbassAero|  5D|
|              Maxair|  8M|
|Myanmar Airways I...|  8M|
|            Onur Air|  8Q|
|  Maldivian Air Taxi|  8Q|
|         Contact Air|  C3|
|           QatXpress|  C3|
|     Macair Airlines|  CC|
|Air Atlanta Icela...|  CC|
|Continental Airlines|  CO|
| Continental Express|  CO|
|   Canadian Airlines|  CP|
|    Compass Airlines|  CP|
|         Sky Express|  G3|
|City Connexion Ai...|  G3|
|Gol Transportes A...|  G3|
|           Air Italy|  I9|
|              Indigo|  I9|
|      Japan Airlines|  JL|
|Japan Airlines Do...|  JL|
|     Lufthansa Cargo|  LH|
|           Lufthansa|  LH|
|Royal Nepal Airlines|  RA|
|      Nepal Airlines|  RA|
|Singapore Airline...|  SQ|
|  Singapore Airlines|  SQ|
|            Airnorth|  TL|
|Trans Mediterrane...|  TL|
|    Vueling Airlines|  VY|
|    Formosa Airline

In [None]:
# Above only seem to duplicates across geographic areas, some duplicates for same airlines but different spelling or different service i.e. Cargo or Royal 

In [64]:
# Adding Country column
df_airlines.select(col("name").alias("carrier"),
                   col("iata").alias("code"),
                   col("country")) \
                    .filter(col("iata") != "-") \
                    .filter(col("iata") != "null") \
                    .filter(col("active") == 'Y') \
                    .dropDuplicates() \
                    .createOrReplaceTempView('dm_arlines_name')

In [63]:
# Checking out values that are duplicate
spark.sql("""
    SELECT carrier, code, country
    FROM dm_arlines_name
    WHERE code IN (SELECT code
                    FROM dm_arlines_name
                    GROUP BY code
                    HAVING COUNT(code) > 1
                    ORDER BY 1 DESC
    )
    ORDER BY 2
""").show(100)

+--------------------+----+--------------+
|             carrier|code|       country|
+--------------------+----+--------------+
|             NetJets|  1I| United States|
|              Novair|  1I|        Sweden|
|         Aerolitoral|  5D|        Mexico|
|         DonbassAero|  5D|       Ukraine|
|Myanmar Airways I...|  8M|       Myanmar|
|              Maxair|  8M|        Sweden|
|  Maldivian Air Taxi|  8Q|      Maldives|
|            Onur Air|  8Q|        Turkey|
|           QatXpress|  C3|         Qatar|
|         Contact Air|  C3|       Germany|
|     Macair Airlines|  CC|     Australia|
|Air Atlanta Icela...|  CC|       Iceland|
| Continental Express|  CO| United States|
|Continental Airlines|  CO| United States|
|   Canadian Airlines|  CP|        Canada|
|    Compass Airlines|  CP| United States|
|Gol Transportes A...|  G3|        Brazil|
|City Connexion Ai...|  G3|       Burundi|
|         Sky Express|  G3|        Greece|
|              Indigo|  I9| United States|
|          

In [72]:
# Adding callsign and ICAO
df_airlines.select(col("name").alias("carrier"),
                   col("iata").alias("code"),
                   col("country"),
                   col("callsign"),
                   col("icao")) \
                    .filter(col("iata") != "-") \
                    .filter(col("iata") != "null") \
                    .filter(col("active") == 'Y') \
                    .dropDuplicates() \
                    .createOrReplaceTempView('dm_arlines_name')

In [75]:
# Checking out values that are duplicate
spark.sql("""
    SELECT carrier, code, country, callsign, icao
    FROM dm_airlines_name
    WHERE code IN (SELECT code
                    FROM dm_arlines_name
                    GROUP BY code
                    HAVING COUNT(code) > 1
                    ORDER BY 1 DESC
    )
    ORDER BY 2
""").show(100)

+--------------------+----+--------------+--------------------+----+
|             carrier|code|       country|            callsign|icao|
+--------------------+----+--------------+--------------------+----+
|             NetJets|  1I| United States|             EXECJET| EJA|
|              Novair|  1I|        Sweden|           NAVIGATOR| NVR|
|         Aerolitoral|  5D|        Mexico|             COSTERA| SLI|
|         DonbassAero|  5D|       Ukraine|        DONBASS AERO| UDC|
|              Maxair|  8M|        Sweden|              MAXAIR| MXL|
|Myanmar Airways I...|  8M|       Myanmar|assignment postponed| MMM|
|  Maldivian Air Taxi|  8Q|      Maldives|                  \N|  \N|
|            Onur Air|  8Q|        Turkey|            ONUR AIR| OHY|
|           QatXpress|  C3|         Qatar|                null| QAX|
|         Contact Air|  C3|       Germany|          CONTACTAIR| KIS|
|Air Atlanta Icela...|  CC|       Iceland|             ATLANTA| ABD|
|     Macair Airlines|  CC|     Au

In [78]:
# Filter down further to check further. Looks like only duplicates appear are Japan Airlines and Nepal Airlines which is negelible
# Probably for this reason, would have been to use IACO codes from ft_i94, however it seems to have only IATA codes
# Might have been better use to Flight number info
spark.sql("""
WITH t as (
    SELECT carrier, code, country, callsign, icao
    FROM dm_arlines_name
    WHERE code IN (SELECT code
                    FROM dm_arlines_name
                    GROUP BY code
                    HAVING COUNT(code) > 1
                    ORDER BY 1 DESC
    )
    ORDER BY 2
)
SELECT COUNT(icao), icao, call 
FROM t
GROUP BY icao
HAVING COUNT(icao) > 1
""").show()

+-----------+----+
|count(icao)|icao|
+-----------+----+
|          2| JAL|
|          2| RNA|
+-----------+----+

