# I94 US Immigration Data Analysis
### Udacity Data Engineering Nanodegree Capstone Project

#### Project Summary
This project creates a data lake for US Immigration data in the year 2016. This data lake will be used for data analytics and BI purpose. Currently data is stored at S3. A data model with star schema will be designed and a ETL pipeline will be built to extract data from S3, clean up using Spark, and load back to S3 with Parquet format. And this processed data could be used to build SQL database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [30]:
# Do all imports and installs here
import pandas as pd
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, split
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import udf
from datetime import datetime, timedelta
from pyspark.sql.functions import sequence, to_date, explode, col

In [31]:
# AWS credentials from configure file
config = configparser.ConfigParser()
config.read('config.cfg')

['config.cfg']

In [32]:
KEY = config['AWS']['AWS_ACCESS_KEY_ID']
SECRET = config['AWS']['AWS_SECRET_ACCESS_KEY']

IMMI_DATA = config['S3']['IMMI_DATA']
DEMO_DATA = config['S3']['DEMO_DATA']
AIRPORT_DATA = config['S3']['AIRPORT_DATA']
COUNTRY_DATA = config['S3']['COUNTRY_DATA']
REGION_DATA = config['S3']['REGION_DATA']
OUTPUT_PATH = config['S3-OUTPUT']['OUTPUT_PATH']

In [33]:
KEY = 'AKIA557Q562U5OP4FMW7'
SECRET = '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'

In [6]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2,saurfang:spark-sas7bdat:3.0.0-s_2.12") \
        .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
        .config('spark.hadoop.fs.s3a.access.key', KEY) \
        .config('spark.hadoop.fs.s3a.secret.key', SECRET) \
        .getOrCreate()

### Step 1: Project Scope and Data Gathering

#### Scope 

This project will create a data lake and clean up immigration, airport and demographics data collected from various sources, and load cleaned data back to S3 for a star schema database. This database could be used for data analyst to explore the relation among cities of US, point of entry and immigrant from different countries.

Some of the examples this data model can solve would be:

* What's the relation between immigration and demographics in different state? Would point of entry determine the race composition of that state?

* What is the busiest week or month for immigration officer? at which airport?

* Which country contributes the most for visitors?  

The main tools used in this project are Python, Pandas, Pyspark, SQL, S3, Airflow and Redshift.

#### Data Intro


* [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office): This data comes from the US National Tourism and Trade Office.  

    * Description:  
    Detailed description can be found at [I94_SAS_Labels_Descriptions.SAS](./I94_SAS_Labels_Descriptions.SAS). This file contains descriptions for the I94 meta data. This data contains 12 files for 12 months immigration data in 2016.
    Each file with the size of 500Mb and roughly has 3 million rows.
    
    * Sample:
|         |       cicid |   i94yr |   i94mon |   i94cit |   i94res | i94port   |   arrdate |   i94mode | i94addr   |   depdate |   i94bir |   i94visa |   count |   dtadfile | visapost   |   occup | entdepa   | entdepd   |   entdepu | matflag   |   biryear |   dtaddto | gender   |   insnum | airline   |      admnum |   fltno | visatype   |
|--------:|------------:|--------:|---------:|---------:|---------:|:----------|----------:|----------:|:----------|----------:|---------:|----------:|--------:|-----------:|:-----------|--------:|:----------|:----------|----------:|:----------|----------:|----------:|:---------|---------:|:----------|------------:|--------:|:-----------|
| 1397736 | 2.86358e+06 |    2016 |        4 |      689 |      689 | MIA       |     20559 |         1 | FL        |     20569 |       10 |         2 |       1 |   20160415 | SPL        |     nan | G         | O         |       nan | M         |      2006 |  10142016 | F        |      nan | JJ        | 9.36397e+10 |   08094 | B2         |
|  997880 | 2.03818e+06 |    2016 |        4 |      245 |      245 | SFR       |     20555 |         1 | CA        |     20570 |       50 |         2 |       1 |   20160411 | SHG        |     nan | G         | O         |       nan | M         |      1966 |  10102016 | F        |      nan | MU        | 9.32671e+10 |   00589 | B2         |


