## Ingest flight_data.csv file

In [0]:
dbutils.widgets.text("data_source", "")
data_source = dbutils.widgets.get("data_source")

In [0]:
dbutils.widgets.text("file_date", "")
file_date = dbutils.widgets.get("file_date")

In [0]:
%run "../includes/configurations"

In [0]:
%run "../src/utils"

##### Imports

In [0]:
import os

from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    BooleanType
)
from pyspark.sql.functions import current_timestamp, concat_ws, lit, col, lpad

##### Step 1 - Read the CSV file using spark dataframe reader

In [0]:
flights_schema = StructType(fields=[
    StructField('ActualElapsedTime', IntegerType(), True),
    StructField('AirTime', IntegerType(), True),
    StructField('ArrDelay', IntegerType(), True),
    StructField('ArrTime', IntegerType(), True),
    StructField('CRSArrTime', IntegerType(), False),
    StructField('CRSDepTime', IntegerType(), False),
    StructField('CRSElapsedTime', IntegerType(), False),
    StructField('CancellationCode', StringType(), True),
    StructField('Cancelled', IntegerType(), True),
    StructField('CarrierDelay', IntegerType(), True),
    StructField('DayOfWeek', IntegerType(), False),
    StructField('DayofMonth', IntegerType(), False),
    StructField('DepDelay', IntegerType(), True),
    StructField('DepTime', IntegerType(), True),
    StructField('Dest', StringType(), False),
    StructField('Distance', IntegerType(), True),
    StructField('Diverted', IntegerType(), False),
    StructField('FlightNum', IntegerType(), False),
    StructField('LateAircraftDelay', IntegerType(), True),
    StructField('Month', IntegerType(), False),
    StructField('NASDelay', IntegerType(), True),
    StructField('Origin', StringType(), False),
    StructField('SecurityDelay', IntegerType(), True),
    StructField('TailNum', StringType(), False),
    StructField('TaxiIn', IntegerType(), True),
    StructField('TaxiOut', IntegerType(), True),
    StructField('UniqueCarrier', StringType(), False),
    StructField('WeatherDelay', IntegerType(), True),
    StructField('Year', IntegerType(), False)
])

In [0]:
flights_df = (spark.read.
              option("header", True).
              csv(f"{raw_folder_path}/{file_date}/flight_data.csv")
             )

In [0]:
cols_order = []
for field in flights_schema.fields:
    cols_order.append(field.name)
    flights_df = flights_df.withColumn(field.name, col(field.name).cast(field.dataType))
flights_df = flights_df.select(cols_order)

**Note:**

- With *inferSchema as True*, what's happening is a Spark going through the data and reading all of the data, identify what the schema should be and then apply that schema to the DataFrame, which is not efficient because it has to read through the data. And in a production environment, it's going to be an awful lot of data and it could just slow down the reads

- And also if you get data which doesn't confirm to what you're expecting, you want your process to fail and tell you that there is something wrong rather than just inferring the schema and carrying on

In [0]:
type(flights_df)

In [0]:
flights_df.printSchema()

##### Step 2 - Drop the unwanted columns

In [0]:
flights_dropped_df = flights_df.drop(col("TailNum"))

##### Step 3 - Rename the columns as requried

In [0]:
ingest_cfg = load_yml("ingest_conf.yml")
column_name_map = ingest_cfg['flights_column_map']

In [0]:
flights_renamed_df = flights_dropped_df
for key, value in column_name_map.items():
    flights_renamed_df = flights_renamed_df.withColumnRenamed(key, value)

##### Step 4 - Add new columns

In [0]:
flights_final_df = flights_renamed_df.withColumn(
    "flight_date",
    concat_ws(
        "-",
        col("year"),
        lpad(col("month"), 2, "0"),
        lpad(col("day_of_month"), 2, "0")
    )
)
flights_final_df = add_ingestion_date(flights_final_df)
flights_final_df = flights_final_df.withColumn("source", lit(data_source))
flights_final_df = flights_final_df.withColumn("file_date", lit(file_date))

In [0]:
display(flights_final_df)

##### Step 5 - Write to datalake **incrementally** as parquet

In [0]:
overwrite_partition(flights_final_df, "dev_air_travel_processed", "flights", "year")

In [0]:
# (flights_final_df.
#  write.mode("overwrite").
#  partitionBy("year").
#  format("parquet").
#  saveAsTable("dev_air_travel_processed.flights")
# )

In [0]:
dbutils.notebook.exit("Success")