# Project Title
### Data Engineering Capstone Project

#### Project Summary
Integrate different data sources to build a data warehouse in parquet format using a star schema to facilitate data analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np

### Step 1: Scope the Project and Gather Data

#### Scope 
What data do I use? 

I'm using the data from Udacity provided project.

What is my end solution look like? 

We'll be able to analyze immigration data (fact) from different perspectives (dimensions).

What tools did I use?

1. Pandas: exploratory data analysis;
2. PySpark: Extract, Transform and Load data from raw to a data lake structure.

#### Describe and Gather Data 

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so I could take a look at the data in csv format before reading it all in. 
* U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

In [2]:
# Read in the data here
immigration_df = pd.read_csv('immigration_data_sample.csv')

In [3]:
print(immigration_df.columns)
immigration_df.head()

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [5]:
#write to parquet
df_spark.write.mode('overwrite').parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Exploring and cleaning immigration data 


In [6]:
#Renaming columns
fact_immigration = immigration_df[['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'biryear', 'dtaddto', 'gender',
       'airline', 'admnum', 'fltno', 'visatype']].rename(columns={
    'cicid':'id_cic',
    'i94yr': 'year',
    'i94mon': 'month',
    'i94cit': 'cod_country_cit',
    'i94res': 'cod_country_res',
    'i94port': 'cod_port',
    'arrdate': 'arrival_date',
    'i94mode': 'cod_mode',
    'i94addr': 'cod_state',
    'depdate' : 'departure_date',
    'i94bir' : 'age',
    'i94visa': 'cod_visa',
    'dtadfile': 'date_added',
    'biryear': 'birth_year',
    'dtaddto': 'date_admitted',
    'admnum': 'adm_num',
    'fltno': 'flight_number',
    'visatype': 'visa_type'
})

fact_immigration.head()

Unnamed: 0,id_cic,year,month,cod_country_cit,cod_country_res,cod_port,arrival_date,cod_mode,cod_state,departure_date,age,cod_visa,count,birth_year,date_admitted,gender,airline,adm_num,flight_number,visa_type
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,1955.0,7202016,F,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,1990.0,10222016,M,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,1940.0,7052016,M,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,1991.0,10272016,M,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,1997.0,7042016,F,,42322570000.0,LAND,WT


In [7]:
#Adjusting types

fact_immigration['id_cic'] = fact_immigration['id_cic'].apply(np.int64)
fact_immigration['year'] = fact_immigration['year'].apply(np.int16)
fact_immigration['month'] = fact_immigration['month'].apply(np.int16)
fact_immigration['cod_country_cit'] = fact_immigration['cod_country_cit'].apply(np.int16)
fact_immigration['cod_country_res'] = fact_immigration['cod_country_res'].apply(np.int16)
fact_immigration['arrival_date'] = pd.to_timedelta(fact_immigration['arrival_date'], unit='D') + pd.Timestamp('1960-1-1')
fact_immigration['departure_date'] = pd.to_timedelta(fact_immigration['departure_date'], unit='D') + pd.Timestamp('1960-1-1')
fact_immigration['cod_mode'] = fact_immigration['cod_mode'].apply(np.int8)
fact_immigration['age'] = fact_immigration['age'].apply(np.int8)
fact_immigration['cod_visa'] = fact_immigration['cod_visa'].apply(np.int8)
fact_immigration['count'] = fact_immigration['count'].apply(np.int8)
fact_immigration['birth_year'] = fact_immigration['birth_year'].apply(np.int16)
fact_immigration['date_admitted'] = pd.to_datetime(fact_immigration['date_admitted'], format='%m%d%Y', errors='coerce')

fact_immigration.head()

Unnamed: 0,id_cic,year,month,cod_country_cit,cod_country_res,cod_port,arrival_date,cod_mode,cod_state,departure_date,age,cod_visa,count,birth_year,date_admitted,gender,airline,adm_num,flight_number,visa_type
0,4084316,2016,4,209,209,HHW,2016-04-22,1,HI,2016-04-29,61,2,1,1955,2016-07-20,F,JL,56582670000.0,00782,WT
1,4422636,2016,4,582,582,MCA,2016-04-23,1,TX,2016-04-24,26,2,1,1990,2016-10-22,M,*GA,94362000000.0,XBLNG,B2
2,1195600,2016,4,148,112,OGG,2016-04-07,1,FL,2016-04-27,76,2,1,1940,2016-07-05,M,LH,55780470000.0,00464,WT
3,5291768,2016,4,297,297,LOS,2016-04-28,1,CA,2016-05-07,25,2,1,1991,2016-10-27,M,QR,94789700000.0,00739,B2
4,985523,2016,4,111,111,CHM,2016-04-06,3,NY,2016-04-09,19,2,1,1997,2016-07-04,F,,42322570000.0,LAND,WT


