# US National Tourism Data Warehouse from Scratch

### Project Summary

This project was built with the goal of providing wide information for analysis purpose about Immigration data of US. The goal is to create and build a Datawarehouse from scratch with clean and structured data that you can apply machine learning models and BI tools to for example know about what kind of visa every immigrant has, which states or port entries receive more immigrants, average period of stay in the US, visa types, etc. 

With this information, you can provide an exhaustive analysis about e.g Which state receive more immigrants, which season is the best for travelers based on weather temperature of each state. How many people live in every state or city, airports for each state. You can even apply machine learning models and use this information to promote events or hotel room deals. 

#### Project Life Cycle 

![img](datasets/cycle_1.png)

In [1]:
import pandas as pd
import numpy as np
import os
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Config for postgresql
# To execute write statements it's necessary to download Postgres jar and upload into a pyspark jar directory. https://jdbc.postgresql.org/download.html
url_db = "jdbc:postgresql://127.0.0.1:5432/imm_dwh"
properties = {"user": "student", "password": "student", "driver": "org.postgresql.Driver"}

## Introduction

The scope of this project is to gather valuable information related to Immigrants and build a structured Datawarehouse that can be helpful to Business Analytics and Machine Learning models. 

All tables were created with the final to match between each other using the state column. This means that you can select all immigrants for New York State and at the same time know which airports you have available in that state, or know what was the average temperature on a particular day for New York. 

To build this project we're going to use Spark and Postgres. 

Spark shine with largest datasets like US National Immigration, that's why we're going to use Spark Dataframes to build all tables. The other reason is that it's really simple to connect with Postgres. 
Spark provides methods to write directly into Postgres without the necessity to create tables, Spark creates tables by himself. If you want to delete the data before insert in a dimension table, all you have to do is specified "overwrite" mode in the write statement, the same for "append" mode. 

Postgres will allow us to create the Datawarehouse as simple as possible and if we want to migrate in the future to Amazon Redshift, it provides good synergy due to Amazon Redshift was build upon Postgres 8.0.2. 

## Datasets: 

List of datasets used in this project: 

