# Data Engineering Capstone Project

## Step 1: Scope the Project and Gather Data

### Scope 

#### Overview :
In this project, I want to build a data lake that contain all neccessrry data that could help interested indivuals to get better understanding about the flow of immigrants into the United States. It could help to answer questions like :
- Where most immigrants come from?
- Whic US states have the majority of the immigrants ?
- Does the tempratiure play major role in immigration distribuation across US ?
- Which US airports has most number of immgiration's arrivals ?
- what's the prefered airlines for immigrants to get to US ?
- Is US airlines benefiting from immigrants moving to US ?

Further, this data lake can be used as a starting point to build very well designed and comperhanisve DW.


#### Architecture and Technology
All our data sources will be placed in ***AWS S3 bucket***, and then I'll utliz ***AWS EMR*** running ***Apache Spark*** to move, transform and clean the data. Finally, the processed data will be written out to ***AWS S3 bucket*** where clean data will placed in different directories according to our data model. Our data pipeline will be orchestrated using ***Apache Airflow***


<img src="images/DEND-Porject-Page-2.png" align="center"/>

### Describe and Gather Data 

***I94 Immigration Data*** 
- **Source** : US National Tourism and Trade Office.
- **Link** : <a href="https://travel.trade.gov/research/reports/i94/historical/2016.html">Click here</a>
- **Description** : dataset contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).
- **Format** : originally in *sas7bdat* but will be converted to *parquet*

***World Temperature Data***
- **Source** : Kaggle
- **Link** : <a href="https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data">Click here</a>
- **Description** : dataset contains monthly average temperature per country per city per state for the period between 1899to 2013 
- **Format** : csv

***U.S. City Demographic Data***
- **Source** : OpenSoft
- **Link** : <a href="https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/">Click here</a>
- **Description** : dataset contains information about the demographics of all US cities and census-designated places with a population greater than or equal to 65,000.  This data comes from the US Census Bureau's 2015 American Community Survey.
- **Format** : csv/JSON

***Airport Code Table*** 
- **Source** : datahub
- **Link** : <a href="https://datahub.io/core/airport-codes#data">Click here</a>
- **Description** : dataset contains world wide airport codes and corresponding cities
- **Format** : csv/JSON

## Step 2: Explore and Assess the Data

For more detailed data exploratry code please refer to notebook **dev/playground.ipynb**.
First, I built my exploring/final code in that notebook, then moved it and organized it in better python file.

In [2]:
# All imports here
import pandas as pd
import configparser
from datetime import datetime
import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, sum, to_timestamp, datediff,  month, year
from pyspark.sql.types import TimestampType, DateType, IntegerType, DoubleType

config = configparser.ConfigParser()
config.read('dl.cfg')

aws_key = os.environ['AWS_ACCESS_KEY_ID']=config.get('CREDENTIALS','AWS_ACCESS_KEY_ID')
aws_secret = os.environ['AWS_SECRET_ACCESS_KEY']=config.get('CREDENTIALS','AWS_SECRET_ACCESS_KEY')

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
.enableHiveSupport().getOrCreate()

***I94 Immigration Data*** 

****Exploring****

In [3]:
df_spark =spark.read.format('com.github.saurfang.sas.spark')\
.load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
# row counts 
df_spark.count()

3096313

In [5]:
df_spark.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [6]:
# let get more details about data type
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
# let's see null stats about our data
# source : https://stackoverflow.com/questions/44413132/count-the-number-of-missing-values-in-a-dataframe-spark
Null_stats = df_spark.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_spark.columns))

