# 6a. Graph Analytics - Preprocessing

# 6a.1 Import Libraries

In [0]:
from pyspark.sql.functions import lit, col, desc, abs, isnan, to_date, rand, length, count, when, hour, dayofweek, round, explode, lower, udf, mean, avg, stddev, min, max, coalesce, concat_ws, row_number, monotonically_increasing_id, floor, round as spark_round

from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, BucketedRandomProjectionLSH
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
from pyspark.sql.types import NumericType
from pyspark.sql.functions import countDistinct

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import plotly.express as px
from sklearn.impute import KNNImputer
from functools import reduce

import re

In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("Graph_Analytics_Preprocessing").getOrCreate()

# 6a.2 Import Df

In [0]:
# Import df_eda
df = spark.read.format("delta").load("/dbfs/FileStore/tables/df_eda")

# Display result
df.limit(10).display()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
2015,1,1,4,NK,597,N528NK,MSP,FLL,115,127,12,14,141,207,220,166,1487,527,40,542,607,25,0,0,,25,0,0,0,0
2015,1,1,4,NK,168,N629NK,PHX,ORD,125,237,72,9,246,204,175,156,1440,622,10,549,632,43,0,0,,43,0,0,0,0
2015,1,1,4,HA,17,N389HA,LAS,HNL,145,145,0,16,201,370,385,361,2762,602,8,555,610,15,0,0,,0,0,15,0,0
2015,1,1,4,B6,1030,N239JB,BQN,MCO,307,304,-3,25,329,173,196,160,1129,509,11,500,520,20,0,0,,20,0,0,0,0
2015,1,1,4,B6,2134,N307JB,SJU,MCO,400,535,95,9,544,185,175,163,1189,727,3,605,730,85,0,0,,0,0,85,0,0
2015,1,1,4,B6,2276,N646JB,SJU,BDL,438,550,72,15,605,241,258,237,1666,902,6,739,908,89,0,0,,17,0,72,0,0
2015,1,1,4,AA,1057,N3ASAA,DFW,MIA,515,703,108,15,718,161,155,133,1121,1031,7,856,1038,102,0,0,,0,0,0,0,102
2015,1,1,4,US,425,N174US,PDX,PHX,520,620,60,13,633,150,150,132,1009,945,5,850,950,60,0,0,,0,0,60,0,0
2015,1,1,4,AA,89,N3KVAA,IAH,MIA,520,618,58,19,637,141,137,111,964,928,7,841,935,54,0,0,,0,0,54,0,0
2015,1,1,4,AA,328,N4XKAA,DEN,DFW,530,623,53,32,655,125,138,96,641,931,10,835,941,66,0,0,,13,0,53,0,0


In [0]:
print(f"Total rows: {df.count()}")

Total rows: 1063439


# 6a.3 Preprocessing From Notebook 2

We applied all the preprocessing from the notebook "2_Preprocessing", excluding:
- Train-Val-Test Split
- Encoding

## 6a.3.1 Sampling

In [0]:
df_sampled = df.sample(withReplacement=False, fraction=0.05, seed=42)

In [0]:
print(f"Total rows of the sampled dataset: {df_sampled.count()}")

Total rows of the sampled dataset: 53429


## 6a.3.2 Treatment of numerical airport codes

### 6a.3.2.1 Map numeric airport codes to IATA codes

We map the numerical airport codes in the dataset to their corresponding IATA codes. This is achieved by importing two auxiliary datasets:

- One with the numerical airport codes and their descriptions.
- Another containing the IATA codes and their descriptions.

We perform a join on the airport description field in order to link the numerical code with the corresponding IATA code.

In [0]:
# File location and type
l_airport_file_location = "/FileStore/tables/L_AIRPORT.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


airport = (
    spark.read.format(file_type)
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .option("quote", '"') # To handle commas correctly
    .option("escape", '"')
    .option("multiLine", "true")  # Allow fields to span multiple lines (helps with complex quoted fields)
    .option("mode", "PERMISSIVE") # Avoid failing on corrupt records
    .load(l_airport_file_location)
)


# Display result
airport.limit(10).display()

