### USA-Immigration Data Warehouse 

#### Project Summary

Many people travel to USA for different purposes, the TSA (Transport Security Administration) is interested to know in depth the immigration patterns in a monthly basis by the airport based on different factors, such as immigration data, temperature, US Demographics and Airport Codes. This project provides a data warehouse, which will allow different TSA members to access curated data that can be use for making reports and deeper analytics insights related to traveler patterns. 

In [1]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, rand
from pyspark.sql.functions import isnan, when, count, col
import pyspark.sql.functions as F
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import configparser
import psycopg2

### Scope the Project and Datasets

#### Scope 
In order to achieve a data warehouse, I developed a data pipeline that Extract - Transform - Load data into a data warehouse. This will allow data analysts to consume the data and provide deeper data insights. The project involves different cloud technologies, such as Redshift (data warehouse), pySpark (read some datasets) and Apache AirFlow for data pipeline orchestration

#### Datasets

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office and we will use the 2016 data. Each report contains international visitor arrival statistics by world regions and select countries such as type of visa, mode of transportation, age groups, states visited, the top ports of entry, etc. [Source](https://travel.trade.gov/research/reports/i94/historical/2016.html)

* World Temperature Data: This dataset came from Kaggle and contains a compilation of global temperatures since 1750. In this case we will focus on the dataset **GlobalLandTemperatureByCity.csv**, which contains: AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude. [Source](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).


* U.S. City Demographic Data: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey. [Source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

* Airport Code Table: The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code. Airport codes from around the world. Downloaded from public domain source http://ourairports.com/data/ who compiled this data from multiple different sources. [Source](https://datahub.io/core/airport-codes#data)



## Datasets Gathering, Exploration & Analysis


### I-94 Immigration Dataset

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
imm_data = spark.read.parquet("dags/datasets/sas_data")
print("Size of the dataset", imm_data.count())
imm_data.limit(15).toPandas()

Size of the dataset 3096313


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


<p style="color:blue;"> By looking at this SAS information, it represents a person record per row with the main details of entry and exit in USA, this is very insightful and it will become in the main fact table for our project. I also extract the schema of the SAS file to understand the data types and is NULL values are allowed in certain columns </p>

In [4]:
imm_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

<p style="color:blue;"> It is important to understand the number of columns and the value types to later consider them in the sql tables </p>

In [5]:
df_imm = imm_data.limit(15).toPandas()
df_imm.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 28 columns):
cicid       15 non-null float64
i94yr       15 non-null float64
i94mon      15 non-null float64
i94cit      15 non-null float64
i94res      15 non-null float64
i94port     15 non-null object
arrdate     15 non-null float64
i94mode     15 non-null float64
i94addr     14 non-null object
depdate     15 non-null float64
i94bir      15 non-null float64
i94visa     15 non-null float64
count       15 non-null float64
dtadfile    15 non-null object
visapost    15 non-null object
occup       0 non-null object
entdepa     15 non-null object
entdepd     15 non-null object
entdepu     0 non-null object
matflag     15 non-null object
biryear     15 non-null float64
dtaddto     15 non-null object
gender      15 non-null object
insnum      0 non-null object
airline     15 non-null object
admnum      15 non-null float64
fltno       15 non-null object
visatype    15 non-null object
dtypes: float64(13)

<p style="color:blue;"> Given the size of this dataset and the limited amount of memory in the workspace, I just wanted to have an idea if some columns have potential null values as percentage with 10000 records </p>

In [6]:
df_imm_all = imm_data.limit(10000).toPandas()
df_imm_all.isnull().sum()/df_imm_all.shape[0]

cicid       0.0000
i94yr       0.0000
i94mon      0.0000
i94cit      0.0000
i94res      0.0000
i94port     0.0000
arrdate     0.0000
i94mode     0.0000
i94addr     0.0400
depdate     0.0517
i94bir      0.0000
i94visa     0.0000
count       0.0000
dtadfile    0.0000
visapost    0.4918
occup       0.9936
entdepa     0.0000
entdepd     0.0517
entdepu     0.9999
matflag     0.0517
biryear     0.0000
dtaddto     0.0000
gender      0.0422
insnum      1.0000
airline     0.0000
admnum      0.0000
fltno       0.0000
visatype    0.0000
dtype: float64

<p style="color:blue;"> We can notice that some columns are incomplete such as depdate, occup, gender records for example. However, it is not critical to have those values empty in this particular dataset </p>

#### I94_SAS_Labels_Description.SAS
The previous dataset in SAS contains an additional labels and descriptions, which can be complemented as dimensions. However, the SAS file needs to be parsed in order to extract the codes. In particular for the project, I am interested in the following codes:

- Country
- Port
- Mode
- Addr
- Type


In [7]:
def sas_program_file_value_parser(sas_source_file, value, columns):
    """Parses SAS Program file to return value as pandas dataframe
    Args:
        sas_source_file (str): SAS source code file.
        value (str): sas value to extract.
        columns (list): list of 2 containing column names.
    Return:
        None
    """
    file_string = ''
    
    with open(sas_source_file) as f:
        file_string = f.read()
    
    file_string = file_string[file_string.index(value):]
    file_string = file_string[:file_string.index(';')]
    
    line_list = file_string.split('\n')[1:]
    codes = []
    values = []
    
    for line in line_list:
        
        if '=' in line:
            code, val = line.split('=')
            code = code.strip()
            val = val.strip()

            if code[0] == "'":
                code = code[1:-1]

            if val[0] == "'":
                val = val[1:-1]

            codes.append(code)
            values.append(val)
        
            
    return pd.DataFrame(zip(codes,values), columns=columns)

In [8]:
i94cit_res = sas_program_file_value_parser('dags/datasets/I94_SAS_Labels_Descriptions.SAS', 'i94cntyl', ['code', 'country'])
i94port = sas_program_file_value_parser('dags/datasets/I94_SAS_Labels_Descriptions.SAS', 'i94prtl', ['code', 'port'])
i94mode = sas_program_file_value_parser('dags/datasets/I94_SAS_Labels_Descriptions.SAS', 'i94model', ['code', 'mode'])
i94addr = sas_program_file_value_parser('dags/datasets/I94_SAS_Labels_Descriptions.SAS', 'i94addrl', ['code', 'addr'])
i94visa = sas_program_file_value_parser('dags/datasets/I94_SAS_Labels_Descriptions.SAS', 'I94VISA', ['code', 'type'])

### World Temperature Dataset

In [9]:
df_temp_data = pd.read_csv('dags/datasets/GlobalLandTemperaturesByCity.csv')
print("Size of the dataset: ", len(df_temp_data))
df_temp_data.head(10)

Size of the dataset:  8599212


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


<p style="color:blue;"> In this dataset, I am interested to know potential unique values on the dataset </p>

In [10]:
for col in df_temp_data:
    print(col, df_temp_data[col].is_unique)

dt False
AverageTemperature False
AverageTemperatureUncertainty False
City False
Country False
Latitude False
Longitude False


<p style="color:blue;"> The dt clumns is not unique or datetime, which make sense, since we have diferent countries. As a rule of thumb, since this dataset specifies temperatures. If the AverageTemperature and AverageTemperatureUncertainty are NaN, we can actually delete those entries and keep only values with temperature measurements. </p>

In [11]:
df_temp_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


<p style="color:blue;"> This just provided us an idea of the type of data in the dataset, most of the objects seems to be text </p>

In [12]:
df_temp_data.isnull().sum()/df_temp_data.shape[0]

dt                               0.000000
AverageTemperature               0.042345
AverageTemperatureUncertainty    0.042345
City                             0.000000
Country                          0.000000
Latitude                         0.000000
Longitude                        0.000000
dtype: float64

<p style="color:blue;"> The proportion of NULL values is quite small compared with the size of the dataset, as an idea we can remove the NULL temperature values, since those rows do not bring too much value. </p>

### USA City Demographics Dataset

In [13]:
df_city_dem_data = pd.read_csv('dags/datasets/us-cities-demographics.csv', sep=';')
print("Size of dataset: ", len(df_city_dem_data))
df_city_dem_data.head(10)

Size of dataset:  2891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [14]:
df_city_dem_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


<p style="color:blue;"> I explore again the type of values in order to define them later my database tables </p>

In [15]:
df_city_dem_data.isnull().sum()/df_city_dem_data.shape[0]

City                      0.000000
State                     0.000000
Median Age                0.000000
Male Population           0.001038
Female Population         0.001038
Total Population          0.000000
Number of Veterans        0.004497
Foreign-born              0.004497
Average Household Size    0.005534
State Code                0.000000
Race                      0.000000
Count                     0.000000
dtype: float64

<p style="color:blue;"> We get a proportion of the potential NULL values in the dataset, in order to undertand which columns may need a data test. In this dataset, I would not remove the incomplete rows, since it offers relevant informations in other columns </p>

### Airport Code Dataset

In [16]:
df_airport_code_data = pd.read_csv('dags/datasets/airport-codes_csv.csv')
df_airport_code_data.head(10)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [17]:
for col in df_airport_code_data:
    print(col, df_airport_code_data[col].is_unique)

ident True
type False
name False
elevation_ft False
continent False
iso_country False
iso_region False
municipality False
gps_code False
iata_code False
local_code False
coordinates False


<p style="color:blue;"> Here the indent column is unique and it serves as primary key </p>

In [18]:
df_airport_code_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


<p style="color:blue;"> Here the object types seems to be dominant by text types </p>

In [19]:
df_airport_code_data.isnull().sum()/df_airport_code_data.shape[0]

ident           0.000000
type            0.000000
name            0.000000
elevation_ft    0.127208
continent       0.503296
iso_country     0.004485
iso_region      0.000000
municipality    0.103059
gps_code        0.255016
iata_code       0.833155
local_code      0.479147
coordinates     0.000000
dtype: float64

<p style="color:blue;"> Proportion of the potential NULL values in the dataset in some cases is high like iata_code. However, this is information related to airports, every row droped just eliminate the possibility of airport matching with other tables. I would not get rid off null values in the rows </p>

### Data Assesment
After exploring the datasets previously, by identifying duplicates, unique and NaN values, I have the following diagnostic per dataset:

#### I-94 Immigration Dataset

- This contains per row a visitor (in/out) in USA Airports, here I would not remove any rows, even if there are some rows with null values
- This will work as a fact table in our solution
- The following labels values will be the meaning of our dimensional tables: Countryi94cntyl), Port(i94prtl), Mode(i94model), Addr(i94addrl), Type(I94VISA)

