# Data Engineering Capstone Project

### Overview
The purpose of the data engineering capstone project is to combine what you've learned throughout the program. This project will be an important part of your portfolio that will help you achieve your data engineering-related career goals.

I will be assuming I am working as a data engineer for a company building a web platform for the people interested in moving to US.
My team is working on a project to prepare statistical data showing correlation between the climate of US states and origin of the expats who have moved to there, and change of this data by time.

I will prepare a Data Lake for the Data Scientist and Analysits where the data is flexible enough to work with differents use cases in the scope of the project and clean enough to eliminate unneccesary work and use of resources.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
I will prepare a Data Lake for the Data Scientist and Analysits where the data is flexible enough to work with differents use cases in the scope of the project and clean enough to eliminate unneccesary work and use of resources.

The data will be residing in Amazon S3, staged and transformed with Apache Spark on Amazon EMR clusters by PySpark and then loaded into Relational PostgreSQL tables. The tools can be expanded with Apache Airflow to work on the data with iterations, and schedule the pipeline in case the process will be updating with further up to date data.

#### Describe and Gather Data

There will be 4 different data sources used:
- i94 Immigration Data [https://travel.trade.gov/research/reports/i94/historical/2016.html]
- US Cities: Demographics [https://public.opendatasoft.com/explore/dataset/us-cities-demographics/table/]
- Climate Change: Earth Surface Temperature Data [https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data]
- US States and State Codes

Note: The data is assumed to be in S3 buckets, however, due to some problems in workspace, it will be consumed directly from workspace folders.

In [13]:
# Upload the data on S3
# %run scripts/upload_to_s3.py

In [1]:
# Copy provided sas_data into a proper folder
!cp -R sas_data staging_parquet/immigration_data

In [3]:
!rm -rf staging_parquet/demographics_data
!rm -rf staging_parquet/temperature_data
!rm -rf staging_parquet/state_data
!rm -rf staging_parquet/immigration_data

!rm -rf output_parquet

In [4]:
# Read data to parquet and describe tables, show first 5 lines of per table
%run scripts/read_from_s3_and_write_to_parquet.py

Creating Spark session..
Spark session is ready.
Reading data from data_sources/us-cities-demographics.csv.
Writing parquet file to staging_parquet/demographics_data.
Reading data from data_sources/GlobalLandTemperaturesByState.csv.
Writing parquet file to staging_parquet/temperature_data.
Reading data from data_sources/states.csv.
Writing parquet file to staging_parquet/state_data.
Reading from sas_data.
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occu

### Step 2: Explore and Assess the Data
#### Explore the Data 
The arrival and departure date in i94 Immigration Data is not human readable, therefore we can convert it to Pandas datetime. I have provided a clean-up script for that, however I will do it on the fly during transform stage by using SQL functions.

#### Cleaning Steps
- During the describe step, it is identified that some source csv files do not have a header, or there are missing columns or unformatted. Before to continue next steps, the data should be properly formatted.
- The data should be checked for null values for non-null planned columns. For my data, there wasn't any issue with null fields.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The conceptual data model diagram is added below.

I have decided to use a Relational Data model considering the data is quite flat, we won't need to apply joins a lot, also the data is well structured and unexpected to be changed frequently.

We have 1 fact table: immigration, and 3 dimention tables: temperature, demographics, state.

- Immigration table is extracted from i94 Immigration data, the unneccsary columns are left out.
- Temperature table is extraction of Climate Change: Earth Surface Temperature Data, filtered based on US states.
- Demographics table is extracted from US Demographics data. The data is grouped by state, as we don't need specific city data.
- State_population table is basically a mapping between US state codes and names. Provided to add easy-to-read - representation for state codes.

#### 3.2 Mapping Out Data Pipelines
- Read data from S3 and stage into Dataframes in PySpark.
- Create temporary view for tables in PySpark.
- Extract and transform staged data and load into sql tables in parquet format.
- Optional: Load parquet files to PostgreSQL tables with a db driver like SQLAlchemy in Python.

![Data_Model](img/data_model.png)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [1]:
%run etl/etl.py

Creating Spark session..
Spark session is ready.
Starting ETL pipeline..
Processing state data..
Processing immigration data..
Processing temperature data..
Processing demographics data..
Done.


#### 4.2 Data Quality Checks
Run Quality Checks below

In [2]:
# Create spark session
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
# Read and describe the relational tables 

In [4]:
# Immigration
df_fact_immigration=spark.read.parquet("output_parquet/fact_immigration/")
df_fact_immigration.printSchema()
df_fact_immigration.show(10)
df_fact_immigration.count()

root
 |-- cicid: integer (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- citizenship_code: integer (nullable = true)
 |-- residency_code: integer (nullable = true)
 |-- i94_visa_type: integer (nullable = true)
 |-- temp_visa_type: string (nullable = true)
 |-- state_code: string (nullable = true)

+-------+------------+--------------+----------------+--------------+-------------+--------------+----------+
|  cicid|arrival_date|departure_date|citizenship_code|residency_code|i94_visa_type|temp_visa_type|state_code|
+-------+------------+--------------+----------------+--------------+-------------+--------------+----------+
|5965389|  2016-04-03|    2016-04-07|             252|           209|            2|           GMT|        GU|
|5750212|  2016-04-30|    2016-05-02|             254|           209|            2|            WT|        GU|
|6028179|  2016-04-22|    2016-04-26|             245|           245|            2|       

3096313

In [5]:
# Temperature
df_dim_temperature=spark.read.parquet("output_parquet/dim_temperature/")
df_dim_temperature.printSchema()
df_dim_temperature.show(10)
df_dim_temperature.count()

root
 |-- dt: date (nullable = true)
 |-- avg_temp: float (nullable = true)
 |-- state_code: string (nullable = true)

+----------+--------+----------+
|        dt|avg_temp|state_code|
+----------+--------+----------+
|1743-11-01|  -0.831|        WI|
|1743-12-01|    null|        WI|
|1744-01-01|    null|        WI|
|1744-02-01|    null|        WI|
|1744-03-01|    null|        WI|
|1744-04-01|   7.602|        WI|
|1744-05-01|   12.48|        WI|
|1744-06-01|  18.478|        WI|
|1744-07-01|  20.284|        WI|
|1744-08-01|    null|        WI|
+----------+--------+----------+
only showing top 10 rows



143267

In [6]:
# Demographics
df_dim_demographics=spark.read.parquet("output_parquet/dim_demographics/")
df_dim_demographics.printSchema()
df_dim_demographics.show(10)
df_dim_demographics.count()

root
 |-- age_median: float (nullable = true)
 |-- m_population: long (nullable = true)
 |-- f_population: long (nullable = true)
 |-- population: long (nullable = true)
 |-- veterans: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- avg_household: double (nullable = true)
 |-- state_code: string (nullable = true)

+----------+------------+------------+----------+--------+------------+------------------+----------+
|age_median|m_population|f_population|population|veterans|foreign_born|     avg_household|state_code|
+----------+------------+------------+----------+--------+------------+------------------+----------+
|    37.775|     2045050|     2150160|   4195210|  302370|      445560| 2.584999978542328|        NM|
| 36.173965|    61055672|    62388681| 123444353| 4617022|    37059662| 3.095325444224318|        CA|
|  35.81875|     7273095|     7405250|  14678345|  939480|     1688155| 2.560000017285347|        CO|
|  33.21111|      527627|      613916|   1141543| 

49

In [7]:
# Demographics
df_dim_state=spark.read.parquet("output_parquet/dim_state/")
df_dim_state.printSchema()
df_dim_state.show(10)
df_dim_state.count()

root
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)

+----------+--------------------+
|state_code|               state|
+----------+--------------------+
|        DC|District of Columbia|
|        SC|      South Carolina|
|        NC|      North Carolina|
|        NH|       New Hampshire|
|        WV|       West Virginia|
|        MA|       Massachusetts|
|        RI|        Rhode Island|
|        ND|        North Dakota|
|        SD|        South Dakota|
|        PA|        Pennsylvania|
+----------+--------------------+
only showing top 10 rows



51

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### fact_immigration

| Column Name      | Data Type | Description                                            |
|------------------|-----------|--------------------------------------------------------|
| cicid            | int       | Primary key representing the unique immigration ID      |
| arrival_date     | date      | Date of arrival in the destination country              |
| departure_date   | date      | Date of departure from the destination country          |
| citizenship_code | int       | Code representing the citizenship of the individual     |
| residency_code   | int       | Code representing the residency status of the individual|
| state_code       | char(2)   | Code representing the state within the destination country|
| i94_visa_type    | int       | Code representing the type of visa                      |
| temp_visa_type   | char(2)   | Temporary visa type                                     |

#### dim_demographics

| Column Name    | Data Type | Description                                               |
|----------------|-----------|-----------------------------------------------------------|
| state_code     | char(2)   | Primary key representing the state code                    |
| age_median     | float     | Median age of the population in the state                  |
| m_population   | int       | Total male population in the state                         |
| f_population   | int       | Total female population in the state                       |
| population     | int       | Total population in the state                              |
| veterans       | int       | Number of veterans in the state                            |
| foreign_born   | int       | Number of foreign-born residents in the state              |
| avg_household  | float     | Average household size in the state                        |

#### dim__temperature

| Column Name | Data Type | Description                                        |
|-------------|-----------|----------------------------------------------------|
| dt          | date      | Primary key representing the date                   |
| avg_temp    | float     | Average temperature for the given date              |
| state_code  | char(2)   | Code representing the state within the temperature data |

#### dim_state

| Column Name | Data Type | Description                                          |
|-------------|-----------|------------------------------------------------------|
| state_code  | char(2)   | Primary key representing the state code               |
| state       | varchar(255) | Name of the state                                    |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

As I have better understanding of Relational DBs and as explained above, it is better suited for my data, I have chosen an SQL based approach. The data is already big, therefore parallel processing with Spark was important.

Data can be updated per year, as the source data includes governmental information, these kind of sources are provided only once a year. Also, the information we want to provide is not time-crucial, it is mostly historical data.

The data was increased by 100:
- I would use larger machines on EMR cluster, look for better partitioning methods(e.g., month-year could be kept on Immigration data for partitioning, use better suited small data types instead of using auto-inferred PySpark data types.

The data populates a dashboard that must be updated on a daily basis by 7am every day.
- We can use Apache Airflow to schedule the jobs daily by 7am. All the methods can be written with Airflow operators or hooks. 

The database needed to be accessed by 100+ people.
- I would still use an RDBMS like PostgreSQL to supply ACID transactions for data integrity.