# Immigration Data Project
### Data Engineering Capstone Project

#### Project Summary
In this project I am working with I94 immigration dataset, which contains immigration data for U.S. ports to find immigration patterns to the US and find answers to the question like,

- During which month, from which country most people visite US?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark
from pyspark.sql.functions import *
import numpy as np

import os
import datetime as dt

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project I am using mainly I94 immigration data, which will be integrated with world temperature data and US demographic data. 

#### Describe and Gather Data 
There are 3 datasets
- I94 Immigration Data
    + This data comes from the US National Tourism and Trade Office. This contains information on entry to the united states
- World Temperature Data
    + This dataset comes from Kaggle and includes the temperatures of various cities in the world.
- U.S. City Demographic Data
    + This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

##### I94 Immigration dataset sample

In [2]:
# Read in the sample data here
df_immigration_sample = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immigration_sample.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
df_immigration_sample.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

##### World Temperature Data

In [2]:
df_worldtemp_sample = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")

In [6]:
df_worldtemp_sample.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### U.S. City Demographic Data

In [3]:
df_demography_sample = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demography_sample.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [8]:
df_demography_sample[df_demography_sample.City=='Hoover']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
960,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Black or African-American,18191
1410,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Hispanic or Latino,3430
1541,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,White,61869


#### Full Immigration Dataset

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [7]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")
df_spark.head(1)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')]

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [5]:
files = os.listdir('../../data/18-83510-I94-Data-2016/')
files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

We need to remove unnecessary columns to make it less complex by checking the missing value percentage > 90


In [6]:
def get_cols_to_drop(df_in):
    percent_missing = df_in.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df_in.columns])
    to_drop = percent_missing.limit(5).toPandas()
    to_drop = (to_drop >= 0.9).any(axis=0)
    return to_drop.loc[to_drop == True].index.tolist()

    

In [7]:
# get columns to drop from immigration
to_drop_col = get_cols_to_drop(df_spark)
to_drop_col

['occup', 'entdepu', 'insnum']

In [8]:
# drop above column from immigration df
df_spark = df_spark.drop(*to_drop_col)

In [42]:
df_spark.count()
df_spark.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null|      T|   null|   null| 1979.0|10282016|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO|      G|   null|   null| 1991.0|     D/S|     M|   null|  3.73679633E9|00296|     

##### Clean Temperature Data

In [9]:
# dropping rows with missing average temperature
print('before count:',df_worldtemp_sample.shape[0])
df_worldtemp_sample['AverageTemperature'] = df_worldtemp_sample['AverageTemperature'].replace('', np.nan)
df_worldtemp = df_worldtemp_sample.dropna(subset=['AverageTemperature'])
print('after count:',df_worldtemp.shape[0])

before count: 8599212
after count: 8235082


In [10]:
df_worldtemp = df_worldtemp.drop_duplicates()
df_worldtemp.shape[0]

8235082

In [14]:
df_worldtemp.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
10,1744-09-01,12.781,1.454,Århus,Denmark,57.05N,10.33E
11,1744-10-01,7.95,1.63,Århus,Denmark,57.05N,10.33E
12,1744-11-01,4.639,1.302,Århus,Denmark,57.05N,10.33E
13,1744-12-01,0.122,1.756,Århus,Denmark,57.05N,10.33E
14,1745-01-01,-1.333,1.642,Århus,Denmark,57.05N,10.33E


In [25]:
df_demography_sample.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Since the purpose of this project is for analytical activities I chose star schema to model the data. Also this model works well with visualisation tools.

The immigration fact table is the central in this star schema. This table's data comes from the immigration data sets.

immigration_fact
 -  record_id: double 
 -  year: double 
 -  month: double 
 -  city_code: string 
 -  state_code: string 
 -  arrive_date: double 
 -  departure_date: double 
 -  mode: double 
 -  visatype: string 


arrival_dm contains the arrival date related data

arrival_dm 
 -  arrdate: string 
 -  arrival_day: integer 
 -  arrival_month: integer 
 -  arrival_year: integer 
 -  id: long (nullable = false)


df_USTemperature_dim contains average temperature in US cities grouped by month

df_USTemperature_dim
 -  Country: string 
 -  City: string 
 -  month: long 
 -  AverageTemperature: double 


demographics_dim contains demographic data of the areas

demographics_dim
 -  City: string 
 -  State: string 
 -  Median Age: double 
 -  Male Population: double 
 -  Female Population: double 
 -  Total Population: long 
 -  State Code: string 



#### 3.2 Mapping Out Data Pipelines
- Read Data
- Drop columns with 90% above missing data and other cleansing
- Insert to fact table


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Create immigration_arrival_dimension

