# Data Engineering Capstone Project

## Project Summary

This is the capstone project for Udacity Data engineer nanodegree. In this project, I will explore I94 Immigration data and create a database that is optimized to query and analyze immigration events. An ETL pipeline is to be build with these to data sources to create the database to store the immigrants information

The project follows the follow steps:

Step 1: Scope the Project and Gather Data  
Step 2: Explore and Assess the Data  
Step 3: Define the Data Model  
Step 4: Run ETL to Model the Data  
Step 5: Complete Project Write Up  

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Step 1: Scope the Project and Gather Data


### The scope of this project:
For this project, I would like to explore the profiles of new immigrants and explore what the new immigrants value when they make their choice on which city to land on. I will create a immigrant table as the first dimensional table, which shows the profile of immigrants. I will aggregate the I94 immigration data by destination city as well as aggregate city temperature data by city to create the city table as second-dimension table. For the fact table, I will create the the table that shows the immigrant event in I94 immigration data. Lastly,I will create a database to query on immigration events. All the process are conducted in Spark.

### Describe and Gather Data
•	I94 Immigration Data: This data comes from the US National Tourism and Trade Office. 

 |-- cicid: double (nullable = true)  
 |-- i94yr: double (nullable = true)  
 |-- i94mon: double (nullable = true)  
 |-- i94cit: double (nullable = true)  
 |-- i94res: double (nullable = true)  
 |-- i94port: string (nullable = true)  
 |-- arrdate: double (nullable = true)  
 |-- i94mode: double (nullable = true)  
 |-- i94addr: string (nullable = true)  
 |-- depdate: double (nullable = true)  
 |-- i94bir: double (nullable = true)  
 |-- i94visa: double (nullable = true)  
 |-- count: double (nullable = true)  
 |-- dtadfile: string (nullable = true)  
 |-- visapost: string (nullable = true)  
 |-- occup: string (nullable = true)  
 |-- entdepa: string (nullable = true)  
 |-- entdepd: string (nullable = true)  
 |-- entdepu: string (nullable = true)  
 |-- matflag: string (nullable = true)  
 |-- biryear: double (nullable = true)  
 |-- dtaddto: string (nullable = true)  
 |-- gender: string (nullable = true)  
 |-- insnum: string (nullable = true)  
 |-- airline: string (nullable = true)  
 |-- admnum: double (nullable = true)  
 |-- fltno: string (nullable = true)  
 |-- visatype: string (nullable = true)  

•	World Temperature Data: This dataset came from Kaggle. 
    dt  
    AverageTemperature  
    AverageTemperatureUncertainty  
    City  
    Country  
    Latitude  
    Longitude  

•	U.S. City Demographic Data:
    City  
    State  
    Median Age  
    Male Population  
    Female Population  
    Total Population  
    Number of Veterans  
    Foreign-born  
    Average Household Size  
    State Code  
    Race  
    Count  


In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [4]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [5]:
df_airport = pd.read_csv('airport-codes_csv.csv' )
df_airport.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [6]:
df_city = pd.read_csv('us-cities-demographics.csv',delimiter = ';' )
df_city.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
df_citycode = pd.read_csv('city_code.txt',sep="=")
df_citycode.columns

Index(['code', 'city'], dtype='object')

## Step 2: Explore and Assess the Data


### Explore the Data¶
I94 immigration data - We wil drop data points with the city code i94cit is not a valid value. This is described in I94_SAS_Labels_Description.SAS

U.S. City Demographic Data: City  - we will drop the city with not in the list in the city code i94cit

Temperature Data - We will filter the country to US, drop data points with null AverageTemperature and aggregate the average averageTemperature by  city

### Clean Data

In [8]:
# Explore Immigrants data
df_spark.count()

3096313

In [9]:
# No missing value in i94cit
df_spark.where(df_spark[ 'i94cit' ] == '').count()

0

In [10]:
citycode = df_citycode['code'].tolist()
len(citycode)

236

In [11]:
df_spark= df_spark.withColumn('i94cit', df_spark[ 'i94cit' ].cast('integer'))

In [12]:
# There are 2702245 records out of 3096313 have  valid city code
df_immigration = df_spark.filter(df_spark.i94cit.isin(citycode))
df_immigration.count()