Code,Description
01A,"Afognak Lake, AK: Afognak Lake Airport"
03A,"Granite Mountain, AK: Bear Creek Mining Strip"
04A,"Lik, AK: Lik Mining Camp"
05A,"Little Squaw, AK: Little Squaw Airport"
06A,"Kizhuyak, AK: Kizhuyak Bay"
07A,"Klawock, AK: Klawock Seaplane Base"
08A,"Elizabeth Island, AK: Elizabeth Island Airport"
09A,"Homer, AK: Augustin Island"
1AK,"Mertarvik, AK: Mertarvik Quarry Road Landing Strip"
1B1,"Hudson, NY: Columbia County"


In [0]:
# File location and type
l_airport_id_file_location = "/FileStore/tables/L_AIRPORT_ID.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


airport_id = (
    spark.read.format(file_type)
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .option("quote", '"') # To handle commas correctly
    .option("escape", '"')
    .option("multiLine", "true")  # Allow fields to span multiple lines (helps with complex quoted fields)
    .option("mode", "PERMISSIVE") # Avoid failing on corrupt records
    .load(l_airport_id_file_location)
)


# Display result
airport_id.limit(10).display()

Code,Description
10001,"Afognak Lake, AK: Afognak Lake Airport"
10003,"Granite Mountain, AK: Bear Creek Mining Strip"
10004,"Lik, AK: Lik Mining Camp"
10005,"Little Squaw, AK: Little Squaw Airport"
10006,"Kizhuyak, AK: Kizhuyak Bay"
10007,"Klawock, AK: Klawock Seaplane Base"
10008,"Elizabeth Island, AK: Elizabeth Island Airport"
10009,"Homer, AK: Augustin Island"
10010,"Hudson, NY: Columbia County"
10011,"Peach Springs, AZ: Grand Canyon West"


In [0]:
airport_id = airport_id.withColumnRenamed("Code", "airport_id")
airport = airport.withColumnRenamed("Code", "iata_code")

# Join on Description
airport_full = airport_id.join(airport, on="Description", how="inner")

# Final selection
airport_full.select("iata_code", "airport_id", "Description")

airport_full.limit(10).display()

Description,airport_id,iata_code
"Afognak Lake, AK: Afognak Lake Airport",10001,01A
"Granite Mountain, AK: Bear Creek Mining Strip",10003,03A
"Lik, AK: Lik Mining Camp",10004,04A
"Little Squaw, AK: Little Squaw Airport",10005,05A
"Kizhuyak, AK: Kizhuyak Bay",10006,06A
"Klawock, AK: Klawock Seaplane Base",10007,07A
"Elizabeth Island, AK: Elizabeth Island Airport",10008,08A
"Homer, AK: Augustin Island",10009,09A
"Hudson, NY: Columbia County",10010,1B1
"Peach Springs, AZ: Grand Canyon West",10011,DQR


### 6a.3.2.2 Function to replace numerical airport codes

We define a function to identify and replace numerical airport codes in the ORIGIN_AIRPORT and DESTINATION_AIRPORT columns with their corresponding IATA codes using the mapping dataframe. The function returns a cleaned dataframe with two new columns: ORIGIN_AIRPORT_CLEAN and DESTINATION_AIRPORT_CLEAN.