* [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
    * Description:
    This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.
    * Sample:
    |      | City         | State      |   Median Age |   Male Population |   Female Population |   Total Population |   Number of Veterans |   Foreign-born |   Average Household Size | State Code   | Race               |   Count |
|-----:|:-------------|:-----------|-------------:|------------------:|--------------------:|-------------------:|---------------------:|---------------:|-------------------------:|:-------------|:-------------------|--------:|
|  629 | Arden-Arcade | California |         41.5 |             47596 |               48680 |              96276 |                 6511 |          13458 |                     2.18 | CA           | White              |   69369 |
| 1438 | Edmond       | Oklahoma   |         32.5 |             45191 |               44899 |              90090 |                 5006 |           5585 |  
    


* [Airport Code Table](https://datahub.io/core/airport-codes#data) 

    * Description:
    This is a simple table of airport codes and corresponding cities. 
    * Sample
    |       | ident   | type          | name                   |   elevation_ft |   continent | iso_country   | iso_region   | municipality   | gps_code   |   iata_code | local_code   | coordinates                           |
|------:|:--------|:--------------|:-----------------------|---------------:|------------:|:--------------|:-------------|:---------------|:-----------|------------:|:-------------|:--------------------------------------|
|  1613 | 16Z     | seaplane_base | Mc Grath Seaplane Base |            325 |         nan | US            | US-AK        | Mcgrath        | 16Z        |         nan | 16Z          | -155.593002319, 62.9580001831         |
| 10090 | 9NC7    | small_airport | Willow Creek Airport   |            572 |         nan | US            | US-NC        | Mt Pleasant    | 9NC7       |         nan | 9NC7         | -80.44000244140625, 35.36970138549805 |

* [ISO Country Code](https://datahub.io/core/country-list)

    * Description:
    ISO 3166-1-alpha-2 English country names and code elements. This list states the country names (official short names in English) in alphabetical order as given in ISO 3166-1 and the corresponding ISO 3166-1-alpha-2 code elements. [ISO 3166-1].
    * Sample
    
    |     | Name                                         | Code   |
|----:|:---------------------------------------------|:-------|
|   0 | Afghanistan                                  | AF     |
|   1 | Åland Islands                                | AX     |


### Step 2: Explore and Assess the Data
#### 2.1 Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

* Airport Data

In [7]:
## read airport data from s3
airport_data = spark.read.format('csv').load(AIRPORT_DATA,header=True)

21/11/24 05:16:45 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [86]:
## change spark df to pandas df
ap_df = airport_data.toPandas()

                                                                                

In [133]:
ap_df.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [89]:
## null value count in each column
ap_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent           0
iso_country         0
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [91]:
## check if there is duplicated rows
ap_df.duplicated().sum()

0

In [94]:
len(ap_df)

55075

We can see that there are total 55075 rows in the airports data, column `gps_code`,`iata_code` and `local_code` have a lot missing values, while `elevation_ft` and `municipality` also has a few missing values. Generally there is no duplicates rows in the data. Besides, the `coordinates` columns contains latitude and longitude information,this could be diveded into 2 columns.

* Demographics Data

In [117]:
## read airport data from s3
demo_data = spark.read.format('csv').load(DEMO_DATA,header=True,sep=';')

## change spark df to pandas df
demo_df = demo_data.toPandas()

In [134]:
demo_df.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723


In [132]:
print(f'There are total {len(demo_df)} rows in demographacis data frame.\n')

## null value count in each column
print('NULL Value counts:\n \n',demo_df.isnull().sum())

## check if there is duplicated rows
print('\nDuplicates count: ', demo_df.duplicated().sum())

There are total 2891 rows in demographacis data frame.

NULL Value counts:
 
 City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

Duplicates count:  0


We can see that there are total 2891 rows in the demographics data, column `Number of Veterans`,`Foreign-born` and `Average Household Size` have a few missing values. Generally there is no duplicates rows in the data.

* Immigration Data

Since the immigration data is really large, this sample will on read one month data to explore.

In [166]:
## read airport data from s3
immi_data = spark.read.format("com.github.saurfang.sas.spark").load(IMMI_DATA+'/i94_jun16_sub.sas7bdat')

In [8]:
print(f'There are total {immi_data.count()} rows in the data of June.') 



There are total 3574989 rows in the data of June.


                                                                                

In [138]:
immi_data.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  59.0|    2.0|  1.0|     1.0|        0.0|    

In [140]:
## check null values
immi_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in immi_data.columns]).show()



+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|   520|     0|      0|      0|  61187| 186064| 287071|   639|      0|    0|       0|          0|          0|         0|      

                                                                                

In [143]:
## check duplicates
if immi_data.count() > immi_data.dropDuplicates().count():
    raise ValueError('Data has duplicates')

                                                                                

We can see that there are total 3574989 rows in the Immigration data of June, quite a few columns have missing values. Generally there is no duplicates rows in the data.

#### 2.2 Data Cleaning

Clean and transform data to make sure the data is ready for downstream extract and save.

* Airport Data

In [17]:
## read airport data from s3
airport_data = spark.read.format('csv').load(AIRPORT_DATA,header=True)

In [8]:
airport_data.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [7]:
airport_data = airport_data.na.fill({'municipality': 'NA'})

* Country Data

In [13]:
## read country data from s3
country_data = spark.read.format('csv').load(COUNTRY_DATA,header=True)

In [23]:
country_data.show(5)

+--------------+----+
|          Name|Code|
+--------------+----+
|   Afghanistan|  AF|
| Åland Islands|  AX|
|       Albania|  AL|
|       Algeria|  DZ|
|American Samoa|  AS|
+--------------+----+
only showing top 5 rows



* Region Data

In [17]:
## read region data from s3
region_data = spark.read.format('csv').load(REGION_DATA,header=True)

In [85]:
region_data.show(5)

+------------+----------------+-----+----------+
|country_code|subdivision_name| code|state_code|
+------------+----------------+-----+----------+
|          US|         Alabama|US-AL|        AL|
|          US|          Alaska|US-AK|        AK|
|          US|         Arizona|US-AZ|        AZ|
|          US|        Arkansas|US-AR|        AR|
|          US|      California|US-CA|        CA|
+------------+----------------+-----+----------+
only showing top 5 rows



* Demographics Data

In [22]:
## read demographics data from s3
demo_data = spark.read.format('csv').load(DEMO_DATA,header=True,sep=';')

In [23]:
demo_data = demo_data.na.fill({\
                              'City':'NA', \
                              'State':'NA', \
                              'Median Age':0.0, \
                              'Male Population':0.0, \
                              'Female Population':0.0, \
                              'Total Population':0.0, \
                              'Number of Veterans':0.0, \
                              'Foreign-born':0.0, \
                              'Average Household Size':0.0, \
                              'State Code':'NA', \
                              'Race':'NA', \
                              'Count':0.0})

In [84]:
demo_data.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

* Immigration Data

In [125]:
## fill null with NA
immi_data = immi_data.na.fill('NA')

In [124]:
immi_data.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|     NA|   null|  59.0|    2.0|  1.0|     1.0|        0.0|    

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The main focus of this project is the immigration and demographics information. It would be natural to build a snowflake schema data model. The immigration and demographics table as the fact tables, and with dimensional tables like time, city, country, and airport.

![](schema_diagram.png)

#### 3.2 Mapping Out Data Pipelines

* Read in configuration settings (dl.cfg);
* Using Spark df to read in raw data from S3, then clean and transformed data, finally select the coresponding data for each fact and dimensional table, and save table data onto S3 for future use.
* Finally, data quality checks are run for each table to validate the output (key columns don't have nulls, record of each table is not 0,etc).

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### airport table

In [8]:
airport_data.filter(col('iso_country')=='US').createOrReplaceTempView("staging_airport_table")
airport_table = spark.sql("""SELECT
                                ident        AS airport_id,
                                name         AS airport_name,
                                type         AS airport_type,
                                iso_region   AS region,
                                municipality AS municipality,
                                CAST(SPLIT(coordinates,',')[0] AS double)  AS coordinate_x,
                                CAST(SPLIT(coordinates,',')[1] AS double)  AS coordinate_y
                         FROM staging_airport_table ORDER BY region
                        """).dropDuplicates()

In [9]:
airport_table.show(5)



+----------+--------------------+-------------+------+------------+------------------+------------------+
|airport_id|        airport_name| airport_type|region|municipality|      coordinate_x|      coordinate_y|
+----------+--------------------+-------------+------+------------+------------------+------------------+
|      01AR|Community Hospita...|     heliport| US-AR|    De Queen|-94.35489654541016|34.047298431396484|
|      01WT|    Odyssey Heliport|     heliport| US-WA|      Renton|       -122.210908|         47.518178|
|      02NJ|     Penske Heliport|     heliport| US-NJ|  Piscataway|-74.46710205078125| 40.55730056762695|
|      07CT|        Tnt Heliport|       closed| US-CT|       Salem|-72.24960327148438| 41.45289993286133|
|      07FA|Ocean Reef Club A...|small_airport| US-FL|   Key Largo|  -80.274803161621|   25.325399398804|
+----------+--------------------+-------------+------+------------+------------------+------------------+
only showing top 5 rows



                                                                                

In [11]:
## write table to parquet format

airport_table.write.mode('overwrite').partitionBy('region').parquet(OUTPUT_PATH + 'airport.parquet')

                                                                                

#### Country table

In [14]:
country_data.createOrReplaceTempView("staging_country_table")
country_table = spark.sql("""SELECT
                                Name        AS country_name,
                                Code         AS country_code
                         FROM staging_country_table ORDER BY country_name
                        """).dropDuplicates()

In [15]:
country_table.show(5)

+--------------------+------------+
|        country_name|country_code|
+--------------------+------------+
|          Antarctica|          AQ|
|              Sweden|          SE|
|           Indonesia|          ID|
|United Arab Emirates|          AE|
|          Azerbaijan|          AZ|
+--------------------+------------+
only showing top 5 rows



In [16]:
## write table to parquet format

country_table.write.mode('overwrite').parquet(OUTPUT_PATH + 'country.parquet')

                                                                                

#### Region Table

In [18]:
region_data = region_data.filter(col('country_code')=='US').withColumn('state_code',split(col('code'),'-')[1])

In [19]:
region_data.createOrReplaceTempView("staging_region_table")
region_table = spark.sql("""SELECT
                                subdivision_name        AS state_name,
                                state_code              AS state_code,
                                country_code            AS country_code,
                                code                    AS country_state
                         FROM staging_region_table ORDER BY state_name
                        """).dropDuplicates()

In [20]:
region_table.show(5)

+-------------+----------+------------+-------------+
|   state_name|state_code|country_code|country_state|
+-------------+----------+------------+-------------+
|      Montana|        MT|          US|        US-MT|
|West Virginia|        WV|          US|        US-WV|
|       Alaska|        AK|          US|        US-AK|
| Pennsylvania|        PA|          US|        US-PA|
|       Nevada|        NV|          US|        US-NV|
+-------------+----------+------------+-------------+
only showing top 5 rows



In [21]:
## write table to parquet format

region_table.write.mode('overwrite').parquet(OUTPUT_PATH + 'region.parquet')

                                                                                

#### Demographics Table

In [24]:
demo_data.createOrReplaceTempView("staging_demo_table")
demo_table = spark.sql("""SELECT
                                City            AS city_name,
                                `State Code`    AS state_code,
                                `Median Age`    AS median_age,
                                `Male Population` AS male_population,
                                `Female Population` AS female_population,
                                `Total Population`  AS total_population,
                                `Foreign-born`      AS foreign_born,
                                `Average Household Size` AS avg_household_size,
                                `Race`          AS race,
                                `Count`         AS race_population
                         FROM staging_demo_table
                        """).dropDuplicates()

In [25]:
demo_table.show(5)

+----------+----------+----------+---------------+-----------------+----------------+------------+------------------+--------------------+---------------+
| city_name|state_code|median_age|male_population|female_population|total_population|foreign_born|avg_household_size|                race|race_population|
+----------+----------+----------+---------------+-----------------+----------------+------------+------------------+--------------------+---------------+
|  O'Fallon|        MO|      36.0|          41762|            43270|           85032|        3269|              2.77|  Hispanic or Latino|           2583|
|   Buffalo|        NY|      33.1|         124537|           133529|          258066|       24630|              2.27|  Hispanic or Latino|          29656|
|Costa Mesa|        CA|      34.8|          59097|            54089|          113186|       26645|              2.59|               Asian|           9165|
|  Columbia|        MO|      26.8|          56544|            62554|  

In [26]:
## write table to parquet format

demo_table.write.mode('overwrite').partitionBy('state_code').parquet(OUTPUT_PATH + 'demographics.parquet')

                                                                                

#### Immigration Table

In [34]:
## read airport data from s3
immi_data = spark.read.format("com.github.saurfang.sas.spark").load(IMMI_DATA)

In [35]:
immi_data = immi_data.na.fill({'depdate':0})

In [36]:
convertTimeUDF = udf(lambda z: convert_datetime(z),DateType())

In [37]:
convertTimeUDF = udf(lambda z: datetime(1960,1,1)+timedelta(days=z),DateType()) 

In [38]:
immi_data = immi_data.withColumn('arrdate',convertTimeUDF(col('arrdate')))

In [39]:
immi_data = immi_data.withColumn('depdate',convertTimeUDF(col('depdate')))

In [40]:
immi_data.createOrReplaceTempView("staging_immi_table")
immi_table = spark.sql("""SELECT
                                cicid            AS immi_id,
                                i94res           AS residency,
                                i94port          AS entry_port,
                                arrdate          AS arrival_date,
                                i94mode          AS transportation,
                                i94addr          AS arrival_state,
                                depdate          AS departure_date,
                                i94bir           AS age,
                                i94visa          AS travel_purpose,
                                biryear          AS birth_year,
                                gender           AS gender,
                                airline          AS airline,
                                fltno            AS flight_number,
                                visatype         AS visa_type
                         FROM staging_immi_table
                        """).dropDuplicates()

21/11/24 05:32:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [41]:
immi_table.show(5)

[Stage 33:>                                                         (0 + 1) / 1]

+-------+---------+----------+------------+--------------+-------------+--------------+----+--------------+----------+------+-------+-------------+---------+
|immi_id|residency|entry_port|arrival_date|transportation|arrival_state|departure_date| age|travel_purpose|birth_year|gender|airline|flight_number|visa_type|
+-------+---------+----------+------------+--------------+-------------+--------------+----+--------------+----------+------+-------+-------------+---------+
|  158.0|    103.0|       NEW|  2016-06-01|           1.0|           NV|    2016-06-05|38.0|           2.0|    1978.0|     M|     LX|        00018|       WT|
|  293.0|    103.0|       LOS|  2016-06-01|           1.0|           CA|    2016-06-21|68.0|           2.0|    1948.0|     M|     AA|        00109|       WT|
|  424.0|    103.0|       SFR|  2016-06-01|           1.0|           CA|    2016-06-15|68.0|           2.0|    1948.0|     F|     UA|        00059|       WT|
|  601.0|    104.0|       ATL|  2016-06-01|         

                                                                                

#### Time Table

In [42]:
date_data = spark.sql("SELECT sequence(to_date('2015-12-01'), to_date('2022-01-01'), interval 1 day) as date").withColumn("date", explode(col("date")))

In [43]:
date_data.show(5)

+----------+
|      date|
+----------+
|2015-12-01|
|2015-12-02|
|2015-12-03|
|2015-12-04|
|2015-12-05|
+----------+
only showing top 5 rows



In [44]:
date_data.createOrReplaceTempView("staging_date_table")
date_table = spark.sql("""SELECT           
                                 date              AS date,
                                 day(date)         AS day,
                                 weekofyear(date)  AS week,
                                 month(date)       AS month,
                                 year(date)        AS year,
                                 dayofweek(date)   AS weekday
                          FROM staging_date_table
                        """)

In [45]:
date_table.sample(0.05).show(5)

+----------+---+----+-----+----+-------+
|      date|day|week|month|year|weekday|
+----------+---+----+-----+----+-------+
|2015-12-05|  5|  49|   12|2015|      7|
|2016-01-25| 25|   4|    1|2016|      2|
|2016-02-17| 17|   7|    2|2016|      4|
|2016-04-08|  8|  14|    4|2016|      6|
|2016-05-01|  1|  17|    5|2016|      1|
+----------+---+----+-----+----+-------+
only showing top 5 rows



In [46]:
## write table to parquet format
date_table.write.mode('overwrite').parquet(OUTPUT_PATH + 'date.parquet')

                                                                                

#### 4.2 Data Quality Checks

Data quality checks ensure the pipeline ran as expected. Many of the Quality Checks were perfomed earlier to replace missing values, removing duplicates etc. Another important check is to make sure the primary key of the table is not null and unique before they are written back to S3.

In [362]:
def data_null_checks(table,column):

    table.createOrReplaceTempView("temp")
    

    null_count = table.filter(col(column).isNull()).count()
    exp_result = 0

    if exp_result != null_count:
        print(f'There are null values in the table!')
    else:
        print('Data Quality checks passed')

In [363]:
def data_unique_checks(table,column):


    unique_count = table.select(column).distinct().count()
    row_count = table.count()

    if unique_count != row_count:
        print(f'There are duplicated values in the table')
    else:
        print('Data Quality checks passed')

In [365]:
data_null_checks(immi_table,'immi_id')



Data Quality checks passed


                                                                                

In [366]:
data_unique_checks(immi_table,'immi_id')



Data Quality checks passed


                                                                                

#### 4.3 ETL process result sample

The data lake created could be used for analyzing immigration patterns. For example:

*Which week has the most visitors in June?*

In [48]:
immi_table.join(date_table,immi_table.arrival_date == date_table.date,'left').groupby(date_table.week).count().show()

21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
21/11/24 05:42:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+----+------+
|week| count|
+----+------+
|  26|520705|
|  22|596482|
|  23|797060|
|  25|859081|
|  24|801661|
+----+------+



                                                                                

We can see that in June,2016, there 5 weeks in this month, and it starts from week 22, and ends at week 26. Week 22 and 26 has relatively low number of visitors, while Week 23,24,and 25 have high volume of visitors, with week 25 top at ~859k visitor.

#### 4.4 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Airport Table**

|    field_name          |example    | field_type                   | description               |
|:-------------|:-----------------------|:-------------------|:-----------------------------|
| airport_id   | 01NM                   | varchar               | identifier                         |
| airport_name | Champion Ranch Airport | varchar    | name of the airport |
| airport_type | small_airport          | varchar      | type of the airport                    |
| region       | US-NM                  | varchar              | iso region code                        |
| municipality  | Artesia                | varchar       | municipality of the airport                      |
| coordinate_x | -104.540278            | double | latitude|
| coordinate_y | 33.008611              | double | longitude          |


**Country Table**

|    field_name          |example    | field_type                   | description               |
|:-------------|:-----------------------|:---------------------------|:--------|
| country_name | Cayman Islands | varchar | name of country |
| country_code | KY             | varchar     | iso country alpha-2 code   |

**Region Table**

|    field_name          |example    | field_type                   | description               |
|:-------------|:-----------------------|:---------------------------|:--------|
| state_name    | Iowa  | varchar | name of US states |
| state_code    | IA    | varchar     | code of the state         |
| country_code  | US    | varchar     | iso country code         |
| country_state | US-IA | varchar  | iso region code      |


**Demographics Table**

|    field_name          |example    | field_type                   | description               |
|:-------------|:-----------------------|:---------------------------|:--------|
| city_name          | Brownsville | varchar                  | city name |
| state_code         | TX          | varchar                        | code of the state where city is         |
| median_age         | 30.6        | double                      | median age of the city       |
| male_population    | 87689       | int                    | male population of the city      |
| female_population  | 96199       | int                    | female population of the city      |
| total_population   | 183888      | int                    | total population of the city     |
| foreign_born       | 53301       | int                     | foreign born population of the city       |
| avg_household_size | 3.48        | double                      | average household size of the city       |
| race               | Asian       | varchar                  | races      |
| race_population    | 1589        | int                    | population of the race       |


**Immigration Table**

|    field_name          |example    | field_type                   | description               |
|:-------------|:-----------------------|:---------------------------|:--------|
| immi_id        | 4554704  | int  | identifier  |
| residency      | 112      | int      | country code in SAS description file      |
| entry_port     | SEA        | varchar        | entry port code in SAS description file        |
| arrival_date   | 2016-06-23 | datetime | arrival date |
| transportation | 1.0        | int        | means of transportation to US code in the SAS description file         |
| arrival_state  | OR         | varchar         | state code of US         |
| departure_date | 2016-06-30 | datetime | departure date |
| age            | 15       | int       | age of the visitor       |
| travel_purpose  | 2.0        | int        | travel purpose code in the SAS description file        |
| birth_year     | 2001     | int     | birth year of the visitor     |
| gender         | F          | char          | gender of the visitor          |
| airline        | DL         | varchar         | airline taken by visitor         |
| flight_number  | 00145      | varchar      | flight number of the flight      |
| visa_type      | WT         | varchar         | visa type of the visitor         |

**Date Table**

|    field_name          |example    | field_type                   | description               |
|:-------------|:-----------------------|:---------------------------|:--------|
| date    | 2019-05-02 | datetime | date |
| day     | 2          | int         |     day     |
| week    | 18         | int         | which week the day falls in the year          |
| month   | 5          | int          | month          |
| year    | 2019       | int       | year       |
| weekday | 5          | int          | which day is the day in a week          |

### Step 5: Project Write Up

#### 5.1 Technology Choice

This project is designed to build a data lake for Analytics and BI teams to research the immigration pattern, country of origin, airlines and its relation with different state's demographics. Creating data lake on S3 seemed appropriate choice, it offers flexibility, efficiency and cost effectiveness. S3 data lake offers a convenient way for data scientist to use the data directly, or as a staging area for data engineers to build dimensional tables for REDSHIFT OR RDS.

Python, Spark framework and Pyspark are appropriate choice for implementation because of the library support. There could be millions of data records for the immigration data every month, to handle this amount of data it is proper to use distributed system like Spark. Besides, with the data growing each month, the need for a distributed system becomes more obvious.

ETL pipeline is scripted with etl.py, it can real immigration SAS files from S3 using spark session and after clean and transform, join with several other data sources, then the results are writen back to S3 in parquet format.



#### 5.2 Data Update

The I94 immigration data could be updated daily, weekly, or monthly, depending on the analytics or BI purpose. If it is research analysis, probably monthly or yearly would be enough. If it is for BI monitoring purpose, it might need to be update daily.

However, other data is relatively more static (airport table, demographics table), they can be updated yearly or even serval year a time.

#### 5.3 Different Scenarios Discussion:

* The data was increased by 100x.

Right now there are around 6 gigabytes of data, and if the data increased by 100x, there would be 600Gb data. It could still be handled with distributed Spark system in EMR. If speed is the concern, the nodes or clusters could be increased easily with EMR.

* The data populates a dashboard that must be updated on a daily basis by 7am every day.

Apache Airflow can be a proper solution for schedueled scripts runs.

* The database needed to be accessed by 100+ people.

Build the database in Redshift could help to provide access for hundreds of people to access the database at the same.
It says that Redshift can handle 500 concurrent connections to a Redshift cluster and maximum 15 queries can be run at the same time in a cluster. If more concurrent queries are needed, redshift provides Concurrency Scaling feature, where you can support virtually unlimited concurrent users and concurrent queries, with consistently fast query performance.