# Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

###  First data frame
SAS data

In [1]:
# All imports and installs

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print("Completed, ran successfully")

Completed, ran successfully


In [2]:
import pandas as pd
pd.set_option('max_columns', None)

In [3]:
from pyspark.sql.functions import isnan, when, count, col

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

# Project overview ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

<img src="Images/over.PNG">

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
immigrations_df_spark =spark.read.load('./sas_data')

In [4]:
print((immigrations_df_spark.count(), len(immigrations_df_spark.columns)))

(3096313, 28)


In [5]:
immigrations_df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [6]:
immigrations_df_spark.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,57.0,2.0,1.0,20160430,ACK,,G,O,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,66.0,2.0,1.0,20160430,ACK,,G,O,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,41.0,2.0,1.0,20160430,ACK,,G,O,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,27.0,2.0,1.0,20160430,ACK,,G,O,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,26.0,2.0,1.0,20160430,ACK,,G,O,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [7]:
immigrations_df_spark.select([count(when(col(c).isNull(), c)).alias(c) for c in immigrations_df_spark.columns]).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,239,152592,142457,802,0,0,1,1881250,3088187,238,138429,3095921,138429,802,477,414269,2982605,83627,0,19549,0


Getting rid of columns with most of the 'Null' data > 50%

    * visapost
    * occup
    * entdepu
    * insnum

In [8]:
columns_to_drop = ['visapost', 'occup', 'entdepu', 'insnum']
immigrations_df_spark = immigrations_df_spark.drop(*columns_to_drop)


Getting rid of columns with meaningless data for this project 

    * entdepa
    * entdepd
    * entdepd
    * count 
    * admnum

In [9]:
columns_to_drop = ['entdepa', 'entdepd', 'entdepd', 'count', 'adnum']
immigrations_df_spark = immigrations_df_spark.drop(*columns_to_drop)

In [10]:
immigrations_df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



Only the first 1000000 of the 3096313 rows will be taken for the "Immigrations data" in order to reduce the size of the file.

In [None]:
# save data as csv to upload it to a S3 instance
immigrations_df_spark.limit(1000000).toPandas().to_csv('immigrations_data.csv')

## Tables from i94 labels

* Getting more tables to be able to perform joins with demographics and temperatures tables

In [21]:
import pandas as pd
def sas_value_parser(value, columns):
    """Parses SAS Program file to return value as pandas dataframe
    Args:
        value (str): sas value to extract.
        columns (list): list of 2 containing column names.
    Return:
        None
    """
    file = 'I94_SAS_Labels_Descriptions.SAS'
    
    file_string = ''
    
    with open(file) as f:
        file_string = f.read()
    
    file_string = file_string[file_string.index(value):]
    file_string = file_string[:file_string.index(';')]
    
    line_list = file_string.split('\n')[1:]
    codes = []
    values = []
    
    for line in line_list:
        
        if '=' in line:
            code, val = line.split('=')
            code = code.strip()
            val = val.strip()

            if code[0] == "'":
                code = code[1:-1]

            if val[0] == "'":
                val = val[1:-1]

            codes.append(code)
            values.append(val)
        
            
    return pd.DataFrame(list(zip(codes, values)), columns=columns)
    

In [22]:
# This table shows all the valid and invalid codes for processing

i94_residence = sas_value_parser('i94cntyl', ['i94cit_res', 'country'])
#i94_residence.to_csv('i94_residence.csv')
i94_residence.head()

Unnamed: 0,i94cit_res,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [23]:
i94_port_of_admission = sas_value_parser('i94prtl', ['i94port', 'port'])
#i94_port_of_admission.to_csv('i94_port_of_admission.csv')
i94_port_of_admission.head()

Unnamed: 0,i94port,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [24]:
i94_usa_state_arrival = sas_value_parser('i94addrl', ['i94addr', 'state'])
#i94_usa_state_arrival.to_csv('i94_usa_state_arrival.csv')
i94_usa_state_arrival.head()

Unnamed: 0,i94addr,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


## Working on demographics dataframe

Demographic data

In [28]:
demographics_path = 'us-cities-demographics.csv'
demographics_sp_df = spark.read.option("delimiter",";").option("header", True).csv(demographics_path)
demographics_sp_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [30]:
print((demographics_sp_df.count(), len(demographics_sp_df.columns)))

(2891, 12)


In [29]:
demographics_sp_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [31]:
demographics_sp_df.select([count(when(col(c).isNull(), c)).alias(c) for c in demographics_sp_df.columns]).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,0,0,0,3,3,0,13,13,16,0,0,0


Since there are not many nulls or columns with useless information, 
all the data will be taken as the original source

In [32]:
# save data as csv to upload it to a S3 instance
demographics_sp_df.toPandas().to_csv('demographics_data.csv')

## Working on the temperatures dataframe

Temperature data

In [33]:
temp_path = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_sp_df = spark.read.option("header", True).csv(temp_path)
temp_sp_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [34]:
print((temp_sp_df.count(), len(temp_sp_df.columns)))

(8599212, 7)


In [35]:
temp_sp_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [36]:
temp_sp_df.select([count(when(col(c).isNull(), c)).alias(c) for c in temp_sp_df.columns]).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,0,364130,364130,0,0,0,0


Lets delete the rows with null values

In [39]:
temp_sp_df = temp_sp_df.na.drop(subset=["AverageTemperature","AverageTemperatureUncertainty"])

In [40]:
temp_sp_df.select([count(when(col(c).isNull(), c)).alias(c) for c in temp_sp_df.columns]).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,0,0,0,0,0,0,0


In [41]:
temp_sp_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
2,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
3,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
4,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E


Only the first 1000000 of the 8599212 rows will be taken for the "Temperature data" in order to reduce the size of the file.