In [0]:
def replace_numeric_airports(df, airport_mapping_df):
    """
    Replace numeric airport codes in ORIGIN_AIRPORT and DESTINATION_AIRPORT
    with IATA codes.

    Parameters:
    df (DataFrame): input dataframe
    airport_mapping_df (DataFrame): must contain columns ['airport_code', 'iata_code']

    Returns:
    DataFrame: cleaned DataFrame with ORIGIN_AIRPORT_CLEAN and DESTINATION_AIRPORT_CLEAN
    """

    # Filter numeric origin and destination airport codes
    numeric_origin_df = df.filter(col("ORIGIN_AIRPORT").rlike("^[0-9]+")) \
                                .select(col("ORIGIN_AIRPORT").cast("string").alias("airport_code"))

    numeric_dest_df = df.filter(col("DESTINATION_AIRPORT").rlike("^[0-9]+")) \
                            .select(col("DESTINATION_AIRPORT").cast("string").alias("airport_code"))

    print("Unique numeric ORIGIN_AIRPORT codes:", numeric_origin_df.select("airport_code").distinct().count())
    print("Unique numeric DESTINATION_AIRPORT codes:", numeric_dest_df.select("airport_code").distinct().count())

    # Union and drop duplicates to get unique codes from both origin and destination
    numeric_airports_df = numeric_origin_df.union(numeric_dest_df).distinct()

    print("Total unique numeric airport codes:", numeric_airports_df.count())

    # Union and drop duplicates to get unique codes from both origin and destination
    mapped_airports = numeric_airports_df.select("airport_code").distinct() \
        .join(
            airport_full.select("airport_id", "iata_code"),
            numeric_airports_df["airport_code"] == airport_full["airport_id"],
            how="left"
        ) \
        .select("airport_code", "iata_code") \
        .dropDuplicates()

    # Join with mapping for ORIGIN and DESTINATION
    joined_df = df \
        .join(mapped_airports.withColumnRenamed("airport_code", "origin_id").withColumnRenamed("iata_code", "origin_iata"),
            df["ORIGIN_AIRPORT"] == col("origin_id"),
            how="left") \
        .join(mapped_airports.withColumnRenamed("airport_code", "dest_id").withColumnRenamed("iata_code", "dest_iata"),
            df["DESTINATION_AIRPORT"] == col("dest_id"),
            how="left")

    # Replace numeric codes with IATA codes if available
    replaced_df = joined_df.withColumn(
        "ORIGIN_AIRPORT_CLEAN",
        when(col("origin_iata").isNotNull(), col("origin_iata")).otherwise(col("ORIGIN_AIRPORT"))
    ).withColumn(
        "DESTINATION_AIRPORT_CLEAN",
        when(col("dest_iata").isNotNull(), col("dest_iata")).otherwise(col("DESTINATION_AIRPORT"))
    )

    # Filter numeric origin and destination airport codes
    numeric_origin_df = replaced_df.filter(col("ORIGIN_AIRPORT_CLEAN").rlike("^[0-9]+")) \
                                .select(col("ORIGIN_AIRPORT_CLEAN").cast("string").alias("airport_code"))

    numeric_dest_df = replaced_df.filter(col("DESTINATION_AIRPORT_CLEAN").rlike("^[0-9]+")) \
                            .select(col("DESTINATION_AIRPORT_CLEAN").cast("string").alias("airport_code"))

    # Count numeric entries
    print("Numeric ORIGIN_AIRPORT_CLEAN count:", numeric_origin_df.count())
    print("Numeric DESTINATION_AIRPORT_CLEAN count:", numeric_dest_df.count())

    # Clean up
    return replaced_df.drop("origin_id", "dest_id", "origin_iata", "dest_iata")


In [0]:
cleaned_sampled_df = replace_numeric_airports(df_sampled, airport_full)

Unique numeric ORIGIN_AIRPORT codes: 206
Unique numeric DESTINATION_AIRPORT codes: 211
Total unique numeric airport codes: 253
Numeric ORIGIN_AIRPORT_CLEAN count: 0
Numeric DESTINATION_AIRPORT_CLEAN count: 0


## 6a.3.3 Missing values

As seen in 1_EDA, there's missing values in cancelation reason. Since the feature is not relevant for analysis, we will drop it.

In [0]:
cleaned_sampled_df = cleaned_sampled_df.drop("CANCELLATION_REASON")

## 6a.3.4 Feature Engineering

### 6a.3.4.1 Functions

#### Seasonality from Month
To capture seasonal patterns, we created a new categorical feature SEASON based on MONTH:

| Season  | Months                     |
|---------|----------------------------|
| Winter  | December, January, February |
| Spring  | March, April, May           |
| Summer  | June, July, August          |
| Autumn  | September, October, November |


In [0]:
def add_season_column(df):
    return df.withColumn("SEASON",
        when(col("MONTH").isin(12, 1, 2), "Winter")
        .when(col("MONTH").isin(3, 4, 5), "Spring")
        .when(col("MONTH").isin(6, 7, 8), "Summer")
        .when(col("MONTH").isin(9, 10, 11), "Autumn"))

#### Departure Period from Scheduled Time

We created a new feature DEPARTURE_PERIOD based on the SCHEDULED_DEPARTURE time. The mapping is as follows:

| Period        | Time Range         |
|---------------|--------------------|
| Early Morning | 04:00 - 07:59      |
| Morning       | 08:00 - 11:59      |
| Midday        | 12:00 - 13:59      |
| Afternoon     | 14:00 - 17:59      |
| Evening       | 18:00 - 20:59      |
| Night         | 21:00 - 23:59      |
| Late Night    | 00:00 - 03:59      |


