# Non-immigrants' Statistics

### Data Engineering Capstone Project

#### Project Summary

The project "Non-immigrants' Statistics" collects data from different sources in order to determine the non-immigrants' trends in US states.    
The data would give the ability to analyze the trends around non-immigrants while showing the influx and efflux of induviduals within the American states.       

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
from pyspark.sql import SparkSession
#from pyspark.sql.functions import udf, desc, asc, sum
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DateType
import datetime
import configparser
import os

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['SECRET_ACCESS_KEY']
os.environ['S3_OUTPUT_DATA']=config['AWS']['SECRET_ACCESS_KEY']
output_data = config['S3']['OUTPUT_DATA']


In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .appName("Non-immigrants' Trends") \
        .getOrCreate()

### Step 1: Scope the Project and Gather Data

<!--#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc> -->


##### Problem Statement

In order to analyze the non-immigrants' trends, the bellow questions can be answered with the data:    
1. Where do most U.S. non-immigrants visit/live?
2. Who is arriving today? (By race/ethnicity/gender)
3. Where do non-immigrants come from?
4. How many people in the U.S. are non-immigrants? 

##### The choice of tools, technologies

1. Spark (on an AWS - EMR Cluster) - To extract and transform the collected data from the data sources and write to S3 bucket.
2. S3 bucket - To store the processed data model into parquet files of dimensions and fact tables
3. Airflow (Deployed on AWS - E2 with a docker instance) - To create and run the ETL/ELT pipeline
4. AWS Athena - To read processed data from S3 from dimensions and fact tables in parquet files for analysis

<!--#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? -->

##### Data Sources

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.    
- World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).    
- U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).    
- Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

##### Data Files

- airport-codes_csv.csv - Airport Code Table    
- I94_SAS_Labels_Descriptions.SAS - I94 Immigration Data (Labels Descriptions)    
- us-cities-demographics.csv - U.S. City Demographic Data    
- immigration_data_sample.csv - I94 Immigration Data (Data Sample with 1000 records)    
- sas_data - I94 Immigration Data (In Parquet files) (Over 3 million records)    




In [4]:
airport_codes_df=spark.read.csv("airport-codes_csv.csv", header='true', inferSchema='true')

In [5]:
us_cities_demographics_df=spark.read.option("delimiter", ';').csv("us-cities-demographics.csv", header='true', inferSchema='true')

In [6]:
immigration_sample_df=spark.read.csv("immigration_data_sample.csv", header='true', inferSchema='true')

In [7]:
immigration_df=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
<!-- #### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc. -->

#### Cleaning Steps
<!-- Document steps necessary to clean the data -->

 - The loaded data are from different formats, comma delimmited csv, semicolon delimitted csv and parquet file format    
 - The data is transformed to match the model fields e.g. extracting state information from another column
 - The data types to each field/column is updated match the required data types.
 - Null values are removed where required and unique values extracted where required


In [8]:
#Extract States from airport codes
airport_codes_df_ = airport_codes_df.withColumn('state_code', regexp_extract(col('iso_region'), '(US-)(\w+)', 2))
#Filter out rows with no state
airport_codes_df_ = airport_codes_df_.filter((airport_codes_df_['state_code'].isNotNull())&(airport_codes_df_['state_code']!=''))
airport_codes_df_.createOrReplaceTempView("airport_codes")
#Select columns for usage in model
airport_codes = spark.sql("""
        SELECT 
            DISTINCT(ident),
            type,
            name,
            continent,
            iso_country,
            iso_region,
            coordinates,
            state_code            
        FROM airport_codes
        """)

#Confirm no colums with Nulls or Empty String
for c in airport_codes.schema.names:
    print(c, ': ',airport_codes.filter((airport_codes[c].isNull())|(airport_codes[c]=='')).count())
    
airport_codes.printSchema()
print(airport_codes.count())    
airport_codes.show(2)

ident :  0
type :  0
name :  0
continent :  0
iso_country :  0
iso_region :  0
coordinates :  0
state_code :  0
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- state_code: string (nullable = true)

22757
+-----+-------------+-----------------+---------+-----------+----------+--------------------+----------+
|ident|         type|             name|continent|iso_country|iso_region|         coordinates|state_code|
+-----+-------------+-----------------+---------+-----------+----------+--------------------+----------+
| 04LA|     heliport|St James Heliport|       NA|         US|     US-LA|-90.702222, 30.05...|        LA|
| 05IA|small_airport|     Spotts Field|       NA|         US|     US-IA|-93.0682983398437...|        IA|
+-----+-------------+----------

