# 2016 Immigration Sources And Destination Model
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to use the Udacity provided I94 Immigration Dataset and the U.S. City Demographic Dataset to create a database. This database can be used to answer questions about where (countries) immigrants were coming from and where (cities in the U.S.) they were coming to over different periods of time. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [8]:
# Do all imports and installs here
import pandas as pd
import re

### Step 1: Scope the Project and Gather Data

#### Scope 

The objective of this project is to gather information about which countries immigrants are coming from, what are their modes of travel, types of visa and which cities in the U.S. they are coming to in order to analyze if demographics of cities imfluence immigration patterns. 

The datasets used are the I94 immigration dataset for April 2016 along with data dictionary and the city demographcs dataset provided by Udacity. 

The end solution is a number of dimension tables and a fact table arranged in a star schema as detailed below in the conceptual data model section. 

Spark was used to process the data. 

#### Describe and Gather Data 
The I94 immigration dataset comes from US National Tourism and Trade Office and is available [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). 
For the purpose of the present analysis the following columns are relevant:-

- i94cit - 3 digit code of the origin country
- i94port - 3 character code of port of entry
- arrdate - arrival date in the U.S.
- i94mode - 1 digit code indicating mode of travel
- i94bir - age of immigrant
- i94visa - 1 digit code indicating reason for immigration
- count

The city demographics data comes from Opensoft and can be explored further [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/). For the purpose of the present analysis the following columns are relevant:
- City
- State Code
- Median Age
- Male Population
- Female Population
- Total Population
- Foreign-born
- Race

In [3]:
#Read in sample immigration data
df_imm_sample = pd.read_csv('immigration_data_sample.csv')
df_imm_sample.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,...,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,...,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,...,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,...,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,...,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,...,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
#Read in city demographics data set
df_cities = pd.read_csv('us-cities-demographics.csv', sep = ';')
print(df_cities.shape)
df_cities.head()

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


**For the purpose of the present analysis we will only use the immigration data pertaining to April 2016, which contain over 3 million rows, and the city demographics data**

In [3]:
	
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_imm =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet
#df_spark.write.parquet("sas_data")
#df_imm =spark.read.parquet("sas_data")

In [3]:
print(df_imm.count())
df_imm.head(5)

3096313


