In [None]:
### Step 1: Scope the Project and Gather Data

#### Scope 
For this capstone project, we need to aggregate the immigration data by city to create out 1st dimension table. Further, we have to aggregate the city temperature data w.r.t city to create city-wise temperature dimension table. For fact table, join both dimension tables on city. 
We will use spark for the etl processing.

#### Describe and Gather Data 
The I94 immigration data comes from [the US National Tourism and Trade Office website](https://travel.trade.gov/research/reports/i94/historical/2016.html). It is provided in SAS7BDAT format which is a binary database storage format.

**U.S. City Demographic Data (demog):** comes from OpenSoft and includes data by city, state, age, population, veteran status and race.
- i94yr = 4 digit year
- i94mon = numeric month
- i94cit = 3 digit code of origin city
- i94port = 3 character code of destination USA city
- arrdate = arrival date in the USA
- i94mode = 1 digit travel code
- depdate = departure date from the USA
- i94visa = reason for immigration
- The temperature data set comes from Kaggle. It is in csv format.

**I94 Immigration Data (sas_data):** comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

**Airport Code Table (airport):** comes from datahub.io and includes airport codes and corresponding cities.

**Temperature Data :**
- AverageTemperature = average temperature
- City = city name
- Country = country name
- Latitude= latitude
- Longitude = longitude

### Data Engineering Capstone Project

#### Project Summary
<p>The project's goal is to build out an ETL pipeline that uses I94 immagratin data and city tempature data to create a database that is optimized for queries to analize immagration events. The database will then be used to answer questions regarding immagration behavior to location tempatures, airports details and demographics of the cities.</p>

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
from glob import glob
import pandas as pd
from etl.extract import Extract
import re

In [2]:
# create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
config("spark.sql.broadcastTimeout", "36000").\
enableHiveSupport().getOrCreate()

#### file paths

In [3]:
paths = {
    "us_cities_demographics" : "us-cities-demographics.csv",
    "airport_codes" :  "airport-codes_csv.csv",
    "sas_data" : "sas_data/",
    "temperature_data": '/data2/GlobalLandTemperaturesByCity.csv'
}

### sas files column descriptions. 

In [4]:
SAS_Labels_Descriptions = 'I94_SAS_Labels_Descriptions.SAS'

with open(SAS_Labels_Descriptions) as f:
    lines = f.readlines()    

line_comments = [line for line in lines if '/*' in line and '*/\n' in line]
regexp = re.compile(r'^/\*\s+(?P<code>.+?)\s+-\s+(?P<description>.+)\s+\*/$')
matched_lines = [regexp.match(c) for c in line_comments]

for matched_phrase in matched_lines:
    print(matched_phrase.group("code"), ":", matched_phrase.group('description'))

I94YR : 4 digit year
I94MON : Numeric month
I94CIT & I94RES : This format shows all the valid and invalid codes for processing
I94PORT : This format shows all the valid and invalid codes for processing
I94MODE : There are missing values as well as not reported (9)
I94BIR : Age of Respondent in Years
COUNT : Used for summary statistics
DTADFILE : Character Date Field - Date added to I-94 Files - CIC does not use
VISAPOST : Department of State where where Visa was issued - CIC does not use
OCCUP : Occupation that will be performed in U.S. - CIC does not use
ENTDEPA : Arrival Flag - admitted or paroled into the U.S. - CIC does not use
ENTDEPD : Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
ENTDEPU : Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
MATFLAG : Match flag - Match of arrival and departure records
BIRYEAR : 4 digit year of birth
DTADDTO : Character Date Field - Date to which admitted to U.S. (allowed to stay un

In [5]:
extract = Extract(spark, paths)

demographics = extract.get_cities_demographics()
airport_codes = extract.get_airports_codes()
immigration_data = extract.get_immigration()
temperature_data = extract.get_temperature_data()

### Print Schema of extracted data.

In [6]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [7]:
airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [8]:
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [9]:
temperature_data.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### Step 2: Explore and Assess the Data

**Transformation Step**
<p>for cleaning the datasets, we need to import data_cleaning.py from etl directory</p>

**Main steps are:**
- Clean demographics dataset, filling null values withn 0 and grouping by city and state and pivot Race in diferent columns
- Clean airports dataset filtering only US airports and discarting anything else that is not an airport. Extract iso regions and cast as float elevation feet.
- Clean the immigrantion dataset. Rename columns with understandable names. Put correct formats in dates and select only important columns
- Filtered the missing values from the temperature data and accept rows of United States.

In [10]:
from etl.data_cleaning import Cleaner

In [11]:
demographics = Cleaner.get_cities_demographics(demographics)
airport_codes = Cleaner.get_airports(airport_codes)
immigration_data = Cleaner.get_immigration(immigration_data)
temperature_data = Cleaner.get_temperature(temperature_data)

### Top 5 rows of extract data after cleaning step

In [12]:
demographics.show(5)

+-------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|         City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+-------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|       Skokie|      Illinois|      43.4|          31382|            33437|           64819|              1066|       27424|                  2.78|        IL|                                0|20272|                     4937|              6590| 406

In [13]:
airport_codes.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| 00AA|small_airport|Aero B Ranch Airport|      3435.0|       NA|         US|        KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|       450.0|       NA|         US|        AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|       820.0|       NA|         US|        AL|     Harvest|    00AL|     null|      00AL|-86.7703018188476...|
| 00AS|small_airport|      Fulton Airport|      1100.0|       NA|         US|     

In [14]:
immigration_data.show(5)

+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+
| cic_id|cod_port|cod_state|visapost|matflag| dtaddto|gender|airline|        admnum|fltno|visatype|cod_visa|cod_mode|cod_country_origin|cod_country_cit|year|month|bird_year|age|counter|arrival_date|departure_date|
+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+
|5748517|     LOS|       CA|     SYD|      M|10292016|     F|     QF|9.495387003E10|00011|      B1|       1|       1|               438|            245|2016|    4|     1976| 40|      1|  2016-04-30|    2016-05-08|
|5748518|     LOS|       NV|     SYD|      M|10292016|     F|     VA|9.495562283E10|00007|      B1|       1|       1|               438|        

In [15]:
temperature_data.show(5)

+----------+------------------+-----------------------------+----------------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|            City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+----------------+-------------+--------+---------+
|1743-11-01|             3.264|                        1.665|       Allentown|United States|  40.99N|   74.56W|
|1820-01-01|-5.303999999999999|                        2.795|          Pueblo|United States|  37.78N|  103.73W|
|1828-01-01|            -1.977|                        2.551|         Seattle|United States|  47.42N|  121.97W|
|1849-01-01|            13.116|           2.5860000000000003|    Garden Grove|United States|  32.95N|  117.77W|
|1849-01-01|            13.116|           2.5860000000000003|Huntington Beach|United States|  32.95N|  117.77W|
+----------+------------------+-----------------------------+----------------+-------------+--------+---

### Step 3: Define the Data Model
**3.1 Conceptual Data Model**
Map out the conceptual data model and explain why you chose that model

**Star Schema**
**Dimension Tables:**

- **demographics_dim**
  - State, **state_code**, Total_Population, Male_Population, Female_Population, American_Indian_and_Alaska_Native, Asian, Black_or_African-American, Hispanic_or_Latino, White, Male_Population_Ratio, Female_Population_Ratio, American_Indian_and_Alaska_Native_Ratio, Asian_Ratio, Black_or_African-American_Ratio, Hispanic_or_Latino_Ratio, White_Ratio.
- **airports_dim**
  - ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, **local_code**, coordinates.

- **temperature_dim**
    - dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude, **i94port**

**Fact Table:**

- **immigration_fact**
    - **cic_id, cod_port, cod_state**, visapost, matflag, dtaddto, gender, airline, admnum, fltno, visatype, cod_visa, cod_mode, cod_country_origin, cod_country_cit, year, month, bird_year, age, counter, arrival_date, departure_date, arrival_year, arrival_month, arrival_day.

**3.2 Mapping Out Data Pipelines**

**There are two steps:**

- **Transform data**
  - Transform demographics dataset grouping by state an calculate all the totals and ratios for every race in every state.
  - Transform immigration dataset on order to get arrival date in different columns (year, month, day) for partitioning the dataset.

- **Generate Model (Star Schema):**

  - Create all dimensions in parquet.
  - Create fact table in parquet particioned by year, month, day of th arrival date.
  - Insert in fact table only items with dimension keys right. For integrity and consistency.

In [16]:
from etl.transform import Transformer

demographics = Transformer.demographics(demographics)
immigration_data = Transformer.immigrants(immigration_data)
temperature_data = Transformer.temperatue(temperature_data)

### Top 5 rows of extract data after transformation step

In [17]:
demographics.show(5)

+----------+--------------+----------------+---------------+-----------------+---------------------------------+------+-------------------------+------------------+-------+---------------------+-----------------------+---------------------------------------+-----------+-------------------------------+------------------------+-----------+
|State_code|         State|Total_Population|Male_Population|Female_Population|American_Indian_and_Alaska_Native| Asian|Black_or_African-American|Hispanic_or_Latino|  White|Male_Population_Ratio|Female_Population_Ratio|American_Indian_and_Alaska_Native_Ratio|Asian_Ratio|Black_or_African-American_Ratio|Hispanic_or_Latino_Ratio|White_Ratio|
+----------+--------------+----------------+---------------+-----------------+---------------------------------+------+-------------------------+------------------+-------+---------------------+-----------------------+---------------------------------------+-----------+-------------------------------+------------------

In [18]:
immigration_data.show(5)

+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+------------+-------------+-----------+
| cic_id|cod_port|cod_state|visapost|matflag| dtaddto|gender|airline|        admnum|fltno|visatype|cod_visa|cod_mode|cod_country_origin|cod_country_cit|year|month|bird_year|age|counter|arrival_date|departure_date|arrival_year|arrival_month|arrival_day|
+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+------------+-------------+-----------+
|5748517|     LOS|       CA|     SYD|      M|10292016|     F|     QF|9.495387003E10|00011|      B1|       1|       1|               438|            245|2016|    4|     1976| 40|      1|  2016-04-30|    2016-05-08|        2016|           04| 

In [19]:
temperature_data.show(5)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+--------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|cod_port|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+--------+
|1828-01-01|            -1.977|                        2.551|Seattle|United States|  47.42N|  121.97W|     SEA|
|1849-01-01| 7.399999999999999|                        2.699|Ontario|United States|  34.56N|  116.76W|     ONT|
|1821-11-01|             2.322|                        2.375|Spokane|United States|  47.42N|  117.24W|     SPO|
|1835-01-01|             9.833|                        2.182|Nogales|United States|  31.35N|  111.20W|     NOG|
|1743-11-01| 8.129999999999999|                        2.245|Atlanta|United States|  34.56N|   83.68W|     ATL|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+--

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [20]:
destination_paths = {
    'demographics': './output/dimensions/demographics.parquet',
    'airports': './output/dimensions/airports.parquet',
    'temperature': './output/dimensions/temperature.parquet',
    'facts': './output/fact/immigrations_fact.parquet'
}

In [21]:
from etl.models import Model

In [22]:
model = Model(spark, destination_paths)

model.modelize(immigration_data, demographics, airport_codes, temperature_data)

writing demographics parquet....
writing airports parquet....
writing temperature parquet....
writing facts parquet....


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [23]:
from etl.quality_check import QualityCheck

In [24]:
checker = QualityCheck(spark, destination_paths)
immigration_fact = checker.get_facts()
dim_demographics, dim_airports, dim_temperature = checker.get_dimensions()

### validate that tables are not empty.

In [25]:
checker.row_count_check(dim_demographics)

True

In [26]:
checker.row_count_check(dim_airports)

True

In [27]:
checker.row_count_check(dim_temperature)

True

In [28]:
checker.row_count_check(immigration_fact)

True

### check integrity and consistency

In [29]:
checker.integrity_checker(immigration_fact, dim_demographics, dim_airports, dim_temperature)

True

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [30]:
dim_demographics.printSchema()

root
 |-- State_code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Total_Population: double (nullable = true)
 |-- Male_Population: double (nullable = true)
 |-- Female_Population: double (nullable = true)
 |-- American_Indian_and_Alaska_Native: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- Black_or_African-American: long (nullable = true)
 |-- Hispanic_or_Latino: long (nullable = true)
 |-- White: long (nullable = true)
 |-- Male_Population_Ratio: double (nullable = true)
 |-- Female_Population_Ratio: double (nullable = true)
 |-- American_Indian_and_Alaska_Native_Ratio: double (nullable = true)
 |-- Asian_Ratio: double (nullable = true)
 |-- Black_or_African-American_Ratio: double (nullable = true)
 |-- Hispanic_or_Latino_Ratio: double (nullable = true)
 |-- White_Ratio: double (nullable = true)



In [31]:
dim_airports.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [32]:
dim_temperature.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- cod_port: string (nullable = true)



In [33]:
immigration_fact.printSchema()

root
 |-- cic_id: integer (nullable = true)
 |-- cod_port: string (nullable = true)
 |-- cod_state: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- cod_visa: integer (nullable = true)
 |-- cod_mode: integer (nullable = true)
 |-- cod_country_origin: integer (nullable = true)
 |-- cod_country_cit: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- bird_year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- counter: integer (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_d

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. For a capstone project I used Apache Spark to do all the processing data and create the data model (Start Schema). I choose Spark, because Spark can scale a lot of data and the library spark.sql has many tools to transform data. The data persisted in parquet files can scale to terabytes data with best practices.

2. The data should be updated every day. We can use Apache Airflow to ingest every day (arrival date) because fact table are partitioned bay arrival date.

Under the following scenarios, I would approach the problem differently:

If the data was increased by 100x, no problem --> functional programming in spark makes to handle large amount of data.

To update on a daily basis I would use Apache Airflow to create a schedule to update all the data,

If the data needs to be accessed by 100+ people, we can store data in the S3 and further can be access by AWS Redshift/ Spark SQL.