# USA Immigration in 2016
### Data Engineering Capstone Project

#### Project Summary

This project is used to build a ETL pipeline with Spark for combining all data related to the immigration of USA in 2016. The data used here is:  
- I94 Immigration Data
- Word Temperature Data  
- U.S. City Demographic Data  
- Airport Code Table
- Country Code Table

**The project follows the follow steps:**
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime, timedelta
from pyspark.sql.types import StringType
import numpy as np

### Step 1: Scope the Project and Gather Data

#### Scope 

The goal of this project is to build the fact table including the immigration information and create the implement of 3 other dimension tables for immigration table. Here, a ETL pipeline will be processed using Spark. Also, we could run our ETL on AWS (S3, EMR, Airflow)

#### Datasets

1. **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. The source of this data is from [link](https://travel.trade.gov/research/reports/i94/historical/2016.html). The table include the immagration information.   
2. **World Temperature Data**: This dataset came from Kaggle. You can read more about it [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). This table include the temperature of different cities in different countries.  
3. **U.S. City Demographic Data**: This data comes from OpenSoft. You can read more about it [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). This table includes poulation information for each city.  
4. **Airport Code Table**: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).  
5. **Countries code and airport 3 letters code**: The two tables are from name library file of immigration I94_SAS_Labels_Descriptions.SAS.  

#### 1.1 Create saprk work session

In [3]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

#### 1.2 Load in the immigration table

In [4]:
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
immigration_df = spark.read.parquet('./sas_data')
immigration = immigration_df.limit(5).toPandas()
immigration

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


#### 1.3 Load in the airport table

In [6]:
airport = spark.read.format('csv').options(header='true').option("sep", ',').load('input/airport-codes_csv.csv')
airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### 1.4 Load in the demographics table

In [8]:
demogra_df = spark.read.format('csv').options(header='true').option("sep", ';').load('input/us-cities-demographics.csv')

demogra_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


#### 1.5 Load in the Global Temperatures table

In [7]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
# temperature = pd.read_csv(fname)
temperature = spark.read.format('csv').options(header='true').option("sep", ',').load(fname)

temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### 1.6 Load in the countries code and airport 3 letters code

In [9]:
countries = spark.read.format('csv').options(header='true').option("sep", ',').load('input/countries.csv')
countries.limit(5).toPandas()

Unnamed: 0,Code,Countries
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [10]:
# airport_code = pd.read_csv('airport.csv')
airport_code = spark.read.format('csv').options(header='true').option("sep", ',').load('input/airport.csv')
airport_code.limit(5).toPandas()

Unnamed: 0,Code,Airport
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


### Step 2: Cleaning tables
Check the missing values and unreasonable values in all tables.

#### 2.1 Clean the airport form

In [11]:
airport.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in airport.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|        7006|        0|          0|         0|        5676|   14045|    45886|     26389|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



From the table, we could see there are a lot of airport information for different countries, however, what we need to collect is just for US. Therefore, we could drop all other information except US airport info.

In [12]:
airport = airport.where(col('iso_country') == 'US')
airport.select('iso_country').distinct().show()
airport.limit(5).toPandas()

+-----------+
|iso_country|
+-----------+
|         US|
+-----------+



Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


Check the type column in this table:

In [13]:
airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|   balloonport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



Obviously, the balloonport couldn't be shown in the immigration port list, so we will drop the type balloonport. Also the closed ports will not be used any more, we will drop those rows too.

In [14]:
airport = airport.where((col('type') != 'balloonport') & (col('type') != 'closed'))
airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
| seaplane_base|
|      heliport|
|medium_airport|
| small_airport|
+--------------+



Because the local_code column is related to the column I94port in immigration table, we need to drop all the NaN values in local_code column.

In [15]:
airport = airport.where(col('local_code') != 'None')
airport.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in airport.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|         149|        0|          0|         0|          13|     290|    19160|         0|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



Check the iso_region column to see if there are some strange values.

In [16]:
airport.groupby('iso_region').count().show(200)