#### Airport Code Dataset

- This dataset contains all the airports and we will not drop any row from here to keep all the airport data, we are also sure that the iddent column is unique and it will serve as primary key
- This will be a fact table

#### USA City Demographics Dataset

- This dataset will help us to underatdn the demographics of our travalers and it will be a fact table 

#### World Temperature Dataset

- Here we can eliminate the rows of that do not offer a the temperature, in order to reduce the dataset size. However, the proportu=ion is low, the I left this elimination as optional
- This will be another fact table



### Data Model
The data model consists on the following tables:

#### Fact Tables
- immigration
- us_cities_demographics
- airport_codes
- world_temperature

#### Dimensional Tables
- i94cit_res
- i94port
- i94mode
- i94addr
- i94visa

#### Considerations & Notes
- The following tables are distributed across all nodes(DISTSTYLE ALL): i94cit_res, i94port, i94mode, i94addr, i94visa, us_cities_demographics
- Redundancy -> DISTSTYLE ALL will copy the data of your table to all nodes - to mitigate data transfer requirement across nodes. You can find out the size of your table and Redshift nodes available size, if you can afford to copy table multiple times per node. 


#### Conceptual Data Model Diagram


<img src="notebook_images/model.png" width="950" height="950">


#### Table Definitions (Details)

