# FLight Data Delays and Cancelations in USA 2015
### Data Engineering Capstone Project

#### Project Summary


A new airline wants to expand to the United States, so they want to learn about the state of US air traffic. so they decided to create a data model of canceled and delayed flight data at US airports under active airlines in 2015. The data provided by The U.S. Department of Transportation's (DOT).

This project uses data *Data 2015 Flight Delays and Cancellations*, using ETL to convert the data into a dimensional model. Serves analysis of delayed flights and flight cancellation reasons. In this project, data will be reduced to the easiest model to analyze so that users can maximize the impact of the data on flight cancellations and delays.

link of data: https://www.kaggle.com/datasets/usdot/flight-delays

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import datetime
import psycopg2
from pyspark.sql.functions import *
import functools

### Step 1: Scope the Project and Gather Data

#### Scope 

In this project i will do an ETL data to put into a Datawarehouse. The data i am using is about Aviation(2015 Flight Delays and Cancellations).
I choose Spark to process my data into dataware house. I am using local file to save the data as parquet file, flexible for me when datawarehouse need to scale to bigger, and i easy put it into cloud.
Tools i use in this project is: Python, Spark

#### Describe and Gather Data 

The data i am using in this project is about aviation. The data have the information about fligts, which contains more kind of times to flight, is the flight has cancel or not. Data has nearly 6 million of rows. This data has 3 csv file and can made dimensional models based on it.

The U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics tracks the on-time performance of domestic flights operated by large air carriers. Summary information on the number of on-time, delayed, canceled, and diverted flights is published in DOT's monthly Air Travel Consumer Report and in this dataset of 2015 flight delays and cancellations.

In [7]:
# Read in the data here
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark Flight ETL") \
    .master('local[*]') \
    .config('spark.sql.execution.arrow.pyspark.enabled', True) \
    .config('spark.sql.session.timeZone', 'UTC') \
    .config('spark.driver.memory','32G') \
    .config('spark.ui.showConsoleProgress', True) \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .getOrCreate()

In [8]:
df_airlines = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
                .option("header",True) \
  .csv("airlines.csv")

In [9]:
df_airlines.show(5)

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
|       AA|American Airlines...|
|       US|     US Airways Inc.|
|       F9|Frontier Airlines...|
|       B6|     JetBlue Airways|
+---------+--------------------+
only showing top 5 rows



In [10]:
df_airports = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
                .option("header",True) \
                .csv("airports.csv")

In [11]:
df_airports.show(5)

+---------+--------------------+-----------+-----+-------+--------+----------+
|IATA_CODE|             AIRPORT|       CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+---------+--------------------+-----------+-----+-------+--------+----------+
|      ABE|Lehigh Valley Int...|  Allentown|   PA|    USA|40.65236|  -75.4404|
|      ABI|Abilene Regional ...|    Abilene|   TX|    USA|32.41132|  -99.6819|
|      ABQ|Albuquerque Inter...|Albuquerque|   NM|    USA|35.04022|-106.60919|
|      ABR|Aberdeen Regional...|   Aberdeen|   SD|    USA|45.44906| -98.42183|
|      ABY|Southwest Georgia...|     Albany|   GA|    USA|31.53552| -84.19447|
+---------+--------------------+-----------+-----+-------+--------+----------+
only showing top 5 rows



In [12]:
df_flights = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
                .option("header",True) \
                .csv("flights.csv")

### Step 2: Explore and Assess the Data
#### Explore the Data 
In this data, we has some data about time in the format string with 4 character, it represent to hour and minutes, i need to make it comeback to right format. And change some columns name to right name.
#### Cleaning Steps

1. Enrich the time data columns

2. Change it into right to date/timestamp format (some columns has date but just only hour and minutes)

3. Change some Column name to right name

In [13]:
def richTime(x):
    """
    function enrich to time
    """
    a = len(str(x))
    if (a == 1): return "000"+str(x)
    if (a == 2): return "00"+str(x)
    if (a == 3): return "0"+str(x)
    return str(x)

richTimeUDF = udf(lambda x: richTime(x))

In [14]:
df_flights = df_flights .withColumn("SCHEDULED_DEPARTURE",richTimeUDF(df_flights.SCHEDULED_DEPARTURE))\
                        .withColumn("DEPARTURE_TIME",richTimeUDF(df_flights.DEPARTURE_TIME))\
                        .withColumn("WHEELS_OFF",richTimeUDF(df_flights.WHEELS_OFF))\
                        .withColumn("WHEELS_ON",richTimeUDF(df_flights.WHEELS_ON))\
                        .withColumn("SCHEDULED_ARRIVAL",richTimeUDF(df_flights.SCHEDULED_ARRIVAL))\
                        .withColumn("ARRIVAL_TIME",richTimeUDF(df_flights.ARRIVAL_TIME))

