In [157]:
from pyspark.sql import SparkSession, DataFrame
import json
import os, sys

spark = SparkSession.builder.appName('airline_etl').getOrCreate()
# Testing Spark Environment
from datetime import datetime, date
from pyspark.sql import Row

In [158]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [159]:
"""
We want to clean up the data swamp on my hard disk.
Let's list the files in the directory.
Maybe we even print a recursive file tree?
"""

start_path = '/Users/ryan/Data'
for root, dirs, files in os.walk(start_path):
    path = root.split(os.sep)
    print((len(path) - 1) * '---', os.path.basename(root))
    for file in files:
        print(len(path) * '---', file)

--------- Data
------------ .DS_Store
------------ fhvhv_tripdata_2023-01.parquet
------------ aa-domestic-delays-2018.csv
------------ sample_data.csv
------------ eng-mlops-weather_dataset_raw.csv
------------ weather_data.json
------------ yellow_tripdata_2022-12.parquet
------------ fhv_tripdata_2022-12.parquet
------------ zoo.csv
------------ green_tripdata_2022-12.parquet
------------ db1b.public.202212.asc
------------ green_tripdata_2023-01.parquet
------------ fhv_tripdata_2023-01.parquet
------------ 256_sampledata.zip
------------ yellow_tripdata_2023-01.parquet
------------ weather_data_available.json
------------ CO2_Emissions_Canada.csv
------------ eng-mlops-weather_dataset_processed.csv
------------ fhvhv_tripdata_2022-12.parquet
------------ aa-domestic-delays-2018.csv.zip
------------ Archive.tar.bz2
------------ P2-Mispriced-Diamonds.csv
------------ flights
--------------- L_STATE_FIPS.csv
--------------- .DS_Store
--------------- L_DIVERSIONS.csv
--------------- L

In [160]:
# Let's look at a `ontime.td.YYYYMM.asc` file
ontime_file_path = "/Users/ryan/Data/flights/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2023_1/"
ontime_file_name = "On_Time_Marketing_Carrier_On_Time_Performance_(Beginning_January_2018)_2023_1.csv"
df = spark.read.csv(ontime_file_path + ontime_file_name, sep=',', header=True)
df.show(4)

+----+-------+-----+----------+---------+----------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+---------------------------------------+----------------------------------------------+-------------------------------------------------+--------------------------------------------------+------------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+-------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+-------------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+-

23/11/15 16:40:08 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Year, Quarter, Month, DayofMonth, DayOfWeek, FlightDate, Marketing_Airline_Network, Operated_or_Branded_Code_Share_Partners, DOT_ID_Marketing_Airline, IATA_Code_Marketing_Airline, Flight_Number_Marketing_Airline, Originally_Scheduled_Code_Share_Airline, DOT_ID_Originally_Scheduled_Code_Share_Airline, IATA_Code_Originally_Scheduled_Code_Share_Airline, Flight_Num_Originally_Scheduled_Code_Share_Airline, Operating_Airline , DOT_ID_Operating_Airline, IATA_Code_Operating_Airline, Tail_Number, Flight_Number_Operating_Airline, OriginAirportID, OriginAirportSeqID, OriginCityMarketID, Origin, OriginCityName, OriginState, OriginStateFips, OriginStateName, OriginWac, DestAirportID, DestAirportSeqID, DestCityMarketID, Dest, DestCityName, DestState, DestStateFips, DestStateName, DestWac, CRSDepTime, DepTime, DepDelay, DepDelayMinutes, DepDel15, DepartureDelayGroups, DepTimeBlk, TaxiOut, WheelsOff, WheelsOn,

In [161]:
# we need a mapping of the columns to the data types
# Reference: https://www.transtats.bts.gov/Fields.asp?gnoyr_VQ=FGJ
import pyspark.sql.functions as fn
df = df.drop(fn.col("_c119"))
df.show(2)

