### Data Engineering Capstone Project

#### Project Summary, scope, and steps:
The Project loads data from immigration and airport-information dataset then prepare it for the data analytics. For example,  it can be used for recommending more immigration counters in busiest airports. 

The project follows these steps:
* Scope the Project and Gather Data
* Explore and Assess the Data
* Define the Data Model
* Run ETL to Model the Data
* Data quality check and creating data-dictionary
* Project Write up

### Sample query:
- Find top5 most visited US states by immigrants in apr/2016.
- Find top5 most visited airport for immigrants.

### Data source:

#### I94 Immigration Data:
This data comes from the US National Tourism and Trade Office. [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)

#### Airport Code Table:
This is a simple table of airport codes and corresponding cities [link](https://datahub.io/core/airport-codes#data)


### Reading data


In [63]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [64]:
df_immigration =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_airport_information = spark.read.csv('./airport-information.csv', header = 'true')

# Data below is extracted from I94_SAS_Labels_Descriptions.SAS
df_airport_code = spark.read.option("delimiter", "=").csv("./airport_codes.csv", header= True)
df_arrival_countries = spark.read.option("delimiter", ";").csv('./arrival_country_code.csv', header = 'true')
df_state_information = spark.read.option("delimiter", "=").csv('./state_information.csv', header = 'true')
df_arrival_type = spark.read.option("delimiter", "=").csv('./arrival_type_code.csv', header = 'true')
df_visa_type = spark.read.option("delimiter", "=").csv('./visa_type.csv', header = 'true')


### Accessing data
Checking schema, count of rows, and data.

#### Immigration data

In [65]:
df_immigration.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [66]:
df_immigration.take(1)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')]

In [67]:
df_immigration.count()

3096313

#### Airport Information

In [68]:
df_airport_information.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [69]:
df_airport_information.take(1)

[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')]

In [70]:
df_airport_information.count()

55075

#### Airport Codes



In [71]:
df_airport_code.printSchema()

root
 |-- airport_code: string (nullable = true)
 |-- airport_name: string (nullable = true)



In [72]:
df_airport_code.take(3)

[Row(airport_code='ALC', airport_name='ALCAN, AK'),
 Row(airport_code='ANC', airport_name='ANCHORAGE, AK'),
 Row(airport_code='BAR', airport_name='BAKER AAF - BAKER ISLAND, AK')]

In [73]:
df_airport_code.count()

660

#### Arrival Countries


In [74]:
df_arrival_countries.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- country_name: string (nullable = true)



In [75]:
df_arrival_countries.take(3)

[Row(country_code='582', country_name="'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'"),
 Row(country_code='236', country_name="'AFGHANISTAN'"),
 Row(country_code='101', country_name="'ALBANIA'")]

In [76]:
df_arrival_countries.count()

289

#### State Information

In [77]:
df_state_information.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)



In [78]:
df_state_information.take(3)

[Row(state_code='AL', state_name='ALABAMA'),
 Row(state_code='AK', state_name='ALASKA'),
 Row(state_code='AZ', state_name='ARIZONA')]

In [79]:
df_state_information.count()

55

#### Arrival Type:



In [80]:
df_arrival_type.printSchema()

root
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)



In [81]:
df_arrival_type.show()

+----+--------------+
|code|         value|
+----+--------------+
|   1|         'Air'|
|   2|         'Sea'|
|   3|        'Land'|
|   9|'Not reported'|
+----+--------------+



In [82]:
df_arrival_type.count()

4

#### Visa Type:

In [83]:

df_visa_type.printSchema()

root
 |-- visa_code: string (nullable = true)
 |-- value: string (nullable = true)



In [84]:
df_visa_type.show()

+---------+--------+
|visa_code|   value|
+---------+--------+
|        1|Business|
|        2|Pleasure|
|        3| Student|
+---------+--------+



In [85]:
df_visa_type.count()

3

### Data Model

#### Conceptual data model:
Data is arranged in star schema, where immigration table is a fact table and arrival country, airport detail, visa type, state information, and arrival type are dimension tables.
![data model](data-model.jpeg)


### ETL

#### Immigration table

In [86]:
df_immigration_cleaned = df_immigration.select('cicid', 'i94yr', 'i94mon', 'i94cit', 'i94port', 'arrdate', 'i94mode', 
                                       'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'visapost', 'occup',
                                        'gender', 'airline', 'fltno', 'visatype'
                                       ).toDF('id', 'year', 'month', 'country_code', 'airport_code', 'arrival_date', 'arrival_mode_code', 
                                        'us_address', 'departure_date', 'age', 'visa_code', 'count', 'visa_issuing_state', 'us_occupation', 
                                        'gender', 'airline', 'flight_number', 'visa_type_code')