```sql
create_table_immigration = """
CREATE TABLE IF NOT EXISTS public.immigration (
    cicid FLOAT PRIMARY KEY,
    i94yr FLOAT SORTKEY,
    i94mon FLOAT DISTKEY,
    i94cit FLOAT REFERENCES i94cit_res(code),
    i94res FLOAT REFERENCES i94cit_res(code),
    i94port CHAR(3) REFERENCES i94port(code),
    arrdate FLOAT,
    i94mode FLOAT REFERENCES i94mode(code),
    i94addr VARCHAR REFERENCES i94addr(code),
    depdate FLOAT,
    i94bir FLOAT,
    i94visa FLOAT REFERENCES i94visa(code),
    count FLOAT,
    dtadfile VARCHAR,
    visapost CHAR(3),
    occup CHAR(3),
    entdepa CHAR(1),
    entdepd CHAR(1),
    entdepu CHAR(1),
    matflag CHAR(1),
    biryear FLOAT,
    dtaddto VARCHAR,
    gender CHAR(1),
    insnum VARCHAR,
    airline VARCHAR,
    admnum FLOAT,
    fltno VARCHAR,
    visatype VARCHAR
);
"""

create_us_cities_demographics = """
CREATE TABLE IF NOT EXISTS public.us_cities_demographics (
    city VARCHAR,
    state VARCHAR,
    median_age FLOAT,
    male_population INT,
    female_population INT,
    total_population INT,
    number_of_veterans INT,
    foreign_born INT,
    average_household_size FLOAT,
    state_code CHAR(2) REFERENCES i94addr(code),
    race VARCHAR,
    count INT
)
DISTSTYLE ALL
"""

create_airport_codes = """
CREATE TABLE IF NOT EXISTS public.airport_codes (
    ident VARCHAR,
    type VARCHAR,
    name VARCHAR,
    elevation_ft FLOAT,
    continent VARCHAR,
    iso_country VARCHAR,
    iso_region VARCHAR,
    municipality VARCHAR,
    gps_code VARCHAR,
    iata_code VARCHAR,
    local_code VARCHAR,
    coordinates VARCHAR
);
"""

create_world_temperature = """
CREATE TABLE IF NOT EXISTS public.world_temperature (
    dt DATE,
    AverageTemperature FLOAT,
    AverageTemperatureUncertainty FLOAT,
    City VARCHAR,
    Country VARCHAR,
    Latitude VARCHAR,
    Longitude VARCHAR
);
"""

create_i94cit_res = """
CREATE TABLE IF NOT EXISTS public.i94cit_res (
    code FLOAT PRIMARY KEY,
    country VARCHAR
)
DISTSTYLE ALL
"""

create_i94port = """
CREATE TABLE IF NOT EXISTS public.i94port (
    code CHAR(3) PRIMARY KEY,
    port VARCHAR
)
DISTSTYLE ALL
"""

create_i94mode = """
CREATE TABLE IF NOT EXISTS public.i94mode (
    code FLOAT PRIMARY KEY,
    mode VARCHAR
)
DISTSTYLE ALL
"""

create_i94addr = """
CREATE TABLE IF NOT EXISTS public.i94addr (
    code CHAR(2) PRIMARY KEY,
    addr VARCHAR
)
DISTSTYLE ALL
"""

create_i94visa = """
CREATE TABLE IF NOT EXISTS public.i94visa (
    code FLOAT PRIMARY KEY,
    type VARCHAR
)
DISTSTYLE ALL
"""

```