In [15]:
df_flights = df_flights.withColumn('schedule_dep_ts',
                                    concat(df_flights.DAY,
                                    lit('/'),df_flights.MONTH,
                                    lit('/'),df_flights.YEAR,
                                    lit(' '),
                                    substring(df_flights.SCHEDULED_DEPARTURE,1,2),
                                    lit(':'),
                                    substring(df_flights.SCHEDULED_DEPARTURE,3,2),
                                    lit(':00')))\
                        .withColumn('departure_hour',
                                    concat(substring(df_flights.DEPARTURE_TIME,1,2),
                                    lit(':'),
                                    substring(df_flights.DEPARTURE_TIME,3,2),
                                    lit(':00')))\
                        .withColumn('wheels_off_hour',
                                    concat(substring(df_flights.WHEELS_OFF,1,2),
                                    lit(':'),
                                    substring(df_flights.WHEELS_OFF,3,2),
                                    lit(':00')))\
                        .withColumn('wheels_on_hour',
                                    concat(substring(df_flights.WHEELS_ON,1,2),
                                    lit(':'),
                                    substring(df_flights.WHEELS_ON,3,2),
                                    lit(':00')))\
                        .withColumn('schedule_arr_hour',
                                    concat(substring(df_flights.SCHEDULED_ARRIVAL,1,2),
                                    lit(':'),
                                    substring(df_flights.SCHEDULED_ARRIVAL,3,2),
                                    lit(':00')))\
                        .withColumn('arrival_hour',
                                    concat(substring(df_flights.ARRIVAL_TIME,1,2),
                                    lit(':'),
                                    substring(df_flights.ARRIVAL_TIME,3,2),
                                    lit(':00')))

In [16]:
df_flights = df_flights.withColumn('SCHEDULED_DEP_TS',to_timestamp('schedule_dep_ts', format='dd/MM/yyyy HH:mm:ss'))\
                        .withColumn('DEPARTURE_HOUR',date_format('departure_hour', format='HH:mm:ss'))\
                        .withColumn('WHEELS_OFF_HOUR',date_format('wheels_off_hour', format='HH:mm:ss'))\
                        .withColumn('WHEELS_ON_HOUR',date_format('wheels_on_hour', format='HH:mm:ss'))\
                        .withColumn('SCHEDULED_ARR_HOUR',date_format('schedule_arr_hour', format='HH:mm:ss'))\
                        .withColumn('ARRIVAL_HOUR',date_format('arrival_hour', format='HH:mm:ss'))

In [17]:
df_flights.select('SCHEDULED_DEP_TS','SCHEDULED_DEPARTURE','DEPARTURE_HOUR','WHEELS_OFF_HOUR','WHEELS_ON_HOUR','SCHEDULED_ARR_HOUR','ARRIVAL_HOUR').show(1)

+-------------------+-------------------+--------------+---------------+--------------+------------------+------------+
|   SCHEDULED_DEP_TS|SCHEDULED_DEPARTURE|DEPARTURE_HOUR|WHEELS_OFF_HOUR|WHEELS_ON_HOUR|SCHEDULED_ARR_HOUR|ARRIVAL_HOUR|
+-------------------+-------------------+--------------+---------------+--------------+------------------+------------+
|2015-01-01 00:05:00|               0005|      23:54:00|       00:15:00|      04:04:00|          04:30:00|    04:08:00|
+-------------------+-------------------+--------------+---------------+--------------+------------------+------------+
only showing top 1 row



In [18]:
df_flights = df_flights.withColumnRenamed('AIRLINE','AIRLINE_IATA')\
                        .withColumnRenamed('SCHEDULED_TIME','SCHEDULED_AIR_TIME')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

I use Star Schema Data Model. It suitable to this data and easy to processing the data

![alt text](schema.png "Database Schema")

This is Dimensional model for this project. This model has one fact table is Flight has data about Flight(time, delay, cancelation, other information)

For match with fact table we have 3 dimension table:

* Airline: save the information about airline with iata code

* Time: time of flight (day, month, year, day of week)

* Airport: save the information about airport in USA

#### 3.2 Mapping Out Data Pipelines
The step load data into Data Warehouse
1. Join flight data with airport data to load data into airport_table

