# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project will create a DataWarehouse using Spark and parquet files. A star schema for US airports inmigration data was used: facts and dimensional tables.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import datetime as dt
import pandas as pd
pd.set_option("display.max_columns", None)
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
#from pyspark.sql.functions import isnan, when, count, col

### Step 1: Scope the Project and Gather Data

#### Scope 
We will use several file sources and SPARK to create a star schema model with inmigration data to the United States from April 2016.

#### Describe and Gather Data 

Here we provide a brief description of all input files:
- **Dataframe "inmigration"** created from file "immigration_data_sample" or from parquet files in "sas_data" folder

- **Dataframe "cities"** created from file "us-cities-demographics".
- **Dataframe "visa"** created from file "visa". File was built from "https://www.trade.gov/i-94-arrivals-program" and file "I94_SAS_Labels_Descriptions.SAS".
- **Dataframe "airports"** created from file "airports". File was built from file "I94_SAS_Labels_Descriptions.SAS".
- **Dataframe "airport_data"** created from file "airport-codes_csv".
- **Dataframe "states"** created from file "states". File was built from file "I94_SAS_Labels_Descriptions.SAS".
- **Dataframe "countries"** created from file "countries". File was built from file "I94_SAS_Labels_Descriptions.SAS".
- **Dataframe "modes"** created from file "modes". File was built from file "I94_SAS_Labels_Descriptions.SAS".
- **Dataframe "temperatures"** created from file "GlobalLandTemperaturesByCity.csv". This file is available in another folder due to its large size (508MB).
- **Dataframe "cities_coordinates"** created from file "uscities.csv". This file is available online (https://simplemaps.com/data/us-cities) and it will be necessary to define which state each city corresponds to (in the temperature dataframe) through the latitude and longitude data. This step is necessary because in the temperature file there are several cities with the same name that correspond to different states (column not available). **This match WILL NOT BE IMPLEMENTED in this project** thus, temperature table will not be related to all other tables in our model.

Please refer to files **Initial Data Dictionary.md** and **Data Dictionary.md** for more details on each table.


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [3]:
from tools_source import Sources
from tools_clean_transform import Cleaner_and_Transformer
from tools_validation import Validator
#Modelizer, Validator

#spark = SparkSession.builder.\
#config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
# dictionary with filpaths to all input files - csv and parquet
source_paths={
    'inmigration_path'   : './sas_data',
    'temperature_path'   : '../../data2/GlobalLandTemperaturesByCity.csv',
    'modes_path'         : './sources/modes.csv',
    'cities_path'        : './sources/us-cities-demographics.csv',
    'states_path'        : './sources/states.csv',
    'countries_path'     : './sources/countries.csv',
    'visa_path'          : './sources/visa.csv',
    'airports_path'      : './sources/airports.csv',
    'airport_data_path'  : './sources/airport-codes_csv.csv',
    'cities_coordinates' : './sources/uscities.csv'
}

sources_df = Sources(spark,source_paths)

In [5]:
# import initial dataframes
df_inmigration = sources_df.get_inmigration_data()
df_temp = sources_df.get_temperature_data()
df_modes = sources_df.get_modes_of_arrival()
df_cities = sources_df.get_cities_data()
df_states = sources_df.get_states_data()
df_countries = sources_df.get_countries_data()
df_visa = sources_df.get_visa_data()
df_airports = sources_df.get_airports()
df_airport_data = sources_df.get_airports_data()
df_cities_coord = sources_df.get_cities_coord_data()

### Step 2: Explore and Assess the Data - Clean and Transform

#### Here we will clean and transform our datasets to address the following items:
- Column names and formats
- Duplicated rows
- Null values
- Standardize date columns format and create new columns: year, month, day 
- Standardize and convert coordinate columns (latitude and longitude to float) 

In [6]:
cleaning_dfs = Cleaner_and_Transformer()

In [7]:
df_temp_clean = cleaning_dfs.clean_and_transform_temperature(df_temp)
df_modes_clean = cleaning_dfs.clean_and_transform_modes(df_modes)
df_countries_clean = cleaning_dfs.clean_and_transform_countries(df_countries)
###########################################################################################
# enrich the states dataset with data from cities
df_cities_clean = cleaning_dfs.clean_and_transform_cities(df_cities)
df_states_clean = cleaning_dfs.clean_and_transform_states(df_states,df_cities_clean)
del df_cities_clean
###########################################################################################
df_visa_clean = cleaning_dfs.clean_and_transform_visa(df_visa)
df_airports_clean =  cleaning_dfs.clean_and_transform_airports(df_airports)
df_airport_data_clean =  cleaning_dfs.clean_and_transform_airport_data(df_airport_data)
df_cities_coord_clean =  cleaning_dfs.clean_and_transform_cities_coord(df_cities_coord)
###########################################################################################
df_inmigration_clean_all = cleaning_dfs.clean_and_transform_inmigration_data(df_inmigration)


In [8]:
dict_all_df = {
    'inmigration' : df_inmigration_clean_all,
    'temperature' : df_temp_clean,
    'modes'       : df_modes_clean,
    'countries'   : df_countries_clean,
    'states'      : df_states_clean,
    'visa'        : df_visa_clean,
    'airports'    : df_airports_clean,
    'airport_data': df_airport_data_clean,
    'cities'      : df_cities_coord_clean,
}

In [9]:
# Filter only inmigration data in DIM tables
df_inmigration_clean = cleaning_dfs.integrity_inmigration_data(dict_all_df)
all_entries = df_inmigration_clean_all.count()
valid_entries = df_inmigration_clean.count()
percentage_entries = int(valid_entries/all_entries*100)
print(f'There are {all_entries} entries/rows but only {valid_entries} are valid ({percentage_entries}%), meaning, represented in current DIM Tables')

There are 3096313 entries/rows but only 1141596 are valid (36%), meaning, represented in current DIM Tables


In [10]:
# update dictionary
dict_all_df.update({'inmigration' : df_inmigration_clean})

In [11]:
# Tables after cleaning and transform processes
for df_name, df in dict_all_df.items():
    lll = df.count()
    print('------------------------------------------------------------')
    print(f'{df_name} has {lll} rows')
    df.printSchema()
    if lll>10:
        display(df.limit(3).toPandas().head(3))
    else:
        display(df.toPandas())
    

------------------------------------------------------------
inmigration has 1141596 rows
root
 |-- cic_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- birth_country: integer (nullable = true)
 |-- residence_country: integer (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- arrival_mode_id: integer (nullable = false)
 |-- state_name_code: string (nullable = false)
 |-- departure_date: date (nullable = true)
 |-- repondent_age: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- visa_issued_department: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_arrival_departure_flag: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- allowed_date: date (nullable = true)
 |-- gender:

Unnamed: 0,cic_id,year,month,birth_country,residence_country,airport_id,arrival_date,arrival_mode_id,state_name_code,departure_date,repondent_age,count,visa_issued_department,occupation,arrival_flag,departure_flag,update_flag,match_arrival_departure_flag,birth_year,allowed_date,gender,ins_number,airline,admission_number,flight_number,visa_type,arrival_year,arrival_month,arrival_day,departure_year,departure_month,departure_day,date_added_year,date_added_month,date_added_day,date_added,allowed_date_year,allowed_date_month,allowed_date_day
0,5748525,2016,4,245,464,HOU,2016-04-30,1,FL,2016-05-07,27,1,ACK,,G,O,,M,1989,2016-10-29,M,,NZ,2147483647,28,B2,2016,4,30,2016,5,7,2016,4,30,2016-04-30,2016,10,29
1,5748527,2016,4,245,504,NEW,2016-04-30,1,MA,2016-05-02,44,1,GUZ,,G,O,,M,1972,2016-10-29,M,,UA,2147483647,1215,B2,2016,4,30,2016,5,2,2016,4,30,2016-04-30,2016,10,29
2,5748532,2016,4,245,504,MIA,2016-04-30,1,FL,2016-05-07,53,1,PNM,,G,O,,M,1963,2016-10-29,F,,CM,2147483647,430,B2,2016,4,30,2016,5,7,2016,4,30,2016-04-30,2016,10,29


------------------------------------------------------------
temperature has 661524 rows
root
 |-- date: date (nullable = true)
 |-- average_temperature: float (nullable = true)
 |-- average_temperature_uncertainty: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- lng: float (nullable = true)



Unnamed: 0,date,average_temperature,average_temperature_uncertainty,city,country,latitude,longitude,year,month,day,lat,lng
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1820,1,1,32.950001,-100.529999
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,1820,2,1,32.950001,-100.529999
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,1820,3,1,32.950001,-100.529999


------------------------------------------------------------
modes has 4 rows
root
 |-- mode_id: integer (nullable = true)
 |-- mode_name: string (nullable = true)



Unnamed: 0,mode_id,mode_name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


------------------------------------------------------------
countries has 289 rows
root
 |-- country_id: integer (nullable = true)
 |-- country_name: string (nullable = true)



Unnamed: 0,country_id,country_name
0,582,MEXICO Air Sea
1,236,AFGHANISTAN
2,101,ALBANIA


------------------------------------------------------------
states has 55 rows
root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- veterans: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = true)
 |-- white: long (nullable = true)



Unnamed: 0,state_code,state_name,male_population,female_population,total_population,veterans,foreign_born,american_indian_and_alaska_native,asian,black_or_african_american,hispanic_or_latino,white
0,AL,ALABAMA,497248,552381,1049629,71543,52154,8084,28769,521068,39313,498920
1,AK,ALASKA,152945,145750,298695,27492,33258,36339,36825,23107,27261,212696
2,AZ,ARIZONA,2227455,2272087,4499542,264505,682313,129708,229183,296222,1508157,3591611


------------------------------------------------------------
visa has 19 rows
root
 |-- visa_code: integer (nullable = true)
 |-- visa_categorie: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- visa_type_description: string (nullable = true)



Unnamed: 0,visa_code,visa_categorie,visa_type,visa_type_description
0,1,Business,B1,Visa Holder: Non-Immigrant Temporary Visitor f...
1,1,Business,WB,Visa Waiver Program: Temporary Visitor for Bus...
2,1,Business,GB,Visa Waiver Program: Guam Visa Waiver Business


------------------------------------------------------------
airports has 660 rows
root
 |-- airport_id: string (nullable = true)
 |-- airport_name: string (nullable = true)



Unnamed: 0,airport_id,airport_name
0,ALC,ALCAN
1,ANC,ANCHORAGE
2,BAR,BAKER AAF - BAKER ISLAND


------------------------------------------------------------
airport_data has 1956 rows
root
 |-- ent_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lng: float (nullable = true)



Unnamed: 0,ent_id,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,lat,lng
0,07FA,small_airport,Ocean Reef Club Airport,8,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804",-80.274803,25.325399
1,0AK,small_airport,Pilot Station Airport,305,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601",-162.899994,61.934601
2,0CO2,small_airport,Crested Butte Airpark,8980,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918",-106.928345,38.851917


------------------------------------------------------------
cities has 28338 rows
root
 |-- city: string (nullable = true)
 |-- city_ascii: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- county_fips: integer (nullable = true)
 |-- county_name: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lng: float (nullable = true)
 |-- population: integer (nullable = true)
 |-- density: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- military: string (nullable = true)
 |-- incorporated: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- ranking: integer (nullable = true)
 |-- zips: string (nullable = true)
 |-- id: integer (nullable = true)



Unnamed: 0,city,city_ascii,state_id,state_name,county_fips,county_name,lat,lng,population,density,source,military,incorporated,timezone,ranking,zips,id
0,New York,New York,NY,New York,36061,New York,40.694302,-73.924896,18713220,10715,polygon,False,True,America/New_York,1,11229 11226 11225 11224 11222 11221 11220 1138...,1840034016
1,Los Angeles,Los Angeles,CA,California,6037,Los Angeles,34.113899,-118.406799,12750807,3276,polygon,False,True,America/Los_Angeles,1,90291 90293 90292 91316 91311 90037 90031 9000...,1840020491
2,Chicago,Chicago,IL,Illinois,17031,Cook,41.837299,-87.686203,8604203,4574,polygon,False,True,America/Chicago,1,60018 60649 60641 60640 60643 60642 60645 6064...,1840000494


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
This model have a FACT table for inmigration data. DIM tables are available to enrich avialable information, i.e. **airport** details and **state** statistics. 
<br>
Examples on how to query current model can be found in a separate file:
- **Query Examples.ipynb**
<br><br><br>

As *future work*:
- *airport* and *airport_data* can be easily merged to simplify the model. 
- we prepared the *cities* table with relevant information for each city. In the future, this information may be aggregated and converted to statistics for each state.
- the *temperature* table was also created. This table has information about each city but does not refer to the state of that city, so it will be necessary to make an interpolation of the location (lat, lng) to be able to use this data in our model.

<img src="model.png" width="1250"/>

#### 3.2 Mapping Out Data Pipelines
After cleaning and transforming all input files into dataframes/tables, we will generate our model in parquet files:
- DIM files
- FACT table particioned by arrival_year, arrival_month and arrival_day 



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [12]:
dict_all_df = {
    'inmigration' : {'df'  : df_inmigration_clean,
                     'file':'./model/facts_inmigration.parquet'},
    'temperature' : {'df'  : df_temp_clean,
                     'file': './model/temperature.parquet'},
    'modes'       : {'df'  : df_modes_clean,
                     'file': './model/mode.parquet'},
    'countries'   : {'df'  : df_countries_clean,
                     'file': './model/countrie.parquet'},
    'states'      : {'df'  : df_states_clean,
                     'file': './model/state.parquet'},
    'visa'        : {'df'  : df_visa_clean,
                     'file': './model/visa.parquet'},
    'airports'    : {'df'  : df_airports_clean,
                     'file': './model/airport.parquet'},
    'airport_data': {'df'  : df_airport_data_clean,
                     'file': './model/airport_data.parquet'},
    'cities'      : {'df'  : df_cities_coord_clean,
                     'file': './model/city.parquet'},
}

In [13]:
for name, _dict in dict_all_df.items():
    _df = _dict['df']
    _path = _dict['file']
    if name == 'inmigration':
        print(f'Export {name} FACT Table to paquet files')
        _df.write.partitionBy("arrival_year", "arrival_month", "arrival_day").mode('overwrite').parquet(_path)
    else:
        print(f'Export {name} DIM Table to paquet files')
        _df.write.mode('overwrite').parquet(_path)
   

Export inmigration FACT Table to paquet files
Export temperature DIM Table to paquet files
Export modes DIM Table to paquet files
Export countries DIM Table to paquet files
Export states DIM Table to paquet files
Export visa DIM Table to paquet files
Export airports DIM Table to paquet files
Export airport_data DIM Table to paquet files
Export cities DIM Table to paquet files


#### 4.2 Data Quality Checks

We will perform the following data validations:
- Each table has rows (more than 0) and unique values are respected
- All entries in FACT table (inmigration) are identified (present) in DIM tables
 
Run Quality Checks<br>
- **We will not run quality checks on *cities* and *temperature* because this will only be useful after implementing the comments described above as *future work*.**
 

In [14]:
dict_all_df = {
    'inmigration' : {'df'  : df_inmigration_clean,
                     'file':'./model/facts_inmigration.parquet',
                     'unique_key': 'cic_id'},
    'modes'       : {'df'  : df_modes_clean,
                     'file': './model/mode.parquet',
                     'unique_key': 'mode_id'},
    'countries'   : {'df'  : df_countries_clean,
                     'file': './model/countrie.parquet',
                     'unique_key': 'country_id'},
    'states'      : {'df'  : df_states_clean,
                     'file': './model/state.parquet',
                     'unique_key': 'state_code'},
    'visa'        : {'df'  : df_visa_clean,
                     'file': './model/visa.parquet',
                     'unique_key': 'visa_type'},
    'airports'    : {'df'  : df_airports_clean,
                     'file': './model/airport.parquet',
                     'unique_key': 'airport_id'},
    'airport_data': {'df'  : df_airport_data_clean,
                     'file': './model/airport_data.parquet',
                     'unique_key': 'iata_code'},
}

##### Check rows

In [15]:
validation_dfs = Validator(spark,dict_all_df)

In [16]:
row_status = validation_dfs.check_rows()
row_status

{'inmigration': 'OK',
 'modes': 'OK',
 'countries': 'OK',
 'states': 'OK',
 'visa': 'OK',
 'airports': 'OK',
 'airport_data': 'OK'}

##### Check FACT table integrity

In [17]:
fact_integrity, rows_integrity = validation_dfs.check_integrity()
fact_integrity

{'modes': True,
 'countries': True,
 'states': True,
 'visa': True,
 'airports': True,
 'airport_data': True}

In [18]:
# 0 it's OK (good)
rows_integrity

{'modes': 0,
 'countries': 0,
 'states': 0,
 'visa': 0,
 'airports': 0,
 'airport_data': 0}

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Please refer to files **Initial Data Dictionary.md** and **Data Dictionary.md** for more details on each table.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Final Comments**
- In this project we used Apache Spark to process data and create the model. Spark is good at handling large files and that's why we were able to analyze more than 3M of data entries. Data persists in parquet files after processing.
- Data can be updated on a daily basis because it's being stored by arrival date (partitioned by).
- Hypothetical scenarios:
    - If data increases 100x we can continue using Spark or we can evaluate migrating the entire infrastructure to AWS: using S3 for storage and redshift for processing (great scalability)
    - To update data on a fixed schedule we could use Apache Airflow with a fixed schedule. Ss an intermediate process, we can evaluate the performance of having a cron job running on AWS (before implementing Airflow).
    - If the database needs to be accessed by 100+ people, making the data available through S3 (files) and/or give access to a redshift cluster (database) would be good options.

#### Reminder:
Examples on how to query current model can be found in a separate file:
- **Query Examples.ipynb**
    - Example 1: inmigration data from a specific arrival day
    - Example 2: inmigration data from a specific airport
    - Example 3: inmigration data from people who were born in BRAZIL and crossed the LAND border