+----+-------+-----+----------+---------+----------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+---------------------------------------+----------------------------------------------+-------------------------------------------------+--------------------------------------------------+------------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+-----------

In [162]:
df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Marketing_Airline: string (nullable = true)
 |-- IATA_Code_Marketing_Airline: string (nullable = true)
 |-- Flight_Number_Marketing_Airline: string (nullable = true)
 |-- Originally_Scheduled_Code_Share_Airline: string (nullable = true)
 |-- DOT_ID_Originally_Scheduled_Code_Share_Airline: string (nullable = true)
 |-- IATA_Code_Originally_Scheduled_Code_Share_Airline: string (nullable = true)
 |-- Flight_Num_Originally_Scheduled_Code_Share_Airline: string (nullable = true)
 |-- Operating_Airline : string (nullable = true)
 |-- DOT_ID_Operating_Airline: string (nullable = true)
 |-- IATA_Code_Operating_Airline: 

In [163]:
# Create a Schema json and edit it and test it
# This needs to be a function since I often do this
# Put the data type in the csv and then add data definition to the json
path_to_schema = "./schema.json"
def write_schema_to_json(df: DataFrame, file_path: str):
    """
    Writes the schema of the given SparkDataFrame to a JSON file.

    :param df: The SparkDataFrame whose schema is to be saved.
    :param file_path: The path of the JSON file where the schema will be saved.
    """
    # Extract the schema of the DataFrame
    schema = df.schema

    # Convert the schema to JSON string
    schema_json = schema.json()

    # Write the JSON string to a file
    with open(file_path, 'w') as file:
        file.write(schema_json)

    print(f"Schema written to {file_path}")

In [164]:
write_schema_to_json(df, path_to_schema)

Schema written to ./schema.json


In [165]:
query_cols = ["Year",
              "Month",
              "DayofMonth",
                "DayOfWeek",
                "FlightDate",
                "DOT_ID_Marketing_Airline",
                "Flight_Number_Marketing_Airline",
                "OriginAirportID",
                "OriginAirportSeqID",
                "OriginCityMarketID",
                "Origin",
                "OriginCityName",
                "Dest",
                "Tail_Number"]

df = df.select(query_cols).where( (fn.col("Origin") == 'ATL') | (fn.col("Dest") == 'ATL') )
df.show(32)

+----+-----+----------+---------+----------+------------------------+-------------------------------+---------------+------------------+------------------+------+-------------------+----+-----------+
|Year|Month|DayofMonth|DayOfWeek|FlightDate|DOT_ID_Marketing_Airline|Flight_Number_Marketing_Airline|OriginAirportID|OriginAirportSeqID|OriginCityMarketID|Origin|     OriginCityName|Dest|Tail_Number|
+----+-----+----------+---------+----------+------------------------+-------------------------------+---------------+------------------+------------------+------+-------------------+----+-----------+
|2023|    1|        22|        7|2023-01-22|                   20409|                           1719|          12478|           1247805|             31703|   JFK|       New York, NY| ATL|     N746JB|
|2023|    1|        22|        7|2023-01-22|                   20409|                           1967|          12478|           1247805|             31703|   JFK|       New York, NY| ATL|     N937JB|


## More tasks
- Add a column for source (Monthly bts.gov or daily flightaware.com)
- Add a column for observation_id
- Pick origin-dest pairs that are interesting and also to make the data smaller.
- Should have a separate code for landing data from bts, flightaware, and other sources
- https://www.bts.gov/browse-statistical-products-and-data/bts-publications/airline-service-quality-performance-234-time
- If the asc file is before a certain date, use one form_234_schema_2018.json, else use form_234_schema_2019.json (something like that)
- I want a function that converts column names to snake case. This could be a field in the csv file or table that is converted json file.
- Can do "full" analysis locally, and then also do a "small" analysis locally. Then repeat the smaller analysis on cloud. Keep in mind there are several local strategies to do the same classification.