#### Mapping Out Data Pipelines and Data Quality checks
The DAG shown in the graph shows the nodes and how the data and tables are being loaded with the data. In our particular case every table (facts or dimentions)
has a data check of the type count.

<img src="notebook_images/dag.png" width="950" height="950">


#### Here some wuality checks examples in dimensional tables.


```{'name': 'i94cit_res',
     'value': 'i94cntyl',
     'columns': ['code', 'country'],
     'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM i94cit_res WHERE code is null", 'expected_result': 0}]
     },
    {'name': 'i94visa',
     'value': 'I94VISA',
     'columns': ['code', 'type'],
     'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM i94visa WHERE code is null", 'expected_result': 0}]
     },
    {'name': 'i94port',
     'value': 'i94prtl',
     'columns': ['code', 'port'],
     'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM i94port WHERE code is null", 'expected_result': 0}]
     },
    {'name': 'i94addr',
     'value': 'i94addrl',
     'columns': ['code', 'addr'],
     'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM i94addr WHERE code is null", 'expected_result': 0}]
     },
    {'name': 'i94mode',
     'value': 'i94model',
     'columns': ['code', 'mode'],
     'dq_checks': [{'check_sql': "SELECT COUNT(*) FROM i94mode WHERE code is null", 'expected_result': 0}]
}```

