# Capstone Project
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
The purpose of the project is to build a data warehouse used to analyze the factors affecting the decision to choose a new place to live when immigrating to the US of people from all over the world. The project only analyzes based on data of air migrants.

#### Describe and Gather Data 
|Data Set|Format|Description|
|--------|------|-----------|
|I94 Immigration Data|SAS	|Contains international visitor arrival statistics by world regions and select countries, type of visa, transportation mode, age, arrived states, entry ports.|
|Airport Code Data|CSV| Contains airport codes and corresponding cities.|
|U.S. City Demographic Data	|CSV	|Contains information about the demographics of all US cities.|

In [1]:
# import all needed libraries 
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, isnan, when, count, udf, monotonically_increasing_id

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### 1. airport-codes_csv.csv file

In [2]:
# Use pandas to read csv file
airport_codes = pd.read_csv('airport-codes_csv.csv')

# Overview data
print('First 5 rows of airport_codes:')
airport_codes.head(5)


First 5 rows of airport_codes:


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [3]:
# get some basic informations about airport_codes (the # of rows, the # of cols, null count, col's datatype)
airport_codes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [4]:
# check if it has any duplicated rows
print('Count duplicated rows:')
airport_codes.duplicated().sum()

Count duplicated rows:


0

In [5]:
# check if it has any duplicated values in the column that will be primary key
print('Count duplicated of primary key --ident--')
airport_codes.duplicated(subset = ['ident']).sum()

Count duplicated of primary key --ident--


0

##### 2. us-cities-demographics.csv file

In [6]:
# Use pandas to read csv file
demographics = pd.read_csv('us-cities-demographics.csv', delimiter = ';')

# Overview data
print('First 5 rows of demographics:')
demographics.head(5)

First 5 rows of demographics:


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
# get some basic informations about demographics (the # of rows, the # of cols, null count, col's datatype)
demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [8]:
# check if it has any duplicated rows
print('Count duplicated rows:')
demographics.duplicated().sum()

Count duplicated rows:


0

In [9]:
# check if it has any duplicated values in the columns
print("Count duplicated of primary key --'City', 'State Code', 'Race'--")
demographics.duplicated(subset = ['City', 'State Code', 'Race']).sum()

Count duplicated of primary key --'City', 'State Code', 'Race'--


0

In [10]:
# check if it has any duplicated values in the columns
print("Count duplicated of primary key --'City', 'State Code', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'Race'--")
demographics.duplicated(subset = ['City', 'State Code', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'Race']).sum()

Count duplicated of primary key --'City', 'State Code', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'Race'--


0

##### 3. immigration_data_sample.csv file

In [11]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()  

In [12]:
# use pyspark to read parquet files
immigrations = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# write to parquet
immigrations.write.mode('overwrite').parquet("sas_data")
# read from parquet
immigrations = spark.read.parquet("sas_data")

In [13]:
# Overview data
print('First 5 rows of demographics:')
immigrations.head(5)

First 5 rows of demographics:


