# Analytics on Tourists visiting US
### Data Engineering Capstone Project

#### Project Summary
An end-to-end Data Pipeline to ingest, transform and load the data related for doing analytics on US immigration.

The project follows the following steps:
* Step 1: Scope of the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and  Gather Data

#### Scope 

The scope of this project is to enable the Data Analysts to do analytics on US immigration. This project focuses on ingesting, cleaning, transforming and storing data related to US immigration, so that the data analysts can derive intelligensce from that data. Also, data quality checks on the data will be performed.

US I94 Immigrartion data and US Demographics data will be used for this purpose along with some lookup data like ISO cuntry code data. The final data will be cleaned I94 data and aggregated US demographics data where the both tables can be joined by the US state names.

Apache Spark will be used for cleaning and transforming purposes. The final data will be stored in partitioned Parquet format.

Data Analysts can use the output of this project to get answers for some questions like:
1. Based on Age group which state they prefer
2. Age Group and Visa Type and Origin country
3. Which state receive more people
4. Is the total number of airports in a state affect immigration in that state
etc.

#### Data 
Below are the list of datasets used: 
##### 1. I94 Immigration data 2016:
Source: https://travel.trade.gov/research/reports/i94/historical/2016.html <br />
Format: SAS format <br />
This data comes from the US National Tourism and Trade Office. Each report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).

##### 2. US Cities Demographics:
Source: https://www.ip2location.com/free/iso3166-2 <br />
Format: CSV Text format (separator is ";") <br />
This data comes from the US Census Bureau's 2015 American Community Survey. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data is impored from OpenSoft. 

##### 3. ISO 3166-2 Subdivision Codes:
Source: https://www.ip2location.com/free/iso3166-2 <br />
Format: CSV Text format (separator is ";") <br />
IP2Location™ ISO 3166-2 Subdivision Code is a free data offered for your download. This data contains the ISO3166-2 code for the states/regions used in our geolocation database. You can easily retrieve the ISO3166-2 code by mapping the country code and subdivision name.

##### 4. US Airport codes:
Source: https://datahub.io/core/airport-codes <br />
Format: CSV Text format (separator is ",") <br />
This is a simple table of airport codes and corresponding cities.

In [113]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,when,length
from pyspark.sql.types import StringType


spark = SparkSession\
    .builder \
    .appName("DEND - US Immigration Analysis") \
    .getOrCreate()

In [36]:
# READ ALL DATASETS

df_i94  = spark.read.parquet('data/i94_2016_data_parquet/')
df_demo = spark.read.csv('data/us_cities_demographics.csv', sep=";", header=True)
df_loc  = spark.read.csv('data/IP2LOCATION_ISO3166.csv', sep=",", header=True)
df_air  = spark.read.csv('data/airport_codes_csv.csv', sep=",", header=True)

df_port  = spark.read.csv('data/i94port_codes.csv', sep=",", header=True)
df_country = spark.read.csv('data/i94cntyl_codes.csv', sep=",", header=True)

---

### Step 2: Explore and Assess the Data

#### DATASET 1:   I94 Immigration Dataset

In [118]:
# --- TRASFORMATIONS ---

# Select only the necessary columns from original data
df_i94_select = df_i94.select("i94yr", "i94mon", "i94port", "i94addr", "i94visa", "i94res", "biryear", "gender") 

# i94port -> US Port Code. Replace with Port name
# i94res -> Origin country code. Replace with country name
# i94visa -> Visa codes collapsed into three categories [1 = Business, 2 = Pleasure, 3 = Student]
   
df_i94_transform = df_i94_select \
                    .join(df_port, df_i94.i94port == df_port.i94port, "left") \
                    .join(df_country, df_i94.i94res == df_country.i94res, "left") \
                    .withColumn("visa_type", when(df_i94_select.i94visa == 1.0, 'Business') \
                                            .when(df_i94_select.i94visa == 2.0, 'Pleasure') \
                                            .otherwise('Student')) \
                    .drop(df_i94_select.i94visa) \
                    .drop(df_i94_select.i94res) \
                    .drop(df_i94_select.i94port) \
                    .withColumnRenamed("description", "port_name") \
                    .withColumnRenamed("country", "origin_country") 

# --- CLEANING ---

# 1. Some i94addr are NULL even though i94port is Not NULL

print("BEFORE ClEANING: ")
df_i94_transform \
    .filter("i94addr is null") \
    .show(10)

# Replace i94addr null values using port_name column
def get_state(i94addr, port_name):
    if i94addr is not None:
        return i94addr
    else:
        split = port_name.split(',')
        if split[1] is not None: return split[1].strip()
        else: return  split[0].strip()

