# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project will be analysing immigration data in US and provide an insights into the various types of visa issued to immigrants as well as the demographics of immigrants. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os 
import glob

import datetime
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, weekofyear, date_format, isnull
from pyspark.sql.types import StringType, DateType , IntegerType


In [2]:
# Create a Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
(TO BE REMOVED)
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>


This project will extract the following data from various sources and create a fact and dimension tables. 

<ol> 
<li> I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project. </li>
<li> World Temperature Data: This dataset came from Kaggle. You can read more about it here.  </li>
<li> U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it here.  </li>
<li> Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from here. </li>
<li> Visa type: This is a simple table of visa codes and corresponding visa descriotion. It comes from I94_SAS_Labels_Description.SAS </li>
</ol> 

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

####  Configure data sources

In [3]:
# Read in the data here
airport_data           = './data_sources/airport-codes_csv.csv'
immigration_data       = "./data_sources/immigration_data_sample.csv"
us_cities_demographics = "./data_sources/us-cities-demographics.csv"
visa_type              = "./data_sources/visa.csv"

df_immigration_data       = spark.read.format('csv').options(header='true').load( immigration_data )
df_airport_codes          = spark.read.format('csv').options(header='true').load( airport_data )
df_us_cities_demographics = spark.read.format('csv').options(header='true' , delimiter=';').load( us_cities_demographics )
df_visa_type              = spark.read.format('csv').options(header='true').load( visa_type )
 
#airport_data_pd           = pd.read_csv( airport_data )
#immigration_data_pd       = pd.read_csv( immigration_data )
#us_cities_demographics_pd = pd.read_csv( us_cities_demographics , delimiter=";")
#visa_type_pd = pd.read_csv( visa_type )

#### Display schema

In [4]:
df_airport_codes.printSchema()
df_immigration_data.printSchema()
df_us_cities_demographics.printSchema()
df_visa_type.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- 

In [5]:
#### Read immigration data  
#i94_2016_df =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

#write to parquet
#i94_2016_df.write.parquet("sas_data")
#i94_2016_df=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### Explore and clean immigrants data 

In [14]:
df_immigration_data.limit(10).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,...,,M,1965.0,10072016,M,,DL,736852585.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,...,,M,1968.0,10112016,F,,CX,786312185.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,...,,M,1983.0,6302016,F,,BA,55474485033.0,00117,WT
8,617174,1230572.0,2016.0,4.0,438.0,438.0,LOS,20551.0,1.0,CA,...,,M,2012.0,7052016,F,,QF,55743814633.0,00015,WT
9,2497156,5056736.0,2016.0,4.0,209.0,209.0,PHI,20571.0,1.0,HI,...,,M,1944.0,7252016,M,,DL,59336618033.0,00598,WT


##### Remove rows with missing values in i94port, i94addr, i94visa and gender

In [15]:
# immigration_data_pd = immigration_data_pd.dropna(how="any", subset=["i94port", "i94addr", "i94visa", "visatype" , "gender"]) 
df_immigration_data = df_immigration_data.dropna(how="any", subset=["i94port", "i94addr", "i94visa", "visatype" , "gender"])

In [16]:
#test = df_immigration_data.select(['i94addr']).drop_duplicates()
#test.show() 
#df_immigration_data.limit(10).toPandas()
#immigration_data_pd.head()

In [9]:
# Create list of valid state from df_us_cities_demographics
valid_states = df_us_cities_demographics.toPandas()["State Code"].unique()
print(valid_states)

# Create user defined function to validate 'state' data
@udf(StringType())
def validate_state(n): 
    if n in valid_states: 
        return n
    return 'other'

