# Analyze I94 Immigration Data and the U.S. City Demographic Data
### Data Engineering Capstone Project

#### Project Summary
In this project we we analyze the immigration dataset I94 Immigration Data and the U.S. City Demographic Data. Based on these data we could try to discover correlations between immigration and total population in certain states. Additionaly it would be possible to discover how much the immigration influences the foreign born statistic or do certain states with higher foreign born values attract more immigration. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [188]:
# Do all imports and installs here
import pandas as pd
import configparser
import logging
import os
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.types import StringType, IntegerType

### Step 1: Scope the Project and Gather Data

#### Scope 

For this project Apache Spark will be used for analyzing the datasets and preparing the data. Following datasets will be used: I94 Immigration Data and the U.S. City Demographic Data
As the immigration dataset is very large we will use the data for APRIL 2016, this dataset allown contains over 3 million rows so this should be enough for fulfilling the project scope regrading the number of recrods. To additionaly enrich the data the demogprahic data is loaded to be able to analyze correlations bettwen these datasets. 

The data is loaed into datasets and cleaned of NaN values or empty rows. Additional dimension tables are generated out of the immigration data and demographic data. In the jupyter notebook we will first analyze the data and try discover best ways how to clean the data and how to setup the datamodel. The focus will be on local execution but the path variables in the configuration capstone file `capstone.cfd` is also prepared that we could execute the pipeline in the cloud with EMR. The ETL pipeline file is `etl.py `



##### I94 Immigration Data Description
This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.
Data is locally stored in the workspace. This dataset is stored seperatley per month in the SAS format. We will use as test only the first data file for the APRIL `i94_apr16_sub.sas7bdat`. The path to the dataset is stored in the configuration file `capstone.cfg` under the keys `['DATASETS']['IMMIGRATION_DATA_PATH_LOCAL']`. To be a valid dataset for the capstone project it needs to contain at least 1 milion rows. 

In [138]:
config = configparser.ConfigParser()
config.read_file(open('capstone.cfg'))
imigration_file_path= config['DATASETS']['IMMIGRATION_DATA_PATH_LOCAL']
imigration_data_files=config['DATASETS']['IMMIGRATION_DATA_FILE'] 
imigration_path_and_file = os.path.join(imigration_file_path, imigration_data_files)
print('File path: {}'.format(imigration_path_and_file))

#Load imigration data over spark, downloading of teh package is not working so we will work with pandas dataframes
#spark = SparkSession.builder.\
#config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
#.enableHiveSupport().getOrCreate()
#df =spark.read.format('com.github.saurfang.sas.spark').load(imigration_path_and_file)

# Load imigration dataset into pandas
df = pd.read_sas(imigration_path_and_file, format='sas7bdat', encoding='ISO-8859-1')

count_all_imigration_records = len(df)
print('Number of rows: {}'.format(count_all_imigration_records))

File path: datasets/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
Number of rows: 3096313


In [110]:
# Validate if dataset has more then 1 million rows
assert count_all_imigration_records > 1000000, 'Dataset must have more the 1 million rows'
print('Total number of imigration records: {}'.format(count_all_imigration_records))
num_of_columns_before_cleanup = df.shape[1]
print('Total number of columns: {}'.format(num_of_columns_before_cleanup))

Total number of imigration records: 3096313
Total number of columns: 28


In [111]:
# Check dataset structure
df.head(7)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2


In [112]:
# Print data types of imigration database
# Details descirption is found in SAS file in dataset directory
df.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile     object
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu      object
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum       object
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object

##### U.S. City Demographic Data

In [113]:
config.read_file(open('capstone.cfg'))
demographic_file_path= config['DATASETS']['DEMOGRAPHIC_DATA_PATH_LOCAL']
demographic_file_name=config['DATASETS']['DEMOGRAPHIC_DATA_FILE'] 
demographic_path_and_file = os.path.join(demographic_file_path, demographic_file_name)
print('File path: {}'.format(demographic_path_and_file))

df_demographic = pd.read_csv(demographic_path_and_file,sep=';', header=0)
count_all_demographic_recrods = len(df_demographic)
print('Number of demographic rows: {}'.format(count_all_demographic_recrods))

File path: datasets/us-cities-demographics.csv
Number of demographic rows: 2891


In [114]:
df_demographic.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [115]:
# Print data types of demographic
df_demographic.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

### Step 2: Explore and Assess the Data



#### Explore of I94 Immigration Data 
We will frist try to find the columns containing most of the missing values. As a threshold we will define if a column has more the 70% missing values it will be removed from the dataset.

In [116]:
percent_missing = df.isnull().sum() * 100 / len(df)