+----------+-----+
|iso_region|count|
+----------+-----+
|     US-TN|  343|
|     US-OK|  464|
|     US-VT|  100|
|     US-SD|  201|
|     US-WA|  556|
|     US-IN|  633|
|     US-AL|  336|
|     US-NY|  600|
|     US-MS|  270|
|     US-NC|  455|
|     US-CT|  157|
|     US-MT|  298|
|     US-OR|  473|
|     US-IA|  321|
|     US-NV|  146|
|     US-RI|   32|
|     US-AK|  783|
|     US-WV|  137|
|     US-MA|  242|
|     US-MD|  240|
|     US-AZ|  332|
|     US-NH|  170|
|     US-DC|   20|
|     US-ME|  196|
|     US-DE|   54|
|     US-OH|  724|
|     US-CO|  475|
|     US-MN|  512|
|     US-GA|  489|
|     US-NJ|  401|
|     US-CA|  992|
|     US-PA|  864|
|     US-ID|  293|
|     US-ND|  313|
|     US-IL|  848|
|     US-FL|  899|
|     US-KY|  235|
|     US-WY|  122|
|     US-NM|  186|
|     US-MO|  547|
|     US-UT|  157|
|     US-SC|  211|
|     US-KS|  410|
|     US-TX| 2052|
|     US-WI|  581|
|     US-VA|  472|
|     US-MI|  505|
|     US-AR|  368|
|     US-NE|  297|
|     US-HI|

The 'US-U-A' might be a mistake, so we drop these rows with 'US-U-A'.

In [17]:
airport = airport.where(col('iso_region') != 'US-U-A')
airport = airport.withColumn('iso_region', regexp_replace('iso_region', 'US-', ''))
airport.groupby('iso_region').count().show(5)

+----------+-----+
|iso_region|count|
+----------+-----+
|        AZ|  332|
|        SC|  211|
|        LA|  542|
|        MN|  512|
|        NJ|  401|
+----------+-----+
only showing top 5 rows



In [18]:
airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AS,small_airport,Fulton Airport,1100,,US,OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"


### 2.2 Clean for the demographics table

In [19]:
demogra_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


Modify the type of column Count.

In [20]:
demogra_df = demogra_df.withColumn("Count",col("Count").cast('int'))

We have duplicate rows for City and State column because of the race colum, so we need to pivote the Race column to make the combination of City and State unique.

In [21]:
index=['City', 'State', 'State Code', 'Median Age', 'Female Population', 'Male Population', 'Total Population', \
              'Number of Veterans', 'Foreign-born', 'Average Household Size']
pivot_df = demogra_df.groupBy(index).pivot('Race').sum('Count')
pivot_df.limit(5).toPandas()

Unnamed: 0,City,State,State Code,Median Age,Female Population,Male Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Union City,California,CA,38.5,35911,38599,74510,1440,32752,3.46,243,44849,5508,13207,16845
1,Kenner,Louisiana,LA,36.9,33113,33993,67106,2485,12352,2.57,158,2729,17172,17769,44863
2,Murfreesboro,Tennessee,TN,30.2,65417,60704,126121,5199,8948,2.6,1339,7265,22651,8840,97270
3,El Cajon,California,CA,32.7,49238,54450,103688,7103,31865,3.14,1891,4561,7534,31542,84703
4,Brooklyn Park,Minnesota,MN,35.1,41305,37845,79150,3506,17490,2.85,1173,14443,22550,4138,41760


Check the NaN values in this table.

In [22]:
pivot_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in pivot_df.columns]).show()

+----+-----+----------+----------+-----------------+---------------+----------------+------------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+-----+
|City|State|State Code|Median Age|Female Population|Male Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino|White|
+----+-----+----------+----------+-----------------+---------------+----------------+------------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+-----+
|   0|    0|         0|         0|                1|              1|               0|                 7|           7|                     8|                               57|   13|                       12|                 0|    7|
+----+-----+----------+----------+-----------------+---------------+----

Luckily, we didn't get many Nan values in this form. We just use 0 to fill all the None values. Also, we need to correct all the types of numbers.

In [23]:
fill_list = ['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', \
        'Average Household Size', 'American Indian and Alaska Native', 'Asian', 'White', \
            'Black or African-American', 'Hispanic or Latino']
demogra_df = pivot_df.fillna(0, subset=fill_list)

In [24]:
demogra_df.limit(5).toPandas()