['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [10]:
# Extract data with valid states
df_immigration_data = df_immigration_data.withColumn("i94addr" , validate_state(df_immigration_data.i94addr))
df_immigration_data.limit(5).toPandas()


Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [21]:
# Keep US state data ( state != 'other')
df_immigration_data = df_immigration_data.filter(df_immigration_data.i94addr != 'other') 
df_immigration_data.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [22]:
# Create user defined function to convert date to PySpark date
#@udf(StringType())
#def convert_datetime(x):
 #   if x:
  #      #return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
   #     return datetime.datetime(1960, 1, 1) + datetime.timedelta(seconds=x)
    #return None

@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [23]:
# Convert arrival_date  (NOT WORKING)
#df_immigration_data_clean1 = df_immigration_data_clean.withColumn("date", convert_datetime(df_immigration_data_clean.arrdate))
#df_immigration_data_clean1.limit(5).toPandas()

In [25]:
# group data by city and state
df_immigration_data_clean = df_immigration_data.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("arrdate"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()
 

In [26]:
df_immigration_data_clean.limit(5).toPandas()

Unnamed: 0,id,arrdate,city_code,state_code,age,gender,visa_type,count
0,1643294.0,20553.0,NYC,MN,33.0,M,1.0,1.0
1,699077.0,20548.0,BOS,TX,43.0,M,2.0,1.0
2,3108923.0,20561.0,NEW,NY,38.0,M,1.0,1.0
3,982461.0,20550.0,MIA,FL,9.0,F,2.0,1.0
4,2735489.0,20559.0,PSP,CA,5.0,F,2.0,1.0


In [45]:
# store in staging 
staging_immigration_df = df_immigration_data_clean.select("id", "arrdate" ,"state_code", "city_code", "gender", "age", "visa_type", "count").drop_duplicates()


##### Explore and clean airport data 

In [28]:
#airport_data_pd.head()
df_airport_codes.limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


#### Clean airports dataset by filter only iso_country = US

#print(airport_data_pd["type"].unique())

In [29]:
#airport_data_pd = airport_data_pd.query('iso_country == "US"')
df_airport_codes_cleaned = df_airport_codes.filter(df_airport_codes["iso_country"] == "US")



#### Clean airports dataset by filter only type = (small / medium / large) airports


In [30]:
#airport_data_pd = airport_data_pd.query('type in ("small_airport","medium_airport","large_airport")')
df_airport_codes_cleaned = df_airport_codes_cleaned.filter( (df_airport_codes_cleaned["type"] == "small_airport") | (df_airport_codes_cleaned["type"]=="medium_airport") | (df_airport_codes_cleaned["type"] == "large_airport") ) 

##### Verify records

In [31]:
df_airport_codes_cleaned.createOrReplaceTempView("df_airport_codes_table")
spark.sql("SELECT distinct type  FROM df_airport_codes_table").show()

+--------------+
|          type|
+--------------+
| large_airport|
|medium_airport|
| small_airport|
+--------------+



In [32]:
staging_airport_df = df_airport_codes_cleaned.select("ident", "type", "name", "continent", "iso_country", "iso_region" , "local_code" ,"coordinates" ).drop_duplicates()


#### Verify US cities demographics data

In [35]:
df_us_cities_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [37]:
staging_demographics_df = df_us_cities_demographics.select("City", "State", "Median Age" , "Male Population" , "Female Population" , "Total Population" , "State Code" ,"Race" ,"Count").drop_duplicates()

In [49]:
# staging_demographics_df.limit(10).toPandas()

#### Verify Visa type data

In [39]:
df_visa_type.limit(5).toPandas()

Unnamed: 0,visa_code,visa
0,1,Business
1,2,Pleasure
2,3,Student


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

##### Staging Table
<ul>
<li> staging_immigration_df 
     <ul><li> id, arrdate, state_code, city_code, gender, age, visa_type, count </li></ul></li>
<li> staging_airport_df 
     <ul><li> ident, type, name, continent, iso_country, iso_region, local_code, coordinates</li></ul></li>
<li>staging_demographics_df
    <ul><li> City, State, Median Age, Male Populaiton, Female Populaiton, Total Population, City Code, State Code, Race, Count</li></ul></li>
</ul>     
       
##### Star Schema
###### Fact Table
<ul>
<li>immigration_fact
    <ul><li>arrdate, state_code, city_code, visa_type, count</li></ul>
</li>
</ul>     
    
###### Dimension Table
<ul>
<li> airport    
     <ul><li> ident, type, name, continent, iso_country, iso_region, local_code, coordinates </li></ul></li>  
<li> demographics
    <ul> <li> City, State, Median Age, Male Populaiton, Female Populaiton, Total Population, City Code, State Code, Race</li> </ul> </li> 
<li> visa    
    <ul><li>visa_code, visa </li> </ul></li>   
<li> immigrant
    <ul><li>id, gender, age, visa_type</li></ul></li>
</ul>    

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
<ol>
    <li> Data cleaning</li>
    <li> Load data into staging tables </li>
    <li> Create and load data into fact and dimensions table</li></ol>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [93]:
# Fact table

In [94]:
immigration_fact = staging_immigration_df.select("state_code", "city_code", "visa_type", "count").drop_duplicates()

In [95]:
# Dimensions table

In [96]:
airport = staging_airport_df.select("ident", "type", "name", "continent", "iso_country", "iso_region" , "local_code" ,"coordinates" ).drop_duplicates()

In [97]:
demographics = staging_demographics_df.select("City", "State", "Median Age" , "Male Population" , "Female Population" , "Total Population" , "State Code" ,"Race").drop_duplicates()

In [98]:
visa = df_visa_type.select("visa_code", "visa").drop_duplicates()

In [99]:
immigrant = staging_immigration_df.select("id", "gender", "age", "visa_type").drop_duplicates()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [100]:
# Create user defined function to check for number of data rows

In [101]:
def check_rows(df):
    return df.count() != 0 

In [102]:
# check immigration (fact)

In [103]:
if check_rows(immigration_fact) : 
    print("data quality checks(number of rows) pased for immigration_fact")
    print()
else : 
    print("data quality checks (number of rows) not passed due to 0 records found in immigration fact table")

data quality checks(number of rows) pased for immigration_fact



In [104]:
# check dimensions

In [105]:
if check_rows(airport)  &  check_rows(demographics) & check_rows(visa) & check_rows(immigrant)  : 
    print("data quality checks (number of rows) pased for all dimensions table")
else :
    print("data quality checks (number of rows) not passed due to 0 records found in dimension table")

data quality checks (number of rows) pased for all dimensions table


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Fact Table
 
immigration_fact
<ul> 
    <li>arrdate: arrival date </li>
    <li>state_code: state code of arrival city</li>
    <li>city_code: city code of arrival city</li>
    <li>visa_type: immigrant's visa type</li>
    <li>count: number of immigrant entering US</li>
</ul>


##### Dimension Table

airport
<ul> 
    <li>ident: id of airport </li>
    <li>type : airport type </li>
    <li>name : name of airport </li>
    <li>continent: continent of airport </li>
    <li>iso_country: country of airport </li>
    <li>iso_region: region of airport </li>
    <li>local_code: local code of airport </li>
    <li>coordinates : coordinates of airport </li>
</ul>

  
demographics
<ul> 
    <li>City : city name </li>
    <li>State : state name </li>
    <li>Median Age : median age of city</li>
    <li>Male Populaiton : male population in city  </li>
    <li>Female Populaiton : female population in city </li> 
    <li>Total Population : total population in city </li>
    <li>City Code : city code </li>
    <li>State Code : state code </li>
    <li>Race: race of the population </li>
</ul>

        
visa
<ul> 
    <li>visa_code: code of the visa issued</li>
    <li>visa: name of the visa issued </li>
</ul>
    
    
immigrant
<ul> 
    <li>id: id of immigrant</li>
    <li>gender: gender of immigrant </li>
    <li>age: age of immigrant </li>
    <li>visa_type: visa typed of immigrant  </li>
</ul>


#### Step 5: Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### rationale for the choice of tools and technologies for the project. 

Apache Spark was chosen for its ability to process big data in a distributed manner and it also utilizes in-memory caching to optimized query execution. In addition, it has libraries that enable users to further perform data transaformation to serve other needs.

##### Data update frequency
Data should be updated based on users' reporting needs. If users have to analyse the data on a daily basis, the data should be updated on daily manner. Otherwise, it is recommended to perform an update on monthly basis. 

##### Consider the following scenarios
<ul> 
    <li>
If data was increased by 100x, there might be performance issues when running on users' machine. Users should consider setting up Amazon instances to host the apache spark because users' machine may not have sufficient RAM to process data.  </li>
  <li>
If data populates a dashboard that must be updated on a daily basis by 7am every day, users should consider using Airflow dags to schedule and automate the data pipeline, and set rules to inform users if the dashboard is updated with latest data at 7am. If data is not available by 7am, another rule should be triggered to run the pipeline again so that the dashboard can be refreshed.  </li>
  <li>
If the database needs to be accessed by 100+ people, users should consider hosting a data warehouse (e.g. Amazon redshift cluster) in the cloud as the data availability and accessibility can be guaranteed by the provider's service level agreement.  </li>
 </ul> 