In [0]:
def add_scheduled_dep_period_column(df):
    hour = floor(col("SCHEDULED_DEPARTURE") / 100)
    return df.withColumn(
        "SCHEDULED_DEPARTURE_PERIOD",
        when((hour >= 4) & (hour <= 7), "Early Morning")
        .when((hour >= 8) & (hour <= 11), "Morning")
        .when((hour >= 12) & (hour <= 13), "Midday")
        .when((hour >= 14) & (hour <= 17), "Afternoon")
        .when((hour >= 18) & (hour <= 20), "Evening")
        .when((hour >= 21) & (hour <= 23), "Night")
        .otherwise("Late Night")  # 00:00–03:59
    )


#### IS_WEEKEND


In [0]:
def add_is_weekend_column(df):
    return df.withColumn("IS_WEEKEND", (col("DAY_OF_WEEK") >= 6).cast("int"))

#### DELAYED_DEPARTURE_FLAG

A flight is officially considered “delayed” if its departure is more than 15 minutes late:
https://www.oag.com/airline-on-time-performance-defining-late#fifteenmins

In [0]:
def add_delayed_departure_flag_column(df):
    return df.withColumn("DELAYED_DEPARTURE_FLAG", (col("DEPARTURE_DELAY") > 15).cast("int"))

#### ROUTE

In [0]:
def add_route_column(df):
    return df.withColumn("ROUTE", concat_ws("_", col("ORIGIN_AIRPORT_CLEAN"), col("DESTINATION_AIRPORT_CLEAN")))

#### TOTAL_KNOWN_DELAY

In "Choerence Checking" section, is proved to be exactly the same as "ARRIVAL_DELAY"

In [0]:
def add_total_known_delay(df):
    return df.withColumn(
        "TOTAL_KNOWN_DELAY",
        coalesce(col("AIR_SYSTEM_DELAY"), lit(0)) +
        coalesce(col("SECURITY_DELAY"), lit(0)) +
        coalesce(col("AIRLINE_DELAY"), lit(0)) +
        coalesce(col("LATE_AIRCRAFT_DELAY"), lit(0)) +
        coalesce(col("WEATHER_DELAY"), lit(0))
    )

#### HHMM columns to minutes from midnight

In [0]:
def hhmm_to_minutes(colname):
    return (floor(col(colname) / 100) * 60 + (col(colname) % 100)).alias(colname + "_min")

### 6a.3.4.2 Applying functions

In [0]:
cleaned_sampled_df = add_season_column(cleaned_sampled_df)
cleaned_sampled_df = add_scheduled_dep_period_column(cleaned_sampled_df)
cleaned_sampled_df = add_is_weekend_column(cleaned_sampled_df)
cleaned_sampled_df = add_delayed_departure_flag_column(cleaned_sampled_df)
cleaned_sampled_df = add_route_column(cleaned_sampled_df)
cleaned_sampled_df = add_total_known_delay(cleaned_sampled_df)

In [0]:
cleaned_sampled_df = cleaned_sampled_df.withColumn("SCHEDULED_DEPARTURE_min", hhmm_to_minutes("SCHEDULED_DEPARTURE"))
cleaned_sampled_df = cleaned_sampled_df.withColumn("SCHEDULED_ARRIVAL_min", hhmm_to_minutes("SCHEDULED_ARRIVAL"))
cleaned_sampled_df = cleaned_sampled_df.withColumn("DEPARTURE_TIME_min", hhmm_to_minutes("DEPARTURE_TIME"))
cleaned_sampled_df = cleaned_sampled_df.withColumn("WHEELS_OFF_min", hhmm_to_minutes("WHEELS_OFF"))
cleaned_sampled_df = cleaned_sampled_df.withColumn("ARRIVAL_TIME_min", hhmm_to_minutes("ARRIVAL_TIME"))
cleaned_sampled_df = cleaned_sampled_df.withColumn("DEPARTURE_TIME_min", hhmm_to_minutes("DEPARTURE_TIME"))