Unnamed: 0,City,State,State Code,Median Age,Female Population,Male Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Union City,California,CA,38.5,35911,38599,74510,1440,32752,3.46,243,44849,5508,13207,16845
1,Kenner,Louisiana,LA,36.9,33113,33993,67106,2485,12352,2.57,158,2729,17172,17769,44863
2,Murfreesboro,Tennessee,TN,30.2,65417,60704,126121,5199,8948,2.6,1339,7265,22651,8840,97270
3,El Cajon,California,CA,32.7,49238,54450,103688,7103,31865,3.14,1891,4561,7534,31542,84703
4,Brooklyn Park,Minnesota,MN,35.1,41305,37845,79150,3506,17490,2.85,1173,14443,22550,4138,41760


Modify the types of column in the int_list and float_list.

In [25]:
int_list = ['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Total Population', \
           'American Indian and Alaska Native', 'Asian', 'Black or African-American', 'Hispanic or Latino', 'White']
float_list = ['Median Age', 'Average Household Size']

for i in int_list:
    demogra_df = demogra_df.withColumn(i, col(i).cast('int'))
    
for j in float_list:
    demogra_df = demogra_df.withColumn(j, col(j).cast('float'))

In [26]:
demogra_df.limit(5).toPandas()

Unnamed: 0,City,State,State Code,Median Age,Female Population,Male Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Union City,California,CA,38.5,35911,38599,74510,1440,32752,3.46,243,44849,5508,13207,16845
1,Kenner,Louisiana,LA,36.900002,33113,33993,67106,2485,12352,2.57,158,2729,17172,17769,44863
2,Murfreesboro,Tennessee,TN,30.200001,65417,60704,126121,5199,8948,2.6,1339,7265,22651,8840,97270
3,El Cajon,California,CA,32.700001,49238,54450,103688,7103,31865,3.14,1891,4561,7534,31542,84703
4,Brooklyn Park,Minnesota,MN,35.099998,41305,37845,79150,3506,17490,2.85,1173,14443,22550,4138,41760


### 2.3 Clean the Temperature table

In [27]:
temperature.head(5)

[Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-03-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')]

From the table above, we could see the time for the information is quite early. Let's check the latest date for this form.

In [28]:
temperature.agg({"dt": "max"}).collect()[0][0]

'2013-09-01'

The latest date is '2013-09-01', however, out immigration data is for 2016. What we could do is collecting the average temperature in a year for each country, then use the average temperature to do analysis.

In [29]:
temperature = temperature.groupby(["Country"]).agg(
        {"AverageTemperature": "avg", "Latitude": "first", "Longitude": "first"})\
        .withColumnRenamed('avg(AverageTemperature)', 'avg_temperature')\
        .withColumnRenamed('first(Latitude)', 'latitude')\
        .withColumnRenamed('first(Longitude)', 'longitude')

Change all the city and country information to be upper case.

In [30]:
temperature = temperature.withColumn('Country', upper(col('Country')))
temperature.limit(5).toPandas()

Unnamed: 0,Country,avg_temperature,latitude,longitude
0,CHAD,27.189829,8.84N,15.41E
1,PARAGUAY,22.784014,24.92S,58.52W
2,RUSSIA,3.347268,53.84N,91.36E
3,YEMEN,25.768408,13.66N,45.41E
4,SENEGAL,25.984177,15.27N,17.50W


### 2.4 Clean the country and airport table

In [31]:
countries.limit(5).toPandas()

Unnamed: 0,Code,Countries
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


Modify the types of the columns in countries code table and 3 letters aorport code table.

In [32]:
countries = countries.withColumn('Code', col('Code').cast('int'))
countries = countries.withColumn('Countries', col('Countries').cast('string'))

In [33]:
airport_code = airport_code.withColumn('Code', col('Code').cast('string'))
airport_code = airport_code.withColumn('Airport', col('Airport').cast('string'))
airport_code.limit(5).toPandas()

Unnamed: 0,Code,Airport
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


### 2.5 Clean for the immigration table

In [34]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


Drop all the NaN values in columns: i94port, i94addr, gender

In [35]:
immigration_df = immigration_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])
immigration_df

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

Because there are too many columns in the immigration table. Drop some unuseful columns.