2. Join flight data with airline data to load data into airline_table

3. Load data into time_table from flight data

4. Load data into fact table (flight) from flight data
              

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [19]:
# Join flight data with airport data to load data into airport_table
airport_list = df_flights.selectExpr('ORIGIN_AIRPORT as AIRPORT_IATA').distinct().collect() + df_flights.selectExpr('DESTINATION_AIRPORT as AIRPORT_IATA').distinct().collect()

schema = StructType([
    StructField('AIRPORT_IATA', StringType(), True)
])
airport_df = spark.createDataFrame(airport_list, schema)
airport_table = airport_df.select('AIRPORT_IATA').distinct()\
                          .join(df_airports ,(airport_df.AIRPORT_IATA==df_airports.IATA_CODE) , how='left_outer')\
                          .select( 'AIRPORT_IATA',
                                   'AIRPORT',
                                   'CITY',
                                   'STATE',
                                   'COUNTRY',
                                   'LATITUDE',
                                   'LONGITUDE').distinct()
airport_table.write.mode('overwrite').parquet('DIMENTION_TABLE/' + 'AIRPORT')

In [20]:
# Join flight data with airline data to load data into airline_table
df_airlines = df_airlines.withColumnRenamed('AIRLINE', 'AIRLINE_NAME')

airline_table = df_flights.select('AIRLINE_IATA').distinct()\
                          .join(df_airlines ,(df_flights.AIRLINE_IATA==df_airlines.IATA_CODE) , how='left_outer')\
                          .select( 'AIRLINE_IATA',
                                   'AIRLINE_NAME'
                                 ).distinct()
airline_table.write.mode('overwrite').parquet('DIMENTION_TABLE/' + 'AIRLINE')

In [21]:
# Load data into time_table from flight data
time_table = df_flights.select('SCHEDULED_DEP_TS','DAY','MONTH','YEAR','DAY_OF_WEEK').distinct()
time_table.write.mode('overwrite').parquet('DIMENTION_TABLE/' + 'TIME')

In [22]:
# Load data into fact table (flight) from flight data
flight_Table = df_flights.select('SCHEDULED_DEP_TS','ORIGIN_AIRPORT','DESTINATION_AIRPORT','AIRLINE_IATA','FLIGHT_NUMBER','TAIL_NUMBER'\
                                 ,'SCHEDULED_AIR_TIME','SCHEDULED_ARR_HOUR','DEPARTURE_HOUR','AIR_TIME','ARRIVAL_HOUR','ELAPSED_TIME'\
                                 ,'DEPARTURE_DELAY','ARRIVAL_DELAY','TAXI_OUT','WHEELS_OFF_HOUR','WHEELS_ON_HOUR','TAXI_IN'\
                                ,'DISTANCE','CANCELLED','DIVERTED','CANCELLATION_REASON').distinct()\
                                .withColumn('FLIGHT_ID', monotonically_increasing_id())
flight_Table.write.mode('overwrite').parquet('FACT_TABLE/' + 'FLIGHT')

#### 4.2 Data Quality Checks

Data quality check contain:

- hasrows: check each table has more than 0 rows, ensure data has loaded in to datawarehouse.

- check_integrity: check integrity between tables in datawarehouse, make sure the integrity of data is grasping.
 
Run Quality Checks

In [23]:
def has_rows(dataframe):
    """
    Checks has data in dataframe
    """
    record_num = dataframe.count()
    if record_num <= 0:
        raise ValueError('This table is empty!!!')
    print('This table has '+str(record_num)+' records')

    
def check_integrity(flight, airport, airline, time):
    """
    Checks integrity data in dataframe
    input: fact table and 3 dimension table
    """
    check_airport_destination = flight.select('DESTINATION_AIRPORT').distinct() \
                     .join(airport, flight.DESTINATION_AIRPORT == airport.AIRPORT_IATA, "left_anti") \
                     .count() == 0
    check_airport_origin = flight.select('ORIGIN_AIRPORT').distinct() \
                     .join(airport, flight.ORIGIN_AIRPORT == airport.AIRPORT_IATA, "left_anti") \
                     .count() == 0
    check_time = flight.select('SCHEDULED_DEP_TS').distinct() \
                     .join(time, flight.SCHEDULED_DEP_TS == time.SCHEDULED_DEP_TS, "left_anti") \
                     .count() == 0
    check_airline = flight.select('AIRLINE_IATA').distinct() \
                     .join(airline, flight.AIRLINE_IATA == airline.AIRLINE_IATA, "left_anti") \
                     .count() == 0
    if not check_airport_destination & check_time & check_airline & check_airport_origin:
        raise ValueError('Error at integrity!!!')
    print('Check Integrity passed')

