# 2015 Flight Delays and Cancellations

## *<font color='orange'>Udacity Data Engineering Nanodegree - Capstone Project</font>*

### Project Summary
The goal of the project is to build a database for US air transport analysts in using US domestic air traffic data, enriched by aircraft registration data from the Federal Aviation Administration.

The database will be in a star-schema format, with the flights table as the fact table and accompanying information around airlines, airports, dates, times and aircraft as dimension tables.

This project has the following steps:
1. Scope and Project Steps
2. Data Sources and Data Gathering
3. Assessing and Cleaning Data
4. Defining the Data Model
5. ETL Process
6. Detailed Project Discussion

### Project Environment
This project is run through the conda environment. You might need to install specific packages (if you don't already have them) using `pip install` commands. You can uncomment the commands below if needed.

In [None]:
#!pip install ipython-sql
#!pip install pandas
#!pip install json
#!pip install os
#!pip install glob
#!pip install zipfile
#!pip install boto3
#!pip install botocore
#!pip install configparser 
#!pip install psycopg2
#!pip install requests
#!pip install pyspark

In [2]:
# Importing all packages needed for the project
import zipfile
import pandas as pd
import requests
import kaggle
import os
import io
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, BooleanType, IntegerType, DateType
import glob
import boto3
from botocore.client import ClientError 
import configparser
import psycopg2
from s3Functions import *
import json

# to display all columns and text
pd.options.display.max_columns = None
pd.options.display.max_colwidth = None

### <font color='green'>Step 1: Scope and Project Steps</font>

#### Scope
This project aims at creating an ETL pipeline that builds analytical tables for US air transport analysts with a focus on US domestic air travel. The tables will allow the analyst to investigate the links between US domestic flights, aircrafts and engine types, airports, airlines and other variables so they can produce meaningful insights and MI.

#### Project Steps
1. Source data from various online sources. Download the files onto your local storage under the capstone project folder.
2. Import, assess and clean the data and store the clean versions in a clean data folder that has only the clean data files.
3. Create an S3 bucket on AWS to store the clean files.
4. Create a Redshift cluster that will contain all of the staging tables loaded from S3. The staging tables will be transformed and loaded to analytical tables in a star schema format - which will then be used by US domestic air transport analysts.
5. Drop the staging tables.
6. Run data quality checks against the newly created analytical tables to determine if the data has loaded correctly.
7. Clean up resources (Redshift cluster, IAM role, S3 buckets/objects).

### <font color='green'>Step 2: Data Sources and Data Gathering</font>

#### Data Sources
1. 2015 Flight Delays and Cancellations Data (Kaggle)
2. Airports Data (Kaggle)
3. Airlines Data (Kaggle)
4. 2015 Aircraft Registration Database (Federal Aviation Administration)

#### Data Gathering

##### <font color='orange'>----- 2015 Flight Delays & Cancellations and Airlines Data-----</font>
The U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics tracks the on-time performance of domestic flights operated by large air carriers. Summary information on the number of on-time, delayed, canceled, and diverted flights is published in DOT's monthly Air Travel Consumer Report and in this dataset of 2015 flight delays and cancellations. The data is sourced from Kaggle and was collected and published by the DOT's Bureau of Transportation Statistics.

Kaggle link to the data [HERE](https://www.kaggle.com/datasets/usdot/flight-delays?select=flights.csv&sort=votes).

In [5]:
# Use the kaggle API to download the 2015 Flight Delays and Cancellations file. The zip file includes the flights, airlines and airports data.
!kaggle datasets download --force -d usdot/flight-delays

Downloading flight-delays.zip to /mnt/c/users/moche/OneDrive/Documents/capstone-project
100%|████████████████████████████████████████| 191M/191M [00:27<00:00, 7.88MB/s]
100%|████████████████████████████████████████| 191M/191M [00:27<00:00, 7.23MB/s]


In [98]:
# extract the flights csv and airlines csv files from the downloaded zip file
zipfile_name = 'flight-delays.zip'
with zipfile.ZipFile(zipfile_name, 'r') as file:
    file.extractall(members=['flights.csv', 'airlines.csv'])

##### <font color='orange'>----- Airports Data -----</font>
The data contains information of airports from around the world and their geographical data. Kaggle link [HERE](https://www.kaggle.com/datasets/thoudamyoihenba/airports).

The key field in the airports data is the iata airline code - which is a sepcific 3 letter code given by IATA to the airports.

In [8]:
!kaggle datasets download -d thoudamyoihenba/airports

Downloading airports.zip to /mnt/c/users/moche/OneDrive/Documents/capstone-project
100%|████████████████████████████████████████| 303k/303k [00:03<00:00, 99.3kB/s]
100%|████████████████████████████████████████| 303k/303k [00:03<00:00, 99.3kB/s]


In [9]:
# extract the file from the downloaded zip file
zipfile_name = 'airports.zip'
with zipfile.ZipFile(zipfile_name, 'r') as file:
    file.extractall()

##### <font color='orange'>----- 2015 Aircraft Registration Database -----</font>
The database contains the records of all U.S. Civil Aircraft maintained by the FAA, Civil Aviation Registry, Aircraft
Registration Branch, AFS-750. Read more about the documentation [HERE](https://www.faa.gov/sites/faa.gov/files/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download/ardata.pdf).

The key field in the aircraft registration database is the N-NUMBER - which is the unique identification number assigned to aircrafts.

The files can be viewed and downloaded [HERE](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download).

In [5]:
# Download the 2017 database from the faa website and extract all the files from the downloaded zip file
url = 'https://registry.faa.gov/database/yearly/ReleasableAircraft.2015.zip'
r = requests.get(url)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()

### <font color='green'>Step 3: Assessing and Cleaning Data</font>


##### <font color='orange'>----- Assessing Aircraft Registration Data -----</font>

In [336]:
# Read in the data
aircraft = pd.read_csv('MASTER.txt')

  aircraft = pd.read_csv('MASTER.txt')


In [337]:
# explore the data
aircraft.sample(3)

Unnamed: 0,N-Number,Serial Number,AircraftMFRModelCode,Engine MFR Code,Year Mfr,Type Registrant,Name,Street1,Street2,City,State,ZIP,Region,County,Country,LastActivity Date,CertIssueDate,Certification Requested,TypeAcft,Type Engine,StatusCode,Mode S Code,Fractional Ownership,Airworthiness Date,Other Name 1,Other Name 2,Other Name 3,Other Name 4,Other Name 5,Expiration Date,UniqueID,Kit MFR Code,Kit Model,Mode S Code Hex,Unnamed: 34
164467,1042A,18-616,7101812,42101,1951,,PETRIE MICHAEL,PO BOX 671731,,CHUGIAK,AK,995671731,5,20,US,20140718,20100701,6130,4,1,V,50013057,,20110829,,,,,,20171130,450090,,,,
412392,6016X,1650,5870204,41514,1961,,FERGUSON KRYSTYNA E,650 N ATLANTIC AVE APT 111,,COCOA BEACH,FL,329313134,7,9,US,20140502,20140502,1,4,1,V,51747066,,19600919,FERGUSON MARK A,,,,,20170531,20846,,,,
341777,37AK,6,05615V8,99999,2003,,VON KOSCHIER ANGELO,32 LINCOLN AVE,,LYNNFIELD,MA,19401816,E,9,US,20141214,20030804,42,6,1,V,51030574,,20111101,,,,,,20180430,60959,522.0,,,


In [343]:
# we're only interested in below columns
aircraft_cols = ['N-Number', 'Serial Number', 'AircraftMFRModelCode', 'Engine MFR Code', 'Year Mfr', 'TypeAcft', 'Type Engine']
aircraft[aircraft_cols].head(3)

Unnamed: 0,N-Number,Serial Number,AircraftMFRModelCode,Engine MFR Code,Year Mfr,TypeAcft,Type Engine
0,1AJ,156H,7220282,41514,1973,4,1
1,7D,D-1,4970150,41508,1951,4,1
2,1BH,1951034,13007S6,43101,1972,6,1


In [344]:
# Check for total rows in the dataset
print('The data has ' + str(len(aircraft)) + ' rows')
print("*****************************")
# check for null values
print("Checking for null values")
display(aircraft[aircraft_cols].isnull().sum())
print("*****************************")
# check for duplicated N-Numbers
print('The number of duplicated N-Numbers is: ' + str(aircraft.duplicated(subset=['N-Number']).sum()))
print("*****************************")
# checking the number of unique values in each column of interest
for col in aircraft_cols:
    unique_count = aircraft[col].nunique() 
    print(col + " has " + str(unique_count) + " unique values") 

The data has 437916 rows
*****************************
Checking for null values


N-Number                0
Serial Number           0
AircraftMFRModelCode    0
Engine MFR Code         0
Year Mfr                0
TypeAcft                0
Type Engine             0
dtype: int64

*****************************
The number of duplicated N-Numbers is: 0
*****************************
N-Number has 437916 unique values
Serial Number has 242112 unique values
AircraftMFRModelCode has 45598 unique values
Engine MFR Code has 2054 unique values
Year Mfr has 109 unique values
TypeAcft has 19 unique values
Type Engine has 23 unique values


In [345]:
# Checking for unique values
aircraft['Serial Number'].value_counts(dropna=False)

                                  123521
1                                   3690
2                                    519
3                                    298
101                                  208
                                   ...  
21062412                               1
34-8233179                             1
17263960                               1
172S9077                               1
50500139                               1
Name: Serial Number, Length: 242112, dtype: int64

In [346]:
# Checking for unique values
aircraft['AircraftMFRModelCode'].value_counts(dropna=False)

           123521
7102802      4887
2072418      3907
7100510      3805
7102808      3787
            ...  
05904W0         1
05615NQ         1
05604OC         1
05621FS         1
05632OA         1
Name: AircraftMFRModelCode, Length: 45598, dtype: int64

In [347]:
# Checking for unique values
aircraft['Engine MFR Code'].value_counts(dropna=False)

         151460
41508     30920
17026     13898
41514     13516
17032     10830
          ...  
67098         1
13048         1
52266         1
33844         1
33881         1
Name: Engine MFR Code, Length: 2054, dtype: int64

In [251]:
# Checking for unique values
aircraft['Year Mfr'].value_counts(dropna=False)

        161727
1946     12638
1978      8600
1979      7991
1976      7925
         ...  
1916         1
197          1
1911         1
196          1
1913         1
Name: Year Mfr, Length: 109, dtype: int64

In [239]:
# Checking the engine type values
aircraft['Type Engine'].value_counts(dropna=False)

1     207122
      123521
1      22851
5      19362
8      17226
2      11433
0       9369
3       8116
7       7025
4       3205
10      3175
8       1837
5       1012
0        983
3        671
2        653
7        203
4         77
11        37
10        28
6          5
9          4
9          1
Name: Type Engine, dtype: int64

In [240]:
# Checking the aircraft type values
aircraft['TypeAcft'].value_counts(dropna=False)

4    207372
     123521
5     46550
4     23328
6     17478
2      5206
1      4987
5      2864
8      2769
7      1383
6       959
2       602
1       436
9       302
8        59
7        51
3        32
3        13
9         4
Name: TypeAcft, dtype: int64

We can see that we have duplicate values returned when counting the values. We will fix this by trimming the engine type and aircraft type values.

We also have rows with null values. We will drop these.

<font color='red'>Cleaning Note 1: Trim the spaces from the dataset</font>

<font color='red'>Cleaning Note 2: Map the engine type and aircraft type numbers to the values below</font>

Engine Type:

0 - None
1 - Reciprocating
2 - Turbo-prop
3 - Turbo-shaft
4 - Turbo-jet
5 - Turbo-fan
6 - Ramjet
7 - 2 Cycle
8 - 4 Cycle
9 – Unknown
10 – Electric
11 - Rotary

Aircraft Type:

1 - Glider
2 - Balloon
3 - Blimp/Dirigible
4 - Fixed wing single engine
5 - Fixed wing multi engine
6 - Rotorcraft
7 - Weight-shift-control
8 - Powered Parachute
9 - Gyroplane
H - Hybrid Lift
O - Other 


<font color='red'>Cleaning Note 3: Drop rows with null values</font>


##### <font color='orange'>----- Cleaning Aircraft Data -----</font>

In [404]:
# create a copy of the original dataset using only the columns needed
aircraft_clean = aircraft[aircraft_cols].copy()

In [405]:
aircraft_clean

Unnamed: 0,N-Number,Serial Number,AircraftMFRModelCode,Engine MFR Code,Year Mfr,TypeAcft,Type Engine
0,1AJ,156H,7220282,41514,1973,4,1
1,7D,D-1,4970150,41508,1951,4,1
2,1BH,1951034,13007S6,43101,1972,6,1
3,721NM,11E-1001,7320100,17160,2008,4,1
4,6334,,,,,,
...,...,...,...,...,...,...,...
437911,837QS,560-0685,2076750,52134,2005,5,5
437912,130SL,1084,6960204,52032,2004,5,2
437913,357QS,680-0155,2076811,52169,2007,5,5
437914,355FX,50500139,326040C,52297,2013,5,5


<font color='red'>Cleaning Note 1: Trim the spaces from the manufacture year, engine type values and map the values</font>

<font color='red'>Cleaning Note 2: Map the engine type and aircraft type numbers to the values below</font>

In [417]:
# Trimming the values in the entire dataset
for i in range(len(aircraft_clean.columns)):
    aircraft_clean.iloc[:, i] = aircraft_clean.iloc[:,i].str.strip()


In [418]:
engine_mapping = {'0': 'None',
    '1':'Reciprocating',
    '2':'Turbo-prop',
    '3':'Turbo-shaft',
    '4':'Turbo-jet',
    '5':'Turbo-fan',
    '6':'Ramjet',
    '7':'2 Cycle',
    '8':'4 Cycle',
    '9':'Unknown',
    '10':'Electric',
    '11':'Rotary'
    }

aircraft_clean['Type Engine'] = aircraft_clean['Type Engine'].map(engine_mapping)

In [419]:
aircraft_mapping = {'1':'Glider',
    '2':'Balloon',
    '3':'Blimp/Dirigible',
    '4':'Fixed wing single engine',
    '5':'Fixed wing multi engine',
    '6':'Rotorcraft',
    '7':'Weight-shift-control',
    '8':'Powered Parachute',
    '9':'Gyroplane',
    'H':'Hybrid Lift',
    'O':'Other'
    }

aircraft_clean['TypeAcft'] = aircraft_clean['TypeAcft'].map(aircraft_mapping)

In [421]:
# Check to see if mapping worked
aircraft_clean['Type Engine'].value_counts(dropna=False)

Reciprocating    207122
NaN              151837
Turbo-fan         19362
4 Cycle           17226
Turbo-prop        11433
None               9369
Turbo-shaft        8116
2 Cycle            7025
Turbo-jet          3205
Electric           3175
Rotary               37
Ramjet                5
Unknown               4
Name: Type Engine, dtype: int64

In [265]:
# Check to see if mapping worked
aircraft_clean['TypeAcft'].value_counts(dropna=False)

Fixed wing single engine    207372
NaN                         151837
Fixed wing multi engine      46550
Rotorcraft                   17478
Balloon                       5206
Glider                        4987
Powered Parachute             2769
Weight-shift-control          1383
Gyroplane                      302
Blimp/Dirigible                 32
Name: TypeAcft, dtype: int64

<font color='red'>Cleaning Note 3: Remove rows with null values</font>

We will also remove rows where the manufacture year is incorrect (i.e. where year is 0, 195, 196, 197, 199)

In [422]:
aircraft_clean = aircraft_clean.dropna(how='any')

In [423]:
aircraft_clean['TypeAcft'].value_counts(dropna=False)

Fixed wing single engine    207372
Fixed wing multi engine      46550
Rotorcraft                   17478
Balloon                       5206
Glider                        4987
Powered Parachute             2769
Weight-shift-control          1383
Gyroplane                      302
Blimp/Dirigible                 32
Name: TypeAcft, dtype: int64

In [424]:
aircraft_clean['Type Engine'].value_counts(dropna=False)

Reciprocating    207122
Turbo-fan         19362
4 Cycle           17226
Turbo-prop        11433
None               9369
Turbo-shaft        8116
2 Cycle            7025
Turbo-jet          3205
Electric           3175
Rotary               37
Ramjet                5
Unknown               4
Name: Type Engine, dtype: int64

In [425]:
aircraft_clean['Year Mfr'].value_counts(dropna=False)

        35748
1946    11463
1978     7749
1979     7261
1977     7033
        ...  
1922        1
1923        1
1916        1
1913        1
196         1
Name: Year Mfr, Length: 109, dtype: int64

In [426]:
# Remove rows with missing manufacture year
aircraft_clean = aircraft_clean[~(aircraft_clean['Year Mfr'] == "")]

In [427]:
# Remove rows with incorrect manufacture years (0, 195, 196, 197, 199)
mask1 = aircraft_clean['Year Mfr'] == '0'
mask2 = aircraft_clean['Year Mfr'] == '195'
mask3 = aircraft_clean['Year Mfr'] == '196'
mask4 = aircraft_clean['Year Mfr'] == '197'
mask5 = aircraft_clean['Year Mfr'] == '199'

aircraft_clean = aircraft_clean[~(mask1 | mask2 | mask3 | mask4 | mask5)]

In [None]:
aircraft_clean['Year Mfr'].value_counts(dropna=False)

In [429]:
aircraft_clean[aircraft_clean['Serial Number'] == ""]

Unnamed: 0,N-Number,Serial Number,AircraftMFRModelCode,Engine MFR Code,Year Mfr,TypeAcft,Type Engine


In [431]:
aircraft_clean[aircraft_clean['AircraftMFRModelCode'] == ""]

Unnamed: 0,N-Number,Serial Number,AircraftMFRModelCode,Engine MFR Code,Year Mfr,TypeAcft,Type Engine


In [428]:
aircraft_clean['Engine MFR Code'].value_counts(dropna=False)

41508    26240
17026    11508
41514    11355
17032     9408
17020     8984
         ...  
3021         1
51100        1
20007        1
9493         1
3023         1
Name: Engine MFR Code, Length: 2008, dtype: int64

In [432]:
# write clean data back to CSV file in clean-data folder 
# check if the "clean-data" directory exists, if not, create it 
if not os.path.exists("clean-data"):
    os.makedirs("clean-data")

# write clean data back to clean file 
aircraft_clean.to_csv("clean-data/aircraft_clean.csv", sep=',', index=False)

##### <font color='orange'>----- Assessing Airports Data -----</font>

In [66]:
# Read in data
airports = pd.read_csv('Airports-Only.csv', encoding='latin-1') # using lating-1 encoding here as I had issues reading in the data using utf-8

In [67]:
# explore the data
airports.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7698 entries, 0 to 7697
Data columns (total 14 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Airport_ID  7698 non-null   int64  
 1   Name        7698 non-null   object 
 2   City        7649 non-null   object 
 3   Country     7698 non-null   object 
 4   IATA        7698 non-null   object 
 5   ICAO        7698 non-null   object 
 6   Latitude    7698 non-null   float64
 7   Longitude   7698 non-null   float64
 8   Altitude    7698 non-null   int64  
 9   Timezone    7698 non-null   object 
 10  DST         7698 non-null   object 
 11  TZ          7698 non-null   object 
 12  Type        7698 non-null   object 
 13  Source      7698 non-null   object 
dtypes: float64(2), int64(2), object(10)
memory usage: 842.1+ KB


In [68]:
# explore the data
airports.head()

Unnamed: 0,Airport_ID,Name,City,Country,IATA,ICAO,Latitude,Longitude,Altitude,Timezone,DST,TZ,Type,Source
0,1,Goroka Airport,Goroka,Papua New Guinea,GKA,AYGA,-6.08169,145.391998,5282,10,U,Pacific/Port_Moresby,airport,OurAirports
1,2,Madang Airport,Madang,Papua New Guinea,MAG,AYMD,-5.20708,145.789002,20,10,U,Pacific/Port_Moresby,airport,OurAirports
2,3,Mount Hagen Kagamuga Airport,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.82679,144.296005,5388,10,U,Pacific/Port_Moresby,airport,OurAirports
3,4,Nadzab Airport,Nadzab,Papua New Guinea,LAE,AYNZ,-6.569803,146.725977,239,10,U,Pacific/Port_Moresby,airport,OurAirports
4,5,Port Moresby Jacksons International Airport,Port Moresby,Papua New Guinea,POM,AYPY,-9.44338,147.220001,146,10,U,Pacific/Port_Moresby,airport,OurAirports


In [69]:
# We're only interested in the below columns
airports_cols = ['IATA', 'ICAO', 'Name', 'City', 'Country', 'Latitude', 'Longitude', 'Altitude', 'Timezone', 'DST', 'TZ']
airports[airports_cols]

Unnamed: 0,IATA,ICAO,Name,City,Country,Latitude,Longitude,Altitude,Timezone,DST,TZ
0,GKA,AYGA,Goroka Airport,Goroka,Papua New Guinea,-6.081690,145.391998,5282,10,U,Pacific/Port_Moresby
1,MAG,AYMD,Madang Airport,Madang,Papua New Guinea,-5.207080,145.789002,20,10,U,Pacific/Port_Moresby
2,HGU,AYMH,Mount Hagen Kagamuga Airport,Mount Hagen,Papua New Guinea,-5.826790,144.296005,5388,10,U,Pacific/Port_Moresby
3,LAE,AYNZ,Nadzab Airport,Nadzab,Papua New Guinea,-6.569803,146.725977,239,10,U,Pacific/Port_Moresby
4,POM,AYPY,Port Moresby Jacksons International Airport,Port Moresby,Papua New Guinea,-9.443380,147.220001,146,10,U,Pacific/Port_Moresby
...,...,...,...,...,...,...,...,...,...,...,...
7693,\N,ULDA,Rogachyovo Air Base,Belaya,Russia,71.616699,52.478298,272,\N,\N,\N
7694,\N,XIUW,Ulan-Ude East Airport,Ulan Ude,Russia,51.849998,107.737999,1670,\N,\N,\N
7695,\N,ULLK,Krechevitsy Air Base,Novgorod,Russia,58.625000,31.385000,85,\N,\N,\N
7696,CPO,SCAT,Desierto de Atacama Airport,Copiapo,Chile,-27.261200,-70.779198,670,\N,\N,\N


In [85]:
# Check for total rows in the dataset
print('The data has ' + str(len(airports)) + ' rows')
print("*****************************")
# check for null values
print("Checking for null values")
display(airports[airports_cols].isnull().sum())
print("*****************************")
# check for duplicated IATA_CODE
print('The number of duplicated iata airport code is  is: ' + str(airports.duplicated(subset=['IATA']).sum()))
print("*****************************")
# checking the number of unique values in each column of interest
for col in airports_cols:
    unique_count = airports[col].nunique() 
    print(col + " has " + str(unique_count) + " unique values") 

The data has 7698 rows
*****************************
Checking for null values


IATA          0
ICAO          0
Name          0
City         49
Country       0
Latitude      0
Longitude     0
Altitude      0
Timezone      0
DST           0
TZ            0
dtype: int64

*****************************
The number of duplicated iata airport code is  is: 1625
*****************************
IATA has 6073 unique values
ICAO has 7698 unique values
Name has 7658 unique values
City has 6955 unique values
Country has 237 unique values
Latitude has 7672 unique values
Longitude has 7674 unique values
Altitude has 2522 unique values
Timezone has 41 unique values
DST has 8 unique values
TZ has 308 unique values


In [71]:
# Checking if the rows with missing cities have the longitude and latittude data - so that we can still locate the area even though no city is given
airports[airports.City.isnull()]

Unnamed: 0,Airport_ID,Name,City,Country,IATA,ICAO,Latitude,Longitude,Altitude,Timezone,DST,TZ,Type,Source
7031,11794,Minsk Mazowiecki Military Air Base,,Poland,\N,EPMM,52.195499,21.655899,604,\N,\N,\N,airport,OurAirports
7032,11795,Powidz Military Air Base,,Poland,\N,EPPW,52.379398,17.853901,371,\N,\N,\N,airport,OurAirports
7137,11900,King Salman Abdulaziz Airport,,Saudi Arabia,DWD,OEDM,24.4499,44.121201,3026,\N,\N,\N,airport,OurAirports
7138,11901,King Khaled Air Base,,Saudi Arabia,KMX,OEKM,18.2973,42.803501,6778,\N,\N,\N,airport,OurAirports
7158,11921,Asahikawa Airfield,,Japan,\N,RJCA,43.794734,142.365432,377,\N,\N,\N,airport,OurAirports
7160,11923,Utsunomiya Airport,,Japan,\N,RJTU,36.5145,139.87101,334,\N,\N,\N,airport,OurAirports
7161,11924,Jungwon Air Base/Chungju Airport,,South Korea,\N,RKTI,37.03024,127.886353,281,\N,\N,\N,airport,OurAirports
7164,11927,Bislig Airport,,Philippines,BPH,RPMF,8.19595,126.321999,12,\N,\N,\N,airport,OurAirports
7165,11928,Mati National Airport,,Philippines,MXI,RPMQ,6.949091,126.27368,156,\N,\N,\N,airport,OurAirports
7184,11947,Metropolitano Airport,,Venezuela,\N,SVMP,10.133169,-66.787827,574,\N,\N,\N,airport,OurAirports


In [72]:
# counting the number of unique values in the IATA colunmn
airports.IATA.value_counts(dropna=False)

\N     1626
GKA       1
REY       1
PUR       1
GYA       1
       ... 
GPS       1
CUE       1
OCC       1
ATF       1
CPO       1
Name: IATA, Length: 6073, dtype: int64

<font color='red'>Cleaning Note 1: Remove the duplicated data from the dataset - i.e. where the iata airline code is \N</font>

<font color='red'>Cleaning Note 2: Select only airports where the country is United States given we're building a database for to analyse US domestic flights</font>

##### <font color='orange'>----- Cleaning Airports Data -----</font>

<font color='red'>Cleaning Note 1: Remove the duplicated iata airline codes from the dataset - i.e. where the iata airline code is \N</font>

In [73]:
# create a copy of the original data
airports_clean = airports[airports_cols].copy()

In [74]:
airports_clean.head(2)

Unnamed: 0,IATA,ICAO,Name,City,Country,Latitude,Longitude,Altitude,Timezone,DST,TZ
0,GKA,AYGA,Goroka Airport,Goroka,Papua New Guinea,-6.08169,145.391998,5282,10,U,Pacific/Port_Moresby
1,MAG,AYMD,Madang Airport,Madang,Papua New Guinea,-5.20708,145.789002,20,10,U,Pacific/Port_Moresby


In [75]:
# Remove duplicated iata airlines code - i.e. where the airline code is \N
airports_clean = airports_clean[airports_clean.IATA != "\\N"]

In [76]:
# Check if it worked
airports_clean.IATA.value_counts(dropna=False)

GKA    1
ESM    1
PUR    1
GYA    1
EYP    1
      ..
GPS    1
CUE    1
OCC    1
ATF    1
CPO    1
Name: IATA, Length: 6072, dtype: int64

<font color='red'>Cleaning Note 2: Select only airports where the country is United States given we're building a database for to analyse US domestic flights</font>

In [78]:
airports_clean = airports_clean[airports_clean.Country == 'United States']

In [79]:
# Check to see if it worked
airports_clean.Country.value_counts(dropna=False)

United States    1251
Name: Country, dtype: int64

In [80]:
# write clean data back to CSV file in clean-data folder 
# check if the "clean-data" directory exists, if not, create it 
if not os.path.exists("clean-data"):
    os.makedirs("clean-data")

# write clean data back to clean file 
airports_clean.to_csv("clean-data/airports_clean.csv", sep=',', index=False)

##### <font color='orange'>----- Assessing Airlines Data -----</font>

In [99]:
# read in data
airlines = pd.read_csv('airlines.csv')

In [100]:
# explore data
airlines

Unnamed: 0,IATA_CODE,AIRLINE
0,UA,United Air Lines Inc.
1,AA,American Airlines Inc.
2,US,US Airways Inc.
3,F9,Frontier Airlines Inc.
4,B6,JetBlue Airways
5,OO,Skywest Airlines Inc.
6,AS,Alaska Airlines Inc.
7,NK,Spirit Air Lines
8,WN,Southwest Airlines Co.
9,DL,Delta Air Lines Inc.


Really simple dataset. Nothing to clean here.

In [101]:
# For consistency, we will write this file back to CSV file in clean-data folder as well 
# check if the "clean-data" directory exists, if not, create it 
if not os.path.exists("clean-data"):
    os.makedirs("clean-data")

# write data
airlines.to_csv("clean-data/airlines_clean.csv", sep=',', index=False)

##### <font color='orange'>----- Assessing Flights Data -----</font>

In [102]:
# Due to the size of the data (almost 6M rows), we'll use Spark to explore and clean the data
# Create a spark session
spark = SparkSession \
    .builder \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble


23/01/01 10:36:47 WARN Utils: Your hostname, mo-chen-windows resolves to a loopback address: 127.0.1.1; using 172.24.88.6 instead (on interface eth0)
23/01/01 10:36:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/01 10:36:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [103]:
# Read in data using spark
flights = spark.read.csv('flights.csv', header='true')

In [104]:
# print the schema
flights.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

We won't fix the schema here as spark helpfully filled the leading 0s for the time fields that are in hhmm format (i.e. we have 0005 which is 5 past midnight, rather than just 5).

In [7]:
# explore data
flights.limit(3).toPandas()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,2354,-11,21,15,205,194,169,1448,404,4,430,408,-22,0,0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,2,-8,12,14,280,279,263,2330,737,4,750,741,-9,0,0,,,,,,
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,18,-2,16,34,286,293,266,2296,800,11,806,811,5,0,0,,,,,,


In [8]:
# We will not use the last 5 columns
flights.select(flights.columns[:-5]).limit(2).toPandas()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,2354,-11,21,15,205,194,169,1448,404,4,430,408,-22,0,0,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,2,-8,12,14,280,279,263,2330,737,4,750,741,-9,0,0,


In [9]:
# Check for null, none and nan values in the entire dataframe
print("Checking for null values")
flights.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in flights.columns[:-5]]).toPandas()

Checking for null values


                                                                                

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON
0,0,0,0,0,0,0,14721,0,0,0,86153,86153,89047,89047,6,105071,105071,0,92513,92513,0,92513,105071,0,0,5729195


There are many columns with missing values. Most of the CANCELLATION_REASON column values are missing, which makes sense as it should only have data if the flight was actually cancelled.

In [10]:
# if the flight is cancelled then all values relating to flight times (e.g. departure time, arrival time, etc) should be empty
flights.filter((f.col('CANCELLED') == '1') & (f.col('ARRIVAL_TIME').isNotNull())).limit(5).toPandas()

                                                                                

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY


In [38]:
# Check the unique values for diverted flag
flights.select('DIVERTED').distinct().toPandas()

                                                                                

Unnamed: 0,DIVERTED
0,0
1,1


In [39]:
# Check the unique values for cancelled flag
flights.select('CANCELLED').distinct().toPandas()

                                                                                

Unnamed: 0,CANCELLED
0,0
1,1


In [11]:
# Check the unique values for cancellation reason
flights.select('CANCELLATION_REASON').distinct().toPandas()

                                                                                

Unnamed: 0,CANCELLATION_REASON
0,
1,B
2,D
3,C
4,A


In [290]:
flights.groupby('CANCELLATION_REASON').count().show()

                                                                                

+-------------------+-------+
|CANCELLATION_REASON|  count|
+-------------------+-------+
|               null|5729195|
|                  B|  48851|
|                  D|     22|
|                  C|  15749|
|                  A|  25262|
+-------------------+-------+



<font color='red'>Cleaning Note 1: Map the Cancellation Reason letters to the easily understandable words</font>

A - Carrier; B - Weather; C - National Air System; D - Security. We will map the letters to the actual reason to make it easier to understand the data.


<font color='red'>Cleaning Note 2: Create a date column from the YEAR, MONTH, DAY columns and a time column from the departure and arrival (both scheduled and actual) and wheels off and on columns</font>

<font color='red'>Cleaning Note 3: Creating N-Number column from the TAIL_NUMBER column by dropping the first letter N. All US tail numbers start with an N.

In [12]:
print('The total number of rows in this dataset is: ' + str(flights.count()))
print('------------------------------------------------------------------------')
print('Checking for the number of unique values in each column')
flights.select([countDistinct(c) for c in flights.columns[:-5]]).toPandas()

                                                                                

The total number of rows in this dataset is: 5819079
------------------------------------------------------------------------
Checking for the number of unique values in each column


                                                                                

Unnamed: 0,count(DISTINCT YEAR),count(DISTINCT MONTH),count(DISTINCT DAY),count(DISTINCT DAY_OF_WEEK),count(DISTINCT AIRLINE),count(DISTINCT FLIGHT_NUMBER),count(DISTINCT TAIL_NUMBER),count(DISTINCT ORIGIN_AIRPORT),count(DISTINCT DESTINATION_AIRPORT),count(DISTINCT SCHEDULED_DEPARTURE),count(DISTINCT DEPARTURE_TIME),count(DISTINCT DEPARTURE_DELAY),count(DISTINCT TAXI_OUT),count(DISTINCT WHEELS_OFF),count(DISTINCT SCHEDULED_TIME),count(DISTINCT ELAPSED_TIME),count(DISTINCT AIR_TIME),count(DISTINCT DISTANCE),count(DISTINCT WHEELS_ON),count(DISTINCT TAXI_IN),count(DISTINCT SCHEDULED_ARRIVAL),count(DISTINCT ARRIVAL_TIME),count(DISTINCT ARRIVAL_DELAY),count(DISTINCT DIVERTED),count(DISTINCT CANCELLED),count(DISTINCT CANCELLATION_REASON)
0,1,12,31,7,14,6952,4897,628,629,1321,1440,1217,184,1440,550,712,675,1363,1440,185,1435,1440,1240,2,2,4


In [13]:
# Check for duplicates. Raise value error if there are duplicates, do nothing otherwise
if flights.count() > flights.dropDuplicates(flights.columns).count():
    raise ValueError('Data has duplicates')

                                                                                

##### <font color='orange'>----- Cleaning Flights Data -----</font>


In [105]:
# create a copy from the original dataset using only the columns that we need
flights_clean = flights.select(flights.columns[:-5]).alias('flights_clean')

In [106]:
# Confirm that the new dataframe is a copy of the original dataframe
id(flights_clean) == id(flights)

False

<font color='red'>Cleaning 1: Map the cancellation reason letters (A, B, C, D) to the actual written reasons</font>

In [107]:
cancellation_reason_mapping= {
        'A': 'carrier',
        'B': 'weather',
        'C': 'national air system',
        'D': 'security'
    }

flights_clean = flights_clean.replace(to_replace=cancellation_reason_mapping, subset=['CANCELLATION_REASON'])

In [108]:
# Confirm the mapping worked
flights_clean.groupBy('CANCELLATION_REASON').count().toPandas()

                                                                                

Unnamed: 0,CANCELLATION_REASON,count
0,security,22
1,,5729195
2,national air system,15749
3,weather,48851
4,carrier,25262


<font color='red'>Cleaning Note 2: Create a date column from the YEAR, MONTH, DAY columns and a time column from the departure and arrival (both scheduled and actual) and wheels off and on columns</font>

In [109]:
# Create DATE column from YEAR, MONTH, DAY columns
flights_clean = flights_clean.withColumn("DATE", expr("make_date(YEAR, MONTH, DAY)").cast('string'))

# Drop YEAR, MONTH, DAY columns
flights_clean = flights_clean.drop('YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK')


In [110]:
# Confirm the column has been successfully created
flights_clean.limit(2).toPandas()

Unnamed: 0,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,DATE
0,AS,98,N407AS,ANC,SEA,5,2354,-11,21,15,205,194,169,1448,404,4,430,408,-22,0,0,,2015-01-01
1,AA,2336,N3KUAA,LAX,PBI,10,2,-8,12,14,280,279,263,2330,737,4,750,741,-9,0,0,,2015-01-01


In [111]:
# Create time in HH:mm format from the departure, arrival (both scheduled and acutal) and wheels off and on columns (both scheduled and actual times)
flights_clean = flights_clean.withColumn('hour', substring('SCHEDULED_DEPARTURE', 1, 2)).withColumn('min', substring('SCHEDULED_DEPARTURE', 3, 2)).\
    withColumn('SCHED_DEP', concat_ws(':', 'hour', 'min')).drop('hour', 'min')

flights_clean = flights_clean.withColumn('hour', substring('DEPARTURE_TIME', 1, 2)).withColumn('min', substring('DEPARTURE_TIME', 3, 2)).\
    withColumn('DEP_TIME', concat_ws(':', 'hour', 'min')).drop('hour', 'min')

flights_clean = flights_clean.withColumn('hour', substring('WHEELS_OFF', 1, 2)).withColumn('min', substring('WHEELS_OFF', 3, 2)).\
    withColumn('WHEELSOFF', concat_ws(':', 'hour', 'min')).drop('hour', 'min')

flights_clean = flights_clean.withColumn('hour', substring('WHEELS_ON', 1, 2)).withColumn('min', substring('WHEELS_ON', 3, 2)).\
    withColumn('WHEELSON', concat_ws(':', 'hour', 'min')).drop('hour', 'min')

flights_clean = flights_clean.withColumn('hour', substring('SCHEDULED_ARRIVAL', 1, 2)).withColumn('min', substring('SCHEDULED_ARRIVAL', 3, 2)).\
    withColumn('SCHED_ARR', concat_ws(':', 'hour', 'min')).drop('hour', 'min')

flights_clean = flights_clean.withColumn('hour', substring('ARRIVAL_TIME', 1, 2)).withColumn('min', substring('ARRIVAL_TIME', 3, 2)).\
    withColumn('ARR_TIME', concat_ws(':', 'hour', 'min')).drop('hour', 'min')

In [112]:
# Drop the SCHEDULED_DEPARTURE, DEPARTURE_TIME, SCHEDULED_ARRIVAL, ARRIVAL_TIME columns
flights_clean = flights_clean.drop('SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'WHEELS_OFF', 'WHEELS_ON', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME')

In [113]:
# Confirm the column has been successfully created
flights_clean.limit(2).toPandas()

Unnamed: 0,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,DEPARTURE_DELAY,TAXI_OUT,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,TAXI_IN,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,DATE,SCHED_DEP,DEP_TIME,WHEELSOFF,WHEELSON,SCHED_ARR,ARR_TIME
0,AS,98,N407AS,ANC,SEA,-11,21,205,194,169,1448,4,-22,0,0,,2015-01-01,00:05,23:54,00:15,04:04,04:30,04:08
1,AA,2336,N3KUAA,LAX,PBI,-8,12,280,279,263,2330,4,-9,0,0,,2015-01-01,00:10,00:02,00:14,07:37,07:50,07:41


<font color='red'>Cleaning Note 3: Creating N-Number column from the TAIL_NUMBER column by dropping the first letter N. All US tail numbers start with an N.

In [114]:
flights_clean = flights_clean.withColumn('N-Number', substring('TAIL_NUMBER', 2, 5)).drop('TAIL_NUMBER')

In [115]:
flights_clean.limit(2).toPandas()

Unnamed: 0,AIRLINE,FLIGHT_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,DEPARTURE_DELAY,TAXI_OUT,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,TAXI_IN,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,DATE,SCHED_DEP,DEP_TIME,WHEELSOFF,WHEELSON,SCHED_ARR,ARR_TIME,N-Number
0,AS,98,ANC,SEA,-11,21,205,194,169,1448,4,-22,0,0,,2015-01-01,00:05,23:54,00:15,04:04,04:30,04:08,407AS
1,AA,2336,LAX,PBI,-8,12,280,279,263,2330,4,-9,0,0,,2015-01-01,00:10,00:02,00:14,07:37,07:50,07:41,3KUAA


In [116]:
# reordering columns
flights_clean = flights_clean.select('DATE', 'AIRLINE', 'FLIGHT_NUMBER', 'N-Number', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHED_DEP', 'DEP_TIME', \
    'DEPARTURE_DELAY', 'TAXI_OUT', 'WHEELSOFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'WHEELSON', 'TAXI_IN', 'SCHED_ARR', 'ARR_TIME', \
        'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED', 'CANCELLATION_REASON')

flights_clean.limit(2).toPandas()

Unnamed: 0,DATE,AIRLINE,FLIGHT_NUMBER,N-Number,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHED_DEP,DEP_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELSOFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELSON,TAXI_IN,SCHED_ARR,ARR_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON
0,2015-01-01,AS,98,407AS,ANC,SEA,00:05,23:54,-11,21,00:15,205,194,169,1448,04:04,4,04:30,04:08,-22,0,0,
1,2015-01-01,AA,2336,3KUAA,LAX,PBI,00:10,00:02,-8,12,00:14,280,279,263,2330,07:37,4,07:50,07:41,-9,0,0,


In [118]:
# write clean data back to clean file in clean-data folder
# check if the "clean-data" directory exists, if not, create it 
if not os.path.exists("clean-data"):
    os.makedirs("clean-data")

# write clean data back to clean file 
flights_clean.write.option('header', True).option('delimiter', ',').csv('clean-data/flights-data')

                                                                                

### <font color='green'>Step 4: Defining the Data Model</font>


##### 3.1 Data Model - Star Schema

![flights-star-schema](https://github.com/mochen862/udacity-nanodegree-capstone-flights/blob/main/flights-star-schema.png)

##### 3.2 ETL Pipeline

![ETL-pipeline](https://github.com/mochen862/udacity-nanodegree-capstone-flights/blob/main/etl_pipeline.png)

### <font color='green'>Step 5: ETL</font>

##### 5.1 Load Data from Local Drive to AWS S3
Use some imported functions from `s3Functions` which will set up an S3 bucket on AWS, then load our files to storage there.

In [3]:
# use the config parser to collect details from the AWS.cfg file which will feed into the creation of AWS S3 services 
config = configparser.ConfigParser() 
config.read_file(open('aws.cfg')) 

key = config.get('AWS','key')
secret = config.get('AWS','secret')
useRegion = config.get('BUCKET','region')
useBucketName = config.get('BUCKET','name')

# create S3 AWS client 
aws_s3_client = boto3.client('s3',
                            region_name = useRegion,
                            aws_access_key_id = key,
                            aws_secret_access_key = secret)

In [4]:
# check if the bucket already exists 
bucket_exists = checkS3bucket(s3Client=aws_s3_client, bucketName=useBucketName, clientError=ClientError)

# If the bucket does not currently exist, create it 
if bucket_exists == 'No':
    buildIt = buildS3bucket(s3Client=aws_s3_client, bucketName=useBucketName, clientError=ClientError, awsRegion=useRegion) 
    print(buildIt)

else: 
    print("Bucket already exists, no need to create it") 

The bucket does not exist
S3 bucket successfully created


In [5]:
# Loading csv files to the bucket
for file in glob.glob('clean-data/*.csv'):
    fileName = file.split('/')[1]
    loadedFile = loadToS3(s3Client=aws_s3_client, bucketName=useBucketName, clientError=ClientError, 
    bucketKey='capstone_project_data/', loadFile=file, fileName=fileName)
    print(loadedFile)

Successfully loaded aircraft_clean.csv to mc-capstone-flights
Successfully loaded airlines_clean.csv to mc-capstone-flights
Successfully loaded airports_clean.csv to mc-capstone-flights


In [6]:
# Loading flights files to the bucket (multi-part upload)
for i, file in enumerate(glob.glob('clean-data/flights-data/*.csv')):
    fileName = 'flights_clean-part-' + str(i) + '.csv'
    loadedFile = loadToS3(s3Client=aws_s3_client, bucketName=useBucketName, clientError=ClientError, 
    bucketKey='capstone_project_data/', loadFile=file, fileName=fileName)
    print(loadedFile)

Successfully loaded flights_clean-part-0.csv to mc-capstone-flights
Successfully loaded flights_clean-part-1.csv to mc-capstone-flights
Successfully loaded flights_clean-part-2.csv to mc-capstone-flights
Successfully loaded flights_clean-part-3.csv to mc-capstone-flights
Successfully loaded flights_clean-part-4.csv to mc-capstone-flights
Successfully loaded flights_clean-part-5.csv to mc-capstone-flights
Successfully loaded flights_clean-part-6.csv to mc-capstone-flights
Successfully loaded flights_clean-part-7.csv to mc-capstone-flights


##### 5.2 Create IAM Role
This IAM role will be used to create the Redshift cluster and will have read only access to our S3 bucket.

In [7]:
# We will first create Redshift, IAM and EC2 clients
aws_redshift_client = boto3.client('redshift',
                            region_name = useRegion,
                            aws_access_key_id = key,
                            aws_secret_access_key = secret) 

aws_iam_client = boto3.client('iam',
                            region_name = useRegion,
                            aws_access_key_id = key,
                            aws_secret_access_key = secret)

aws_ec2_resource = boto3.resource('ec2',
                            region_name=useRegion,
                            aws_access_key_id = key,
                            aws_secret_access_key = secret)

In [8]:
# Next we need to create a specified IAM role which will create the Redshift cluster and will have read only access to the S3 bucket
roleName = config.get('AWS','iam_role')
policy = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" 

try:
    print("Creating a new IAM Role ...") 
    dwhRole = aws_iam_client.create_role(
        Path='/',
        RoleName=roleName,
        Description = "Allows Redshift clusters to call AWS services on your behalf",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
    print("IAM role " + str(roleName) + " successfully created")
except Exception as e:
    print("IAM role creation failed") 
    print(e) 

# Attach the policy to the IAM role 
try:
    print("Attaching Policy to IAM role ...")
    aws_iam_client.attach_role_policy(RoleName=roleName,
                        PolicyArn=policy)['ResponseMetadata']['HTTPStatusCode'] 
    print("Policy attached to IAM role") 
except Exception as e:
    print("IAM role policy attach failed") 
    print(e) 

# Collect the ARN details of the IAM role 
roleArn = aws_iam_client.get_role(RoleName=roleName)['Role']['Arn']

Creating a new IAM Role ...
IAM role etl-dev successfully created
Attaching Policy to IAM role ...
Policy attached to IAM role


##### 5.3 Create Redshift Cluster
The Redshift cluster and database will contain all of the staging and analytics tables.

In [9]:
try:
    print("Creating the Redhsift cluster " + str(config.get('DWH','dwh_cluster_identifier')) + " ...") 
    response = aws_redshift_client.create_cluster(        
        #HW
        ClusterType=config.get('DWH','dwh_cluster_type'),
        NodeType=config.get('DWH','dwh_node_type'),
        NumberOfNodes=int(config.get('DWH','dwh_num_nodes')),

        #Identifiers & Credentials
        DBName=config.get('DWH','dwh_db'),
        ClusterIdentifier=config.get('DWH','dwh_cluster_identifier'),
        MasterUsername=config.get('DWH','dwh_db_user'),
        MasterUserPassword=config.get('DWH','dwh_db_password'),
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
    print("Cluster " + str(config.get('DWH','dwh_cluster_identifier')) + " has been successfully created") 
except Exception as e:
    print("Cluster creation has failed") 
    print(e) 

Creating the Redhsift cluster mccapstoneflights ...
Cluster mccapstoneflights has been successfully created


In [13]:
# we can use the following function to check for an active cluster - ClusterStatus must change from `creating` to `available` in order to be ready for use 
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = aws_redshift_client.describe_clusters(ClusterIdentifier=config.get('DWH','dwh_cluster_identifier'))['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,mccapstoneflights
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dbadmin
4,DBName,flights_cancellations_delays
5,Endpoint,"{'Address': 'mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-03e0a4ff45dcefe60
7,NumberOfNodes,4


In [128]:
# Collects the Endpoint & ARN 
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN) 

DWH_ENDPOINT ::  mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::574324166090:role/etl-dev


In [16]:
# write the endpoint (i.e. host) and role arn to the aws.cfg file
cfg_file = 'aws.cfg' 
config = configparser.ConfigParser()
config.read(cfg_file)

def set_config(sec, attr, value):
    config.set(sec, attr, value)
    with open(cfg_file, 'w') as configfile:
        config.write(configfile)

set_config('CLUSTER', 'host', DWH_ENDPOINT)
set_config('CLUSTER', 'role_arn', DWH_ROLE_ARN)

In [129]:
# Open an incoming TCP port to access the cluster 
try:
    vpc = aws_ec2_resource.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(config.get('DWH', 'dwh_port')),
        ToPort=int(config.get('DWH', 'dwh_port'))
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-045c15689baac7caf')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


Connect to the cluster

In [14]:
# load the magic sql module for the notebook 
%load_ext sql 
#%reload_ext sql

# set connection details to redshift db 
user = config.get('DWH','dwh_db_user')
password = config.get('DWH','dwh_db_password') 
endpoint = config.get('CLUSTER','host') 
port = config.get('DWH','dwh_port') 
db = config.get('DWH','dwh_db') 

conn_string="postgresql://{}:{}@{}:{}/{}".format(user, password, endpoint, port, db)
%sql $conn_string


##### 5.4 Main ETL Process
We will create staging tables, create empty tables and then load these empty tables with data from the staging tables.
1. Create staging tables and load data into staging tables
2. Create empty analytics tables and insert data into the analytics tables


In [64]:
%run "staging_tables.py"

In [65]:
# check for any loading errors in case you have some
%sql select * from stl_load_errors;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
0 rows affected.


userid,slice,tbl,starttime,session,query,filename,line_number,colname,type,col_length,position,raw_line,raw_field_value,err_code,err_reason,is_partial,start_offset


In [66]:
%run "analytics_tables.py"

In [67]:
%run "drop_staging_tables"

##### 5.5 Data Quality Checks
1. Run check to ensure that there are no duplicate flights in the flights analytics table

In [68]:
%%sql
SELECT COUNT(a.*) as duplicate_records FROM 
(SELECT date, iata_airline_code, flight_number, scheduled_departure, scheduled_arrival, count(*) as count_ FROM flights 
GROUP BY date, iata_airline_code, flight_number, scheduled_departure, scheduled_arrival) as a
WHERE a.count_ > 1 ;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


duplicate_records
0


It's okay to have duplicate date, iata_airline_code and flight_number records. These are return flights between two airports on the same day.

An example for AA flight number 1002 with date 2015-01-01 is shown below.

In [69]:
%%sql
SELECT date, iata_airline_code, flight_number, count(*) as count_ FROM flights 
GROUP BY date, iata_airline_code, flight_number
HAVING count_ > 1
ORDER BY date, iata_airline_code, flight_number
LIMIT 10;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
10 rows affected.


date,iata_airline_code,flight_number,count_
2015-01-01,AA,1002,2
2015-01-01,AA,1012,2
2015-01-01,AA,1023,2
2015-01-01,AA,1024,2
2015-01-01,AA,1030,2
2015-01-01,AA,1046,2
2015-01-01,AA,1068,2
2015-01-01,AA,1086,2
2015-01-01,AA,1098,2
2015-01-01,AA,1115,2


In [70]:
%%sql
SELECT * from flights
WHERE date = '2015-01-01' AND flight_number = '1002' AND iata_airline_code = 'AA';

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
2 rows affected.


flight_id,date,iata_airline_code,flight_number,n_number,origin_airport,destination_airport,scheduled_departure,departure_time,departure_delay,taxi_out,wheels_off,scheduled_time,elapsed_time,air_time,distance,wheels_on,taxi_in,scheduled_arrival,arrival_time,arrival_delay,diverted,cancelled,cancellation_reason,aircraft_manufacture_year,aircraft_type,engine_type
12330,2015-01-01,AA,1002,475AA,DFW,IAH,10:30,,,,,65,,,224,,,11:35,,,0,1,weather,1988,Fixed wing multi engine,Turbo-fan
16715,2015-01-01,AA,1002,559AA,IAH,DFW,12:25,12:24,-1.0,11.0,12:35,75,65.0,37.0,224,13:12,17.0,13:40,13:29,-11.0,0,0,,1991,Fixed wing multi engine,Turbo-fan


2. Run check to ensure that there are no null values in the airlines table

In [71]:
%%sql 
SELECT COUNT(*) as NULL_Records
FROM airlines 
WHERE iata_airline_code IS NULL or airline_name IS NULL ;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


null_records
0


3. Run check to ensure that there are no null values in the airports table for the iata_airport_code column

In [72]:
%%sql 
SELECT COUNT(*) as NULL_Records
FROM airports 
WHERE iata_airport_code IS NULL;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


null_records
0


4. Run check to ensure that there are no null values in the aircraft table for the n_number column

In [73]:
%%sql 
SELECT COUNT(*) as NULL_Records
FROM aircraft 
WHERE n_number IS NULL;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


null_records
0


5. Run check to ensure that there are no null values date table, and that we have all 365 days for the 2015 calendar year

In [74]:
%%sql 
SELECT COUNT(*) as NULL_Records
FROM date 
WHERE date IS NULL;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


null_records
0


In [75]:
%sql SELECT count(*) FROM date;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


count
365


6. Run check to ensure that there are no null values time table

In [76]:
%%sql 
SELECT COUNT(*) as NULL_Records
FROM time 
WHERE time IS NULL;

 * postgresql://dbadmin:***@mccapstoneflights.ci5hka3tp4fe.us-west-2.redshift.amazonaws.com:5439/flights_cancellations_delays
1 rows affected.


null_records
0


In [85]:
# close the connection
connections = %sql -l
[c.session.close() for c in connections.values()]

[None]

##### 5.6 Make sure to <font color='red'>CLEAN UP THE ETL RESOURCES</font> to avoid unnecessary costs

In [86]:
# delete cluster 
try:
    aws_redshift_client.delete_cluster(ClusterIdentifier=config.get('DWH','dwh_cluster_identifier'), SkipFinalClusterSnapshot=True) 
    print("Cluster Deleted")
except Exception as e:
    print("Cluster Deletion has failed")
    print(e) 

Cluster Deleted


In [90]:
# check the progress of deletion
myClusterProps = aws_redshift_client.describe_clusters(ClusterIdentifier=config.get('DWH','dwh_cluster_identifier'))['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster mccapstoneflights not found.

In [91]:
# remove IAM role created 
try:
    # detach role policy
    aws_iam_client.detach_role_policy(RoleName=config.get('AWS','iam_role'), PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
    print("IAM Policy removed") 
    print("*-------------------------*")
    # delete the role
    aws_iam_client.delete_role(RoleName=config.get('AWS','iam_role')) 
    print("IAM Role deleted") 
    print("*-------------------------*")

except Exception as e:
    print("Failed to delete IAM role") 
    print(e) 

IAM Policy removed
*-------------------------*
IAM Role deleted
*-------------------------*


In [92]:
# empty S3 bucket (i.e. delete all objects)
BUCKET = config.get('BUCKET', 'name')

aws_s3_resource = boto3.resource('s3',
                            region_name = useRegion,
                            aws_access_key_id = key,
                            aws_secret_access_key = secret) 

try:
    aws_s3_resource.Bucket(BUCKET).objects.all().delete() 
    print("S3 objects deleted") 

except Exception as e:
    print("S3 objects FAILED to delete") 
    print(e) 

S3 objects deleted


In [93]:
# delete the S3 bucket
try:
    aws_s3_resource.Bucket(BUCKET).delete() 
    print("S3 bucket has been deleted") 

except Exception as e:
    print("S3 bucket FAILED to delete") 
    print(e) 

S3 bucket has been deleted


##### 5.7 Data Dictionary

`flights` fact table

| Column | Type | Desription |
| ------ | ---- | ---------- |
| flight_id | INT | The unique flight id. This is the <font color='red'>PK</font>|
| date | DATE | Date of the flight |
| iata_airline_code | VARCHAR | 2-letter code of an airline |
| flight_number | VARCHAR | Flight number of the flight |
| n_number | VARCHAR | Identification number assigned to aircraft |
| origin_airport | VARCHAR | The airport from where the flight left from |
| destination_airport | VARCHAR | The airport where the flight was going to |
| scheduled_departure | VARCHAR | The scheduled departure time of the flight in HH:mm format |
| departure_time | VARCHAR | The actual departure time of the flight in HH:mm format (wheels_off - taxi_out) |
| departure_delay | VARCHAR | The number of minutes the flight departure is delayed (departure_time - scheduled_departure)|
| taxi_out | VARCHAR | The number of minutes the taxi out process took (i.e. the time duration elapsed between departure from the origin airport gate and wheels off) |
| wheels_off | VARCHAR | The actual time when the aircraft took off |
| scheduled_time | VARCHAR | The scheduled flight time in minutes |
| elapsed_time | VARCHAR | The actual flight time in minutes including taxi in and out (air_time + taxi_in + taxi_out) |
| air_time | VARCHAR | The number of minutes spent in the air ( wheels_on - wheels_off) |
| distance | VARCHAR | The distance between origin and destination in miles |
| wheels_on | VARCHAR | The actual time when the aircraft landed |
| taxi_in | VARCHAR | The number of minutes the taxi out process took (The time duration elapsed between wheels-on and gate arrival at the destination airport) |
| scheduled_arrival | VARCHAR | The scheduled arrival time of the flight in HH:mm format |
| arrival_time | VARCHAR | The actual arrival time of the flight in HH:mm format (wheels_on + taxi_in) |
| arrival_delay | VARCHAR | The number of minutes the flight arrival is delayed (arrival_time - scheduled_arrival) |
| diverted | VARCHAR | 0 if the flight was not diverted and 1 if the flight was diverted |
| cancelled | VARCHAR | 0 if the flgiht was not cancelled and 1 if the flight was diverted |
| cancellation_reason | VARCHAR | The reason for flight cancellation |
| aircraft_manufacture_year | VARCHAR | The year the aircraft that was used in the flight was manufactured |
| aircraft_type | VARCHAR | The type of the aircraft that was used for the flight |
| engine_type | VARCHAR | The engine type of the aircraft that was used for the flight |

`aircraft` dim table

| Column | Type | Desription |
| ------ | ---- | ---------- |
| n_number | VARCHAR |  Identification number assigned to aircraft. This is the <font color='red'>PK</font> |
| serial_number | VARCHAR | The complete aircraft serial number assigned to the aircraft by the manufacturer |
| aircraft_mfr_model_code | VARCHAR | A code assigned to the aircraft manufacturer, model and series |
| engine_mfr_code | VARCHAR | A code assigned to the engine manufacturer and model |

`airlines` dim table

| Column | Type | Desription |
| ------ | ---- | ---------- |
| iata_airline_code | VARCHAR |  2-letter code of an airline. This is the <font color='red'>PK</font>|
| airline_name | VARCHAR | The name of airline |

`airports` dim table

| Column | Type | Desription |
| ------ | ---- | ---------- |
| iata_airport_code | VARCHAR |  3-letter code of an airport. This is the <font color='red'>PK</font>|
| airport_name | VARCHAR | The name of the airport |
| city | VARCHAR | The city in which the airport is in|
| latitude | VARCHAR | The latitude details of the airport |
| longitude | VARCHAR | The longitude details of the airport |
| altitude | VARCHAR | The altitude details of the airport |

`date` dim table

| Column | Type | Desription |
| ------ | ---- | ---------- |
| date | DATE | Date of the flight. This is the <font color='red'>PK</font>|
| year | INT | Year of the flight |
| month | INT | Month of the flight |
| day | INT | Day of the flight |
| weekday | INT | Weekday of the flight |

`time` dim table

| Column | Type | Desription |
| ------ | ---- | ---------- |
| time | VARCHAR | Time of the flight. This is the <font color='red'>PK</font>|
| hour | VARCHAR | Hour of the flight |
| minute | VARCHAR | Minute of the flight |

### <font color='green'>Step 6: Detailed Project Discussion</font>

##### 6.1 Data Assessment and Cleaning Tools

The <font color='orange'>**Pandas**</font> library was used as the tool to explore and clean the smaller datasets (airports, airlines and aircraft data) due to its wide range of abilities when it comes to data wrangling.

Pandas can read in data from various formats such as txt, csv, json, excel and makes it easy to explore new data in a dataframe format. The package helps to find duplicates, count unique values, add / remove columns from the existing dataframe and write back the clean dataframe to our chosen file format (which was csv for this project).

<font color='orange'>**Spark**</font> was the chosen tool to explore and clean the flights data due to its size (6M rows). Spark is meant for big data sets that cannot fit on one computer. Spark is capable of handling several petabytes of data at a time, distributed across a cluster of thousands of cooperating physical or virtual servers.

##### 6.2 ETL Tools

The chosen storage for the source data was <font color='orange'>**AWS S3**</font>. It's cheap, easily accessible and allows storage of any file types. S3 has no storage limit, allows others (engineers, analysts, etc) to access the files and can also connect with other AWS services. To promote <font color='orange'>**infrastructure as code**</font>, the clean data files were uploaded to the S3 bucket using <font color='orange'>**Python AWS SDK**</font>.

Next, a specific <font color='orange'>**IAM role**</font> was created with S3 Read Only Access to access the data stored in our bucket. Then a <font color='orange'>**Redshift**</font> cluster was launched. Again, these steps were done using Python AWS SDK to automate as much of the process as possible.

Within the Redshift cluster, we loaded the S3 data to staging tables first as an intermediate step in the ETL process so that we can access the data from each of the staging tables when creating our analytics tables.



##### 6.3 Data Model - Star Schema

The star schema was the chosen data model as our key event data was the flights themselves. The dimension tables were created to link the flights data to airline details, airport details and aircraft information to help air transport analysts produce meaningful insights around US domestic air travel.

##### 6.4 Updating the Data

The data sources used have different intervals. 

The airports data was last updated in 2021, but it'd probably make sense to update this data every quarter to account for new / no longer existing airports. 

The aircraft dataset is refreshed daily at 11:30pm central time.

The airlines and flights cancellation & delays data from the US Department of Transportation is released on a monthly basis, but in a real production pipeline, it'd make sense to update the flights data daily to keep up with the new records. 

Of course, we should bear in mind that the flights fact table would change daily, and most of the dimension tables would channge a lot less frequently.

##### 6.5 Handling Different Scenarios

**If the data was increased by 100x**
AWS Redshift automatically and quickly scales to handle more data

**If the pipelines were run on a daily basis by 7am**
Apache Airflow could be used to schedule daily runs that would collect, assess and clean the data and push the clean data through the ETL process described in the project.

**If the database needed to be accessed by 100+ people**
We can easily scale up / out the Redshift clusters to handle larger loads so that hundreds of analysts can access the database. Of course, we need to ensure database security (say for example we could manage databaseb security through creating Groups)