missing_value_df = pd.DataFrame({'column_name': df.columns,
                                 'percent_missing': percent_missing})

missing_value_df.sort_values('percent_missing', ascending=False, inplace=True)

# Show all columns that have more the
missing_value_df_above_70 = missing_value_df[missing_value_df.percent_missing > 70]

missing_value_df_above_70

Unnamed: 0,column_name,percent_missing
entdepu,entdepu,99.98734
occup,occup,99.737559
insnum,insnum,96.327632


###### Cleaunup of I94 Immigration Data 
For the immigration data we will remove every column that has more the 70% missing values. Also we will remove any duplicates from the primary keys and drop all rows containing NaN values as primary key. 

In [117]:
cleaned_df = df.drop(columns=missing_value_df_above_70['column_name'].tolist())
num_columns_after_cleanup = cleaned_df.shape[1]
print('Number of columns after cleanup: {}'.format(num_columns_after_cleanup))

Number of columns after cleanup: 25


Lets try to cleanup all duplicates and remove keys with NAN values for the cicid primary key. 

In [118]:
cleaned_df = cleaned_df.drop_duplicates(subset='cicid')
cleaned_df = cleaned_df.dropna(subset = ['cicid'])

print('Number of rows before cleanup: {}'.format(count_all_imigration_records))
print('Number of rows after cleanup: {}'.format(len(cleaned_df)))
print('Number of columns before cleanup: {}'.format(num_of_columns_before_cleanup))
print('Number of columns after cleanup: {}'.format(num_columns_after_cleanup))

Number of rows before cleanup: 3096313
Number of rows after cleanup: 3096313
Number of columns before cleanup: 28
Number of columns after cleanup: 25


#### Explore of U.S. City Demographic Data

We will frist try to find the columns containing most of the missing values. As a threshold we will define if a column has more the 70% missing values it will be removed from the dataset.

In [119]:
percent_missing_dem = df_demographic.isnull().sum() * 100 / len(df_demographic)

missing_value_df_dem = pd.DataFrame({'column_name': df_demographic.columns,
                                 'percent_missing': percent_missing_dem})

missing_value_df_dem.sort_values('percent_missing', ascending=False, inplace=True)

missing_value_df_above_01_dem = missing_value_df_dem[missing_value_df_dem.percent_missing > 0.1]
missing_value_df_above_01_dem

Unnamed: 0,column_name,percent_missing
Average Household Size,Average Household Size,0.553442
Number of Veterans,Number of Veterans,0.449671
Foreign-born,Foreign-born,0.449671
Male Population,Male Population,0.10377
Female Population,Female Population,0.10377


As we don't have a significat NaN values in certain collums we will continue with searching other data inconsistency. 
We will continues do check if there are duplicate values or Nan values. As pimary key mutliple columns must be used: `'City','Race','State Code'`

In [122]:
clean_df_demographic = df_demographic.drop_duplicates(subset=['City','Race','State Code'])
clean_df_demographic = clean_df_demographic.dropna()

print('Number of rows before cleanup: {}'.format(count_all_demographic_recrods))
print('Number of rows after cleanup: {}'.format(len(clean_df_demographic)))

Number of rows before cleanup: 2891
Number of rows after cleanup: 2875


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![Datamodel](img/data_model_3.png)

The fact table comes from the immigration dataset. This dataset in his raw setup contains 28 columns. With addtitonal cleanup of columns which has NaN values the fact datamode is reduce to 25 columns. The primary key is the cicid column which we rename to *id*. 

The dimensions tables are generated from the immigration dataset values and additionaly from the demographics dataset.
The *dim_port_code* dimension tables is generated from the descprition of the immigration dataset. The port codes are saved in a seperated csv files under [port_codes](datasets/port_codes.csv). 

The *dim_arr_date* table is created from the arrival date of the immigration dataset. Additionaly the data is enriched with month, weekday, day, hour, year data from the arrival date. The *dim_visa_type* table contains visa_types from the immigration datasets. The *dim_gemographics* table contains data from the demographics datasets about based on the US state level. The link to the immigration dataset is the arrival state in the immigration fact table. 

Based on the correltions between the fact and demographics dimension we could try to discover possible relations betwwen states with high population and immigration arrival. Additional relations between foreign born values in certain stats or houshold sizes could be disovered and immigration arrivals in certain states.  


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Lets first create the dimenstion tables out of the cleanded datasets. 

##### 4.1.1 Analyze content of  visatype dimension table

In [162]:
# Get contetn of dimenstion visatype table
distinct_visatype = df['visatype'].unique()
visatype_dimensions = pd.DataFrame(distinct_visatype)
visatype_dimensions