get_state = udf(get_state, StringType())

df_i94_transform = df_i94_transform \
                        .withColumn("dest_state", get_state(df_i94_transform.i94addr, df_i94_transform.port_name)) \
                        .drop(df_i94_select.i94addr)

print("ClEANING: After replacing null i94addr with dest_state: ")
df_i94_transform.filter("i94addr is null").show(10)


# 2. Keep only US states and remove all other non-US places

df_i94_transform = df_i94_transform \
                        .filter(length(df_i94_transform.dest_state) == 2)

print("ClEANING: After removing all non-US states: ")
df_i94_transform.show(10)

BEFORE ClEANING: 
+------+------+-------+-------+------+-------+--------------------+------+--------------+---------+
| i94yr|i94mon|i94addr|biryear|gender|i94port|           port_name|i94res|origin_country|visa_type|
+------+------+-------+-------+------+-------+--------------------+------+--------------+---------+
|2016.0|   4.0|   null| 1977.0|     M|    LOS|LOS ANGELES, CA  ...|   504|        PANAMA| Pleasure|
|2016.0|   4.0|   null| 1985.0|     F|    TOR|TORONTO, CANADA  ...|   249|          IRAN| Pleasure|
|2016.0|   4.0|   null| 1963.0|     M|    NYC|NEW YORK, NY     ...|   249|          IRAN| Pleasure|
|2016.0|   4.0|   null| 1945.0|  null|    NYC|NEW YORK, NY     ...|   251|        ISRAEL| Pleasure|
|2016.0|   4.0|   null| 1950.0|     M|    NAS|NASSAU, BAHAMAS  ...|   251|        ISRAEL| Pleasure|
|2016.0|   4.0|   null| 1980.0|     F|    ATL|ATLANTA, GA      ...|   251|        ISRAEL| Pleasure|
|2016.0|   4.0|   null| 1990.0|     F|    NEW|NEWARK/TETERBORO,...|   251|        

##### DATASET 2:   US Demographics Dataset

In [184]:
# --- TRASFORMATIONS ---

# Select only the necessary columns from original data
df_demo_select = df_demo \
                    .drop("State") \
                    .drop("Average Household Size") \
                    .drop("Number of Veterans") \
                    .withColumnRenamed("State Code", "state_code") \
                    .withColumn("median_age", df_demo["Median Age"].cast('float')) \
                    .withColumn("pop_male", df_demo["Male Population"].cast('int')) \
                    .withColumn("pop_female", df_demo["Female Population"].cast('int')) \
                    .withColumn("pop_total", df_demo["Total Population"].cast('int')) \
                    .withColumn("no_immigrants", df_demo["Foreign-born"].cast('int')) \
                    .withColumn("count", df_demo["Count"].cast('int')) \
                    .drop("Median Age") \
                    .drop("Male Population") \
                    .drop("Female Population") \
                    .drop("Total Population") \
                    .drop("Foreign-born")

# --- CLEANING ---

# 1. Aggregate the data by US State Name and Race

print("BEFORE AGGREGATION for Texas:")
df_demo_select.filter("state_code == 'TX'").show()

df_demo_transform = df_demo_select \
                            .drop("City") \
                            .groupBy("state_code", "Race") \
                            .agg({'median_age': 'mean',
                                  'pop_male':'sum', 
                                  'pop_female':'sum', 
                                  'pop_total':'sum', 
                                  'no_immigrants':'sum'}) \
                            .withColumnRenamed("avg(median_age)", "median_age") \
                            .withColumnRenamed("sum(pop_male)", "pop_male") \
                            .withColumnRenamed("sum(pop_female)", "pop_female") \
                            .withColumnRenamed("sum(pop_total)", "pop_total") \
                            .withColumnRenamed("sum(no_immigrants)", "no_immigrants")
        
print("AFTER AGGREGATION for Texas:")
df_demo_transform.filter("state_code == 'TX'").show()

BEFORE AGGREGATION for Texas:
+--------------+----------+--------------------+------+----------+--------+----------+---------+-------------+
|          City|state_code|                Race| count|median_age|pop_male|pop_female|pop_total|no_immigrants|
+--------------+----------+--------------------+------+----------+--------+----------+---------+-------------+
|        Laredo|        TX|American Indian a...|  1253|      28.8|  124305|    131484|   255789|        68427|
|  Flower Mound|        TX|  Hispanic or Latino|  6149|      40.2|   35200|     35824|    71024|         6860|
|Corpus Christi|        TX|               White|292663|      35.0|  160488|    163594|   324082|        30834|
|         Bryan|        TX|Black or African-...| 11914|      29.4|   41761|     40345|    82106|        12014|
|       Killeen|        TX|American Indian a...|  2362|      29.2|   69442|     71367|   140809|        15769|
|       El Paso|        TX|American Indian a...|  7359|      33.1|  332797|    348