In [0]:
def plot_categorical_distribution(df, column_name, title=None, xlabel=None, ylabel="Count", figsize=(8, 4)):
    """
    Plots the distribution of a categorical column from a PySpark DataFrame.

    Parameters:
        df (PySpark DataFrame): The input DataFrame.
        column_name (str): The name of the categorical column to analyze.
        title (str): Plot title.
        xlabel (str): X-axis label.
        ylabel (str): Y-axis label (default: "Count").
        figsize (tuple): Size of the plot.
    """
    dist_df = (
        df.groupBy(column_name)
        .count()
        .toPandas()
        .sort_values("count", ascending=False)
    )

    plt.figure(figsize=figsize)
    plt.bar(dist_df[column_name], dist_df["count"])
    plt.title(title if title else f"{column_name} Distribution")
    plt.xlabel(xlabel if xlabel else column_name)
    plt.ylabel(ylabel)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

## 6a.3.5 Outliers

As observed in the boxplot, DISTANCE contains outliers. To reduce their impact on the model, we cap the values at the 97.5th percentile.


In [0]:
def cap_at_percentile(df, column, percentile):
    threshold = df.approxQuantile(column, [percentile], 0.01)[0]
    return df.withColumn(column, when(col(column) > threshold, threshold).otherwise(col(column)))


In [0]:
cleaned_sampled_df = cap_at_percentile(cleaned_sampled_df, "DISTANCE", 0.975)

## 6a.3.6 Coherence Checking

### 6a.3.6.1 Duplicated rows (after mapping airports)

We noticed airport codes repeated so we have to treat that

In [0]:
origin_train_mapping = (
    cleaned_sampled_df.groupBy("ORIGIN_AIRPORT")
    .agg(min("ORIGIN_AIRPORT_CLEAN").alias("PREFERRED_ORIGIN_AIRPORT_CLEAN"))
)

destination_train_mapping = (
    cleaned_sampled_df.groupBy("DESTINATION_AIRPORT")
    .agg(min("DESTINATION_AIRPORT_CLEAN").alias("PREFERRED_DESTINATION_AIRPORT_CLEAN"))
)

# Join and overwrite ORIGIN_AIRPORT_CLEAN
cleaned_sampled_df = cleaned_sampled_df.join(
    origin_train_mapping, on="ORIGIN_AIRPORT", how="left"
)

# Join and overwrite DESTINATION_AIRPORT_CLEAN
cleaned_sampled_df = cleaned_sampled_df.join(
    destination_train_mapping, on="DESTINATION_AIRPORT", how="left"
)

In [0]:
cleaned_sampled_df = cleaned_sampled_df.filter(
    (col("ORIGIN_AIRPORT_CLEAN") == col("PREFERRED_ORIGIN_AIRPORT_CLEAN")) &
    (col("DESTINATION_AIRPORT_CLEAN") == col("PREFERRED_DESTINATION_AIRPORT_CLEAN"))
)

cleaned_sampled_df = cleaned_sampled_df.drop(
    "PREFERRED_ORIGIN_AIRPORT_CLEAN",
    "PREFERRED_DESTINATION_AIRPORT_CLEAN"
)

### 6a.3.6.2 Other coherence checkings

In [0]:
#create a new feature, a flag, that indicates if the flight happened during midnight (1) or didn't (0), which would explain an arrival time min smaller than the departure time min, since the conversion counts the minutes since midnight
cleaned_sampled_df = cleaned_sampled_df.withColumn(
    'TOTAL_FLIGHT_MIDNIGHT_min',
    col('DEPARTURE_TIME_min') + col('AIR_TIME')
)

#function to flag flights that cross midnight
def add_crosses_midnight_flag_column(df):
    return df.withColumn('CROSSES_MIDNIGHT_FLAG', (col('TOTAL_FLIGHT_MIDNIGHT_min') >= 1440).cast("int"))

cleaned_sampled_df = add_crosses_midnight_flag_column(cleaned_sampled_df)

In [0]:
#remove rows in which departure delay is negative, the flights departure before their scheduled time
cleaned_sampled_df = cleaned_sampled_df.filter(cleaned_sampled_df.DEPARTURE_DELAY >= 0)

In [0]:
# remove rows in which the plane left before the scheduled time without being those flights that cross midnight
cleaned_sampled_df = cleaned_sampled_df.filter(
    cleaned_sampled_df.DEPARTURE_TIME_min >= cleaned_sampled_df.SCHEDULED_DEPARTURE_min
)

In [0]:
# remove rows where other time constraints are not satisfied
cleaned_sampled_df = cleaned_sampled_df.filter(
    ~(
        (col('CROSSES_MIDNIGHT_FLAG')!=1) &                               
        (col("WHEELS_OFF_min") < col("DEPARTURE_TIME_min")) | 
        (col("WHEELS_ON") < col("WHEELS_OFF")) |
        (col("ARRIVAL_TIME") < col("WHEELS_ON")) |
        (col("SCHEDULED_ARRIVAL_min") < col("SCHEDULED_DEPARTURE_min"))
    )
)