In [8]:
fact_immigration

Unnamed: 0,id_cic,year,month,cod_country_cit,cod_country_res,cod_port,arrival_date,cod_mode,cod_state,departure_date,age,cod_visa,count,birth_year,date_admitted,gender,airline,adm_num,flight_number,visa_type
0,4084316,2016,4,209,209,HHW,2016-04-22,1,HI,2016-04-29,61,2,1,1955,2016-07-20,F,JL,5.658267e+10,00782,WT
1,4422636,2016,4,582,582,MCA,2016-04-23,1,TX,2016-04-24,26,2,1,1990,2016-10-22,M,*GA,9.436200e+10,XBLNG,B2
2,1195600,2016,4,148,112,OGG,2016-04-07,1,FL,2016-04-27,76,2,1,1940,2016-07-05,M,LH,5.578047e+10,00464,WT
3,5291768,2016,4,297,297,LOS,2016-04-28,1,CA,2016-05-07,25,2,1,1991,2016-10-27,M,QR,9.478970e+10,00739,B2
4,985523,2016,4,111,111,CHM,2016-04-06,3,NY,2016-04-09,19,2,1,1997,2016-07-04,F,,4.232257e+10,LAND,WT
5,1481650,2016,4,577,577,ATL,2016-04-08,1,GA,2016-06-01,51,2,1,1965,2016-10-07,M,DL,7.368526e+08,910,B2
6,2197173,2016,4,245,245,SFR,2016-04-12,1,CA,2016-06-30,48,2,1,1968,2016-10-11,F,CX,7.863122e+08,870,B2
7,232708,2016,4,113,135,NYC,2016-04-02,1,NY,2016-04-10,33,2,1,1983,2016-06-30,F,BA,5.547449e+10,00117,WT
8,5227851,2016,4,131,131,CHI,2016-04-28,1,IL,2016-05-01,39,2,1,1977,2016-07-26,,LX,5.941342e+10,00008,WT
9,13213,2016,4,116,116,LOS,2016-04-01,1,CA,2016-04-09,35,2,1,1981,2016-06-29,,AA,5.544979e+10,00109,WT


In [9]:
country_df = pd.read_csv('I94_SAS_Labels_Descriptions.SAS', sep = '=', header=None, names=['cod_country', 'country'], skiprows=9, nrows=289, index_col=False)
country_df['country'] = country_df['country'].str.strip().str.replace("'", "")
country_df.head()

Unnamed: 0,cod_country,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [10]:
port_df = pd.read_csv('I94_SAS_Labels_Descriptions.SAS', sep = '=', header=None, names=['cod_port', 'port'], skiprows=302, nrows=660, index_col=False)
port_df['cod_port'] = port_df['cod_port'].str.strip().str.replace("'", "")
port_df['port'] = port_df['port'].str.strip().str.strip("'")
port_df.head()

Unnamed: 0,cod_port,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [11]:
mode_df = pd.read_csv('I94_SAS_Labels_Descriptions.SAS', sep = '=', header=None, names=['cod_mode', 'mode'], skiprows=972, nrows=4, index_col=False)
mode_df['mode'] = mode_df['mode'].str.strip().str.replace("'|;", "")
mode_df.head()

Unnamed: 0,cod_mode,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [12]:
visa_df = pd.read_csv('I94_SAS_Labels_Descriptions.SAS', sep = '=', header=None, names=['cod_visa', 'visa'], skiprows=1046, nrows=3, index_col=False)
visa_df['visa'] = visa_df['visa'].str.strip()
visa_df.head()

Unnamed: 0,cod_visa,visa
0,1,Business
1,2,Pleasure
2,3,Student


### Exploring and cleaning U.S. City Demographic Data