[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [14]:
# Row count
immigrations.count()

3096313

In [15]:
# Datatype of columns
immigrations.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [16]:
# Check if it has any duplicated values in the column that will be primary key
# refer to https://www.datasciencemadesimple.com/get-duplicate-rows-in-pyspark/
print("Count duplicated of primary key --cicid--")
duplicates = immigrations.groupBy('cicid').count().filter('count > 1')
duplicates.drop('count').show()

Count duplicated of primary key --cicid--
+-----+
|cicid|
+-----+
+-----+



#### 3. Cleaning Data
Document steps necessary to clean the data

##### 1. Drop duplicated rows
Because there's no duplicates in airport_codes, demographics, immigrations. Therefore no need to drop duplicates.

##### 2. Drop rows have NULL
Because there's no NULL in airport_codes, demographics, immigrations. Therefore no need to drop any rows.

##### 3. Transform data
Because arrdate & depdate in immigrations are sas-formatted values. Therefore need to transform these columns to datetype.

In [17]:
def convert_sasformat_to_date(sas_num):
    '''
        Description:
            This func uses to convert date columns have SAS-formatted values to DateType ('yyyy/mm/dd')
            Params: 
                - sas_num: a SAS-formatted value
            Return: date value ('yyyy/mm/dd')
            The solution is refered from https://stackoverflow.com/questions/36412864/convert-numeric-sas-date-to-datetime-in-pandas
    '''
    if sas_num is not None:
        return pd.to_timedelta(sas_num, unit='D') + pd.Timestamp('1960-1-1')

In [18]:
#  Convert to udf to use with spark
# refer to https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
udf_convert_sasformat_to_date = udf(convert_sasformat_to_date, DateType())

In [19]:
# transform sas format of arrdate to date type
immigrations = immigrations.withColumn('arrdate', udf_convert_sasformat_to_date(col('arrdate')))

# transform sas format of depdate to date type
immigrations = immigrations.withColumn('depdate', udf_convert_sasformat_to_date(col('depdate')))

# transform sas format of i94bir to date type
immigrations = immigrations.withColumn('i94bir', udf_convert_sasformat_to_date(col('i94bir')))

In [20]:
# get immigrations have been by airport only
immigrations_air = immigrations[immigrations['i94mode'] == 1]

In [21]:
# get US's airport only
airport_codes_us = airport_codes[airport_codes['iso_country'] == 'US']

In [22]:
# get state code only from iso_region field
airport_codes_us['iso_region'] = airport_codes_us['iso_region'].str[3:]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [23]:
def mapping_city_code():
    with open("I94_SAS_Labels_Descriptions.SAS") as f:
        contents = f.readlines()
    
    cities = {}
    cicos = []
    cinas = []
    stcos = []
    
    for row in contents[303:817]:
        # city_code = pair[0], city_name & state_code = pair[1]
        pair = row.split('=')
        # to split city_name and state_code
        cist_pair = pair[1].split(',')
        
        cico, cina = pair[0].strip("\t").strip().strip("'"), cist_pair[0].strip("\t").strip().strip("'")
        if len(cist_pair) == 2:
            stco = cist_pair[1].strip("\n").strip().strip("'")
        else:
            stco = ''
            
        cicos.append(cico)
        cinas.append(cina)
        stcos.append(stco)
        df_cities = pd.DataFrame({'city_code' : cicos,
                                'city_name' : cinas,
                                'state_code' : stcos }, 
                                columns=['city_code', 'city_name', 'state_code'])
        
    return df_cities

In [24]:
df_cities = mapping_city_code()

In [25]:
df_states = demographics[['State Code', 'State']].drop_duplicates()
df_states['State Code'] = df_states['State Code'].str.strip()

### Step 3: Define the Data Model
![data model](schema.png)

#### 3.1 Conceptual Data Model
The data model consists of 4 tables (1 fact & 3 dims):
- FACT_IMMIGRATION: contains all immigration events and date
- DIM_IMMIGRANT: contains immigrant information
- DIM_AIRPORT: contains airport information
- DIM_STATE_DEMOGRAPHIC: contains US states demographic information

#### 3.2 Mapping Out Data Pipelines
Load data to dim, fact tables and change clearer column name.
- FACT_IMMIGRATION: get fields from files in sas_data folder
- DIM_IMMIGRANT: get fields from files in sas_data folder
- DIM_AIRPORT: get fields from airport-codes_csv.csv file
- DIM_STATE_DEMOGRAPHIC: get fields from us-cities-demographics.csv

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [26]:
# DIM_CITY
def create_dim_city():
    dim_city = df_cities.join(df_states.set_index('State Code'), on='state_code', how = 'left')
    dim_city.columns = ['city_code', 'city_name', 'state_code', 'state_name']
    
    # make sure primary key is unique
    dim_city = dim_city.drop_duplicates(['city_code', 'state_code'])
    
    return dim_city

In [27]:
# prepare a temp dim_city to get the corresponding city_code for dim_airport
dim_city_tojoin_airport = create_dim_city().drop_duplicates(['city_code', 'city_name'])
dim_city_tojoin_demog = create_dim_city().drop_duplicates(['city_code', 'city_name'])

In [28]:
# DIM_AIRPORT
def create_dim_airport():
    # get needed fields from airport-codes_csv.csv file
    dim_airport = airport_codes_us[['ident', 'iso_region', 'municipality', 'name', 'type', 'elevation_ft', 'gps_code', 'local_code', 'coordinates']]
    
    # setup clearer column name
    dim_airport.columns = ['airport_ident', 'state_code', 'city_name', 'name', 'type', 'elevation_ft', 'gps_code', 'local_code', 'coordinates']
    
    dim_airport['city_name'] = dim_airport['city_name'].str.upper()
    
    # join to temp dim_city to extract city_code
    dim_airport = dim_airport.merge(dim_city_tojoin_airport.set_index('city_name'), on='city_name', how = 'inner')
    dim_airport = dim_airport[['airport_ident', 'city_code', 'state_code_x', 'name', 'type', 'elevation_ft', 'gps_code', 'local_code', 'coordinates']]
    dim_airport.columns = ['airport_ident', 'city_code', 'state_code', 'name', 'type', 'elevation_ft', 'gps_code', 'local_code', 'coordinates']
    
    # make sure primary key is unique
    dim_airport = dim_airport.drop_duplicates(['airport_ident'])
    
    return dim_airport

In [29]:
# DIM_DEMOGRAPHIC
def create_dim_demographic():
    # includes only US demographic    
    # get needed fields from us-cities-demographics.csv
    dim_demographic = demographics[['City', 'State Code', 'State', 'Race', 'Count', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size']]
    dim_demographic.columns = ['city_name', 'state_code', 'state', 'race', 'count_each_race', 'median_age', 'male_population', 'female_population', 'total_population', 'num_vetarans', 'foreign_born', 'avg_household_size']
    
    dim_demographic['city_name'] = dim_demographic['city_name'].str.upper()
    
    # join to temp dim_city to extract city_code
    dim_demographic = dim_demographic.merge(dim_city_tojoin_demog.set_index('city_name'), on='city_name', how = 'inner')
    dim_demographic = dim_demographic[['city_code', 'state_code_x', 'race', 'count_each_race', 'median_age', 'male_population', 'female_population', 'total_population', 'num_vetarans', 'foreign_born', 'avg_household_size']]
    dim_demographic.columns = ['city_code', 'state_code', 'race', 'count_each_race', 'median_age', 'male_population', 'female_population', 'total_population', 'num_vetarans', 'foreign_born', 'avg_household_size']
    
    # make sure primary key is unique
    dim_demographic = dim_demographic.drop_duplicates(['city_code', 'state_code', 'race'])
    
    return dim_demographic

In [30]:
# FACT_IMMGRATION
def create_fact_immigration():
    fact_immigration = immigrations_air.select(
        (immigrations_air.cicid.cast(IntegerType())).alias('cic_id'),
        immigrations_air.i94port.alias('city_code'),
        immigrations_air.i94addr.alias('state_code'),
        immigrations_air.depdate.alias('departure_date'),
        immigrations_air.arrdate.alias('arrive_date'),
        (immigrations_air.i94mode.cast(IntegerType())).alias('travel_mode'),
        (immigrations_air.i94visa.cast(IntegerType())).alias('visa_type_id'),
        immigrations_air.visatype.alias('visa_type'),
        immigrations_air.airline,
        (immigrations_air.admnum.cast(IntegerType())).alias('admin_num'),
        immigrations_air.fltno.alias('flight_no'),
        (immigrations_air.admnum.cast(IntegerType())).alias('admin_num'),
        immigrations_air.i94bir.alias('birthday'),
        immigrations_air.gender
    )
    
    return fact_immigration

Run the create function to create and load data to dim, fact tables.

In [31]:
dim_city = create_dim_city()
dim_airport = create_dim_airport()
dim_demographic = create_dim_demographic()
fact_immigration = create_fact_immigration()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  if __name__ == '__main__':


In [32]:
dim_city.head(5)

Unnamed: 0,city_code,city_name,state_code,state_name
0,BAR,BAKER AAF - BAKER ISLAND,AK,Alaska
1,DAC,DALTONS CACHE,AK,
2,PIZ,DEW STATION PT LAY DEW,AK,Alaska
3,DTH,DUTCH HARBOR,AK,
4,EGL,EAGLE,AK,


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [33]:
# Check if there're empty tables
dc_all = len(dim_city)
da_all = len(dim_airport)
dd_all = len(dim_demographic)
fi_all = fact_immigration.count()

print(f'- dim_city: {dc_all} rows.')
print(f'- dim_airport: {da_all} rows.')
print(f'- dim_demographic: {dd_all} rows.')
print(f'- fact_immigration: {fi_all} rows.')

- dim_city: 514 rows.
- dim_airport: 2800 rows.
- dim_demographic: 691 rows.
- fact_immigration: 2994505 rows.


In [34]:
# Check the primary key is unique
dc_unique = len(dim_city[['city_code', 'state_code']].drop_duplicates())
da_unique = len(dim_airport.drop_duplicates(['airport_ident']))
dd_unique = len(dim_demographic.drop_duplicates(['city_code', 'state_code', 'race']))
fi_unique = fact_immigration.dropDuplicates(['cic_id']).count()

if dc_all == dc_unique:
    print('The primary key of dim_city is unique.')
else:
    print('dim_city: Fail TC.')

if da_all == da_unique:
    print('The primary key of dim_airport is unique.')
else:
    print('dim_airport: Fail TC.')
    
if dd_all == dd_unique:
    print('The primary key of dim_demographic is unique.')
else:
    print('dim_demographic: Fail TC.')
    
if fi_all == fi_unique:
    print('The primary key of fact_immigration is unique.')
else: 
    print('fact_immigration: Fail TC.')

The primary key of dim_city is unique.
The primary key of dim_airport is unique.
The primary key of dim_demographic is unique.
The primary key of fact_immigration is unique.


Check The ETL Process

In [35]:
# Query the data to answer the question: Where is Police Heliport airport located? And How is Asian's demographic situation in here?
output = pd.merge(dim_airport, dim_demographic, on=['city_code', 'state_code'], how='inner')
output = output[(output['name'] == 'Police Heliport') & (output['race'] == 'Asian')]
output = output[['airport_ident', 'city_code', 'state_code', 'name', 'race', 'count_each_race', 'median_age', 'total_population', 'avg_household_size']]
output

Unnamed: 0,airport_ident,city_code,state_code,name,race,count_each_race,median_age,total_population,avg_household_size
2,IN78,GAR,IN,Police Heliport,Asian,537,38.1,77354,2.35


#### 4.3. Data Dictionary

- DIM_CITY

|COLUMN NAME|DESCRIPTION|
|-----------|-----------|
|city_code|3 digit for the city code that is in United State. city_code and state_code are combined to composite key as primary key of DIM_CITY|
|state_code|3 digit for the state code that is in United State. city_code and state_code are combined to composite key as primary key of DIM_CITY|
|city_name|city name|
|state_name|state name|


- DIM_AIRPORT

|COLUMN NAME|DESCRIPTION|
|-----------|-----------|
|airport_ident|3-4 digit for the airport identity code that is in only United State. And primary key of DIM_AIRPORT also.
|city_code|3 digit for the city code that is in United State. city_code and state_code are combined to composite key as foreign key reference to DIM_CITY|
|state_code|3 digit for the state code that is in United State. city_code and state_code are combined to composite key as foreign key reference to DIM_CITY|
|name|airport name|
|type|airport type: small, heliport, closed|
|elevation_ft| the highest point of the landing area|
|gps_code|gps code is unique|
|local_code|local code is unique|
|coordinates|are formed by two components that are a latitude , giving the north-south position, and a longitude, giving the east-west position|


- DIM_DEMOGRAPHIC

|COLUMN NAME|DESCRIPTION|
|-----------|-----------|
|city_code|3 digit for the city code that is in United State. city_code and state_code and race are combined to composite key as primary key of DIM_DEMOGRAPHIC|
|state_code|3 digit for the state code that is in United State. city_code and state_code and race are combined to composite key as primary key of DIM_DEMOGRAPHIC|
|race|race type: Asian, White, Latino, etc.city_code and state_code and race are combined to composite key as primary key of DIM_DEMOGRAPHIC|
|count_each_race| count people in city per race|
|median_age|median age of the city in the state|
|male_population|male population of the city in the state|
|female_population|female population of the city in the state|
|total_population|total population of the city in the state|
|num_veterans| the number of veterans of the city in the state|
|foreign_born| foreign_born of the city in the state|
|avg_household_size| average household size of the city in the state|

- FACT_IMMIGRATION|
|COLUMN NAME|DESCRIPTION|
|-----------|-----------|
|cic_id|CIC ID is the primary key of FACT_IMMIGRATION|
|city_code|3 digit for the city code that is in United State. city_code and state_code are combined to composite key as foreign key reference to DIM_CITY|
|state_code|3 digit for the state code that is in United State. city_code and state_code are combined to composite key as foreign key reference to DIM_CITY|
|departure_date|departure date in SAS format|
|arrive_date| arrive date in SAS format|
|travel_mode| like air, sea, land. only includes air|
|visa_type_id| a integer number|
|visa_type| includes: WT, B2, CP, etc.|
|airline|a 2 digit for airline code|
|admin_num| a number string for admin number|
|flight_no|flight no.|
|birthday|immigrant's birthday in SAS format|
|gender|a digit (F or M)|

#### Step 5: Complete Project Write Up

- The data model is designed as snowflake schema, with 1 fact and 3 dims. This schema is corresponding with the complex project's data and relationship. In this case, it helps the developers can read and understand the business clearly, and also scale-up easily.

- The project uses both pandas and pyspark because there're the large datasets.

- The dataset should be rebuilt yearly to get the clear change of analyzing and predicting results.

- When the data was increased by 100x, We would repartition the data first base on the join key to improve the performance. If this doesn't improve performance that much, we will consider increasing the number of nodes in Spark cluster.

- When the dashboard updates on a daily, we should create an Airflow DAG for it, and use AWS EMR to run the Spark cluster, then use Apache Livy to send Spark commands

- When the database need to be accessed by 100+ people, we will scale up the read capacity by using Redshift. We also use AWS IAM to manage users' access to the clusters. Additionally, Amazon Cognito is a good for customizing login page and user management system.