In [36]:
immigration_df = immigration_df.drop('entdepu', 'entdepd', 'count', 'dtadfile', 'visapost', 'matflag', \
                                    'dtaddto', 'insnum', 'admnum', 'occup')
immigration_df

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, entdepa: string, biryear: double, gender: string, airline: string, fltno: string, visatype: string]

This means the cicid is unique for each row.

In [37]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,entdepa,biryear,gender,airline,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,G,1976.0,F,QF,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,G,1984.0,F,VA,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,G,1987.0,M,DL,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,G,1987.0,F,DL,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,G,1988.0,M,DL,40,B1


In this table, some numbers should be int, but float in table, next step, we need to change the type of these numbers.

In [38]:
immigration_df = immigration_df.withColumn("cicid",col("cicid").cast('int'))\
                                .withColumn("i94yr",col("i94yr").cast('int'))\
                                .withColumn("i94mon",col("i94mon").cast('int'))\
                                .withColumn("i94cit",col("i94cit").cast('int'))\
                                .withColumn("i94res",col("i94res").cast('int'))\
                                .withColumn("arrdate",col("arrdate").cast('int'))\
                                .withColumn("i94mode",col("i94mode").cast('int'))\
                                .withColumn("depdate",col("depdate").cast('int'))\
                                .withColumn("i94bir",col("i94bir").cast('int'))\
                                .withColumn("i94visa",col("i94visa").cast('int'))\
                                .withColumn("biryear",col("biryear").cast('int'))
immigration_df

DataFrame[cicid: int, i94yr: int, i94mon: int, i94cit: int, i94res: int, i94port: string, arrdate: int, i94mode: int, i94addr: string, depdate: int, i94bir: int, i94visa: int, entdepa: string, biryear: int, gender: string, airline: string, fltno: string, visatype: string]

In [39]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,entdepa,biryear,gender,airline,fltno,visatype
0,5748517,2016,4,245,438,LOS,20574,1,CA,20582,40,1,G,1976,F,QF,11,B1
1,5748518,2016,4,245,438,LOS,20574,1,NV,20591,32,1,G,1984,F,VA,7,B1
2,5748519,2016,4,245,438,LOS,20574,1,WA,20582,29,1,G,1987,M,DL,40,B1
3,5748520,2016,4,245,438,LOS,20574,1,WA,20588,29,1,G,1987,F,DL,40,B1
4,5748521,2016,4,245,438,LOS,20574,1,WA,20588,28,1,G,1988,M,DL,40,B1


Modify the type of the arrdate and depdate columns to datetime.

In [40]:
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [41]:
immigration_df = immigration_df.withColumn("arrdate", convert_datetime(immigration_df.arrdate))

In [42]:
immigration_df = immigration_df.withColumn("depdate", convert_datetime(immigration_df.depdate))

In [43]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,entdepa,biryear,gender,airline,fltno,visatype
0,5748517,2016,4,245,438,LOS,2016-04-30,1,CA,2016-05-08,40,1,G,1976,F,QF,11,B1
1,5748518,2016,4,245,438,LOS,2016-04-30,1,NV,2016-05-17,32,1,G,1984,F,VA,7,B1
2,5748519,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-08,29,1,G,1987,M,DL,40,B1
3,5748520,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,29,1,G,1987,F,DL,40,B1
4,5748521,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,28,1,G,1988,M,DL,40,B1


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The immigration table includes a lot of information and we also have other forms which have deatil information about some colums in Immigration table. Here, we choose to use star schema to build a model.  

This star schema will include a fact table: **immigration**, and 4 dimension tables: **countries, demographics, airports**.   

**Fact table: fact_immigration**  
- cicid  
- i94yr  
- i94mon  
- i94cit (Foreign Key - countries)  
- i94res  
- i94port (Foreign Key - airports)  
- arrdate  
- i94mode  
- i94addr (Foreign Key - demographics)
- depdate  
- i94bir  
- i94visa  
- entdepa  
- biryear
- gender  
- airline
- fltno  
- visatype  

**Dimension Tables**  
**dim_countries**  
- code (Froeign Key)
- Country  
- AverageTemperature   
- AverageTemperatureUncertainty  