In [11]:
#Select Columns for usage in model
us_cities_demographics_df_ = us_cities_demographics_df.select(col("City").alias("city"),
  col("State").alias("state"),
  col("Median Age").alias("med_age"),
  col("Total Population").alias("total_pop"),
  col("State Code").alias("state_code"),
  col("Race").alias("race")).select(['city','state','med_age','total_pop','state_code','race'])
us_cities_demographics_df_.createOrReplaceTempView("us_cities_demographics")
#Select columns for usage in model
us_cities_demographics = spark.sql("""
        SELECT 
            DISTINCT(city),
            state,
            CAST(med_age AS INT),
            CAST(total_pop AS INT),
            state_code,
            race
        FROM us_cities_demographics
        """)

for c in us_cities_demographics.schema.names:
    print(c, ': ',us_cities_demographics.filter((us_cities_demographics[c].isNull())|(us_cities_demographics[c]=='')).count())

us_cities_demographics.printSchema()
print(us_cities_demographics.count())
us_cities_demographics.show(2)

city :  0
state :  0
med_age :  0
total_pop :  0
state_code :  0
race :  0
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- med_age: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)

2891
+----------+--------+-------+---------+----------+-----+
|      city|   state|med_age|total_pop|state_code| race|
+----------+--------+-------+---------+----------+-----+
|     Flint|Michigan|     35|    98297|        MI|White|
|Fort Worth|   Texas|     32|   836969|        TX|White|
+----------+--------+-------+---------+----------+-----+
only showing top 2 rows



In [None]:
#Capture US Cities. As null values exist on 194addr which tags cities to states, we will capture all cities tagged to states and use the same details on records with missing values
notnull_cities_df = immigration_df.select(['i94port','i94addr']).filter(immigration_df['i94addr'].isNotNull()).groupBy("i94port", "i94addr").agg(count('i94port').alias('count'))
notnull_cities_df.createOrReplaceTempView("us_cities")
#Select columns for usage in model
us_cities = spark.sql("""
        SELECT 
            DISTINCT(i94port) as city_code,
            i94addr as state_code
        FROM us_cities
        """)

for c in us_cities.schema.names:
    print(c, ': ',us_cities.filter((us_cities[c].isNull())|(us_cities[c]=='')).count())

us_cities.printSchema()
print(us_cities.count())
us_cities.show(2)

In [None]:
#non_immigrant_df_ = immigration_sample_df.select(['cicid','i94yr','i94mon','i94cit','i94res','i94port','arrdate','i94visa','count','admnum','visatype'])
non_immigrant_df_ = immigration_sample_df.select(['admnum','i94bir','i94visa','dtadfile','visapost','occup','biryear','gender','insnum','visatype'])
non_immigrant_df_.createOrReplaceTempView("non_immigrant")
#Select columns for usage in model
non_immigrant = spark.sql("""
        SELECT 
            DISTINCT CAST(admnum AS BIGINT),
            CAST (i94bir AS INT),
            CAST (i94visa AS INT),
            CAST (dtadfile AS INT),
            visapost,
            occup,
            CAST (biryear AS INT),
            gender,
            insnum,
            visatype           
        FROM non_immigrant
        """)

#for c in non_immigrant.schema.names:
#    print(c, ': ',non_immigrant.filter((non_immigrant[c].isNull())|(non_immigrant[c]=='')).count())

non_immigrant.printSchema()
print(non_immigrant.count())
non_immigrant.show(5)

In [None]:
immigration_df_ = immigration_sample_df.select(['cicid','i94yr','i94mon','i94cit','i94res','i94port','arrdate','i94mode','i94addr','depdate','entdepa','entdepd','entdepu','matflag','dtaddto','airline','admnum','fltno'])

immigration_df_.createOrReplaceTempView("immigration")
#Select columns for usage in model

immigration = spark.sql("""
        SELECT 
            DISTINCT CAST(cicid AS INT),
            CAST(i94yr AS INT),
            CAST(i94mon AS INT),
            CAST (i94cit AS INT),
            CAST(i94res AS INT),
            i94port as city_code,
            CAST(arrdate AS INT),
            CAST(i94mode AS INT),
            i94addr as state_code,
            CAST(depdate AS INT),
            entdepa,
            entdepd,
            entdepu,
            matflag,
            CAST(dtaddto AS INT),
            airline,
            CAST(admnum AS BIGINT),
            fltno
        FROM immigration
        """)