2702245

In [13]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable =

In [14]:
# Explore temperature data
# There are 8599212 records
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
df_temp.count()

8599212

In [15]:
# After filtering out data points with NaN average temperature
# There are 8235082 records
df_temp = df_temp.filter(df_temp.AverageTemperature != 'NaN')
df_temp.count()

8235082

In [16]:
# After aggreate the averageTemperature
df_temp = df_temp.where(F.lower(F.col('Country')) == 'united states')\
                    .groupBy('City')\
                    .agg(F.mean('AverageTemperature'))
df_temp.show()

+---------------+-----------------------+
|           City|avg(AverageTemperature)|
+---------------+-----------------------+
|      Worcester|      7.341440525809558|
|     Charleston|     18.696557871112546|
|         Corona|      16.12483712696008|
|    Springfield|     10.647931343609901|
|          Tempe|       21.0487690509584|
|North Las Vegas|      17.45498153547133|
|       Thornton|      8.777836262323191|
|        Phoenix|       21.0487690509584|
|      Hollywood|      23.06892444289695|
| Pembroke Pines|      23.06892444289695|
|       Savannah|     19.406439563962774|
|     Toms River|     11.855868547611408|
|  Coral Springs|      23.06892444289695|
|          Omaha|     10.051168201978832|
|      Anchorage|    -2.3016456107756667|
|       Paradise|      17.45498153547133|
|      Allentown|      9.523295607566514|
|   Fort Collins|       8.18163890045814|
|        Anaheim|      16.12483712696008|
|     Greensboro|      14.41886117345303|
+---------------+-----------------

In [17]:
# There are 3490 information for city's temperature
df_temp= df_temp.withColumn('AverageTemperature',F.round(df_temp["avg(AverageTemperature)"], 2))\
                .drop('avg(AverageTemperature)')
                
df_temp.count()

248

In [18]:
# Clean city demo table
# There are 2891 records
df_city = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
df_city.count()

2891

In [19]:
df_city.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [20]:
# There are 596 unique city but over 3000 records in city table
# I found there are multiple entry of race and corresponding every city
df_city.filter(df_city.City == 'Yonkers').show()

+-------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   City|   State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+-------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|Yonkers|New York|      38.0|          96580|           104538|          201118|              4801|       61247|                   2.8|        NY|  Hispanic or Latino| 73608|
|Yonkers|New York|      38.0|          96580|           104538|          201118|              4801|       61247|                   2.8|        NY|Black or African-...| 38731|
|Yonkers|New York|      38.0|          96580|           104538|          201118|              4801|       61247|             

In [21]:
df_city.select('City','State').dropDuplicates().count()

596

In [22]:
# We would like to pivot race and count to make city as unique key in this table
df_city = df_city.withColumn('Count',df_city.Count.cast('integer'))
df_city = df_city.groupby('City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code').pivot('Race').max('Count').fillna(0)

In [23]:
df_city.count()

596

In [24]:
# Change data type
df_city= df_city.withColumn('Median Age', F.round(F.col('Median Age').cast('float'),2)) \
                .withColumn('Male Population', F.col('Male Population').cast('integer')) \
                .withColumn('Female Population', F.col('Female Population').cast('integer')) \
                .withColumn('Total Population', F.col('Total Population').cast('integer')) \
                .withColumn('Number of Veterans', F.col('Number of Veterans').cast('integer')) \
                .withColumn('Average Household Size', F.round(F.col('Average Household Size').cast('float'),2)) 