**dim_demographics**  
- City  
- State  
- Median  
- Age  
- Male  
- Population  
- Female_Population  
- Male_Population
- Total_Population  
- Number_Veterans  
- Foreign_born  
- AverageHouseholdSize  
- State_Code  (Foreign Key)
- AmericanIndian_AlaskaNative  
- Black_AfricanAmerican  
- Hispanic_Latino  
- Asia  
- White    

**dim_airports**
- ident  
- type  
- name  
- elevation_ft  
- continent  
- iso_country  
- iso_region  
- municipality  
- gps_code  
- iata_code  
- local_code (Foreign Key) 
- coordinates   

#### 3.2 Mapping Out Data Pipelines
1. Load in all the tables we will use.  
2. Clean all the tables by dealing with missing, duplicate data or check for the unreasonable data.
3. Create 3 diemnsion tables: dim_countries, dim_demographics and dim_airports.  
4. Create the fact table: fact_immigration which is combining all the foreign keys in the dimention tables.
5. Save fact and dimention table in parquet. 
6. Check the data quality.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
**Create the dimension table: dim_demographics**

In [47]:
def create_dim_demographics(spark, demogra_df):
    """Clean the demogra_df table and create the dim_demographics
    
    Keyword arguments:
    * Spark --       Spark working session
    * demogra_df --  input table

    
    Output:
    * demographics.parquet 
    
    """
    # change the type of column Count to pivote the Race column
    demogra_df = demogra_df.withColumn("Count",col("Count").cast('int'))
    
    # Pivote the Race column
    index=['City', 'State', 'State Code', 'Median Age', 'Female Population', 'Male Population', 'Total Population', \
              'Number of Veterans', 'Foreign-born', 'Average Household Size']
    pivot_df = demogra_df.groupBy(index).pivot('Race').sum('Count')
    
    # Fill all the NaN values use 0 for the numeric columns
    fill_list = ['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', \
        'Average Household Size', 'American Indian and Alaska Native', 'Asian', 'White', \
            'Black or African-American', 'Hispanic or Latino']
    demogra_df = pivot_df.fillna(0, subset=fill_list)
    
    # Correct the number type for columns
    demogra_df = demogra_df.withColumn('State Code', col('State Code').cast('string'))
    
    int_list = ['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Total Population', \
           'American Indian and Alaska Native', 'Asian', 'Black or African-American', 'Hispanic or Latino', 'White']
    float_list = ['Median Age', 'Average Household Size']

    for i in int_list:
        demogra_df = demogra_df.withColumn(i, col(i).cast('int'))

    for j in float_list:
        demogra_df = demogra_df.withColumn(j, col(j).cast('float'))
    
    dim_demographics = demogra_df.withColumnRenamed('State Code', 'State_Code')\
                           .withColumnRenamed('Median Age', 'Median_Age')\
                           .withColumnRenamed('Female Population', 'Female_Population')\
                           .withColumnRenamed('Male Population', 'Male_Population')\
                           .withColumnRenamed('Total Population', 'Total_Population')\
                           .withColumnRenamed('Number of Veterans', 'Number_Veterans')\
                           .withColumnRenamed('Average Household Size', 'AverageHouseholdSize')\
                           .withColumnRenamed('American Indian and Alaska Native', 'AmericanIndian_AlaskaNative')\
                           .withColumnRenamed('Black or African-American', 'Black_AfricanAmerican')\
                           .withColumnRenamed('Hispanic or Latino', 'Hispanic_Latino')
    print(dim_demographics.limit(5).toPandas())
    
    # Write table to parquet file
    dim_demographics.write.mode('overwrite').parquet('output/demographics.parquet')
    

In [48]:
demogra_df = spark.read.format('csv').options(header='true').option("sep", ';').load('input/us-cities-demographics.csv')
create_dim_demographics(spark, demogra_df)

            City       State State_Code  Median_Age  Female_Population  \
0     Union City  California         CA   38.500000              35911   
1         Kenner   Louisiana         LA   36.900002              33113   
2   Murfreesboro   Tennessee         TN   30.200001              65417   
3       El Cajon  California         CA   32.700001              49238   
4  Brooklyn Park   Minnesota         MN   35.099998              41305   

   Male_Population  Total_Population  Number_Veterans  Foreign-born  \