##### DATASET 3:   ISO Subdivision Codes Dataset

In [187]:
# --- TRASFORMATION ---

# From this ISO lookup table filter only US state codes

df_loc \
    .filter("country_code = 'US'") \
    .show(10)

+------------+--------------------+-----+
|country_code|    subdivision_name| code|
+------------+--------------------+-----+
|          US|             Alabama|US-AL|
|          US|              Alaska|US-AK|
|          US|             Arizona|US-AZ|
|          US|            Arkansas|US-AR|
|          US|          California|US-CA|
|          US|            Colorado|US-CO|
|          US|         Connecticut|US-CT|
|          US|            Delaware|US-DE|
|          US|District of Columbia|US-DC|
|          US|             Florida|US-FL|
+------------+--------------------+-----+
only showing top 10 rows



##### DATASET 4:   US airport Code Dataset

In [199]:
# --- TRASFORMATION ---

# Select only the necessary columns from original data
df_air_select = df_air.select("type", "iso_country", "iso_region")

# Aggregare total number of airports in each US state
df_air_transform = df_air_select \
                        .filter("iso_country == 'US'") \
                        .filter("type like '%airport%'") \
                        .groupBy("iso_region") \
                        .count()

df_air_transform.show(10)

+----------+-----+
|iso_region|count|
+----------+-----+
|     US-TN|  228|
|     US-OK|  372|
|     US-VT|   66|
|     US-SD|  162|
|     US-WA|  382|
|     US-IN|  487|
|     US-AL|  198|
|     US-NY|  404|
|     US-MS|  212|
|     US-NC|  349|
+----------+-----+
only showing top 10 rows



---

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Entities:
- i94_table
- us_demo_table
- airport_count_table

The above 3 tables can be joined with the state_name key. With above entities, will answer the below questions for data analysts:
1. Based on Age group which state they prefer
2. Age Group and Visa Type and Origin country
3. Which state receive more people
4. Is the total number of airports in a state affect immigration in that state

#### 3.2 Mapping Out Data Pipelines

* Ingest all data
    * Ingest i94 dataset
    * Ingest US demographics dataset
    * Ingest ISO location code lookup dataset
    * Ingest Airport code lookup dataset
    

* Cleansing and Transformation
    * Select only the columns that are needed for further analysis
    * Use lookup tables to map codes with description
    
* Joining tables
    * Key value: state_name
    * Build the final data model by joining tables with key values to get denormalised form

---

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks

In [201]:
# Data Quality Check 1: Row Count is non-zero


# Data Quality Check 2: No Duplicate rows




#### 4.3 Data dictionary 

* i94_table:
    * i94yr -> 4 digit Year 
    * i94mon -> Numeric Month
    * biryear -> Birth year of immigrant
    * gender -> Gender ['M', 'F', Null]
    * i94port -> Arrival Port code
    * port_name -> Arrival Port name
    * origin_country -> Origin country of immigrant
    * visa_type -> VISA Type [Business, Pleasure, Student]
    * dest_state -> Destination State in US

* US_demographics_table
   * state_code -> State Code
   * Race -> Race of people [Black, Asian, White, Hispanic]
   * pop_male -> Total Male Population
   * pop_female -> Total Female Population
   * pop_total -> Total Population
   * no_immigrants -> Total number of Immigrants
   * median_age -> Median Age
   
   
* Airport_count_table
   * iso_region -> ISO US State code
   * count -> Total number of airports

---

#### Step 5: Project Write Up
* Rationale for the choice of tools and technologies for the project.

  - Computation Tool: Apache Spark
  - Data Storage: Currently the data resides in local system (single node). But as the data grows we can store the data to any distributed storage partitioned by year like Amazon S3 or HDFS for fast parallel processing using Apache Spark. 


* Propose how often the data should be updated and why.
  - Ideally each day the number of immigrants coming into US will be less. So it makes sense to update the Immigration data every week. Say, for example, every Sunday at midnight. 


* Approach for handling the following scenarios:
 * The data was increased by 100x.
    - Soln: Distributed storage systems like Amazon S3 or HDFS can handle more data by adding more nodes and storing the data in distributed manner.
    
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - Soln: Schedule the ETL job using daily using any scheduler like Apache Airflow or Apache Oozie daily at 6.30 AM so that the data get refreshed on the dashboard by 7 AM daily.
    
 * The database needed to be accessed by 100+ people.
   - Soln: Create an Access group with READ access for the database and add the members who require access to the database to this group.

---