Null_stats.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [8]:
# list of columns that we're interested in
valid_cols = ['cicid', 'i94yr', 'i94mon', 'i94res', 'i94port', 'arrdate',\
                  'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'gender',\
                  'airline', 'visatype']

In [9]:
df_spark = df_spark.select(valid_cols)

In [10]:
df_spark.show(5)

+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+------+-------+--------+
|cicid| i94yr|i94mon|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|gender|airline|visatype|
+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+------+-------+--------+
|  6.0|2016.0|   4.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  null|   null|      B2|
|  7.0|2016.0|   4.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|     M|   null|      F1|
| 15.0|2016.0|   4.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|     M|     OS|      B2|
| 16.0|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|     MA|20567.0|  28.0|    2.0|  null|     AA|      B2|
| 17.0|2016.0|   4.0| 101.0|    NYC|20545.0|    1.0|     MA|20567.0|   4.0|    2.0|  null|     AA|      B2|
+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+------+-------+--------+
only showing top 5 rows



In [11]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- visatype: string (nullable = true)



In [12]:
# let's check data quality for some columns
df_spark.select('i94addr').distinct().show(10)

+-------+
|i94addr|
+-------+
|     .N|
|     RG|
|     YH|
|     RF|
|     FT|
|     CI|
|     TC|
|     SC|
|     AZ|
|     FI|
+-------+
only showing top 10 rows



From above exploring, I see a lot of issues with the data :
- extra columns that don't have much of value.
- some columns have high number of nulls.
- inaccurate data types for the columns.
- some columns have the mapping code not the actual data.
- some comluns contain incorrect data, for example: US sates columns "i94addr"

****Cleansing****

1. After understaning the dataset, I came up with list of columns that will be used.
2. All undesired columns will be removed.
3. All rows contain Null values will be dropped.
4. Column data type will be converted to the appropriate data type.
5. SAS date type will be converted to the correct date type.
6. Any incorrect value in "I94addr" column will be replaced with "99"
7. For the column (I94MODE), the codes will be replaced with the values.
8. For the column (I94VISA), the codes will be replaced with the values.
9. I'll create a new column "stay_length" that will calculate the stay length.
10. The cleansed data will be written to Parquet file partiationed by States, year and month.

***World Temperature Data*** 

In [13]:
spark_tmp = spark.read.csv('raw_data/GlobalLandTemperaturesByState.csv', header = True)

In [14]:
spark_tmp.count()

645675

In [15]:
spark_tmp.show(5)

+----------+------------------+-----------------------------+-----+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+----------+------------------+-----------------------------+-----+-------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|
+----------+------------------+-----------------------------+-----+-------+
only showing top 5 rows



In [16]:
# let get more details about data type
spark_tmp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)



In [17]:
# let's check data quality for some columns
spark_tmp = spark_tmp.filter((spark_tmp.Country == 'United States') & (spark_tmp.dt >= '2011-01-01')).dropDuplicates()
spark_tmp.select('State').distinct().show(10)

+--------------------+
|               State|
+--------------------+
|                Utah|
|              Hawaii|
|           Minnesota|
|                Ohio|
|            Arkansas|
|              Oregon|
|District Of Columbia|
|     Georgia (State)|
|               Texas|
|        North Dakota|
+--------------------+
only showing top 10 rows



From above exploring, I see a lot of issues with the data :
- extra columns that don't have much of value.
- Date range is out of my project scope.
- I'm only interested in US data
- inaccurate data types for the columns.
- some comluns contain incorrect data, for example: US sates column.

****Cleansing****
1. I'll filter dataset to include only US data
2. I'll filter dataset to include only 2011 data or above.
3. I'll drop unwanted columns.
4. Columns names will be changed if neccessry, for example : td.
4. Two new columns will be extracted from date, month and year.
5. I'll check the data quilty for states naming it matches the correct naming, for example Gorgia.


***U.S. City Demographic Data***

****Exploring****

In [18]:
dm_spark = spark.read.csv('raw_data/us-cities-demographics.csv', sep = ';', header = True)

In [19]:
dm_spark.count()

2891

In [20]:
# let's see how data look like
dm_spark.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [21]:
# data types
dm_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



From above exploring, I see a lot of issues with the data :
- extra columns that don't have much of value.
- A lot columns' names need to be corrected, and shorten. For example, "Black or African-American" -> "Black"
- The data grouped by city.
- inaccurate data types for the columns.

****Cleansing****
1. I'll drop unwanted columns (Number of Veterans, Average Household Size )
2. I'll correct columns names
3. Appropriate data types will be imposed.
4. I'll povit (Race) column then some its count.
5. My final data will be grouped by state code.

***Airport Code Table***

****Exploring****

In [22]:
spark_air = spark.read.csv('raw_data/airport-codes_csv.csv', header = True)

In [23]:
spark_air.count()

55075

In [24]:
# let's see how our data look like
spark_air.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [25]:
spark_air.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [26]:
# US data and columns
spark_air = spark_air.filter(spark_air.iso_country == 'US').select(['ident', 'type', 'name', 'elevation_ft', 'iso_region', 'municipality', 'iata_code', 'local_code'])

In [27]:
spark_air.show(5)

+-----+-------------+--------------------+------------+----------+------------+---------+----------+
|ident|         type|                name|elevation_ft|iso_region|municipality|iata_code|local_code|
+-----+-------------+--------------------+------------+----------+------------+---------+----------+
|  00A|     heliport|   Total Rf Heliport|          11|     US-PA|    Bensalem|     null|       00A|
| 00AA|small_airport|Aero B Ranch Airport|        3435|     US-KS|       Leoti|     null|      00AA|
| 00AK|small_airport|        Lowell Field|         450|     US-AK|Anchor Point|     null|      00AK|
| 00AL|small_airport|        Epps Airpark|         820|     US-AL|     Harvest|     null|      00AL|
| 00AR|       closed|Newport Hospital ...|         237|     US-AR|     Newport|     null|      null|
+-----+-------------+--------------------+------------+----------+------------+---------+----------+
only showing top 5 rows