In [24]:
#load data from datawarehouse
flight = spark.read.parquet('FACT_TABLE/' + 'FLIGHT')
airport = spark.read.parquet('DIMENTION_TABLE/' + 'AIRPORT')
airline = spark.read.parquet('DIMENTION_TABLE/' + 'AIRLINE')
time = spark.read.parquet('DIMENTION_TABLE/' + 'TIME')

In [25]:
has_rows(flight)

This table has 5819079 records


In [26]:
has_rows(airport)

This table has 629 records


In [27]:
has_rows(airline)

This table has 14 records


In [28]:
has_rows(time)

This table has 397568 records


In [29]:
check_integrity(flight,airport,airline,time)

Check Integrity passed


#### 4.3 Data dictionary 

##### Flight_table

* FLIGHT_ID: Flight log Identifier
* SCHEDULED_DEP_TS: Planned Departure Time (timestamp)
* ORIGIN_AIRPORT: Starting Airport Iata code
* DESTINATION_AIRPORT: Destination Airport Iata code
* AIRLINE_IATA: Airline Identifier Iata code
* FLIGHT_NUMBER: Flight Identifier
* TAIL_NUMBER: Aircraft Identifier
* SCHEDULED_AIR_TIME: Planned time amount needed for the flight trip
* SCHEDULED_ARR_HOUR: Planned arrival time
* DEPARTURE_HOUR: WHEEL_OFF - TAXI_OUT, time flight start run in street
* AIR_TIME: The time duration between wheels_off and wheels_on time, the time fly
* ARRIVAL_HOUR: WHEELS_ON+TAXI_IN, the time ending of taxi in, when aircraft stop
* ELAPSED_TIME: AIR_TIME+TAXI_IN+TAXI_OUT
* DEPARTURE_DELAY: Total Delay on Departure
* ARRIVAL_DELAY: ARRIVAL_HOUR - SCHEDULED_ARR_HOUR
* TAXI_OUT: The time duration elapsed between departure from the origin airport gate and wheels off
* WHEELS_OFF_HOUR: The time point that the aircraft's wheels leave the ground
* WHEELS_ON_HOUR: The time point that the aircraft's wheels touch on the ground
* TAXI_IN: The time duration elapsed between wheels-on and gate arrival at the destination airport
* DISTANCE: Distance between two airports
* CANCELLED: Flight Cancelled (1 = cancelled)
* DIVERTED: Aircraft landed on airport that out of schedule
* CANCELLATION_REASON: Reason for Cancellation of flight: A - Airline/Carrier; B - Weather; C - National Air System; D - Security

##### Airline_table

* AIRLINE_IATA: Airline Identifier Iata code
* AIRLINE_NAME: name of Airline

##### Airport_table

* AIRPORT_IATA: Airport Location Identifier
* AIRPORT: Airport's Name
* CITY: city of airport location
* STATE: state of airport location
* COUNTRY: country of airport location
* LATITUDE: latitude of airport location
* LONGITUDE: longitude of airport location

##### Time_table

* SCHEDULED_DEP_TS: Planned Departure Time (timestamp)
* DAY: day of flight log
* MONTH: month of flight log
* YEAR: year of flight log
* DAY_OF_WEEK: day of week of flight log

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. In this Project, I use Apache Spark to process and save data in parquet file. Spark is a powerful tool for big data processing, it can meet all my needs in file processing. Saving files as parquet reduces storage space and is easier to expand with larger amounts of data. Furthermore, saving the parquet file allows me to easily convert to another storage system (e.g. Amazon S3)

2. The data must be updated daily, as a date partition can be used for the data and it will always be up to date. The amount of data processed in a day will also be limited to the number of flights in the US or the world, so the data will not be overloaded when updating daily (even monthly, annually).

* The data was increased by 100x -> When the data is multiplied by 100, spark can still process it fine and the parquet file's storage space still significantly reduces the storage memory, maybe consider moving the data to a cloud service like Amazon S3, will not a great cost.

* The data populates a dashboard that must be updated on a daily basis by 7am every day => This is completely simple as we can use Apache Airflow to configure the whole thing to do with daily data. Using DAGs and set data quality, we just waiting for email success or fail to monitoring the workflow.

* The database needed to be accessed by 100+ people => we can use Redshift to handle this case as it has auto-scaling and strong support for read performance