[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [4]:
df_cities = spark.read.format("csv").option("header", "true").load("us-cities-demographics.csv", sep = ';')

In [6]:
print(df_cities.count())
df_cities.head(5)

2891


[Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924'),
 Row(City='Quincy', State='Massachusetts', Median Age='41.0', Male Population='44129', Female Population='49500', Total Population='93629', Number of Veterans='4147', Foreign-born='32935', Average Household Size='2.39', State Code='MA', Race='White', Count='58723'),
 Row(City='Hoover', State='Alabama', Median Age='38.5', Male Population='38040', Female Population='46799', Total Population='84839', Number of Veterans='4819', Foreign-born='8229', Average Household Size='2.58', State Code='AL', Race='Asian', Count='4759'),
 Row(City='Rancho Cucamonga', State='California', Median Age='34.5', Male Population='88127', Female Population='87105', Total Population='175232', Number of Veterans='5821', Foreign-born='3387

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


**Immigration Dataset**

In [5]:
df  = df_imm.select(['i94cit', 'i94port', 'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count'])
df.printSchema()

root
 |-- i94cit: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)



In [9]:
# Check for missing values in the immigration dataframe

df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+------+-------+-------+-------+------+-------+-----+
|i94cit|i94port|arrdate|i94mode|i94bir|i94visa|count|
+------+-------+-------+-------+------+-------+-----+
|     0|      0|      0|    239|   802|      0|    0|
+------+-------+-------+-------+------+-------+-----+



Let us consider the columns relevant to our analysis :


- i94cit : According to the data dictionary, this column has invalid codes like 'INVALID: INTERNATIONAL WATERS' and 'No Country Code (100)'. 
- i94port : This column has invalid codes like 'XXX : NOT REPORTED/UNKNOWN' or , 'GAC : 'No PORT Code (GAC)'
- arrdate : We can see from the schema printed above that this variable is in a numeric format and will have to be converted. 
- i94mode : There are missing values as well as 'Not reported'.
- i94bir : There are missing values here
- i94visa : No missing values
- count : No missing values

**City Demographics Dataset**

In [6]:
df_cit = df_cities.select(['City', 'State Code', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Foreign-born', 'Race'])
df_cit.printSchema()

root
 |-- City: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Race: string (nullable = true)



In [9]:
# Check for missing values in the immigration dataframe

df_cit.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_cit.columns]).show()

+----+----------+----------+---------------+-----------------+----------------+------------+----+
|City|State Code|Median Age|Male Population|Female Population|Total Population|Foreign-born|Race|
+----+----------+----------+---------------+-----------------+----------------+------------+----+
|   0|         0|         0|              3|                3|               0|          13|   0|
+----+----------+----------+---------------+-----------------+----------------+------------+----+



Let us consider the columns relevant to our analysis :

- City : No missing values
- State Code : No missing values
- Median Age : No missing values
- Male Population : Missing values, variable is string
- Female Population : Missing values, variable is string
- Total Population : No missing values, variable is string
- Foreign-born : Missing values, variable is string
- Race : No missing values


#### Cleaning Steps
Document steps necessary to clean the data

**Immigration Dataset**

- i94cit : We will remove all invalid codes as a first step towards creating a country dimension 
- i94port : We will remove all invalid codes as a first step towards creating a city dimension 
- arrdate : We will convert this to timestamp format prior to creating a time dimension
- i94mode : We will club missing values and 'not reported' together
- i94bir : We will impute missing values with the average for the dataset


**City Demographics Dataset**
- We will group the data by city and state code
- We will add a new column mapped to 'i94port' in the immigration table as a step towards making this a dimension table
- Median Age, Total Population : We will convert to numeric and aggregate
- Male Population, Female population, Foreign-born : We will convert to numeric, impute missing values with mean value and aggregate 


**Immigration Dataset**

In [9]:
#Create dictionary of valid i94cit codes
patt = re.compile(r"^\s*(?P<code>\d+)\s*=\s*'(?P<country>.+)'.*$")
with open('valid_country_code.txt') as f:
    country_lines = f.readlines()
    
matches = [patt.match(line) for line in country_lines]
country_codes_valid = {int(match.group('code')): match.group('country') for match in matches}
assert len(country_lines) == len(country_codes_valid)
#country_codes_valid


In [10]:
#Remove invalid i94cit codes
df = df.filter(df.i94cit.isin(list(country_codes_valid.keys())))

In [11]:
#Create dictionary of valid i94port codes
patt = re.compile(r"^\s*'(?P<code>...?)'\s*=\s*'(?P<name>.+)'.*$")
with open('valid_port_code.txt') as f:
    port_lines = f.readlines()
matches = [patt.match(line) for line in port_lines]
port_codes_valid = {match.group('code'): match.group('name').strip() for match in matches}
assert len(port_codes_valid) == len(port_lines)
#port_codes_valid

In [12]:
#Split city and state
for key, value in port_codes_valid.items():
    port_codes_valid[key] = value.split(",")
#port_codes_valid

In [13]:
#Remove invalid i94port codes
df = df.filter(df.i94port.isin(list(port_codes_valid.keys())))

In [16]:
df.count()

2587323

In [14]:
#Create date column from arrdate
import datetime

get_ts = F.udf(lambda x: (datetime.timedelta(days=x) + datetime.datetime(1960,1,1)).strftime('%Y-%m-%d')) 
df = df.withColumn('arrival_date', F.to_date(get_ts(df.arrdate)))

# create year, month, day, day of week columns from timestamp column
df = df.withColumn("arr_year", F.year(F.col('arrival_date')))\
                .withColumn("arr_month", F.month(F.col("arrival_date")))\
                .withColumn("arr_day", F.dayofmonth(F.col("arrival_date")))\
                .withColumn("arr_day_of_week", F.dayofweek(F.col("arrival_date")))

In [15]:
# Code null values in the i94mode column as 9 ('not reported')

df = df.na.fill(9.0, subset = ['i94mode'])


In [16]:
#Impute missing values in i94bir with mean

mean_val = df.select(F.mean(df['i94bir'])).collect()
mean_age = mean_val[0][0]
df = df.na.fill(mean_age, ['i94bir'])

In [17]:
df_final = df.select(['i94cit', 'i94port', 'i94mode', 'i94bir', 'i94visa', 'arrival_date', 'arr_year', 'arr_month', 'arr_day', 'arr_day_of_week', 'count'])
df_final.show()

+------+-------+-------+------+-------+------------+--------+---------+-------+---------------+-----+
|i94cit|i94port|i94mode|i94bir|i94visa|arrival_date|arr_year|arr_month|arr_day|arr_day_of_week|count|
+------+-------+-------+------+-------+------------+--------+---------+-------+---------------+-----+
| 101.0|    WAS|    1.0|  55.0|    2.0|  2016-04-01|    2016|        4|      1|              6|  1.0|
| 101.0|    NYC|    1.0|  28.0|    2.0|  2016-04-01|    2016|        4|      1|              6|  1.0|
| 101.0|    NYC|    1.0|   4.0|    2.0|  2016-04-01|    2016|        4|      1|              6|  1.0|
| 101.0|    NYC|    1.0|  57.0|    1.0|  2016-04-01|    2016|        4|      1|              6|  1.0|
| 101.0|    NYC|    1.0|  63.0|    2.0|  2016-04-01|    2016|        4|      1|              6|  1.0|
| 101.0|    NYC|    1.0|  57.0|    2.0|  2016-04-01|    2016|        4|      1|              6|  1.0|
| 101.0|    NYC|    1.0|  46.0|    2.0|  2016-04-01|    2016|        4|      1|   

In [21]:
#Check there are no more missing values
df_final.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()

+------+-------+-------+------+-------+------------+--------+---------+-------+---------------+-----+
|i94cit|i94port|i94mode|i94bir|i94visa|arrival_date|arr_year|arr_month|arr_day|arr_day_of_week|count|
+------+-------+-------+------+-------+------------+--------+---------+-------+---------------+-----+
|     0|      0|      0|     0|      0|           0|       0|        0|      0|              0|    0|
+------+-------+-------+------+-------+------------+--------+---------+-------+---------------+-----+



In [18]:
df_final.printSchema()

root
 |-- i94cit: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: double (nullable = false)
 |-- i94bir: double (nullable = false)
 |-- i94visa: double (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- arr_year: integer (nullable = true)
 |-- arr_month: integer (nullable = true)
 |-- arr_day: integer (nullable = true)
 |-- arr_day_of_week: integer (nullable = true)
 |-- count: double (nullable = true)



**City Demographics Dataset**

In [19]:
# Convert relevant columns to numeric
from pyspark.sql.types import IntegerType
df_cit = df_cit.withColumn('MedianAge', df_cit['Median Age'].cast(IntegerType())).drop('Median Age')
df_cit = df_cit.withColumn('Male_Population', df_cit['Male Population'].cast(IntegerType())).drop('Male Population')
df_cit = df_cit.withColumn('Female_Population', df_cit['Female Population'].cast(IntegerType())).drop('Female Population')
df_cit = df_cit.withColumn('Total_Population', df_cit['Total Population'].cast(IntegerType())).drop('Total Population')
df_cit = df_cit.withColumn('Foreign_Born', df_cit['Foreign-born'].cast(IntegerType())).drop('Foreign-born')


In [20]:
#Impute missing values with mean
#Male Population
mean_val = df_cit.select(F.mean(df_cit['Male_Population'])).collect()
mean_male_pop = mean_val[0][0]
df_cit = df_cit.na.fill(mean_male_pop, ['Male_Population'])
#Female Population
mean_val = df_cit.select(F.mean(df_cit['Female_Population'])).collect()
mean_female_pop = mean_val[0][0]
df_cit = df_cit.na.fill(mean_female_pop, ['Female_Population'])
#Foreign Born
mean_val = df_cit.select(F.mean(df_cit['Foreign_Born'])).collect()
mean_foreign_born = mean_val[0][0]
df_cit = df_cit.na.fill(mean_foreign_born, ['Foreign_Born'])



In [21]:
#Create a function to aggregate 'MedianAge' and 'Race' by most frequent value when grouping by city and state code
import pyspark.sql.functions as F
@F.udf
def mode(x):    
    '''
    This function returns the most frequent value
    '''
    from collections import Counter
    return Counter(x).most_common(1)[0][0]

cols = ['MedianAge', 'Race']
agg_expr = [mode(F.collect_list(col)).alias(col) for col in cols]

In [22]:
#Create dataset grouped by city and state code
grouped = df_cit.groupBy(F.col('City'), F.col('State Code'))
df_cit_grouped_1 = grouped.agg({'Male_Population' : 'sum', 'Female_Population' : 'sum', 'Total_Population' : 'sum','Foreign_Born' : 'sum'})
df_cit_grouped_2 = grouped.agg(*agg_expr)

df_cit_final = df_cit_grouped_1.join(df_cit_grouped_2, ['City', 'State Code'])
df_cit_final = df_cit_final.withColumnRenamed('State Code', 'State_Code')
df_cit_final = df_cit_final.withColumnRenamed('sum(Total_Population)', 'Total_Population')
df_cit_final = df_cit_final.withColumnRenamed('sum(Male_Population)', 'Male_Population')
df_cit_final = df_cit_final.withColumnRenamed('sum(Female_Population)', 'Female_Population')
df_cit_final = df_cit_final.withColumnRenamed('sum(Foreign_Born)', 'Foreign_born')


In [27]:
df_cit_final.count()

596

In [23]:
#Add i94 port code to the city dataset


@F.udf()
def get_i94port(city, state_code):
    '''
    Input: City name, State code
    
    Output: Corresponding i94port
    
    '''
    
    for key in port_codes_valid:
        if (city.lower() in port_codes_valid[key][0].lower()) & (state_code in port_codes_valid[key][1]):
            return key
        
# Add i94port code based on city name
df_cities_final =df_cit_final.withColumn("i94port", get_i94port(df_cit_final.City, df_cit_final.State_Code))
# Remove entries with no iport94 code
df_cities_final =df_cities_final.filter(df_cities_final.i94port != 'null')


In [24]:
#Check if i94port column is a unique key
df_cities_final.groupBy('i94port').count().filter('count > 1').show()

+-------+-----+
|i94port|count|
+-------+-----+
|    CID|    2|
|    MCA|    2|
|    RDU|    2|
+-------+-----+



We see that there are duplicates for 3 i94port values above. Let us examine them in detail

In [31]:
df_cities_final.filter(df_cities_final['i94port'] == 'CID').show()

+------------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+
|        City|State_Code|Total_Population|Female_Population|Male_Population|Foreign_born|MedianAge|                Race|i94port|
+------------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+
|   Iowa City|        IA|          371135|           185690|         185445|       46010|       25|Black or African-...|    CID|
|Cedar Rapids|        IA|          652025|           336480|         315545|       27020|       36|               White|    CID|
+------------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+



The issue here is that data dictionary shows the value for key 'CID' as 'Cedar Rapids/Iowa City. To avoid the conflict we will keep the row for the bigger city (in terms of total population) 'Cedar Rapids' and drop the other row.

In [30]:
df_cities_final.filter(df_cities_final['i94port'] == 'RDU').show()

+-------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+
|   City|State_Code|Total_Population|Female_Population|Male_Population|Foreign_born|MedianAge|                Race|i94port|
+-------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+
| Durham|        NC|         1287990|           682225|         605765|      193210|       33|               White|    RDU|
|Raleigh|        NC|         2259745|          1163825|        1095920|      325625|       32|Black or African-...|    RDU|
+-------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+



Same problem as above. We will keep the record for the bigger city Raleigh and remove the record for Durham

In [32]:
df_cities_final.filter(df_cities_final['i94port'] == 'MCA').show()

+-------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+
|   City|State_Code|Total_Population|Female_Population|Male_Population|Foreign_born|MedianAge|                Race|i94port|
+-------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+
|  Allen|        TX|          490690|           234070|         256620|       98245|       37|American Indian a...|    MCA|
|McAllen|        TX|          701265|           359605|         341660|      188455|       32|Black or African-...|    MCA|
+-------+----------+----------------+-----------------+---------------+------------+---------+--------------------+-------+



The conflict here arises because 'Allen' is part of the string 'McAllen'. We will drop the record for Allen.

In [25]:
df_cities_final = df_cities_final.filter(df_cities_final['City'] != 'Iowa City')
df_cities_final = df_cities_final.filter(df_cities_final['City'] != 'Durham')
df_cities_final = df_cities_final.filter(df_cities_final['City'] != 'Allen')

In [26]:
print(df_cities_final.count())
df_cities_final.printSchema()

135
root
 |-- City: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Total_Population: long (nullable = true)
 |-- Female_Population: long (nullable = true)
 |-- Male_Population: long (nullable = true)
 |-- Foreign_born: long (nullable = true)
 |-- MedianAge: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- i94port: string (nullable = true)



In [27]:
df_cities_final = df_cities_final.withColumn('Median_Age', df_cities_final['MedianAge'].cast(IntegerType())).drop('MedianAge')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The objective of the exercise is to find out which countries immigrants are coming from, their mode of travel ,type of visa and which cities in the U.S. they are arriving to, in order to see if demographics of cities influence immigration patterns. Accordingly the following dimension tables and fact table (arranged in a star schema) were created to facilitate the analysis.

- **Dimension Table City_of_Arrival**

- **Dimension Table Country_of_Origin**

- **Dimension Table Mode_of_Travel**

- **Dimension Table Type_of_Visa**

- **Dimension Table Time**

- **Fact Table Immigration_Fact**

The details of the fields in each table and source of data are provided in the Data Dictionary below


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Clean I94 data as described in the 'Cleaning Steps' section to create spark dataframe df_final
2. Clean city demograhics data as described in the 'Cleaning Steps' section to create spark dataframe df_cities_final
3. Create city_of_arrival dimension table from the df_cities_final dataframe and write to parquet file partitioned by i94port
4. Create country_of_origin dimension table from the country_codes_valid dictionary and write to parquet file partitioned by i94cit
5. Create mode_of_travel dimension table from data dictionary and write to parquet file
6. Create type_of_visa dimension table from data dictionary and write to parquet file
7. Create time dimension table from df_final and write to parquet file partitioned by year and month
8. Create immigration_fact table from df_final and write to parquet file partitioned by i94 cit and i94port


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [37]:
#Create city_of_arrival dimension table
city_of_arrival = df_cities_final.select(['i94port', 'City', 'State_Code', 'Male_Population', 'Female_Population', 'Total_Population', \
                                          'Foreign_born', 'Median_Age', 'Race']) 

city_of_arrival.write.partitionBy('i94port').parquet('tables/city_of_arrival', "overwrite")


In [38]:
#Create country of origin dimension table
#Convert dictionary to pandas dataframe
country_codes_df = pd.DataFrame.from_dict(country_codes_valid, orient = 'index', columns = ['Country_Name'])
country_codes_df.reset_index(level = 0, inplace = True)
country_codes_df = country_codes_df.rename(columns = {"index" : "i94cit"})
#Create spark dataframe
country_of_origin = spark.createDataFrame(country_codes_df)
#Write to parquet
country_of_origin.write.partitionBy('i94cit').parquet('tables/country_of_origin', "overwrite")

In [39]:
#Create mode of travel dimension table
mode_dict = {1.0 : 'Air', 2.0 : 'Sea', 3.0 : 'Land', 9.0 : 'Not reported' }
mode_df = pd.DataFrame.from_dict(mode_dict, orient = 'index', columns = ['Mode_of_Arrival'])
mode_df.reset_index(level = 0, inplace = True)
mode_df = mode_df.rename(columns = {'index' : 'i94mode'})

#Create spark dataframe
mode_of_travel = spark.createDataFrame(mode_df)
#Write to parquet
mode_of_travel.write.parquet('tables/mode_of_travel ', "overwrite")


In [43]:
#Create type of visa dimension table
visa_dict = {1 : 'Business', 2 : 'Pleasure', 3 : 'Student'}
visa_df = pd.DataFrame.from_dict(visa_dict, orient = 'index', columns = ['Visa_Type'])
visa_df.reset_index(level = 0, inplace = True)
visa_df = visa_df.rename(columns = {'index' : 'i94visa'})

#Create spark dataframe
type_of_visa = spark.createDataFrame(visa_df)
#Write to parquet
type_of_visa.write.parquet('tables/type_of_visa ', "overwrite")


In [40]:
#Create time dimension table
time_table = df_final.select(F.col('arrival_date').alias('date'), F.col('arr_year').alias('year'), F.col('arr_month').alias('month'), F.col('arr_day').alias('day'), F.col('arr_day_of_week').alias('dayofweek')).dropDuplicates(["date"])
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet("tables/time", "overwrite")

In [41]:
#Create immigration fact table

immigration_fact = df_final.select(['i94cit', 'i94port', 'i94mode', 'i94bir', 'i94visa', 'arrival_date', 'count'])\
                .withColumn("immigration_id", F.monotonically_increasing_id())
immigration_fact.write.partitionBy('i94cit', 'i94port').parquet('tables/immigration_fact', 'overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

**Quality Check 1: Ensure there are adequate entries in each table**

In [44]:
# Perform quality check 1 here
def quality_check_count(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data count check failed for {} with zero records".format(description))
    else:
        print("Data count check passed for {} with {} records".format(description, result))
        
quality_check_count(immigration_fact, "immigration fact table")
quality_check_count(time_table, "time table")
quality_check_count(type_of_visa, "type of visa table")
quality_check_count(mode_of_travel, "mode of travel table")
quality_check_count(country_of_origin, "country of origin table")
quality_check_count(city_of_arrival, "city of arrival table")

Data count check passed for immigration fact table with 2587323 records
Data count check passed for time table with 30 records
Data count check passed for type of visa table with 3 records
Data count check passed for mode of travel table with 4 records
Data count check passed for country of origin table with 236 records
Data count check passed for city of arrival table with 135 records


**Quality Check 2: Ensure that primary keys have unique values in each table**

In [45]:
def quality_check_unique_key(df, column, description):
    '''
    Input: Spark dataframe, column name, description
    Output: Print outcome of data quality check
    '''
    if df.select(column).distinct().count() == df.count():
        print("Unique key constraint check passed for column {} in {}".format(column, description))
    else:
        print("Unique key constraint check failed for column {} in {}".format(column, description))
        
quality_check_unique_key(immigration_fact, 'immigration_id', "immigration fact table")
quality_check_unique_key (time_table, 'date', "time table")
quality_check_unique_key(type_of_visa, 'i94visa', "type of visa table")
quality_check_unique_key(mode_of_travel, 'i94mode', "mode of travel table")
quality_check_unique_key(country_of_origin, 'i94cit', "country of origin table")
quality_check_unique_key(city_of_arrival, 'i94port', "city of arrival table")

Unique key constraint check passed for column immigration_id in immigration fact table
Unique key constraint check passed for column date in time table
Unique key constraint check passed for column i94visa in type of visa table
Unique key constraint check passed for column i94mode in mode of travel table
Unique key constraint check passed for column i94cit in country of origin table
Unique key constraint check passed for column i94port in city of arrival table


#### 4.3 Data dictionary 

**Dimension Table City_of_Arrival**

This is essentially the df_cities_final dataframe created above with the following columns:
- i94port : 3 character code, string, primary key
- City : Name of city, string
- State_Code : 2 character state code, string
- Male_population : population, numeric
- Female_population : population, numeric
- Total_population : population, numeric 
- Foreign-born : population, numeric 
- Median Age : median age, numeric
- Race : race, string

**Dimension Table Country_of_Origin**

This is created from the 'country_codes_valid' dictionary created above with the following columns:
- i94cit : 3 character country code, string, primary key
- country_name : Name of country, string

**Dimension Table Mode_of_Travel**

This is created from a dictionary of i94mode values in the data dictionary with the following columns:
- i94mode : code, numeric, primary key
- travel_mode : mode of travel, string

**Dimension Table Type_of_Visa**

This is created from a dictionary of i94visa values in the data dictionary with the following columns:
- i94visa : code, numeric, primary key
- visa_type : type of visa, string

**Dimension Table Time**

This is created from the df_imm_final dataframe with the following columns:

- date : date, primary key
- year : year, numeric
- month : month, numeric
- day : day, numeric
- dayofweek : day of week, numeric

**Fact Table Immigration_Fact**

This is created from the df_imm_final dataframe with the following columns:
- immigration_id : numeric, primary key
- i94cit string, foreign key
- i94port string, foreign key
- i94mode numeric, foreign key
- i94bir numeric
- i94visa numeric, foreign key
- arrival_date date, foreign key
- count numeric



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

Spark was chosen since it can easily handle multiple file formats (including SAS) containing large amounts of data. Spark SQL was chosen to process the large input files into dataframes and manipulate them via standard select and create dataframe operations to form additional tables.

* Propose how often the data should be updated and why.

1. The immigration_fact table should be updated on a monthly basis when each new dataset is available
2. With a data dictionary as provided here, all dimension tables sourced from the immigration data can be recreated 
3. The city_of_arrival dimension table currently does not have a time component. If new data is available in the future the table should be updated at that time. 

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
If the data was increased by 100x we could consider moving Spark to cluster mode using a cluster manager such as Yarn.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 

If the data needs to populate a dashboard that updates daily by 7 am, then we could use a scheduling tool such as Airflow to run the ETL pipeline overnight.
 
 * The database needed to be accessed by 100+ people.
 
 
 If the database needs to be accessed by 100+ people we could have the data replicate to different nodes used by different users. 
 