0            38599             74510             1440         32752   
1            33993             67106             2485         12352   
2            60704            126121             5199          8948   
3            54450            103688             7103         31865   
4            37845             79150             3506         17490   

   AverageHouseholdSize  AmericanIndian_AlaskaNative  Asian  \
0                  3.46                          243  44849   
1 

**Create the dimension table: dim_countries**

In [49]:
def create_dim_countries(spark, temperature, countries):
    """Clean the temperature, countries tables and create the dim_countries
    
    Keyword arguments:
    * Spark --       Spark working session
    * temperature --  input table
    * countries -- input table

    
    Output:
    * countries.parquet 
    
    """
    # Calculate the average temperature for each country
    temperature = temperature.groupby(["Country"]).agg(
        {"AverageTemperature": "avg", "Latitude": "first", "Longitude": "first"})\
        .withColumnRenamed('avg(AverageTemperature)', 'avg_temperature')\
        .withColumnRenamed('first(Latitude)', 'latitude')\
        .withColumnRenamed('first(Longitude)', 'longitude')
    temperature = temperature.withColumn('Country', upper(col('Country')))
    
    # clean the countries table
    countries = countries.withColumn('Code', col('Code').cast('int'))
    countries = countries.withColumn('Countries', col('Countries').cast('string'))
    
    # join the countries table to temoerature by the country column
    dim_countries = temperature.join(countries, col('Country') == col('Countries'), 'inner').drop(col('Countries'))
    
    print(dim_countries.limit(5).toPandas())
    # write the country_df to parquet file
    dim_countries.write.mode('overwrite').parquet('output/countries.parquet')

In [50]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature = spark.read.format('csv').options(header='true').option("sep", ',').load(fname)
countries = spark.read.format('csv').options(header='true').option("sep", ',').load('input/countries.csv')
create_dim_countries(spark, temperature, countries)

    Country  avg_temperature latitude longitude  Code
0      CHAD        27.189829    8.84N    15.41E   384
1  PARAGUAY        22.784014   24.92S    58.52W   693
2    RUSSIA         3.347268   53.84N    91.36E   158
3     YEMEN        25.768408   13.66N    45.41E   216
4   SENEGAL        25.984177   15.27N    17.50W   391


**Create the dimension table dim_airports**

In [51]:
def create_dim_airports(spark, airport, airport_code):
    """Clean the airport, airport_code tables and create the dim_airports
    
    Keyword arguments:
    * Spark --       Spark working session
    * airport --  input table
    * airport_code -- input table

    
    Output:
    * airports.parquet 
    
    """
    # Clean the airport table
    airport = airport.where(col('iso_country') == 'US')
    airport = airport.where((col('type') != 'balloonport') & (col('type') != 'closed'))
    airport = airport.where(col('local_code') != 'None')
    airport = airport.where(col('iso_region') != 'US-U-A')
    airport = airport.withColumn('iso_region', regexp_replace('iso_region', 'US-', ''))
    
    # Clean the airport_code table
    airport_code = airport_code.withColumn('Code', col('Code').cast('string'))
    airport_code = airport_code.withColumn('Airport', col('Airport').cast('string'))
    
    # Join the airport_code and airport tables together
    dim_airports = airport.join(airport_code, col('local_code') == col('Code'), 'inner').drop(col('Code'))\
                    .withColumnRenamed('Airport', 'short_name')
    print(dim_airports.limit(5).toPandas())
    # Write out the dim_airports out
    dim_airports.write.mode('overwrite').parquet('output/airports.parquet')

In [52]:
airport = spark.read.format('csv').options(header='true').option("sep", ',').load('input/airport-codes_csv.csv')
airport_code = spark.read.format('csv').options(header='true').option("sep", ',').load('input/airport.csv')
create_dim_airports(spark, airport, airport_code)

  ident           type                                    name elevation_ft  \
0   48Y  small_airport          Piney Pinecreek Border Airport         1078   
1   5KE  seaplane_base          Ketchikan Harbor Seaplane Base         None   
2   CDD  seaplane_base                    Scotts Seaplane Base         1119   
3   CSP       heliport              Cape Spencer C.G. Heliport           84   
4  K5T6  small_airport  DoÃ±a Ana County International Jetport         4112   

  continent iso_country iso_region  municipality gps_code iata_code  \