Unnamed: 0,0
0,B2
1,F1
2,B1
3,WT
4,WB
5,E2
6,I
7,F2
8,E1
9,M1


##### 4.1.2 Analyze port dimension table

In [161]:
config.read_file(open('capstone.cfg'))
port_file_path= config['DATASETS']['PORT_DATA_PATH_LOCAL']
port_file_name=config['DATASETS']['PORT_DATA_DATA_FILE'] 
port_path_and_file = os.path.join(port_file_path, port_file_name)
print('File path for port data: {}'.format(port_path_and_file))
df_port = pd.read_csv(port_path_and_file,sep=';', header=0)
df_port.head(10)

File path for port data: datasets/port_codes.csv


Unnamed: 0,port,city,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKERAAF-BAKERISLAND,AK
3,DAC,DALTONSCACHE,AK
4,PIZ,DEWSTATIONPTLAYDEW,AK
5,DTH,DUTCHHARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


##### 4.1.3 Analyze arravial date table

In [202]:
def create_arr_date_table(df):
    """
    This method creates arrival date table from immigration data frame as input
    :param df: dataframe of immigration data
    :return: arr_date table with arrival date broken down to single values
    """

    # SAS date value is a value that represents the number of days between January 1, 1960, and a specified
    # This UDF converts SAS data value to standard datetime format
    @udf(StringType())
    def convert_sas_to_datetime(x):
        if x:
            return datetime(1960, 1, 1).date() + dt.timedelta(x).isoformat()
        return None

    arr_date_table = df.select(['arrdate']).withColumn("arr_date", convert_sas_to_datetime(df.arrdate)).distinct()
    arr_date_table = arr_date_table.withColumn('arrival_day', dayofmonth('arr_date'))
    arr_date_table = arr_date_table.withColumn('arrival_week', weekofyear('arr_date'))
    arr_date_table = arr_date_table.withColumn('arrival_month', month('arr_date'))
    arr_date_table = arr_date_table.withColumn('arrival_year', year('arr_date'))
    arr_date_table = arr_date_table.withColumn('arrival_weekday', dayofweek('arr_date'))
    arr_date_table = arr_date_table.withColumn('id', monotonically_increasing_id())
    return arr_date_table

#### 4.2 Data Quality Checks

The immigration datasets and dimension datasets are validate if the contain any empty ids. Additionaly the generated dimension tables visa_type and arr_date are validate if they the same number of distinct records like the fact tables.
 
Example quality checks of immigration dataset:

In [204]:
def data_quality_check_immigration(spark, immigration_fact, dimension_port_code, dimension_arr_date,
                                   dimension_visa_type):
    """
    Method checks if all tables dont contain null values in primary key. Also dimension table check are executed
    :param spark: spark session context
    :param immigration_fact: immigration dataframe
    :param dimension_port_code:  port_code dataframe
    :param dimension_arr_date:  arr_date dimension dataframe
    :param dimension_visa_type: visa_type dimension
    :return:
    """
    sql_check = [
        ('SELECT COUNT(*) FROM immigration_fact WHERE id IS NULL', 0),
        ('SELECT COUNT(*) FROM dimension_port_code WHERE port_code IS NULL', 0),
        ('SELECT COUNT(*) FROM dimension_arr_date WHERE arr_date IS NULL', 0),
        ('SELECT COUNT(*) FROM dimension_visa_type WHERE visa_type IS NULL', 0)
    ]
    for query in sql_check:
        print('--------------------EXECUTING SQL TEST-------------------------')
        sql_query, expected_result = query
        print("Validating query:{} , Expecting result {}".format(sql_query, expected_result))
        result = spark.sql(sql_query)
        new_count = result.collect()[0][0]
        if new_count != expected_result:
            raise ValueError("ERROR Validating query:{} , Expected result: {}, Returned_result: {}".format(sql_query,
                                                                                                           expected_result,
                                                                                                           new_count))
        else:
            print("Validation OK! Validating query:{} , Expected result: {}, Returned_result: {}".format(sql_query,
                                                                                                         expected_result,
                                                                                                         new_count))

    distinct_arr_date_fact = immigration_fact.select("arr_date").distinct().count()
    distinct_arr_date_dimension = dimension_port_code.distinct().count()
    if distinct_arr_date_fact != distinct_arr_date_dimension:
        ValueError(
            "arr_date in fact table {} are not equal arr_date in dimension table {}".format(distinct_arr_date_fact,
                                                                                            distinct_arr_date_dimension))

    distinct_visa_type_fact = immigration_fact.select("visa_type_id").distinct().count()
    distinct_visa_type_dimension = dimension_visa_type.distinct().count()
    if distinct_visa_type_fact != distinct_visa_type_dimension:
        ValueError(
            "visa_type in fact table {} are not equal visa_type in dimension table {}".format(distinct_visa_type_fact,
                                                                                              distinct_visa_type_dimension))