df_immigration_cleaned.take(1)


[Row(id=6.0, year=2016.0, month=4.0, country_code=692.0, airport_code='XXX', arrival_date=20573.0, arrival_mode_code=None, us_address=None, departure_date=None, age=37.0, visa_code=2.0, count=1.0, visa_issuing_state=None, us_occupation=None, gender=None, airline=None, flight_number=None, visa_type_code='B2')]

In [None]:
df_immigration_cleaned.write.parquet("table/immigration")

#### Airport details table
This table combined the information from immigration tables airport codes(I94_SAS_Labels_Descriptions.SAS) and an independetn  airport information dataset.

In [99]:
df_airport_information_cleaned = df_airport_information.dropna(how = "any", subset = ["iata_code"]).dropDuplicates(["iata_code"])


In [100]:
df_airport_information_cleaned.createOrReplaceTempView("airport_info")
df_airport_code.createOrReplaceTempView("airport_codes")

df_airport_table = spark.sql("""
    SELECT *
    FROM airport_codes AS ac
    LEFT JOIN airport_info as ai
    ON ac.airport_code = ai.iata_code
""")

In [109]:
df_airport_table.take(1)

[Row(airport_code='ALC', airport_name='ALCAN, AK', ident='LEAL', type='large_airport', name='Alicante International Airport', elevation_ft='142', continent='EU', iso_country='ES', iso_region='ES-V', municipality='Alicante', gps_code='LEAL', iata_code='ALC', local_code=None, coordinates='-0.5581560134887695, 38.28219985961914')]

In [29]:
df_airport_table.write.parquet("table/airport_details")

#### Arrival Countires

In [54]:
df_arrival_countries.write.parquet("table/arrival_countries")

#### State Information

In [55]:
df_state_information.write.parquet("table/state_information")

#### Arrival Type

In [56]:
df_arrival_type.write.parquet("table/arrival_type")


#### Visa Type

In [59]:
df_visa_type.write.parquet("table/visa_type")

### Analytics Query:

##### Find top5 most visited US states by immigrants in apr/2016.

In [89]:
df_immigration_cleaned.createOrReplaceTempView("immigration")
spark.sql('''SELECT us_address, count(*) as count 
          FROM immigration
          WHERE us_address IS NOT NULL
          GROUP BY us_address
          ORDER BY count DESC
          ''').show(5)

+----------+------+
|us_address| count|
+----------+------+
|        FL|621701|
|        NY|553677|
|        CA|470386|
|        HI|168764|
|        TX|134321|
+----------+------+
only showing top 5 rows



### Data Quality Check:

#### Check tables have unique keys

In [91]:
# The result should be 0
df_immigration_cleaned.count() - df_immigration_cleaned.dropDuplicates(["id"]).count()

0

In [106]:
# The result should be 0
df_airport_table.count() - df_airport_table.dropDuplicates(["airport_code"]).count()

0

#### Source/Count checks to ensure completeness

In [107]:
# The result should be >= 0
df_immigration.count() - df_immigration_cleaned.count()

0

In [108]:
# The result should be >= 0
df_airport_information.count() - df_airport_information_cleaned.count()

46033

Data dictionary 

[Data dictonary file link](./data-dictionary.csv)

### Project write up.

#### Tools and technologies
Since I only have my subscription for couple more days, I chose the simplest stack to complete the project before it expires. For data processing I used spark, and loaded/stored data locally. 

For scaling the project, I will be using following stack
* Data storage : S3, It is a standard choice to work with spark and reshift clusters on AWS.
* Data pipeline : Airflow, I will use airlflow for automating, scheduling, and monitoring etl pipelines.
* Data Lake using EMR cluster : With EMR cluster, I will process the unstructured/semistructured data and load it back to s3 to be used with Redshit.
* Data Warehouse using Redshift cluster : Finally, I will load refined data to redshift cluster for processing business analytics.



#### Propose how often the data should be updated and why.

Immigration data should be updated everymonth while airport-information data can be updated quarterly  

#### The data was increased by 100x and The database needed to be accessed by 100+ people.

Scaling memory or processing is non issue with EMR cluster and Redshift. We can easily scale clusters with adding more nodes, changing instances type based on more memory or processing power.

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.

We can setup airflow piepline for 7AM everyday. Further more we can create custom email alerts with airflow.