- I94 Immigration Data: This data came from the US National Tourism and Trade Office, 
[this dataset](https://travel.trade.gov/research/reports/i94/historical/2016.html) contains all information about immigrants that travel to the US. In this dataset you will find data like ports of entry, airline, the number of flight, type of visa, date of entry, date until allowed to stay in the US. etc. 

- Historical Hourly Weather Data: This dataset contains more than 5 years of hourly weather information with various attributes, such as temperature, humidity, air pressure, etc.   [Dataset link](https://www.kaggle.com/selfishgene/historical-hourly-weather-data)

- U.S. City Demographic Data: This dataset is from OpenDataSoft, provides all information about the total population for cities and states of the US. You can download the entire dataset here: [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

- Airport Code Table: Dataset with all airports for each city and state of the US. Link to download: [here](https://datahub.io/core/airport-codes#data)



### Dimension Tables:

    - dim_us_city
    - dim_city_temp
    - dim_airport
    - dim_country
    
### Fact Table:

    - immigration_us
    
    
### Database Model


![title](datasets/model.png)


## Getting Started

To execute this project successfully we have two options
The first one is to execute cell by cell of this notebook. The second option is to execute a series of scripts created to accomplish the same as this notebook but without all analysis made it here.

### Jupyter Notebook option 

- This is pretty straightforward, we can accomplish this executing cell by cell of this notebook but first, we need to execute a few files to create a database and tables for this project.
I recommend executing this project following this notebook because this provides wide information about why we choose some clean methods, how we solve null values, columns, pivot, etc. 
This will give you more information about how we approached every step of this project.

1. Execute `create_tables.py`: This file creates a database called `imm_dwh` and all tables used in this project. 
2. After that, we can execute all cells of this notebook without a problem.

### Python script option

- As we said before, we can accomplish the same goal executing a series of python files, the unique difference is that in the script we don't implement analysis methods like print a Dataframe after every clean process. That's the main difference with the Jupyter notebook option.

1. Execute `create_tables.py`: This file creates a database called `imm_dwh` and all tables used in this project. 
2. Execute `etl.py`: This file executes the ETL pipeline to build and write into all tables of the Datawarehouse. 

### Aditional Information

- Data Dictionary: We add a file called `data_dictionary.md` (To be watched on Github without problems) that contains a dictionary an explanation about every column of this Datawarehouse.
- SQL Queries: You can find all queries like create tables and select in the file called `sql_queries.py`
- Credentials of AWS: Credentials of AWS are already implemented in `etl.py` file, if we want to migrate this project to the cloud. 


## 1. Dimension Table: `dim_us_city` 

Let's create a Spark Session, clean and transform the dataset, after that, we can write the dataframe in parquet files. 

In [2]:
# Create a Spark Session 
spark = SparkSession.builder\
    .appName('national_tourism')\
    .getOrCreate()

In [3]:
us_city_path  = os.getcwd() + '/datasets/us-cities-demographics.csv'
df_city = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(us_city_path)
df_city.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


For the purpose of this project, we're going to select just a few columns and transform some others. e.g: We are going to group the data by City. There's no need to sum the population of males and females because the source already did that for us. We can corroborate this by choosing a random city and compare with this link: https://suburbanstats.org/

Some columns are going to be discarded like race, n˚ of veterans, avg household size, etc. because don't fit in the purpose of the project.


Columns of `dim_us_city`: 
* __id_city__: Serial, primary key
* __city__: Name of the city 
* __state__: Name of the State of US without abbreviation
* __male_population__: Male total population for a city
* __female_population__: Female total population for a city
* __total_population__: Male + Female population
* __state_prefix__: Abbreviation of State name, more information [here](https://en.wikipedia.org/wiki/List_of_U.S._state_abbreviations)


In [4]:
columns  = ['City', 'State', 'Male Population', 'Female Population', 'Total Population']
new_columns = [column.replace(" ", "_").lower() for column in columns]
df_city = df_city.select(*columns).dropDuplicates().toDF(*new_columns)
df_city.limit(5).toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population
0,Saint Cloud,Minnesota,34311,33942,68253
1,West Covina,California,51629,56860,108489
2,Saint Joseph,Missouri,37688,38408,76096
3,Troy,Michigan,42371,40905,83276
4,Fayetteville,Arkansas,41959,40873,82832


### Data Cleaning for `dim_us_city` table

First of all, it's a good practice to check for null values in the dataset

In [7]:
df_city.filter((F.col("male_population").isNull() == True) | (F.col("female_population").isNull() == True)).toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population
0,The Villages,Florida,,,72590


In [8]:
df_city.filter((F.col("city").isNull() == True) | (F.col("state").isNull() == True) ).toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population


In [10]:
df_city.filter(F.col("total_population").isNull() == True).toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population


We have columns `male_population` and `female_population` with null values. 

Let's change those NaN values using fillna method. 

In [5]:
df_cityFill = df_city.fillna({'male_population': 0, 'female_population': 0})
df_cityFill.filter('city = "The Villages"').toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population
0,The Villages,Florida,0,0,72590


As we described before, the entire Datawarehouse was created to be joined by State (prefix) column. That's why every table needs to have a `state prefix` column

Let's add a state prefix column to this Dataframe

In [6]:
from datasets.data import Data
from pyspark.sql.functions import udf

@udf
def state_prefix(name):   
    prefix = [key for key,value in Data.states.items() if value==name][0]
    return prefix
   
      
df_cityState = df_cityFill.withColumn("state_prefix", state_prefix(df_cityFill.state))
df_cityState.limit(5).toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population,state_prefix
0,Saint Cloud,Minnesota,34311,33942,68253,MN
1,West Covina,California,51629,56860,108489,CA
2,Saint Joseph,Missouri,37688,38408,76096,MO
3,Troy,Michigan,42371,40905,83276,MI
4,Fayetteville,Arkansas,41959,40873,82832,AR


Searching for null values in the new column

In [13]:
df_cityState.filter(F.col("state_prefix").isNull()).toPandas()

Unnamed: 0,city,state,male_population,female_population,total_population,state_prefix


Now that we have cleaned and added columns to the Dataframe, we can write the data in a database.

Write DataFrame into Postgres

In [7]:
# uncomment to execute
# df_cityState.write.mode("append").jdbc(url=url_db, table="dim_us_city", properties=properties)

Create a variable with state and city to add state column in dim_city_temperature (Next dim table to be created)

In [8]:
state_city = df_cityState.select("state_prefix", "city").dropDuplicates().rdd.map(lambda x: (x[0],x[1])).collect()

## 2. Dimension Table: `dim_city_temp` 

For our main dataset (Immigration) we have only data of United States, that's why we're going to filter or select (due to the original csv is pivot) this dataset by cities only on the US. 

Columns of `dim_city_temp`: 

* __id_weather__: Serial, primary key
* __datetime__: Date column in format YYYY-MM-DD  
* __city__: City of US
* __temp__: Average temperature for a City of US. This temperature is grouped by day and city.
* __state_prefix__: Abbreviation of State name, more information [here](https://en.wikipedia.org/wiki/List_of_U.S._state_abbreviations)

Create a list of cities of US based on the dictionary of our dataset

In [5]:
df_cities  = pd.read_csv(os.getcwd() + "/datasets/historical-hourly-weather-data/city_attributes.csv")
us_cities = df_cities[df_cities.Country == "United States"]['City'].values.tolist()

Spark Dataframe for dim table, select column based on the US city list.

In [6]:
# path dataset
path_temperature = os.getcwd() + "/datasets/historical-hourly-weather-data/temperature.csv"
# dataframe
df = spark.read.format("csv").option("header", "true").load(path_temperature)
df_city = df.select('datetime', *us_cities)
df_city.limit(5).toPandas()

Unnamed: 0,datetime,Portland,San Francisco,Seattle,Los Angeles,San Diego,Las Vegas,Phoenix,Albuquerque,Denver,...,Indianapolis,Atlanta,Detroit,Jacksonville,Charlotte,Miami,Pittsburgh,Philadelphia,New York,Boston
0,2012-10-01 12:00:00,,,,,,,,,,...,,,,,,,,,,
1,2012-10-01 13:00:00,282.08,289.48,281.8,291.87,291.53,293.41,296.6,285.12,284.61,...,283.85,294.03,284.03,298.17,288.65,299.72,281.0,285.63,288.22,287.17
2,2012-10-01 14:00:00,282.083251974,289.474992813,281.797216632,291.868185522,291.533500952,293.403141271,296.608508543,285.154558187,284.607305531,...,283.889393939,294.03534141,284.069789234,298.205229759,288.650172214,299.732517698,281.024767377,285.663207797,288.24767617,287.186092094
3,2012-10-01 15:00:00,282.091866475,289.460618112,281.789832606,291.862844459,291.543355079,293.392177052,296.631487354,285.233951595,284.5999178,...,283.941919192,294.049702185,284.173964682,298.299595186,288.650581705,299.76657946,281.088318736,285.756824139,288.326939663,287.23167159
4,2012-10-01 16:00:00,282.100480976,289.446243412,281.78244858,291.857503395,291.553209206,293.381212832,296.654466164,285.313345004,284.59253007,...,283.994444444,294.064062959,284.278140131,298.393960613,288.650991196,299.800641223,281.151870096,285.85044048,288.406203155,287.277251086


The dataset provided has a column for each city, that's not the best way to handle this data. That's why we need to unpivot the dataframe and create two new columns called `City` and `Temp`. The `City` column will have every city column in the original dataset and `Temp` will have the temperature for each city every hour.

Unpivot Dataframe

In [9]:
# replace spaces in columns
df_newcolumn = df_city.toDF(*[column.replace(" ", "") for column in df_city.columns])
# unpivot 
stack_statement = "stack(27, 'Portland', Portland, 'SanFrancisco', SanFrancisco, 'Seattle', Seattle, 'LosAngeles', LosAngeles, 'SanDiego', SanDiego, 'LasVegas', LasVegas, 'Phoenix', Phoenix, 'Albuquerque', Albuquerque, 'Denver', Denver, 'SanAntonio', SanAntonio, 'Dallas', Dallas, 'Houston', Houston, 'KansasCity', KansasCity, 'Minneapolis', Minneapolis, 'SaintLouis', SaintLouis, 'Chicago', Chicago, 'Nashville', Nashville, 'Indianapolis', Indianapolis, 'Atlanta', Atlanta, 'Detroit', Detroit, 'Jacksonville', Jacksonville, 'Charlotte', Charlotte, 'Miami', Miami, 'Pittsburgh', Pittsburgh, 'Philadelphia', Philadelphia, 'NewYork', NewYork, 'Boston', Boston) as (City, Temp)"
df_weather = df_newcolumn.selectExpr("Datetime", stack_statement).where("Temp is not null")
df_weather.limit(5).toPandas()

Unnamed: 0,Datetime,City,Temp
0,2012-10-01 13:00:00,Portland,282.08
1,2012-10-01 13:00:00,SanFrancisco,289.48
2,2012-10-01 13:00:00,Seattle,281.8
3,2012-10-01 13:00:00,LosAngeles,291.87
4,2012-10-01 13:00:00,SanDiego,291.53


We don't need the temperature splitted by hour and city, instead, we prefer to group the data by day and city and calculate the average of temperature for every day. Let's do that.

Change date format (YYYY-MM-DD) and order by datetime, city

In [10]:
from dateutil.parser import parse

datetime_udf = F.udf(lambda x: parse(x), T.DateType())

df_weatherDate = df_weather.withColumn("Datetime", datetime_udf(df_weather.Datetime))\
                .orderBy("Datetime", "City")
df_weatherDate.limit(5).toPandas()

Unnamed: 0,Datetime,City,Temp
0,2012-10-01,Albuquerque,285.154558187
1,2012-10-01,Albuquerque,285.63091864
2,2012-10-01,Albuquerque,285.392738413
3,2012-10-01,Albuquerque,285.313345004
4,2012-10-01,Albuquerque,285.472131822


Avg temperature group by day (YYYY-MM-DD) and City

In [11]:
df_avg_weather =  df_weatherDate.groupBy("Datetime", "City").agg({"Temp": "avg"})
df_avg_weather.limit(5).toPandas()

Unnamed: 0,Datetime,City,avg(Temp)
0,2012-10-01,Albuquerque,285.476208
1,2012-10-01,Atlanta,294.093604
2,2012-10-01,Boston,287.371091
3,2012-10-01,Charlotte,288.651832
4,2012-10-01,Chicago,284.552669


Return name of cities to normal (spaces between words)

In [40]:
from pyspark.sql.functions import udf

replace_cities = {
'SanFrancisco': 'San Francisco',
'LosAngeles': 'Los Angeles',
'SanDiego': 'San Diego',
'LasVegas': 'Las Vegas', 
'SanAntonio': 'San Antonio',
'KansasCity': 'Kansas City',
'SaintLouis': 'Saint Louis',
'NewYork': 'New York',
} 

@udf
def replace_city(name):
    for key, value in replace_cities.items():
        if name == key:
            return value
    return name


df_weatherReplace = df_avg_weather.withColumn('City', replace_city(df_avg_weather.City))
df_weatherReplace.limit(5).toPandas()

Unnamed: 0,Datetime,City,avg(Temp)
0,2012-10-01,Albuquerque,285.476208
1,2012-10-01,Atlanta,294.093604
2,2012-10-01,Boston,287.371091
3,2012-10-01,Charlotte,288.651832
4,2012-10-01,Chicago,284.552669


Let's add the state prefix column into the dataframe

In [41]:
@udf
def state(string):
    for value in state_city:
        if string == value[1]:
            return value[0] # value[0] is equal to state prefix
    return None # if there's no match return None 
    
df_weatherState = df_weatherReplace.withColumn('State', state(df_weatherReplace.City))
df_weatherState.limit(5).toPandas()

Unnamed: 0,Datetime,City,avg(Temp),State
0,2012-10-01,Albuquerque,285.476208,NM
1,2012-10-01,Atlanta,294.093604,GA
2,2012-10-01,Boston,287.371091,MA
3,2012-10-01,Charlotte,288.651832,NC
4,2012-10-01,Chicago,284.552669,IL


Write Parquet files 

In [43]:
df_weatherState.withColumnRenamed("avg(Temp)", "Temp")\
                    .write.partitionBy("State")\
                    .parquet("weather.parquet")

In [9]:
df_weatherParquet = spark.read.parquet("weather.parquet")
df_weatherParquet.limit(5).toPandas()

Unnamed: 0,Datetime,City,Temp,State
0,2015-01-31,San Diego,283.752563,CA
1,2015-01-31,San Francisco,286.552042,CA
2,2015-02-01,Los Angeles,284.660312,CA
3,2015-02-01,San Diego,284.296771,CA
4,2015-02-01,San Francisco,285.360312,CA


Change temperature measurement from Kelvin to Fahrenheit using this metric conversion formula: https://www.metric-conversions.org/temperature/kelvin-to-fahrenheit.htm

In [10]:
fahrenheit_udf = F.udf(lambda x: '%.3f'%((x - 273.15) * 1.8000 + 32.00)) # return a three decimal float number

df_weatherFahrenheit = df_weatherParquet.withColumn("Temp", fahrenheit_udf(df_weatherParquet.Temp))
df_weatherFahrenheit.limit(5).toPandas()

Unnamed: 0,Datetime,City,Temp,State
0,2015-01-31,San Diego,51.085,CA
1,2015-01-31,San Francisco,56.124,CA
2,2015-02-01,Los Angeles,52.719,CA
3,2015-02-01,San Diego,52.064,CA
4,2015-02-01,San Francisco,53.979,CA


Our Dataframe is almost ready, all we have to do now is change the name of every column to lower for a better practice.

Rename and lower columns 

In [12]:
columns = [column.lower() for column in df_weatherFahrenheit.columns]
df_weatherLower = df_weatherFahrenheit.toDF(*columns)
df_weatherLower.limit(5).toPandas()

Unnamed: 0,datetime,city,temp,state
0,2015-01-31,San Diego,51.085,CA
1,2015-01-31,San Francisco,56.124,CA
2,2015-02-01,Los Angeles,52.719,CA
3,2015-02-01,San Diego,52.064,CA
4,2015-02-01,San Francisco,53.979,CA


The Dataframe looks really good. Now we can write the data into Postgres.

Write into Postgres

In [14]:
# unncomment to execute
# df_weatherFahrenheit.write.mode("append").jdbc(url=url_db, table="dim_us_weather", properties=properties)

## 3. Dimension Table: `dim_airport` 

This dataset has all airport codes, types, and location for all cities of the US. 
For the purpose of this project, we only need a few columns like type, country, region, and municipality

Columns for `dim_airport`: 
- __id_airport__: ID code for each airport of US
- __type__: Type of airport, like small airport, large, heliport, etc.
- __name__: Name of the airport
- __country__: Country were the airport is located
- __state__: State were the airport is located
- __city__: City were the airport is located

First, read the dataset and analyze which columns we have. 

In [15]:
df = spark.read.format("csv").option("header", "true").load(os.getcwd() + "/datasets/airport-codes_csv.csv")
df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


As our main dataset is related to US immigration. The table `dim_airport` will only have US airports. 
This dataframe will be filtered by country = 'US'

In [16]:
df_airport = df.filter('iso_country = "US" and municipality is not null').select('ident', 'type', 'name', 'iso_country', 'iso_region', 'municipality')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality
0,00A,heliport,Total Rf Heliport,US,US-PA,Bensalem
1,00AA,small_airport,Aero B Ranch Airport,US,US-KS,Leoti
2,00AK,small_airport,Lowell Field,US,US-AK,Anchor Point
3,00AL,small_airport,Epps Airpark,US,US-AL,Harvest
4,00AR,closed,Newport Hospital & Clinic Heliport,US,US-AR,Newport


Create a new column for state using iso_region

In [17]:
udf_state = F.udf(lambda x: x[3::])
df_airport_state = df_airport.withColumn('iso_region', udf_state(df_airport.iso_region))
df_airport_state = df_airport_state.withColumnRenamed('iso_region', 'state')
df_airport_state.limit(5).toPandas()

Unnamed: 0,ident,type,name,iso_country,state,municipality
0,00A,heliport,Total Rf Heliport,US,PA,Bensalem
1,00AA,small_airport,Aero B Ranch Airport,US,KS,Leoti
2,00AK,small_airport,Lowell Field,US,AK,Anchor Point
3,00AL,small_airport,Epps Airpark,US,AL,Harvest
4,00AR,closed,Newport Hospital & Clinic Heliport,US,AR,Newport


Change name of columns

In [18]:
columns = ['id_airport', 'type', 'name', 'country', 'state', 'city']
df_airportNew = df_airport_state.toDF(*columns)
df_airportNew.limit(5).toPandas()

Unnamed: 0,id_airport,type,name,country,state,city
0,00A,heliport,Total Rf Heliport,US,PA,Bensalem
1,00AA,small_airport,Aero B Ranch Airport,US,KS,Leoti
2,00AK,small_airport,Lowell Field,US,AK,Anchor Point
3,00AL,small_airport,Epps Airpark,US,AL,Harvest
4,00AR,closed,Newport Hospital & Clinic Heliport,US,AR,Newport


Write into Postgres

In [19]:
# uncomment to execute
# df_airportNew.write.mode("overwrite").jdbc(url=url_db, table="dim_airport", properties=properties)


## 4. Dimension Table: `dim_country` 

This table will be the relationship for `Citizen` and `Resident` columns in the fact table immigration.

All data came from `I94_SAS_Labels_Descriptions.SAS` file and is used as a Data Dictionary for the main dataset.

We created a `.py` file with all key, values for this columns, this file can be found in `datasets/data.py`

Columns for `dim_country`: 
- __id_country__: ID country, related to us immigration dictionary. This code is related to the column `citizen` and `resident` for every immigrant in the main dataset.
- __country__: Name of the country were the immigrant is a citizen and resident. 


In [20]:
from datasets.data import Data

columns = ['id_country', 'country']

df_country = spark.createDataFrame([(key,value) for key, value in Data.countries.items()], schema=columns)
df_country.limit(5).toPandas()

Unnamed: 0,id_country,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


Write into Postgres

In [21]:
# uncomment to execute
# df_country.write.mode("overwrite").jdbc(url=url_db, table="dim_country", properties=properties)

## 5. US Immigration Fact Table

This is our main table to be created, as we said before, all other tables (dim tables) can be matched by state prefix with our fact table `(immigration_us)`


This data came from US National Tourism and Trade Office: https://travel.trade.gov/research/reports/i94/historical/2016.html 


Columns for `immigration_us`:

* __id__: Unique ID for every row in the dataset.
* __year__: Year of entry into US.
* __month__: Month of entry into US.
* __citizen__: Country code where the immigrant is an official citizen.
* __resident__: Country code where the immigrant is an official resident.
* __port_entry__: Port of entry into US.
* __mode_entry__: Mode of entry, like air, sea, land, etc. 
* __arrival_date__: Date of arrival.
* __dep_date__: Departure date.
* __dateadd_to__: Date to which admitted to U.S. (allowed to stay until).
* __state_addr__: State address where the immigrant will stay. 
* __birth_year__: Year of birth.
* __age__: Age of immigrant in years.
* __gender__: Immigrant sex.
* __visa_code__: Visa category code: 1- Business, 2- Pleasure, 3- Student.
* __visa_type__: Class of admission legally admitting the non-immigrant to temporarily stay in U.S 
* __airline__: Airline used to arrive in the U.S. If correspond. 

This dataset needs to be read in SAS format.

In [23]:
from pyspark.sql import SparkSession

ss = SparkSession\
        .builder\
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11")\
        .enableHiveSupport()\
        .getOrCreate()

In [None]:
path = '../../data/18-83510-I94-Data-2016/*.sas7bdat'
df = ss.read.format("com.github.saurfang.sas.spark").load(path)
# write parquet
df.write.parquet("datasets/sas_data")

In [24]:
# read parquet file
df = ss.read.parquet("datasets/sas_data")
df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In this section, we need to change the columns `arrdate` and `depdate`. This columns are in SAS numeric date format. 

We need to create a udf function to apply a conversion for these columns. Due to `depdate` has some null values, we need to apply an if/else statement into our udf function.

In [61]:
df.filter("depdate is null").count()

142457

In [25]:
from datetime import datetime
from datetime import timedelta
from dateutil.parser import parse

epoch = datetime(1960,1,1)

sas_days = F.udf(lambda x: (timedelta(days=int(x)) + epoch) if x else None, T.DateType())

df_dateParse = df.withColumn("arrdate", sas_days(df.arrdate))
df_dateParse2 = df_dateParse.withColumn("depdate", sas_days(df_dateParse.depdate))
df_dateParse2.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,CA,2016-05-08,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,NV,2016-05-17,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-08,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-14,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-14,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


There's another date column that we need to change called `depdate` which refers to date allowed to stay in U.S 

This is a date character field but some rows have a weird format, so we may not be able to transform every row. That's why the function created needs to handle this problem.

In [112]:
df_dateParse2.select('dtaddto').where('(length(dtaddto) < 8) or (dtaddto is null) or dtaddto rlike "[a-zA-Z]|[ ]"').distinct().collect()

[Row(dtaddto='183'),
 Row(dtaddto=None),
 Row(dtaddto='10 02003'),
 Row(dtaddto='D/S'),
 Row(dtaddto='06 02002'),
 Row(dtaddto='/   183D')]

In [117]:
df_dateParse2.filter('(length(dtaddto) < 8) or (dtaddto is null) or dtaddto rlike "[a-zA-Z]|[ ]"').count()

45824

Create a function that change the format of dtatto to date and return null if row format is not correct.

In [26]:
from pyspark.sql.functions import udf

def char_date(string):
    try:
        return datetime.strptime(str(string), '%m%d%Y')
    except:
        return None
    
udf_charDate = udf(char_date, T.DateType())

df_charDate =  df_dateParse2.withColumn("dtaddto", udf_charDate(df_dateParse2.dtaddto))
df_charDate.limit(5).toPandas()    

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,CA,2016-05-08,...,,M,1976.0,2016-10-29,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,NV,2016-05-17,...,,M,1984.0,2016-10-29,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-08,...,,M,1987.0,2016-10-29,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-14,...,,M,1987.0,2016-10-29,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,2016-04-30,1.0,WA,2016-05-14,...,,M,1988.0,2016-10-29,M,,DL,94956390000.0,40,B1


In [128]:
df_charDate.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)


Change Double columns to Int. and build our final table based on this columns 

'cicid', 
'i94yr', 
'i94mon', 
'i94cit', 
'i94res', 
'i94port', 'i94mode', 'arrdate', 'depdate', 'dtaddto', 'i94addr', 'biryear', 'i94bir', 'gender', 'i94visa', 'visatype', 'airline'


In [27]:
to_int = F.udf(lambda x: int(x) if x else None, T.IntegerType())

columns = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'i94mode', 'arrdate', 'depdate', 'dtaddto', 'i94addr', 'biryear', 'i94bir', 'gender', 'i94visa', 'visatype', 'airline']

df_immigration = df_charDate.withColumn("cicid", to_int(df_charDate.cicid))\
                .withColumn("i94yr", to_int(df_charDate.i94yr))\
                .withColumn("i94mon", to_int(df_charDate.i94mon))\
                .withColumn("i94cit", to_int(df_charDate.i94cit))\
                .withColumn("i94res", to_int(df_charDate.i94res))\
                .withColumn("i94mode", to_int(df_charDate.i94mode))\
                .withColumn("biryear", to_int(df_charDate.biryear))\
                .withColumn("i94bir", to_int(df_charDate.i94bir))\
                .withColumn("i94visa", to_int(df_charDate.i94visa))\
                .select(*columns)

df_immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- depdate: date (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- visatype: string (nullable = true)
 |-- airline: string (nullable = true)



In [28]:
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94mode,arrdate,depdate,dtaddto,i94addr,biryear,i94bir,gender,i94visa,visatype,airline
0,5748517,2016,4,245,438,LOS,1,2016-04-30,2016-05-08,2016-10-29,CA,1976,40,F,1,B1,QF
1,5748518,2016,4,245,438,LOS,1,2016-04-30,2016-05-17,2016-10-29,NV,1984,32,F,1,B1,VA
2,5748519,2016,4,245,438,LOS,1,2016-04-30,2016-05-08,2016-10-29,WA,1987,29,M,1,B1,DL
3,5748520,2016,4,245,438,LOS,1,2016-04-30,2016-05-14,2016-10-29,WA,1987,29,F,1,B1,DL
4,5748521,2016,4,245,438,LOS,1,2016-04-30,2016-05-14,2016-10-29,WA,1988,28,M,1,B1,DL


Change name of columns for immigration dataframe/table

In [29]:
new_columns = ['id', 'year', 'month', 'citizen', 'resident', 'port_entry', 'mode_entry', 'arrival_date', 'dep_date', 'dateadd_to', 'state_addr', 'birth_year', 'age', 'gender', 'visa_code', 'visa_type', 'airline']

df_imm = df_immigration.toDF(*new_columns)
df_imm.limit(5).toPandas()

Unnamed: 0,id,year,month,citizen,resident,port_entry,mode_entry,arrival_date,dep_date,dateadd_to,state_addr,birth_year,age,gender,visa_code,visa_type,airline
0,5748517,2016,4,245,438,LOS,1,2016-04-30,2016-05-08,2016-10-29,CA,1976,40,F,1,B1,QF
1,5748518,2016,4,245,438,LOS,1,2016-04-30,2016-05-17,2016-10-29,NV,1984,32,F,1,B1,VA
2,5748519,2016,4,245,438,LOS,1,2016-04-30,2016-05-08,2016-10-29,WA,1987,29,M,1,B1,DL
3,5748520,2016,4,245,438,LOS,1,2016-04-30,2016-05-14,2016-10-29,WA,1987,29,F,1,B1,DL
4,5748521,2016,4,245,438,LOS,1,2016-04-30,2016-05-14,2016-10-29,WA,1988,28,M,1,B1,DL


Create a variable which contains the total rows for the Dataframe, this variable will be useful for a Data Quality process

In [30]:
total_dataframe = df_imm.count()
total_dataframe

3096313

Write into Postgres

In [31]:
# uncomment to execute
# df_imm.write.mode("append").jdbc(url=url_db, table="immigration_us", properties=properties)

## Data Quality

For the purpose of this project, we are going to check two main tables and apply a data quality process to make sure that the data was written without problems.

1. The first data quality process will be a count between the data in the Spark Dataframe and immigration_us table after the writing process. For this we're going to make a count method using Spark and then make a select into immigration_us table. This check will be valid if the count for the Spark Dataframe and the select count(1) for the table are equals.

2. The second data quality process will be a check if there's a problem with the arrival date value. This is a very import column because every immigrant needs to have an arrival date, so if for some reason this column is entire null, our Datawarehouse is not useful for Analytics and Machine Learning.

3. The third data quality process will be a count into our dim_us_temp table. The idea is always to have a temperature available is we want to make a join between immigration_us and dim_us_temp tables. That's why we're going to select all years of our main dataset and validate if we have a temperature available for these years.


Create a connection for PostgresSQL

In [32]:
conn = psycopg2.connect("host=127.0.0.1 dbname=imm_dwh user=student password=student")
cur = conn.cursor()

SQL queries for Data Quality

In [33]:
select_imm_count = """
    SELECT COUNT(1) FROM IMMIGRATION_US;
"""

select_arrival_date = """
    SELECT 
    SUM(CASE WHEN ARRIVAL_DATE IS NULL THEN 1 ELSE 0 END) NULL_VALUES,
    SUM(CASE WHEN ARRIVAL_DATE IS NOT NULL THEN 1 ELSE 0 END) NOT_NULL_VALUES  
    FROM IMMIGRATION_US;
"""

select_year_imm = """
    SELECT DISTINCT YEAR
    FROM IMMIGRATION_US
    ORDER BY YEAR;
"""

select_year_weather = """
    SELECT DISTINCT CAST(EXTRACT(YEAR FROM datetime) AS INTEGER) AS YEAR 
    FROM DIM_US_WEATHER 
    ORDER BY YEAR;
"""

### 1. First Data Quality check

In [34]:
cur.execute(select_imm_count)
count = cur.fetchall()

if total_dataframe == count[0][0]:
    print(f"Data Quality check passed Successfully!\n\nTotal values in dataframe: {total_dataframe}\nTotal values in table: {count[0][0]}")
else:
    print(f"Data Quality check failed! \nTotal values in dataframe: {total_dataframe}\n Total values in table: {count[0][0]}")

Data Quality check passed Successfully!

Total values in dataframe: 3096313
Total values in table: 3096313


### 2. Second Data Quality check

In [35]:
cur.execute(select_arrival_date)
result = cur.fetchall()

if result[0][1] > 0 and result[0][1] > result[0][0]:
    print(f"Data Quality check passed Successfully!\n\nTotal null values: {result[0][0]}\nTotal not null values: {result[0][1]}")
else: 
    print(f"Data Quality check failed!\n\nTotal null values: {result[0][0]}\n Total not null values: {result[0][1]}")

Data Quality check passed Successfully!

Total null values: 0
Total not null values: 3096313


### 3. Third Data Quality check

In [36]:
from itertools import chain

# check for distinct years in immigration_us table
cur.execute(select_year_imm)
result_1 = cur.fetchall()

# check for distint years in dim_us_temp table
cur.execute(select_year_weather)
result_2 = cur.fetchall()


if result_1[0][0] in chain.from_iterable(result_2):
    print(f"Data Quality check passed Successfully!")
else:
    print(f"Data Quality check failed!")

Data Quality check passed Successfully!


Close connection

In [37]:
conn.commit()
conn.close()

### FAQs about the project

* __What is the rationale for the choice of tools and technologies for the project?__

__Apache Spark__

We choose to work with Spark having in mind the scalability that may upcoming from our dataset and have the ability to transform data quickly at scale. For example, Pandas in an amazing library and you can accomplish pretty much all of this code using it but is most suitable for working with data that fits into one single machine. So what can happen if we want to process data for 2016, 2017, 2018, 2019? That's not a good approach for Pandas library. This is when Spark shine. Spark will help us to distribute over the cluster doing all tasks faster and without bottleneck. At the moment we're just running a spark job on-premises but we can deploy this on EC2 and take all advantages of AWS. 

Another reason of why we choose Spark is his impressive Machine Learning library MLib. This project was created having in mind to build a structured Datawarehouse suitable for Analysis and Machine Learning. So if the company wants to make use of all this data and apply Machine Learning models, Spark is the best choice. 

Cleaning the data for Data Analysis is a crucial process, Spark helps us with this too with the capacity to deploy Dataframes and analyze the data before inserting into the database. This provides data integration in the Datawarehouse. 

__PostgreSQL__

Postgres provide an excellent synergy with Spark. You can write a Spark Dataframe into a Postgres database without the necessity to create a table. 

Postgres is a relational database, this fits perfectly with the scope of the project and if in the future we want to migrate the entire database into AWS, Amazon Redshift was built based on Postgres 8.0.2, so this provides less work and problems if we have to implement constraints, triggers, stored procedures, etc. It's relevant to say that the syntax of queries is the same too.


* __How often the data should be updated and why?__

Our main dataset immigration has information every day. But it's not crucial to update the data for every new row. We need to have in mind that this dataset can be joined with weather information, city population, airport, and states. 

Immigration and weather tables can be updated one time a day, why? because the weather table provides temperature information grouped by day and city. In this way, when we run the ETL, the data from the day before will be processed and calculate the average temperature for every city. At the same time, we can load the data for immigrants.

The other datasets like city population, airport and states can be updated one or two times a year. We need to have in mind that this dataset doesn't change in a long time. The next Census is scheduled to 2020, so the period of an update is relatively long compared to the other datasets.


* __How to approach the problem differently under the following scenarios:__
 * __The data was increased by 100x.__
 
This project is actually running on a single machine. If the data increase by 100x, first of all, we need to migrate the data to the cloud. A good approach is to use EMR on AWS, the idea is to execute this notebook, save the data in parquet files into an S3 bucket and provide information for BI tools and machine learning models.

One of the benefits of implementing EMR is the pricing, we pay per-instant rate for every second used. This means that we don't have to be running EC2 machines 24hours daily to execute a job 1 hour a day. We just pay for the hour of service used.

Another benefit is his auto-scaling. We can provide one or thousands of compute instances to process the data. We can increase or decrease the number of nodes manually or with auto-scaling. This is a good deal is we are not totally sure about how many nodes we need to process the data. This also means that we spend less time tuning and monitoring the clusters.


![img](datasets/cycle_2.png)

 
 * __The data populates a dashboard that must be updated on a daily basis by 7am every day.__
 
To schedule a job, a good approach is to make use of Apache Airflow. We can accomplish pretty much the same using CRON scheduler but Apache Airflow provides a more robust system. With Apache Airflow we have the ability to monitor, schedule and re-run failed events. 

To run a Spark Job using Apache Airflow we can make use of our `etl.py` file. This file is the same process than this notebook but instead, it doesn't made use of the analysis process, the file just read, clean and write data into Postgres. 

Using a BashOperator we can create a task and indicate in the bash_command variable the .py file we want to submit. 

A good tutorial to accomplish this task can be found here: [link](https://blog.insightdatascience.com/scheduling-spark-jobs-with-airflow-4c66f3144660)

 * __The database needed to be accessed by 100+ people.__

For this purpose, the database needs to migrate to the cloud. A good approach will be make use of Amazon Redshift. 

We can create groups for security and control access for the entire organization, e.g design a group for every department of the company. In this way, we don't compromise the information.

Amazon Redshift is based on Postgres 8.0.2. If we want to migrate the data into Redshift, first we may need to make some adjustments like constraints, and SQL syntax. For example, if we have a table that makes use of ON CONFLICT constraint. We have to implement this task using other methods, like select distinct, or filter the data before insert. This is because Postgres 8.0.2 doesn't provide this kind of constraints like Postgres 11.