#### 4.3 Data dictionary 

<center><b> fact_imigration</cetner>  <b>
<br/>
    
| Column        | Description   |
| :------------- |:--------------|
| id            | created from cicid used from as primary key |
| arr_date      | arrivale date in the USA.      |
| arr_mode      | arrival mode origin field is i94mode      |
| dep_date      | departure date       |
| age      | age at arrivale from I94BIR column     |
| visa_code      | I94VISA - Visa codes collapsed into three categories:1 = Business, 2 = Pleasure,  3 = Student |
| port_code      | port code from i94port      |  
| visa_post      | Department of State where where Visa was issued     |    
| gender         | Gender    |      
| airline        | Airline used to arrive in U.S.     |    
| visa_post      | Department of State where where Visa was issued     |    
| flight_num     | Flight number    |  
| visa_type_id   | Foreign key of visa type dimension table    |  
| birth_country_code   | code of birth country   |  
| residence_code   | residence code from I94RES    |  
| match_fag   | Match of arrival and departure records    |  
| birth_year   | birth year   | 
| stay_until   | Date to which admitted to U.S. (allowed to stay until)   | 
| adm_num   | Admission Number   | 
| arr_flag   | Arrival Flag - admitted or paroled into the U.S | 
| dep_flag   | Departure Flag - Departed, lost I-94 or is deceased   | 
|dtadfile|Character Date Field - Date added to I-94 Files|
|year |4 digit year |
|month|Numeric month |
|state|Address state|
    
<br/><br/>

 <center> <b> dim_port_code <b></center>

| Column        | Description   |
| :------------- |:-------------|
| port_code            | airport code |
| city      | airport city      |
| state      | airport state|
        
     
<br/><br/>
<center> <b> dim_demographics <b></center>
        
| Column        | Description   |
| :-------------- |:-------------|
| median_age            | city median age |
| male_population      | male population of the city |
| number_of_veterans      | number of veterane|
| female_population            | female population |
| total_population      | total population in city   |
| state_code      | city state code|   
| average_household_size      | average household size|   
| foreign_born      | foreign born in city|   
| city| city name | 
| state_name| state name | 
| state_code| state code| 
| race|  race name | 
| count| population number in certain race | 

<br/><br/>
<center> <b> dim_visa_type <b></center>
    
| Column        | Description   |
| :------------- |:-------------|
| visa_type_id            | unique id of visa type|
| visa_type      | Class of admission legally admitting the non-immigrant to temporarily stay    |

<br/><br/>
<center> <b> dim_arr_date <b></center>

| Column        | Description   |
| :------------- |:-------------|
| arrdate            | arrival date |
| arr_day      | arrival day      |
| arr_week      | arrival week|
| arr_month      | arrival month|    
| arr_year      | arrival year|
| arr_weekday      | arrival weekday|    
| id      | unique key|  
   

#### Step 5: Complete Project Write Up

Following tehnologies has been used for the project:

- Python - Python is easy to learn as well because of its simple syntax. This simple, readable syntax helps Big Data pros to focus on insights managing Big data, rather than wasting time in understanding technical nuances of the language. This one is one of the primary reasons to choose Python for Big Data.

- Apache Pysport - Provides several advantages over MapReduce: it is faster, easier to use, offers simplicity, and runs virtually everywhere. It has built-in tools for SQL, Machine Learning, and streaming which make it a very popular and one of the most asked tools in the IT industry.

- Pandas -  is mainly used for data analysis and for analyzing the immigration and demographic datasets. Pandas allows importing data from various file formats such as comma-separated values, JSON, SQL, Microsoft Excel. Pandas allows various data manipulation operations such as merging, reshaping, selecting, as well as data cleaning, and data wrangling features.

Data can be update on monthly bases as new datasets come out trends can be analyzed on month based especially for the immigration fact table which is partitioned by month. 

##### Questions:

*The data was increased by 100x.*
- Data can be partitioned by lower granularity probably by weekly or daily basis depending on the growth of the data. Additional EMR can be used for such cases adding worker not with more compute power and memory.
    

*If the pipelines were run on a daily basis by 7am*
- Apache Airflow can be used for scheduling workflows 

*If the database needed to be accessed by 100+ people*
- Amazon Redshfit can be used if the data needs cerain access priviliges. Additionaly adding more nodes to Redshift or bigger cluster will help us to deal with the necessary write scaling 