0        NA          US         MN     Pinecreek      48Y      None   
1        NA          US         AK     Ketchikan     None       WFB   
2        NA          US         MN    Crane Lake     KCDD      None   
3        NA          US         AK  Cape Spencer     PACA      None   
4        NA          US         NM  Santa Teresa     KDNA      None   

  local_code                            coordinates  \
0        48Y  -95.98259735107422, 48.999599

**Create Fact table: fact_immigration**

In [53]:
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [54]:
def create_fact_immigration(spark, immigration_df):
    """Clean the immigration_df table and create the fact_immigration
    
    Keyword arguments:
    * Spark --       Spark working session
    * immigration_df --  input table

    
    Output:
    * immigration.parquet 
    
    """
    # Clean and create the fact_immigration table
    immigration_df = immigration_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])
    immigration_df = immigration_df.drop('entdepu', 'entdepd', 'count', 'dtadfile', 'visapost', 'matflag', \
                                    'dtaddto', 'insnum', 'admnum', 'occup')
    immigration_df = immigration_df.withColumn("cicid",col("cicid").cast('int'))\
                                .withColumn("i94yr",col("i94yr").cast('int'))\
                                .withColumn("i94mon",col("i94mon").cast('int'))\
                                .withColumn("i94cit",col("i94cit").cast('int'))\
                                .withColumn("i94res",col("i94res").cast('int'))\
                                .withColumn("arrdate",col("arrdate").cast('int'))\
                                .withColumn("i94mode",col("i94mode").cast('int'))\
                                .withColumn("depdate",col("depdate").cast('int'))\
                                .withColumn("i94bir",col("i94bir").cast('int'))\
                                .withColumn("i94visa",col("i94visa").cast('int'))\
                                .withColumn("biryear",col("biryear").cast('int'))
    immigration_df = immigration_df.withColumn("arrdate", convert_datetime(immigration_df.arrdate))
    fact_immigration = immigration_df.withColumn("depdate", convert_datetime(immigration_df.depdate))
    
    print(fact_immigration.limit(5).toPandas())
    # Write the fact_immigration to parquet file
    fact_immigration.write.partitionBy('arrdate').mode('overwrite').parquet('output/immigration.parquet')

In [55]:
immigration_df = spark.read.parquet('./sas_data')
create_fact_immigration(spark, immigration_df)

     cicid  i94yr  i94mon  i94cit  i94res i94port     arrdate  i94mode  \
0  5748517   2016       4     245     438     LOS  2016-04-30        1   
1  5748518   2016       4     245     438     LOS  2016-04-30        1   
2  5748519   2016       4     245     438     LOS  2016-04-30        1   
3  5748520   2016       4     245     438     LOS  2016-04-30        1   
4  5748521   2016       4     245     438     LOS  2016-04-30        1   

  i94addr     depdate  i94bir  i94visa entdepa  biryear gender airline  fltno  \
0      CA  2016-05-08      40        1       G     1976      F      QF  00011   
1      NV  2016-05-17      32        1       G     1984      F      VA  00007   
2      WA  2016-05-08      29        1       G     1987      M      DL  00040   
3      WA  2016-05-14      29        1       G     1987      F      DL  00040   
4      WA  2016-05-14      28        1       G     1988      M      DL  00040   

  visatype  
0       B1  
1       B1  
2       B1  
3       B1  
4  

#### 4.2 Data Quality Checks
Here we have two steps for the checking of data quality:  
- Check if the model create the parquet files sucessfully.  
- Check if the parquet files created by the model have more than one row (not empty)  

 
**Run Quality Checks**

In [56]:
# Read in the dimension tables
dim_demographics = spark.read.parquet("output/demographics.parquet")
dim_countries = spark.read.parquet("output/countries.parquet")
dim_airports = spark.read.parquet("output/airports.parquet")
# Read in the fact table
fact_immigration = spark.read.parquet("output/immigration.parquet")

In [57]:
def table_exists(table):
    if table is not None:
        return True
    else:
        print('There is no table %s' %table)
        return False
    
if table_exists(dim_demographics)&table_exists(dim_countries)&table_exists(dim_airports)\
     &table_exists(fact_immigration) is True:
    print('All tables exist.')

