# Data Engineering Capstone Project
### Author: Tung Pham Duc

#### Project Summary
This project build up a data warehouse by integrating immigration data and demography data together to provide a wider range single-source-of-truth database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope
* With the final solution, I can answer questions such as "People from what age of which country in which season, tend to move to the US?" or "How often people from Vietnam move to US yearly?". The output can be used for both machine learning or ordinary analysis.

* To do this, I'll take data from 3 reliable sources, design ETL Pipelines with test scripts, document the configurable params to open the options to run on both Local and Cloud environment. Using Cloud env will help the scalability of the project.

#### Data

* The Climate Change: Earth Surface Temprature Data was taken from [Kaggle](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data), provided in CSV format. I chose to explore the GlobalLandTempraturesByState.csv since it's closely related to i94addr.

| attribute | desc |
|--|--|
| Country | country |
| State | state |
| dt | recorded date |
| AverageTemperature | calculated average temperature |

* The US Cities Demographics Data comes from OpenSoft, contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65000. It is provided in CSV format. Here attribute names pretty much self-explained its purpose.

| attribute | desc |
|--|--|
| City |  |
| State Code |  |
| State |  |
| Median Age |  |
| Male Population |  |
| Female Population |  |
| Number of Veterans |  |
| Foreign-born |  |
| Average Household Size |  |

* The I94 Immigration Data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format.

| attribute | desc |
|--|--|
| i94yr | year of migration |
| i94mon | month of migration |
| i94cit | original country which migrant travels from |
| i94port | original port which migrant travels from |
| i94mode | travel method |
| i94addr | destination State |
| biryear | migrant year of birth |
| i94visa | VISA type |
| visatype | class of admission legally admitting the non-immigrant to temporarily stay in USA |

In [1]:
from os import environ, getcwd
from os.path import dirname
from sys import path

path.append(dirname(getcwd()))

from udacity_capstone.pipelines.pipeline_dim_temperatures import DimTemperaturesPipeline
from udacity_capstone.pipelines.pipeline_dim_temperatures_test import DimTemperaturesTestPipeline
from udacity_capstone.pipelines.pipeline_dim_demographics import DimDemographicsPipeline
from udacity_capstone.pipelines.pipeline_dim_demographics_test import DimDemographicsTestPipeline
from udacity_capstone.pipelines.pipeline_dim_immigrations import DimImmigrationsPipeline
from udacity_capstone.pipelines.pipeline_dim_immigrations_test import DimImmigrationsTestPipeline
from udacity_capstone.pipelines.pipeline_fact_immigrations import FactImmigrationsPipeline
from udacity_capstone.pipelines.pipeline_fact_immigrations_test import FactImmigrationsTestPipeline

from udacity_capstone.pipelines import get_spark

spark = get_spark()

dim_temperatures_pipeline = DimTemperaturesPipeline(
    f"{environ['INPUT_DIR']}/GlobalLandTemperaturesByState.csv"
)
dim_demographics_pipeline = DimDemographicsPipeline(
    f"{environ['INPUT_DIR']}/us-cities-demographics.csv"
)
dim_immigrations_pipeline = DimImmigrationsPipeline(
    f"{environ['INPUT_DIR']}/immigration_data_sample.csv"
)
fact_immigrations_pipeline = FactImmigrationsPipeline()


22/06/19 14:43:05 WARN Utils: Your hostname, rbtwmachine-pc resolves to a loopback address: 127.0.1.1; using 172.31.117.26 instead (on interface eth0)
22/06/19 14:43:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/19 14:43:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Step 2: Explore, Assess & Cleaning

#### Step 2.1: Climate Change: Earth Surface Temprature By State

we only play with USA, let's filter the Country

In [2]:
dim_temperatures_pipeline.extract()
dim_temperatures_pipeline.df.printSchema()
dim_temperatures_pipeline.df.show(10)


                                                                                

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)

+-------------------+------------------+-----------------------------+-------+-------------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|  State|      Country|
+-------------------+------------------+-----------------------------+-------+-------------+
|1743-11-01 00:00:00|10.722000000000001|                        2.898|Alabama|United States|
|1744-04-01 00:00:00|            19.075|                        2.902|Alabama|United States|
|1744-05-01 00:00:00|            21.197|                        2.844|Alabama|United States|
|1744-06-01 00:00:00|             25.29|                        2.879|Alabama|United States|
|1744-07-01 00:00:00|             26.42|                        2.841|Alabama|United States|
|1744-09-01 00:00:00|     

#### Step 2.2: US Cities Demographics