In [11]:
def create_immigration_arrival_dimension(df, outputfile):
    """create arrival data based on arrival date

    """
    # convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # create arrival df from arrdate column
    arrival_dm = df.select(['arrdate']).withColumn("arrdate", get_datetime(df.arrdate)).distinct()
    arrival_dm = arrival_dm.withColumn('arrival_day', dayofmonth('arrdate'))
    arrival_dm = arrival_dm.withColumn('arrival_month', month('arrdate'))
    arrival_dm = arrival_dm.withColumn('arrival_year', year('arrdate'))


    # create an id field in calendar df
    arrival_dm = arrival_dm.withColumn('id', monotonically_increasing_id())
    
    partition_columns = ['arrival_year', 'arrival_month']
    arrival_dm.write.parquet(outputfile + "immigration_arrival", partitionBy=partition_columns, mode="overwrite")
    
    return arrival_dm

In [12]:
outputfile = "outtables/"
arrival_dm = create_immigration_arrival_dimension(df_spark, outputfile)

In [22]:
arrival_dm.show(5)

+----------+-----------+-------------+------------+-----------+
|   arrdate|arrival_day|arrival_month|arrival_year|         id|
+----------+-----------+-------------+------------+-----------+
|2016-04-22|         22|            4|        2016| 8589934592|
|2016-04-15|         15|            4|        2016|25769803776|
|2016-04-18|         18|            4|        2016|42949672960|
|2016-04-09|          9|            4|        2016|68719476736|
|2016-04-11|         11|            4|        2016|85899345920|
+----------+-----------+-------------+------------+-----------+
only showing top 5 rows



##### Create USTemperature_dim table

In [13]:
def create_USTemperature_dim(df_t):
    df_USTemperature_dim=df_t.loc[(df_worldtemp.Country== 'United States') & (df_t.dt > '2000-01-01')]
    df_USTemperature_dim.insert(0, 'month', pd.to_datetime(df_USTemperature_dim['dt']).dt.month)
    df_USTemperature_dim= df_USTemperature_dim.loc[:,['month','AverageTemperature','City','Country']]
    df_USTemperature_dim = df_USTemperature_dim.groupby(['Country','City','month']).AverageTemperature.agg('mean').reset_index()
    df_USTemperature_dim = spark.createDataFrame(df_USTemperature_dim)
    return df_USTemperature_dim


In [14]:
df_USTemperature_dim = create_USTemperature_dim(df_worldtemp)

In [25]:
df_USTemperature_dim.show(5)

+-------------+-------+-----+------------------+
|      Country|   City|month|AverageTemperature|
+-------------+-------+-----+------------------+
|United States|Abilene|    1| 6.336615384615385|
|United States|Abilene|    2| 8.001142857142858|
|United States|Abilene|    3|13.184714285714287|
|United States|Abilene|    4|17.966785714285713|
|United States|Abilene|    5|22.858714285714285|
+-------------+-------+-----+------------------+
only showing top 5 rows



#### Create demographic_dim

In [15]:
def create_demographics_dim(df_d):
    """To Create a demographics dimension table from the us cities demographics data.
    """
    dim_df_d = df_d.iloc[:,[0,1,2,3,4,5,9]]
    dim_df_d = spark.createDataFrame(dim_df_d)
    
    return dim_df_d

In [16]:
demographics_dim = create_demographics_dim(df_demography_sample)
demographics_dim.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+----------+
|            City|        State|Median Age|Male Population|Female Population|Total Population|State Code|
+----------------+-------------+----------+---------------+-----------------+----------------+----------+
|   Silver Spring|     Maryland|      33.8|        40601.0|          41862.0|           82463|        MD|
|          Quincy|Massachusetts|      41.0|        44129.0|          49500.0|           93629|        MA|
|          Hoover|      Alabama|      38.5|        38040.0|          46799.0|           84839|        AL|
|Rancho Cucamonga|   California|      34.5|        88127.0|          87105.0|          175232|        CA|
|          Newark|   New Jersey|      34.6|       138040.0|         143873.0|          281913|        NJ|
+----------------+-------------+----------+---------------+-----------------+----------------+----------+
only showing top 5 rows



#### Create visatypedm

In [17]:
def create_visa_type_dim(df, outputfile):
    """To create s a visa type dimension from the immigration data.
    """
    # create visatype df from visatype column
    visa_type_df = df.select(['visatype']).distinct()
    
    # add an id column
    visa_type_df = visa_type_df.withColumn('visa_type_key', monotonically_increasing_id())
    
    # write dimension to parquet file
    visa_type_df.write.parquet(outputfile + "visatype", mode="overwrite")
    
    return visa_type_df