In [13]:
demographics_df = pd.read_csv('us-cities-demographics.csv', sep=';')
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [14]:
dim_demographics = demographics_df[['State Code', 'State', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']].groupby(['State Code', 'State'], as_index=False).sum()
dim_demographics.columns = ['state_code', 'state', 'male_population', 'female_population', 'total_population', 'number_veterans', 'foreign_born']
dim_demographics.head()

Unnamed: 0,state_code,state,male_population,female_population,total_population,number_veterans,foreign_born
0,AK,Alaska,764725.0,728750.0,1493475,137460.0,166290.0
1,AL,Alabama,2448200.0,2715106.0,5163306,352896.0,252541.0
2,AR,Arkansas,1400724.0,1482165.0,2882889,154390.0,307753.0
3,AZ,Arizona,11137275.0,11360435.0,22497710,1322525.0,3411565.0
4,CA,California,61055672.0,62388681.0,123444353,4617022.0,37059662.0


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The purpose of this model is to facilitate OLAP queries. Therefore, I've used a star schema.

![schema.png](./images/schema.png)

#### 3.2 Data Pipeline steps

1. Assumptions
    * I94_SAS_Labels_Descriptions.SAS file must be present in the root directory of the project;
    * us-cities-demographics.csv file must be present in the root directory of the project;
    * ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat file must be present.
2. `python etl.py` to create parquet files in the datalake folder;
3. `python data_quality` to run data quality checks.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Run the etl script.

In [15]:
%run etl.py

Creating lookup dims...
Countries...
Ports...
Modes...
1 Air
2 Sea
3 Land
9 Not reported 
Visa...
Creating US demographic dim...
Creating immigration fact...
Done.


#### 4.2 Data Quality Checks

 * Count checks to ensure completeness.
 
Run Quality Checks

In [16]:
%run data_quality.py

Checking lookup dims...
country...
Ok. Found 289 records.
port...
Ok. Found 358 records.
mode...
Ok. Found 4 records.
visa...
Ok. Found 3 records.
us_demographic...
Ok. Found 49 records.
immigration...
Ok. Found 3096313 records.
Done.


#### 4.3 Data dictionary 

Dimensions

* country
    * **cod_country (PK)** INT
    * country TEXT

* port
    * **cod_port (PK)** TEXT
    * port TEXT
    
* mode
    * **cod_mode (PK)** INT
    * mode TEXT
    
* visa
    * **cod_visa (PK)**: INT
    * visa TEXT

* us_demographic:
    * **state_code (PK)** TEXT
    * state TEXT
    * male_population INT - SUM of the male population grouped by state
    * female_population INT - SUM of the female population grouped by state
    * total_population INT - SUM of the total population grouped by state
    * number_veterans INT - SUM of the number of veterans grouped by state
    * foreign_born INT - SUM of the foreign born grouped by state
    
Fact:

* immigration
    * **id_cic (PK)** INT - ID CIC
    * year INT - 4 digit year
    * month INT - month of immigration
    * **cod_country_cit (FK)** INT - Country of citzen
    * **cod_country_res (FK)** INT - Country of residence
    * **cod_port (FK)** TEXT - Airport code
    * arrival_date DATE - Arrival date
    * **cod_mode (FK)** INT - Mode code
    * **cod_state (FK)** TEXT - State code
    * departure_date DATE - Departure date
    * age INT - Age
    * **cod_visa (FK)** INT - Visa code
    * count INT - Used for summary statistics 
    * birth_year INT - Birth year
    * date_admitted DATE - Admission date
    * gender TEXT - Gender
    * airline TEXT - Airline
    * adm_num INT - Admission number
    * flight_number TEXT - Flight Number
    * visa_type TEXT - Visa type

The fact table was partitioned by year and month to scale up better if necessary.

#### Step 5: Complete Project Write Up

##### Target Audience

* Students learning data pipeline concepts.
* Data professionals to practice their skills.

##### Some example questions

* Which state receives the most new immigrants?
* How do they arrive (Air, sea, etc)?
* What kind of visa do they have (students, pleasure, etc)?

##### Tools and Technologies

* Pandas:  Exploratory data analysis.
* Pyspark: distributed data processing framework.
* Parquet: Highly distributed data format.

##### Data update frequency

* We can update our fact table montly because our table was partitioned by year and month. The other tables are rarely changed, so we can update them once a year.

##### Some scenarios

* The data was increased by 100x.

 We can migrate our local datalake to AWS S3 and our data pipeline to AWS EMR.

* The data populates a dashboard that must be updated on a daily basis by 7am every day.

 We can use Apache Airflow to schedule our data pipeline and use a SLA to check this constraint.

* The database needed to be accessed by 100+ people.

 We can create a data pipeline to migrate our S3 data lake to a RedShift instance. Redshit can handle it.

### Step 6: Answering the question: How do they arrive (Air, sea, etc)? 

In [17]:
immigration = spark.read.parquet("./datalake/immigration")
immigration.createOrReplaceTempView("immigration")

mode = spark.read.parquet("./datalake/mode")
mode.createOrReplaceTempView("mode")

spark.sql("""
    SELECT m.mode, count(*) as total 
    FROM immigration i 
    INNER JOIN mode m 
    ON i.cod_mode = m.cod_mode
    GROUP BY m.mode
    ORDER BY 2 desc
""").toPandas()

Unnamed: 0,mode,total
0,Air,2994505
1,Land,66660
2,Sea,26349
3,Not reported,8560


As expected, most immigrants came by plane.