In [3]:
dim_demographics_pipeline.extract()
dim_demographics_pipeline.df.printSchema()
dim_demographics_pipeline.df.show(10)


root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------

#### Step 2.3: I94 Immigration from US National Tourism and Trade Office

Inside I94_SAS_Labels_Description.SAS, we have list of US States and Visa Types, so firstly I had to export them by hand.
Then 2 files us-states.txt and visa.txt will be for mapping purpose.

In [4]:
dim_immigrations_pipeline.extract()
dim_immigrations_pipeline.df.printSchema()
dim_immigrations_pipeline.df.show(10)


root
 |-- _c0: integer (nullable = true)
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: integer (nullable = true)
 |-- airline: string (nullable = 

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
All fields should have data, there is no "NOT NULL".

##### 3.1.1: I'll take Climate Change data model and transform to this schema
```
dim_temperatures
 |-- state_id: string
 |-- year: integer
 |-- month: integer
 |-- avg_tmp: double
```

##### 3.1.2: I'll take Cities Demographics data model and transform to this schema
```
dim_demographics
 |-- id: string
 |-- name: string
 |-- male_population: long
 |-- female_population: long
 |-- veterans_count: long
 |-- foreigners_count: long
 |-- avg_household_size: double
```

##### 3.1.3: I'll take Immigration data model and transform to this schema
```
dim_immigrations
 |-- cicid: integer
 |-- year: integer
 |-- month: integer
 |-- travel_method: string
 |-- origin_country: integer
 |-- origin_port: string
 |-- terminus_state_id: string
 |-- migrant_yob: integer
 |-- visa_type: string
 |-- visa_class: string
```

##### Finally, I create the fact table using all the data from dimensions above
```
fact_immigrations
 |-- cicid: integer
 |-- year: integer
 |-- month: integer
 |-- travel_method: string
 |-- origin_country: integer
 |-- terminus_state_id: string
 |-- terminus_state_name: string
 |-- terminus_temperature: double
 |-- terminus_avg_household_size: double
 |-- terminus_male_population: long
 |-- terminus_female_population: long
 |-- visa_type: string
 |-- visa_class: string
```

#### 3.2 Mapping Out Data Pipelines
![erd](../erd.jpg)

For each of the tables, I created a Pipeline & TestPipeline to do ETL process & test its Transform/Load.
```
- src/pipelines/pipeline_dim_temperatures.py
- src/pipelines/pipeline_dim_temperatures_test.py
- src/pipelines/pipeline_dim_demographics.py
- src/pipelines/pipeline_dim_demographics_test.py
- src/pipelines/pipeline_dim_immigrations.py
- src/pipelines/pipeline_dim_immigrations_test.py
- src/pipelines/pipeline_fact_immigrations.py
- src/pipelines/pipeline_fact_immigrations_test.py
```

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
For each Pipeline, I always have:
- 1 function for Extract, which will use PySpark reader to ingest data from input files (could be from local or S3)
- 1 function for Transform, which will perform Map / Reduce on output of Extract, I take advantage of PySpark Dataframe
- 1 function for Load, which will use PySpark writer to save data (could be local csv files or Redshift database)

##### 4.1.1 dim_temperatures

firstly have to extract, build a map of states as documented in I94_SAS_Labels_Description

standardize column names & doing some aggregations, I do analytics by months, not days

In [5]:
dim_temperatures_pipeline.transform()
dim_temperatures_pipeline.df.printSchema()


root
 |-- state_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_tmp: double (nullable = true)



In [6]:
dim_temperatures_pipeline.df.show(10)
dim_temperatures_pipeline.load()

INFO:udacity_capstone.pipelines:saving data...                                               


+--------+----+-----+------------------+
|state_id|year|month|           avg_tmp|
+--------+----+-----+------------------+
|      AL|1755|    3|            11.918|
|      AL|1766|    4|            17.227|
|      AL|1776|    2|             10.31|
|      AL|1793|   11|11.392000000000001|
|      AL|1800|    7|            26.764|
|      AL|1803|    1|             6.121|
|      AL|1807|    8|            26.704|
|      AL|1808|    5|            20.177|
|      AL|1866|    2|             7.459|
|      AL|1934|    5|            21.494|
+--------+----+-----+------------------+
only showing top 10 rows



                                                                                

##### 4.1.2 dim_demographics

standardize column names & doing some aggregations, because I dont use City, I do analytics on State

In [7]:
dim_demographics_pipeline.transform()
dim_demographics_pipeline.df.printSchema()


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- veterans_count: long (nullable = true)
 |-- foreigners_count: long (nullable = true)
 |-- avg_household_size: double (nullable = true)



In [8]:
dim_demographics_pipeline.df.show(10)
dim_demographics_pipeline.load()

INFO:udacity_capstone.pipelines:saving data...


+---+--------------------+---------------+-----------------+--------------+----------------+------------------+
| id|                name|male_population|female_population|veterans_count|foreigners_count|avg_household_size|
+---+--------------------+---------------+-----------------+--------------+----------------+------------------+
| MT|             Montana|         438535|           467935|         69270|           29885|2.2749999999999995|
| NC|      North Carolina|        7330525|          7970470|        830730|         1896635|2.4750000000000014|
| MD|            Maryland|        3139755|          3420890|        320715|         1148970|             2.655|
| CO|            Colorado|        7273095|          7405250|        939480|         1688155|2.5599999999999996|
| CT|         Connecticut|        2123435|          2231661|        122546|         1114250|2.6661538461538465|
| IL|            Illinois|       10943864|         11570526|        723049|         4632600|2.7318681318

##### 4.1.3 dim_immigrations

standardize column names & select relevant columns

In [9]:
dim_immigrations_pipeline.transform()
dim_immigrations_pipeline.df.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- travel_method: string (nullable = true)
 |-- origin_country: integer (nullable = true)
 |-- origin_port: string (nullable = true)
 |-- terminus_state_id: string (nullable = true)
 |-- migrant_yob: integer (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- visa_class: string (nullable = true)



In [10]:
dim_immigrations_pipeline.df.show(10)
dim_immigrations_pipeline.load()

INFO:udacity_capstone.pipelines:saving data...


+---------+----+-----+-------------+--------------+-----------+-----------------+-----------+---------+----------+
|    cicid|year|month|travel_method|origin_country|origin_port|terminus_state_id|migrant_yob|visa_type|visa_class|
+---------+----+-----+-------------+--------------+-----------+-----------------+-----------+---------+----------+
|4084316.0|2013|    4|          Air|           209|        HHW|               HI|       1955| Pleasure|        WT|
|4422636.0|2013|    4|          Air|           582|        MCA|               TX|       1990| Pleasure|        B2|
|1195600.0|2013|    4|          Air|           148|        OGG|               FL|       1940| Pleasure|        WT|
|5291768.0|2013|    4|          Air|           297|        LOS|               CA|       1991| Pleasure|        B2|
| 985523.0|2013|    4|         Land|           111|        CHM|               NY|       1997| Pleasure|        WT|
|1481650.0|2013|    4|          Air|           577|        ATL|               GA

##### 4.1.4 fact_immigrations

In [11]:
fact_immigrations_pipeline.extract()
fact_immigrations_pipeline.transform()
fact_immigrations_pipeline.df.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- travel_method: string (nullable = true)
 |-- origin_country: integer (nullable = true)
 |-- terminus_state_id: string (nullable = true)
 |-- terminus_state_name: string (nullable = true)
 |-- terminus_temperature: double (nullable = true)
 |-- terminus_avg_household_size: double (nullable = true)
 |-- terminus_male_population: long (nullable = true)
 |-- terminus_female_population: long (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- visa_class: string (nullable = true)



In [12]:
fact_immigrations_pipeline.df.show(10)
fact_immigrations_pipeline.load()

INFO:udacity_capstone.pipelines:saving data...                                               


+---------+----+-----+-------------+--------------+-----------------+-------------------+--------------------+---------------------------+------------------------+--------------------------+---------+----------+
|    cicid|year|month|travel_method|origin_country|terminus_state_id|terminus_state_name|terminus_temperature|terminus_avg_household_size|terminus_male_population|terminus_female_population|visa_type|visa_class|
+---------+----+-----+-------------+--------------+-----------------+-------------------+--------------------+---------------------------+------------------------+--------------------------+---------+----------+
|5762766.0|2013|    4|          Air|           343|               NC|     North Carolina|              15.002|         2.4750000000000014|                 7330525|                   7970470| Pleasure|        B2|
|1214788.0|2013|    4|          Air|           245|               NC|     North Carolina|              15.002|         2.4750000000000014|              

                                                                                

#### 4.2 Data Quality Checks
Data Quality is ensured by pipeline tests script.
  * there must be no NULL fields
  * number of records must not be 0
  * records must not be duplicated, this depends on how each tables define its primary key

In [13]:
DimTemperaturesTestPipeline(dim_temperatures_pipeline).run()

DimDemographicsTestPipeline(dim_demographics_pipeline).run()

DimImmigrationsTestPipeline(dim_immigrations_pipeline).run()

FactImmigrationsTestPipeline(fact_immigrations_pipeline).run()


INFO:udacity_capstone.pipelines.pipeline_dim_temperatures_test:check passed!                 
INFO:udacity_capstone.pipelines.pipeline_dim_temperatures_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_temperatures_test:check passed!                 
INFO:udacity_capstone.pipelines.pipeline_dim_demographics_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_demographics_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_demographics_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_demographics_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_demographics_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_immigrations_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_immigrations_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_dim_immigrations_test:check passed!
INFO:udacity_capstone.pipelines.pipeline_fact_immigrations_test:check passed!                
INFO:udacity_capstone.pip

#### 4.3 Data dictionary 
* dim_temperatures

| attribute | desc |
|--|--|
| state_id  | state code, documented in us-states.txt |
| year      | recored year |
| month     | recorded month |
| avg_tmp   | calculated average temperature |


* dim_demographics

| attribute | desc |
|--|--|
| id                 | state code, documented in us-states.txt |
| name               | state name, documented in us-states.txt |
| male_population    | male population |
| female_population  | female population |
| veterans_count     | veterans (old people) |
| foreigners_count   | foreign borns |
| avg_household_size | average household size |


* dim_immigrations

| attribute | desc |
|--|--|
| cicid             | unique id from sas file, I94 Org generated it themself |
| year              | year of migration |
| month             | month of migration |
| travel_method     | original country which migrant travels from |
| origin_country    | original port which migrant travels from |
| origin_port       | travel method |
| terminus_state_id | destination state |
| migrant_yob       | migrant year of birth |
| visa_type         | VISA type |
| visa_class        | class of admission legally admitting the non-immigrant to temporarily stay in USA |

* fact_immigrations

| attribute | desc |
|--|--|
| cicid                       | unique id from sas file, I94 Org generated it themself |
| year                        | year of migration |
| month                       | month of migration |
| travel_method               | original country which migrant travels from |
| origin_country              | original port which migrant travels from |
| terminus_state_id           | destination state code |
| terminus_state_name         | destination state name, linked from dim_demographics |
| terminus_temperature        | destination state average temperature by month, linked from dim_temperatures |
| terminus_avg_household_size | destination state average household size, linked from dim_demographics |
| terminus_male_population    | destination state male population, linked from dim_demographics |
| terminus_female_population  | destination state female population, linked from dim_demographics |
| visa_type                   | VISA type |
| visa_class                  | class of admission legally admitting the non-immigrant to temporarily stay in USA |

#### Step 5: Complete Project Write Up
##### 5.1 Tools / Libraries
* **Poetry** \
I feel safer and more confident using a proper package manager rather than the default Pip. Poetry helps installing things much simpler where you can select version interactively. It generates a lockfile, and clearly states version range and dependencies.

* **Black & Pylint** for Styling / Linting \
I often protect my code quality by installing formatters and linters. Having these 2, I no longer suffer from manually format my code.

* **PySpark** \
To deal with Batch ETL Pipelines, PySpark is a good choice. It has the Spark SQL which is powerful and make me fun to explore and investigate data.

* **Amazon S3** (optional, configurable via config.ini) \
A Cloud-based storage service, I can either do ETL on Cloud or Local.

* **Amazon Redshift** (optional, configurable via config.ini) \
A Cloud-based database, highly scalable. I can choose to save by CSV files or a relational database like Redshift. For now, it's running on Local Env.

* **Amazon EMR** (optional) \
I can always choose a platform to run data pipelines. Amazone EMR will take care of automatic scaling when velocity or volume increases.

##### 5.2 How often data should be updated
- Data comes from external source, they do not update regularly; Except for I94, they update by month. ***However, it totally depends on how I do analysis.***

- If I'm about to answer the question: "People from what age of which country in which season, tend to move to the US?" then it's more like a prediction, machine learning thing. Our current data is already sufficient.

- If I'm about to answer the question: "How often people from Vietnam move to US yearly?" then ok data must be updated at least until the last year.


##### 5.3 Corner scenarios
* The data was increased by 100x \
I'll take advantage of ***Amazon EMR*** & ***Amazon Redshift***. These 2 tools will help me do data ETL much more efficient than personal computer, but at a cost.

* The data populates a dashboard that must be updated on a daily basis by 7am every day \
I'll design an ***Apache Airflow pipeline***, and use ***Amazon Managed Workflows for Apache Airflow*** to schedule a run at 7AM daily.

* The database needed to be accessed by 100+ people \
So instead of using **Amazon S3** or local CSV files, I'll use ***Amazon Redshift***. I'll create IAM account for people in my organization, then share them the Redshift connection. The rest work I'll leave to Amazon Redshift.

In [14]:
spark.stop()