## 6a.3.7 Drop and Rename Features

In [0]:
cleaned_sampled_df.printSchema()

root
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (nulla

In [0]:
cleaned_sampled_df.limit(10).display()

DESTINATION_AIRPORT,ORIGIN_AIRPORT,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,ORIGIN_AIRPORT_CLEAN,DESTINATION_AIRPORT_CLEAN,SEASON,SCHEDULED_DEPARTURE_PERIOD,IS_WEEKEND,DELAYED_DEPARTURE_FLAG,ROUTE,TOTAL_KNOWN_DELAY,SCHEDULED_DEPARTURE_min,SCHEDULED_ARRIVAL_min,DEPARTURE_TIME_min,WHEELS_OFF_min,ARRIVAL_TIME_min,TOTAL_FLIGHT_MIDNIGHT_min,CROSSES_MIDNIGHT_FLAG
ATL,ILM,2015,12,19,6,EV,5042,N850AS,1046,1108,22,15,1123,87,86,63,377.0,1226,8,1213,1234,21,0,0,0,0,5,16,0,ILM,ATL,Winter,Morning,1,1,ILM_ATL,21,646,733,668,683,754,731,0
CLT,ILM,2015,12,10,4,AA,1941,N752US,725,833,68,12,845,70,55,39,185.0,924,4,835,928,53,0,0,0,0,53,0,0,ILM,CLT,Winter,Early Morning,0,1,ILM_CLT,53,445,515,513,525,568,552,0
ATL,ILM,2015,12,3,4,EV,5106,N923EV,1440,1820,220,16,1836,90,96,67,377.0,1943,13,1610,1956,226,0,0,6,0,220,0,0,ILM,ATL,Winter,Afternoon,0,1,ILM_ATL,226,880,970,1100,1116,1196,1167,0
ATL,ILM,2015,11,29,7,EV,5239,N843AS,1805,1820,15,13,1833,91,93,68,377.0,1941,12,1936,1953,17,0,0,2,0,0,15,0,ILM,ATL,Autumn,Evening,1,0,ILM_ATL,17,1085,1176,1100,1113,1193,1168,0
ATL,ILM,2015,11,9,1,EV,4870,N858AS,1805,1815,10,8,1823,91,97,69,377.0,1932,20,1936,1952,16,0,0,6,0,1,9,0,ILM,ATL,Autumn,Evening,0,0,ILM_ATL,16,1085,1176,1095,1103,1192,1164,0
CLT,ILM,2015,11,6,5,AA,1821,N754UW,1430,1430,0,30,1500,70,87,50,185.0,1550,7,1540,1557,17,0,0,17,0,0,0,0,ILM,CLT,Autumn,Afternoon,0,0,ILM_CLT,17,870,940,870,900,957,920,0
ATL,ILM,2015,5,18,1,EV,5245,N936EV,1250,1436,106,9,1445,89,81,62,377.0,1547,10,1419,1557,98,0,0,0,0,60,38,0,ILM,ATL,Spring,Midday,0,1,ILM_ATL,98,770,859,876,885,957,938,0
ATL,ILM,2015,4,10,5,EV,4955,N852AS,1355,1507,72,13,1520,88,84,63,377.0,1623,8,1523,1631,68,0,0,0,0,0,68,0,ILM,ATL,Spring,Midday,0,1,ILM_ATL,68,835,923,907,920,991,970,0
ATL,ILM,2015,3,29,7,DL,1969,N944DL,725,823,58,9,832,94,78,59,377.0,931,10,859,941,42,0,0,0,0,42,0,0,ILM,ATL,Spring,Early Morning,1,1,ILM_ATL,42,445,539,503,512,581,562,0
DFW,LAW,2015,9,24,4,EV,2608,N687JS,1025,1028,3,12,1040,57,88,30,140.0,1110,46,1122,1156,34,0,0,31,0,3,0,0,LAW,DFW,Autumn,Morning,0,0,LAW_DFW,34,625,682,628,640,716,658,0


In [0]:
# Define columns to drop
cols_to_drop = [
    # 'YEAR',
    # 'MONTH',
    # 'DAY',
    'ORIGIN_AIRPORT', # wrong one
    'DESTINATION_AIRPORT', # wrong one
    "SCHEDULED_DEPARTURE", # HHMM
    "SCHEDULED_ARRIVAL", # HHMM
    "DEPARTURE_TIME", # HHMM
    "WHEELS_OFF", # HHMM
    "DELAYED_DEPARTURE_FLAG", # for graph analytics, diverted and cancelled flights will be filtered out
    "TOTAL_KNOWN_DELAY" # equal to arrival_delay
]

# Drop from DataFrames
cleaned_sampled_df = cleaned_sampled_df.drop(*cols_to_drop)

In [0]:
# Rename columns for clarity
cleaned_sampled_df = cleaned_sampled_df.withColumnRenamed("ORIGIN_AIRPORT_CLEAN", "ORIGIN_AIRPORT")
cleaned_sampled_df = cleaned_sampled_df.withColumnRenamed("DESTINATION_AIRPORT_CLEAN", "DESTINATION_AIRPORT")

# 6a.4 Add Airports Data

# 6a.5 Export Dataframe

In [0]:
cleaned_sampled_df.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .save("/dbfs/FileStore/tables/df_graph")

In [0]:
cleaned_sampled_df.limit(10).display()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,DEPARTURE_DELAY,TAXI_OUT,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SEASON,SCHEDULED_DEPARTURE_PERIOD,IS_WEEKEND,ROUTE,SCHEDULED_DEPARTURE_min,SCHEDULED_ARRIVAL_min,DEPARTURE_TIME_min,WHEELS_OFF_min,ARRIVAL_TIME_min,TOTAL_FLIGHT_MIDNIGHT_min,CROSSES_MIDNIGHT_FLAG
2015,10,17,6,EV,5144,N921EV,9,33,102,132,91,425.0,1128,8,1136,39,0,0,30,0,9,0,0,ABE,DTW,Autumn,Morning,1,ABE_DTW,555,657,564,597,696,655,0
2015,10,8,4,EV,5317,N850AS,195,14,128,135,105,692.0,2114,16,2130,202,0,0,7,0,2,193,0,ABE,ATL,Autumn,Afternoon,0,ABE_ATL,960,1088,1155,1169,1290,1260,0
2015,10,26,1,MQ,3176,N813MQ,323,7,63,45,33,158.0,1416,5,1421,305,0,0,0,0,305,0,0,ABI,DFW,Autumn,Morning,0,ABI_DFW,493,556,816,823,861,849,0
2015,10,23,5,MQ,3110,N824MQ,141,11,62,115,40,158.0,1507,64,1611,194,0,0,53,0,0,0,141,ABI,DFW,Autumn,Morning,0,ABI_DFW,715,777,856,867,971,896,0
2015,10,5,1,MQ,3441,N668MQ,68,9,62,47,31,158.0,2014,7,2021,53,0,0,0,0,0,53,0,ABI,DFW,Autumn,Evening,0,ABI_DFW,1106,1168,1174,1183,1221,1205,0
2015,10,30,5,AS,775,N528AS,0,14,189,206,183,1180.0,1857,9,1906,17,0,0,17,0,0,0,0,ABQ,SEA,Autumn,Afternoon,0,ABQ_SEA,1000,1129,1000,1014,1146,1183,0
2015,10,23,5,WN,477,N7827A,324,8,120,108,89,677.0,2311,11,2322,312,0,0,0,0,218,94,0,ABQ,LAX,Autumn,Afternoon,0,ABQ_LAX,1030,1090,1354,1362,1402,1443,1
2015,10,15,4,WN,1703,N7844A,32,7,75,69,54,349.0,938,8,946,26,0,0,0,0,26,0,0,ABQ,DEN,Autumn,Morning,0,ABQ_DEN,485,560,517,524,586,571,0
2015,10,7,3,WN,1873,N384SW,50,9,110,108,93,718.0,1642,6,1648,48,0,0,0,0,19,29,0,ABQ,MCI,Autumn,Midday,0,ABQ_MCI,790,960,840,849,1008,933,0
2015,10,6,2,UA,740,N845UA,29,27,82,87,56,349.0,922,4,926,34,0,0,5,0,29,0,0,ABQ,DEN,Autumn,Early Morning,0,ABQ_DEN,450,532,479,506,566,535,0