In [25]:
df_city.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: float (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: float (nullable = true)
 |-- State Code: string (nullable = true)
 |-- American Indian and Alaska Native: integer (nullable = true)
 |-- Asian: integer (nullable = true)
 |-- Black or African-American: integer (nullable = true)
 |-- Hispanic or Latino: integer (nullable = true)
 |-- White: integer (nullable = true)



## Step 3: Define the Data Model


### 3.1 Conceptual Data Model
#### Fact Table - Contain information from the I94 immigration data showing the immigrant events  

Columns:    
cicid -- primary key  
i94yr  
i94mon  
i94cit   
i94port  
arrdate  
i94mode   
depdate   
i94visa   
airline  
visatype  

#### Dimension Table  - This will contain information of immigrants profile. The columns is selected from I94 immigration data

Columns:  
cicid --primary key    
i94cit  
i94res  
i94addr   
i94bir  
occup  
biryear  
gender  


#### Dimension Table 2 - This will contain the information of city they arrived. The table will created by join city temperature and city demographic table

Columns:
City -- primary key  
State   
Median Age    
Total Population   
Foreign-born   
Average Household Size   
State Code   
American Indian and Alaska Native   
Asian   
Black or African-American   
Hispanic or Latino   
White   
AverageTemperature   


### 3.2 Mapping Out Data Pipelines
Pipeline Steps:

1. Clean I94 data to create Spark dataframe df_immigration.  
2. Clean city demographic data to create spark datafram df_city.  
3. Clean temperature data to create Spark dataframe df_temp.  
4. Create fact table tables and write to parquet file partitioned by i94port.  
5. Create immigrats profile table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94cit.  
6. Create city dimension table by joining df_temp to df_city and write to parquet file .  


## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model
Build the data pipelines to create the data model.

#### STEP 1 Create Fact Table

In [26]:
# Read Spark dataframe of immigrant data 
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_citycode = pd.read_csv('city_code.txt',sep="=")
citycode = df_citycode['code'].tolist()

In [27]:
# Clean I94 immigration data and store as Spark dataframe
df_spark= df_spark.withColumn('i94cit', df_spark[ 'i94cit' ].cast('integer'))
df_immigration = df_spark.filter(df_spark.i94cit.isin(citycode))

In [28]:
# Extract columns for immigration fact table
fact_table = df_immigration.select(["cicid","i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa","airline","visatype"])

In [29]:
# Write immigration dimension table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/result/fact.parquet")

#### Step 2 Create Dimension Table - Immigrants Profile

In [30]:
# Extract columns for temperature dimension table
immigrants_table = df_immigration.select(["cicid", "i94cit", "i94res","i94addr","i94bir","occup","biryear","gender"])

In [31]:
# Write temperature dimension table to parquet files partitioned by i94port
immigrants_table.write.mode("append").partitionBy("i94cit").parquet("/results/immigrats.parquet")

#### STEP 3 Create Dimension Table - City 

In [32]:
# Create the fact table by joining the immigration and temperature views
city_table =df_city.join(df_temp, F.lower(df_city.City) == F.lower(df_temp.City) , how='left').drop(df_temp.City)
city_table = city_table.select('City', 
'State',
'Median Age',
'Total Population',
'Foreign-born',
'Average Household Size',
'State Code',
'American Indian and Alaska Native',
'Asian',
'Black or African-American',
'Hispanic or Latino',
'White',
'AverageTemperature')
city_table = city_table.withColumnRenamed('Median Age','Median_Age') \
                        .withColumnRenamed('Total Population','Total_Population') \
                        .withColumnRenamed('Foreign-born','Foreign_Born') \
                        .withColumnRenamed('Average Household Size','Average_Household_Size',) \
                        .withColumnRenamed('State Code','State_Code',) \
                        .withColumnRenamed('American Indian and Alaska Native','American_Native',) \
                        .withColumnRenamed('Black or African-American','African_American') \
                        .withColumnRenamed('Hispanic or Latino','Hispanic_Latino') 


In [33]:
# Write fact table to parquet files partitioned by i94port
city_table.write.mode("append").parquet("/results/city.parquet")

### 4.2 Data Quality Checks
Explain the data quality checks you ll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 

In [34]:
# Perform quality checks here
def quality_check(df, table_name, primary_key):
    '''
    Input: Spark dataframe, table_name, primary key of each table
    Output: Print outcome of data quality check
    '''
    
    row_count = df.count()
    primary_key_count = df.select(*primary_key).distinct().count()
    
    if row_count == 0:
        print("Data quality check failed for {} with zero records".format(table_name))
    else:
        print("Data quality check passed for {} with {} records".format(table_name, row_count))
        
    if row_count == primary_key_count:
        print ("Data quality check passed for{} with unique primary key{}".format(table_name, primary_key))
    else:
        print ("Data quality check failed for {} with primary key{}".format(table_name, primary_key))
        
    return 0

In [35]:
# Perform data quality check
quality_check(fact_table, "fact table",["cicid"])
quality_check(immigrants_table, "immigrants profile table",["cicid"])
quality_check(city_table, "city table",["city","state"])


Data quality check passed for fact table with 2702245 records
Data quality check passed forfact table with unique primary key['cicid']
Data quality check passed for immigrants profile table with 2702245 records
Data quality check passed forimmigrants profile table with unique primary key['cicid']
Data quality check passed for city table with 596 records
Data quality check passed forcity table with unique primary key['city', 'state']


0

### 4.3 Data dictionary 

#### Fact Table - This will contain information from the I94 immigration data showing the immigrant events

Columns:    
cicid -- primary key  
i94yr = 4 digit year  
i94mon = numeric month  
i94cit = 3 digit code of origin city  
i94port = 3 character code of destination city  
arrdate = arrival date  
i94mode = 1 digit travel code  
depdate = departure date  
i94visa = reason for immigratio  
airline  = airline taken  
visatype  = type of visa  

#### 1st Dimension Table - This will contain information of immigrants profile. The columns is selected from I94 immigration data

Columns:  
cicid --primary key        
i94cit = origin city of immigrants  
i94res =  residience of immigrants  
i94addr = addresss  of immigrants  
i94bir  = birth date of immigrants  
occup  = occupation of immigrants  
biryear  = the year of birth of immigrants  
gender = immigrants gender   


#### 2nd Dimension Table - This will contain the information of city they arrived. The table will created by join city temperature and city demographic table

Columns:
City -- primary key    
State -- primary key   
Median Age = the average age  
Total Population  = total population of the city  
Foreign-born   = the population borned in foreign country  
Average Household Size = average household size    
State Code  = the code of state  
American Indian and Alaska Native = the number of race American Indian and Alaska Native  
Asian  = the number of race Asian  
Black or African-American = the number of race  Black or African-American  
Hispanic or Latino    = the number of race Hispanic or Latino  
White = the number of race White  
AverageTemperature  = the average temperature of the city  


## Step 5: Complete Project Write Up
#### The rationale for the choice of tools and technologies for the project.
For this project, Spark was used because it is efficient to handle multiple file formats (SAS, csv) with large amounts of data. To reach the goal of creating a database of immigrants information, Spark SQL was used to process the input raw files into dataframes and manipulated those data via spark function.

For example, Spark is useful in cleaning large number of raw data and aggregate the data. For example, the command 'df_immigration = df_spark.filter(df_spark.i94cit.isin(citycode))' filtered out the invalid values in the city code.   
The command "df_temp = df_temp.where(F.lower(F.col('Country')) == 'united states').groupBy('City').agg(F.mean('AverageTemperature'))" got the average temperature of each city in each country. By using these function, we can conduct the data cleaning tasks to prepare the data for creating fact table and dimension tables.

Spark is also very important when it comes to creating tables. The select funtion and join function enable to get the right features for each table. The withColumn function and withColumnRenamed helped to make the column in an appropriated format.

Last but not least, Spark plays an important role when writing the final table. For example, "immigrants_table.write.mode("append").partitionBy("i94cit").parquet("/results/immigrats.parquet")" the command helps to write the big immigrants table to database in an more organized way so analyst can retrieve those data more efficiently by filtering the city code. It improve the efficiecny and save time in the future work.


#### Data updated and the reason
Since the format of the raw files in monthly format, we should continue pulling the data monthly.

#### Write a description of how you would approach the problem differently under the following scenarios:
1. If the data was increased by 100x:  
We can store the raw data in S3 and load data into Redshift when we would like to do the analysis every month because Redshift is optimized for aggregation and read-heavy workloads  

2. The data populates a dashboard that must be updated on a daily basis by 7am every day:  
AirFlow is an ideal tool to create dashboard for data pipeline and it can create DAG retries or send emails when failure happens so that data engineer can fix the problem on time.  

3. The database needed to be accessed by 100+ people:  
The more people accessing the database the more cpu resources you need to get a fast experience. By using a distributed database you can to improve your replications and partitioning to get faster query results for each user.