## How to run the ETL to model the data

1. Clone the repository and fill the credential information in tables/dwh.cfg and dags/dw.cfg
2. Read the file dags/datasets/README.md (It will tell you about the datasets needed)
3. Upload those datasets in a S3 bucket
4. Please follow the instructions to run Apache Airflow in a Docker container [Instructions](https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html))
5. **Apache AirFlow 2.0.1 has an error in the official documentation, and I created a video to fix it and 
share it with the world (Sharing is caring) [FIX THAT BUG](https://youtu.be/RVKRtgDIh8A))**
6. Go to the main folder project and is Apache AirFlow is not running do:

```
docker compose up  (Start the services)
docker compose down (Stop the services)
```
7. Configure the connector in Apache Airflow to be able to see Amazon Redshift [Detailed Steps](https://www.progress.com/tutorials/jdbc/connect-to-redshift-salesforce-and-others-from-apache-airflow)
8. You should be able to see the DAG -> immigration_etl_dag in ApacheAirflow
9. **Run the script tables/create_tables.py**
10. Finally, in Apache AirFlow you can execute the dag and wait for the results.

## Project Write Up, Questions & Assumptions

* Clearly state the rationale for the choice of tools and technologies for the project.

 <p style="color:blue;">In terms of technologies, I wanted to bring also some technologies not touched by the certification like docker containers.
Amazon RDS,  Redshift are the idea tools for loading data into databases. S3 buckets are quite convenient to storage large datasets in the cloud.
Finally, an orchestrator like Apaceh AirFlow plays a main role to not only execute all the ETLs but also being able to monitor preformance and have logs on executions.</p>

* Propose how often the data should be updated and why.
 <p style="color:blue;">If the dta sources are being updated very often a short time window should be specified, like every day at 5 am. However, the way that I have design this ETL based
on the data freshness and datasets that are not updated in realtime, it would be ideal to have monthly reports or updates. In some cases it may be conveninent weekly reports.

I would not go beyond for a month since that can bring data innacuracy if data consumers are building prediction models or any kind of analysis on inmmigration behaviour.</p>


### Write a description of how you would approach the problem differently under the following scenarios:
 
 * The data was increased by 100x.
 
<p style="color:blue;">If the data is storaged in the cloud then the use of Spark in a EMR (Virtual machine) in Amazon can help to cope with the load. This is a modular and scalable approach. We can also split the dag by using partitioning functionality in AirFlow (divide and conquer) </p>

 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 <p style="color:blue;">This is a very common case in the industry, fortunatelly in Airflow we can use the internal scheduler to make a cron syntax to specify in the DAG to run at 7am every day. </p>
 
 
 * The database needed to be accessed by 100+ people.
 
 <p style="color:blue;">Fortunatelly Amazon Redshift as Data Warehouse solution is designed to serve different data consumers (data analysts, marketers, etc) and not everybody should have access to everything. That is great feature to make different roles in Redshift and provide particular access to certain fact tables or perform as well certain operations. </p>

In [36]:
from subprocess import call
call(['python', '-m', 'nbconvert', 'Capstone_Project.ipynb'])

0