def get_visa_type_dim(outputfile):
    return spark.read.parquet(outputfile + "visatype")

In [18]:
visatype_dim = create_visa_type_dim(df_spark, outputfile)
visatype_dim.show(n=5)

+--------+-------------+
|visatype|visa_type_key|
+--------+-------------+
|      F2| 103079215104|
|     GMB| 352187318272|
|      B2| 369367187456|
|      F1| 498216206336|
|     CPL| 601295421440|
+--------+-------------+
only showing top 5 rows



##### Create the immigration fact table

In [19]:
def create_immigration_fact(df):
    """To create an country dimension from the immigration and global land temperatures data.
    
    """
    fact_df = df.select(df['cicid'], df['i94yr'], df['i94mon'], df['i94port'], df['i94addr'], df['arrdate'], df['depdate'], df['i94mode'], df['visatype'])
    
    fact_df_out = fact_df.withColumnRenamed('cicid','record_id')\
    .withColumnRenamed('i94yr', 'year')\
    .withColumnRenamed('i94mon', 'month')\
    .withColumnRenamed('i94port', 'city_code')\
    .withColumnRenamed('i94addr', 'state_code')\
    .withColumnRenamed('arrdate', 'arrive_date')\
    .withColumnRenamed('depdate', 'departure_date')\
    .withColumnRenamed('i94mode', 'mode')\
    .withColumnRenamed('visatype', 'visatype')
    return fact_df_out

In [20]:
immigration_fact = create_immigration_fact(df_spark)

In [21]:
immigration_fact.show(5)

+---------+------+-----+---------+----------+-----------+--------------+----+--------+
|record_id|  year|month|city_code|state_code|arrive_date|departure_date|mode|visatype|
+---------+------+-----+---------+----------+-----------+--------------+----+--------+
|      6.0|2016.0|  4.0|      XXX|      null|    20573.0|          null|null|      B2|
|      7.0|2016.0|  4.0|      ATL|        AL|    20551.0|          null| 1.0|      F1|
|     15.0|2016.0|  4.0|      WAS|        MI|    20545.0|       20691.0| 1.0|      B2|
|     16.0|2016.0|  4.0|      NYC|        MA|    20545.0|       20567.0| 1.0|      B2|
|     17.0|2016.0|  4.0|      NYC|        MA|    20545.0|       20567.0| 1.0|      B2|
+---------+------+-----+---------+----------+-----------+--------------+----+--------+
only showing top 5 rows



#### 4.2 Data Quality Checks
Steps to do the data quality check

- Check the data schema of each table if that matches with the data model
- Check if the tables are loaded properly. Confirm there are no empty tables

In [22]:
def data_quality_count_check(df,dfname):
    
    count = df.count()
    print(count)
    if count == 0:
        print(f"Count check failed on....: {dfname}")
    else:
        print(f"Count check successfull on ....: {dfname}")
            
def data_quality_pk_check(df,pk,dfname):
    
    nullcount = df.filter((df[pk] == "") | df[pk].isNull() | isnan(df[pk])).count()
    if count == 0:
        print(f"PK check success on....: {dfname}")
    else:
        print(f"PK check success on ....: {dfname}")
        
df_list = [[arrival_dm,'arrdate'],[df_USTemperature_dim,'City'],[demographics_dim,'City'],[immigration_fact,'record_id']]
for frames in df_list:
    frame = frames[0]
    pk = frames[1]
    dfname =[x for x in globals() if globals()[x] is frame][0]
    data_quality_count_check(frame,dfname)
    data_quality_pk_check(frame,pk,dfname)

30
Count check successfull on ....: arrival_dm
PK check success on ....: arrival_dm
2976
Count check successfull on ....: df_USTemperature_dim
PK check success on ....: df_USTemperature_dim
2891
Count check successfull on ....: demographics_dim
PK check success on ....: demographics_dim
3096313
Count check successfull on ....: immigration_fact
PK check success on ....: immigration_fact


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
##### Tools and Technologies
- Pandas for sample data set analysis. 
    + It is easier to load and explore small dataset 
- PySpark for large data set data processing.
    + Spark was used because it can scale up, so it will handle the growth of data. 
    + It can handle multiple file  formats and also has easy-to-use functions for operating on large datasets
    
##### Data Update Frequency
Raw data set is available on monthly basis. So need to be updated monthly

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 We may use aws S3 to store the files and tables combined with pyspark 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 will use airflow to manage schedule
 * The database needed to be accessed by 100+ people.
 Can use redshift cluster for this

##### Data dictionary uploaded as separate file