immigration.printSchema()
print(immigration.count())
immigration.show(2)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
<!--Map out the conceptual data model and explain why you chose that model-->
##### Dimension Tables

    Table: airport_codes    
     - Columns: ident,type,name,continent,iso_country,iso_region,coordinates,state_code    

    Table: us_cities_demographics    
     - Columns: city,state,med_age,total_pop,state_code,race

    Table: us_cities    
     - Columns: city_code,state_code

    Table: non_immigrant    
     - Columns: admnum,i94bir,i94visa,dtadfile,visapost,occup,biryear,gender,insnum,visatype

##### Fact Tables

    Table: immigration    
     - Columns: cicid,i94yr,i94mon,i94cit,i94res,city_code,arrdate,i94mode,state_code,depdate,entdepa,entdepd,entdepu,matflag,dtaddto,airline,admnum,fltno

#### 3.2 Mapping Out Data Pipelines
<!--List the steps necessary to pipeline the data into the chosen data model-->

After the transformation of the data into the different dimensions and fact tables, the data would be written into s3 bucket in parquet files as follows:

 - Table: airport_codes  would be partitioned by state
 - Table: us_cities_demographics  would be partitioned by state
 - Table: us_cities would be partitioned by state 
 - Table: non_immigrant would be partitioned by visatype  
 - Table: immigration would be partitioned by state(i94addr), city(i94port) and the arrival date(arrdate)        

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
<!--Build the data pipelines to create the data model.-->

In [None]:
airport_codes.write.partitionBy("state_code").parquet(output_data+"airport_codes.parquet",mode="overwrite")

In [None]:
us_cities_demographics.write.partitionBy("state_code").parquet(output_data+"us_cities_demographics.parquet",mode="overwrite")

In [None]:
us_cities.write.partitionBy("state_code").parquet(output_data+"us_cities.parquet",mode="overwrite")

In [None]:
non_immigrant.write.partitionBy("visatype").parquet(output_data+"non_immigrant.parquet",mode="overwrite")

In [None]:
immigration.write.partitionBy("state_code","city_code","arrdate").parquet(output_data+"immigration.parquet",mode="overwrite")

#### 4.2 Data Quality Checks
<!--
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

Run Quality Checks
 -->
 1. Loading each model file to spark and confirm data exists
 2. Checking of change in size in the parquet files on s3 bucket

In [None]:
#Below is an example data quality check on checking if a file exists

import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket(output_data.split('/')[2])
key = 'immigration.parquet'
objs = list(bucket.objects.filter(Prefix=key))
if len(objs) > 0 and objs[0].key == key:
    print("Exists!")
else:
    print("Doesn't exist")

In [None]:
#Below is an example data quality check on loading a model file to spart and confirming if data exists

checking_immigration =spark.read.parquet(output_data+"immigration.parquet")

checking_immigration.createOrReplaceTempView("checking_immigration")
#Select columns for usage in model

checking_immigration = spark.sql("""
        SELECT 
           state_code
        FROM checking_immigration LIMIT 1
        """)
print(checking_immigration.count())

#### 4.3 Data dictionary 
<!--Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.-->
[Data Dictionary](data_dictionary.csv)

#### Step 5: Complete Project Write Up
<!--* Clearly state the rationale for the choice of tools and technologies for the project. -->

The rationale behind the choice of tools is to allow for scaling up in cases where a number of users are accessing the data and whenever updates are done more regularly.    

##### The choice of tools, technologies

1. Spark (on an AWS - EMR Cluster) - To extract and transform the collected data from the data sources and write to S3 bucket.
2. S3 bucket - To store the processed data model into parquet files of dimensions and fact tables
3. Airflow (Deployed on AWS - E2 with a docker instance) - To create and run the ETL/ELT pipeline
4. AWS Athena - To read processed data from S3 from dimensions and fact tables in parquet files for analysis

<!--* Propose how often the data should be updated and why.-->

Data can be updated on a daily basis to allow for substantial data to be accumulated from the sources.      

<!--* Write a description of how you would approach the problem differently under the following scenarios:-->
<!-- * The data was increased by 100x.-->

##### Problem scenarios

 - If data was increased by 100x, the scaling would be much easier as the cloud setup has already been thought through. This would allow for ease in scaling up.    
<!-- * The data populates a dashboard that must be updated on a daily basis by 7am every day.-->
 - If the data populates a dashboard that must be updated on a daily basis by 7am every day, an airflow pipleine would be created with a DAG that updates regularly say, after every 6 hours in order to ensure there are atleast 3 tries within 24hours that would ensure that data has been populated and retried prior to deadline.    
<!-- * The database needed to be accessed by 100+ people. -->
 - If the databases needed to be accessed by 100+ people, it would be much easiser to scale up as the cloud setup has already been thought through. This would allow for ease in scaling up.    
 