In [28]:
# let's see Null stats
spark_air_stats = spark_air.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in spark_air.columns))

spark_air_stats.show()

+-----+----+----+------------+----------+------------+---------+----------+
|ident|type|name|elevation_ft|iso_region|municipality|iata_code|local_code|
+-----+----+----+------------+----------+------------+---------+----------+
|    0|   0|   0|         239|         0|         102|    20738|      1521|
+-----+----+----+------------+----------+------------+---------+----------+



In [29]:
# airports type
spark_air.select('type').distinct().show(10)

+--------------+
|          type|
+--------------+
| large_airport|
|   balloonport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



In [30]:
# can we extract US states code from iso_region ?
spark_air.select('iso_region').distinct().show(5)

+----------+
|iso_region|
+----------+
|     US-TN|
|     US-OK|
|     US-VT|
|     US-SD|
|     US-WA|
+----------+
only showing top 5 rows



From above exploring, I see a lot of issues with the data :
- extra columns that don't have much of value.
- We only need US data.
- US states codes need to be extracted.
- inaccurate data types for the columns.

****Cleansing****
1. I'll drop unwanted columns (continent, coordinates )
2. i'll extract US states code from ISO_region column
3. Appropriate data types will be imposed.

## Step 3: Define the Data Model

Our data model is showen below :

<img src="images/data_model_updated.png" align="center"/>

I have 8 tables in my data lake :
<img src="images/table_desc.png" align="center"/>

## Step 4: Run ETL to Model the Data

### ETL/Pipeline 

I used Python, Apache Spark and Airflow to build and run my data pipelines that basaiclly translate my onceptual data model to code.

<img src="images/Airflow - DAGs.png" align="center"/>

- *`airflow/dasg/data_lake_dag.py`* ; contain all airflow tasks to run our data lake pipelines.
- *`airflow/plugins/helpers/data_lake_etl.py`* : our data lake pipeline actual code written in python utlizing Pyspark to creat Spark jobs.


<img src="images/airflow_etl_desc.png" align="center"/>

- To run pipeline, execute *`airflow/dasg/data_lake_dag.py`* 

### Data Dictionary 

Please refere to file **`Data-Dictionary.xlsx`** for data dictioanry

## Step 5: Complete Project Write Up

****Tools and Technologies****

- **`AWS S3`** : it's very highlly scalabe, reliable, fast, and inexpensive data storage. It's best choice for our data lake from both cost and performance perspective. 
- **`AWS EMR Spark cluster`** : Apache Spark is a powerful, distributed data in-memory processing engine. With AWS EMR cluster, setting up spark cluster is very easy and will elimnate all time-consuming configration.
- **`Apache Airflow`** : it's a great and simple yet a powerful open source tool to automate our data pipeline's sechdulig, runs and mointering.
- **`PySpark`** - Python : Pyspark is the official python APIs for Spark. It abstracts all the complexity of spark and provide initiative API to interact with Spark components.
- **`Pandas`** : full featured python model to wrok data, and it simplifies data discovery and exploration.
- **`Jupyter Notebook`** : great platform to data discovery and exploration, and work with data interactively. 
- **`Apache Parquet`** : is a very efficient columnar storage format. It provides efficient compression, reduced data storage costs and fast data querying.

****data update frequency****
- **`Monthly`**

****scenarios****

- **`The data was increased by 100x`**
        - Using cloud technologoes like AWS S3 and AWS EMR make it very easy to sacle up our infrastructure to meet our customer demand and expected/unexpcted data volum.
        

- **`The data populates a dashboard that must be updated on a daily basis by 7am every day`**
        - We can schedule our data pipeline via Aiflow to run overnight, and set alerts and SLA to ensure data being updated according to business needs. 
        
        
- **`The database needed to be accessed by 100+ people`**
        - Again, since our solution is cloud-based it's easy to meet any expect or unexpteced demand. For database, we can add more nodes to scale our database.

## what's included ?
- **`dend.ipynb`** : my project details and write-up
- **`airflow`** : contain my data lake code and airflow dags
- **`dev`** : it's my developemnt folder, contains all my starting codes and tries.
- **`images`** : all images used in my project
- **`raw_data`** : contains the inital raw data before being moved to S3 bucket 
- **`sas_data`** : processed sas data in Parquet format
- **`Data-Dictionar.xls`** : our data dictionary in excel format