All tables exist.


In [58]:
if dim_demographics.count() > 0 and dim_countries.count() > 0 and dim_airports.count() > 0 and \
    fact_immigration.count() > 0:
    print('All tables are not empty')
else:
    print('Some tables are empty')

All tables are not empty


#### 4.3 Data dictionary 
In this model, we create one fact table **fact_immigration**, and three dimension tables: **dim_demographics, dim_countries, dim_airports**.

- **Fact table -- fact_immigration**

| Column Name   | Description                                            |
| :------------ | -----------------------------------------------------: |
| cicd          | Record ID                                              |
| i94yr         | 4 digit year                                           |
| i94mon        | Numeric month                                          |
| i94cit        | Contry of citizenship                                  |
| i94res        | Country of residence                                   |
| i94port       | Three letter airport code                              |
| arrdate       | Arrival date in the USA                                |
| i94mode       | Mode of transportation                                 |
| i94addr       | State code                                             |
| depdate       | Departure date in the USA                              |
| i94bir        | Age of Respondent in Years                             |
| i94visa       | Visa codes (1 = Business, 2 = Pleasure, 3 = Student)   |
| entdepa       | Arrival Flag                                           |
| biryear       | 4 digit year of birth                                  |
| gender        | Non-immigrant sex                                      |
| airline       | Airline used to arrive in U.S.                         |
| fltno         | Flight number of Airline used to arrive in U.S.        |
| visatype      | Class of admission legally admitting the non-immigrant |

- **Dimension table -- dim_countries**  
| Column Name                   | Description                                            |
| :---------------------------- | -----------------------------------: |
| code                          | 4 digit code for country             |
| Country                       | Country Name                         |
| AverageTemperature            | Average Temperature for country      |
| AverageTemperatureUncertainty | Uncertaintay of Average Temperature  |

- **Dimension table -- dim_demographics**  

| Column Name                 | Description                                            |
| :-------------------------- | -----------------------------------------------------: |
| City                        | City                                                   |
| State                       | State                                                  |
| Median                      | Meadian age                                            |
| Age                         | Age                                                    |
| Female_Population           | number of female                                       |
| Male_Population             | number of male                                         |
| Total_Population            | number of citizen                                      |
| Number_Veterans             | number of Veterans                                     |
| Foreign_born                | number of citizen born outside of US                   |
| AverageHouseholdSize        | Average of household size                              |
| State_Code                  | Two letter state code                                  |
| AmericanIndian_AlaskaNative | Race                                                   |
| Black_AfricanAmerican       | Race                                                   |
| Hispanic_Latino             | Race                                                   |
| Asian                       | Race                                                   |
| White                       | Race        |

- **Dimension table -- dim_airports**  

| Column Name  | Description                                                             |
| :------------| ----------------------------------------------------------------------: |
| ident        | Identity ID                                                             |
| type         | Type of airport                                                         |
| name         | Name of airport                                                         |
| elevation_ft | Elevation above the sea level in feet                                   |
| continent    | Continent code                                                          |
| iso_country  | ISO Country Code                                                        |
| iso_region   | ISO Region Code                                                         |
| municipality | municipality                                                            |
| gps_code     | GPS Code                                                                |
| iata_code    | Code of the airport assigned by International Air Transport Association |
| local_code   | Local Code                                                              |
| coordinates  | Coordinates                                                             |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

  - **Here we use Spark seesion to build the ETL pipeline beacause the high speed of dealing with large dataset, low-latency in-memory data processing capability and handling with different type of data. We could also run the ETL in the AWS EMR clusters**.  
  
* Propose how often the data should be updated and why.  
  - **The immigration table are stored by month and day, we could update the dataset every day or every month**.   
   
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.  
   - **We could move our pipeline to the AWS cloud EMR cluster, Amazon EMR provides flexibility to scale your cluster up or down as your computing needs change. You can resize your cluster to add instances for dataset size changing**.  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
   - **We could implement this ETL pipeline with Airflow to scheduel the data running by 7am every day**.    
 * The database needed to be accessed by 100+ people.
   - **We could store our output data into AWS Redshift cluster, Redshift distributes and parallelize your queries across multiple nodes. Redshift also gives you an option to use Dense Compute nodes**.