In [42]:
# save data as csv to upload it to a S3 instance
temp_sp_df.limit(1000000).toPandas().to_csv('Temperature_data.csv')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Data model explanation

This model was chosen to be able to join the immigrations table with any of the other 5 tables.


![Image_2](Images/data_m.PNG)

# 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Uploading CSV files to AWS S3.

Steps that are necessary to uploat the csv files from local to an AWS S3 instance.
In fact Step 2 is condensated in this program to read the csv files, and select neccesary columns to upload within the CSVs on S3.

<pre>python get_csv_to_s3.py</pre>

### Creating DB tables

<pre>python create_tables.py</pre>

![Image_2](Images/create_tables.PNG)


### Populating tables in the DB.

Data is dumped into the tables from S3 files (using the COPY command) 

<pre>python etl.py</pre>

![Image_3](Images/copy_csv.png)


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

* 1st data quality check

First ckecker to verify if the recordings are done in each table

* 2nd data quality check

Second checker to take a look at the first 5 rows of each table

Run the data quality checkers to show if we have data recorded and display the first 5 rows.

<pre>python data_quality.py</pre>

![Image_x](Images/data_q.PNG)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Immigrations table

| Feature | Description |
| --- | --- |
| imm_idx | Row number, relative to the table |
| cicid | Unique record ID |
| i94yr | 4 digit year |
| i94mon | Numeric month |
| i94cit | Digit for immigrant city |
| i94res | 3 digit code for immigrant country of residence |
| i94port | Port of admission |
| arrdate | Arrival Date in the USA |
| i94mode | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| i94addr | USA State of arrival |
| depdate | Departure Date from the USA |
| i94bir | Age of Respondent in Years |
| i94visa | Visa codes collapsed into three categories |
| dtadfile | Character Date Field - Date added to I-94 Files |
| matflag | Match flag - Match of arrival and departure records |
| biryear | 4 digit year of birth |
| dtaddto | Character Date Field - Date to which admitted to U.S. (allowed to stay until) |
| gender | Non-immigrant sex |
| airline | airline code 2 letters |
| admnum | double |
| fltno | 5 digit string |
| visatype | Numerical key for the visa type |


### i94_residence table

| Feature | Description |
| --- | --- |
| res_idx | Row number, relative to the table |
| code | i94cit & i94res digit codes |
| country | Country for each code |

### i94_port_of_admission table

| Feature | Description |
| --- | --- |
| por_idx | Row number, relative to the table |
| code | i94port 3 letter codes |
| port | port of admission for each code |

### i94_usa_state_arrival table

| Feature | Description |
| --- | --- |
| arr_idx | Row number, relative to the table |
| code | i94port 3 letter codes |
| state | USA state of arrival for each code |


### Demographics table

| Feature | Description |
| --- | --- |
| dem_idx | Row number, relative to the table |
| city | City Name |
| state | US State where city is located |
| median_age | Median age of the population |
| male_population | Count of male population |
| female_population | Count of female population |
| total_population | Count of totale population |
| number_of_veterans | Count of veteran population |
| foreign_born | Count of residents of the city that were not born in the city |
| average_household_size | Average city household size |
| state_code | Code from the state |
| race | Respondent race |
| count | Count of city's individual per race |


### Temperatures table

| Feature | Description |
| --- | --- |
| temp_idx | Row number, relative to the table |
| dt | Record date |
| AverageTemperature | global average land temperature in celsius |
| AverageTemperatureUncertainty | the 95% confidence interval around the average |
| City | Meassurement city |
| Country | Country where the meassurement was taken |
| Latitude | Coordinate |
| Longitude | Coordinate |


### Step 5: Complete Project Write Up

* What's the goal?

Integrate the knowledge gained trough the lessons to create this project.

* What queries will you want to run? / Why did you choose the model you chose?

For the DB we could get different info type about US immigrations / airports / demographics / temperatures and that info could be joined by location (state, country ...) except for the airports data wich coordinates should be transformed into state or city before performing joins by place, however that step takes too much time so I've skipped it.

* How would Spark or Airflow be incorporated?

For this project spark is used to work with dataframes because is faster than pandas.

Working with airflow we would have 4 nodes in our DAG (begin_execution, S3 to redshift, data quality checkers, end_execution) and the update would be defined monthly

* Clearly state the rationale for the choice of tools and technologies for the project.

Python ► With this programming language it comes the utilization of different libraries to manipulate databases, files and data (configparser, psycopg2, os, pandas...)

PySpark ► PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment.

AWS_S3 ► is storage for the Internet. It is designed to make web-scale computing easier.

AWS_Redshift ► is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers.

* Propose how often the data should be updated and why.

Tables should be updated each time the data from the sources changes, for example for the immigration data set the data is updated monthly.

* Post your write-up and final data model in a GitHub repo.

<a href= "https://github.com/juan-ivan-NV/Data_Engineering_Nanodegree/tree/main/16_Project_5_Capstone_Project">Github project</a> 

* Include a description of how you would approach the problem differently under the following scenarios:

    * If the data was increased by 100x.
    
    For that reason the data is stored in Redshift warehouse, so the data can keep wrowing and also the cluster capacity.
    
    Another option could be Cassandra to write online transactions as it comes in, and later aggregated into analytics tables in Redshift.

    * If the pipelines were run on a daily basis by 7am.

    For that reason Airflow implementation could be a good choice to set the data pipeline execution hourly or daily, and implement data quality checks in case something fails send mails to be aware of the pipeline performance.
    
    * If the database needed to be accessed by 100+ people.

    While storing the DB in Redshift or another warehouse we can grant those access and set the cluster to handle that ammount of requests.