# US Immigration Data Model
### Data Engineering Capstone Project

#### Project Summary
This capstone project utilizes data from multiple data sources, using immigration data in US ports, temperature, airports and demographics data.
Data goes through cleaning process, it gets filtered and prepared to fit defined data model within ETL pipeline.
For data processing, pandas is used. 
As a database solution, postgres has been chosen.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
import datetime
from sql_queries import airport_insert, demographic_insert, immigration_insert, temperature_insert

### Step 1: Scope the Project and Gather Data

#### Scope 
The plan is to gather the data from different web locations first.
Data will be processed using python and pandas.
Processed data will fit defined data model ready to be loaded with data in ETL pipeline described further in this project.

#### Describe and Gather Data 
Following section shows all data sources and their origin.

#### I94 Immigration Data
This data comes from the US National Tourism and Trade Office and can be found [here](https://www.trade.gov/national-travel-and-tourism-office).

In [2]:
df_i94 = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 'sas7bdat', encoding="ISO-8859-1")

#### U.S. City Demographic Data
This data comes from Open Data Soft and can be found [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

In [3]:
df_demographics = pd.read_csv("./us-cities-demographics.csv", delimiter=";")

#### World Temperature Data
This data comes from Kaggle and can be found [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

In [4]:
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

#### Airport Codes Table
This data comes from datahub and can be found [here](https://datahub.io/core/airport-codes#data).

In [5]:
df_airport_codes = pd.read_csv("./airport-codes_csv.csv")

In [6]:
# Data description part - showing 5 rows for each dataset
df_i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [8]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [9]:
df_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [10]:
# date columns need conversion from SAS date to date format redable in Python

def transform_sas_date(day_cnt):
    """
    Transforms SAS date stored as days since 1/1/1960 to datetime type
    :param day_cnt: Number of days since 1/1/1960
    :return: datetime
    """
    if day_cnt is None:
        return None
    return datetime.date(1960, 1, 1) + datetime.timedelta(days = day_cnt)

In [11]:
def process_port_locations_data(file_path):
    """
    Perform cleaning and filtering process for I94_SAS_Labels_Descriptions data
    :param file_path: path to file for reading
    :return: df_port_locations for data model
    """
    
        # Read port locations from SAS file
    with open(file_path) as i94f:
        i94_sas_label_desc = i94f.readlines()

    i94_sas_label_desc = [x.strip() for x in i94_sas_label_desc]
    ports_unformatted  = i94_sas_label_desc[302:962]
    ports_formatted    = [x.split("=") for x in ports_unformatted]

    # extract port codes from string array
    port_codes = [x[0].replace("'","").strip() for x in ports_formatted]

    # extract location in city-state format
    port_locations = [x[1].replace("'","").strip() for x in ports_formatted]
    # extract city as first element from splited string array
    port_cities = [x.split(",")[0] for x in port_locations]
    # extract state as last element from splited string array
    port_states = [x.split(",")[-1] for x in port_locations]

    # define port locations data frame
    df_port_locations = pd.DataFrame({"port_code": port_codes, "port_city": port_cities, "port_state": port_states})
    
    return df_port_locations

In [17]:
def process_temperature_data(df_temperature):
    """
    Perform cleaning and filtering process for temperature data
    :param df_temperature: Raw data frame to be processed
    :return: df_temperature for data model
    """
    # Filter temperature data for US only
    df_temperature = df_temperature[df_temperature["Country"] == "United States"]
    # clear records with missing values
    df_temperature.dropna(inplace=True)

    return df_temperature

In [13]:
def process_i94_data(df_i94):
    """
    Perform cleaning and filtering process for df_i94 data
    :param df_i94: Raw data frame to be processed
    :return: df_i94
    """
    # exclude columns not needed for further processing
    df_i94.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)
    # clear records with missing values
    df_i94.dropna(inplace=True)
    # process SAS date into readable date format
    df_i94["arrdate"] = [transform_sas_date(x) for x in df_i94["arrdate"]]
    df_i94["depdate"] = [transform_sas_date(x) for x in df_i94["depdate"]]

    df_i94 = df_i94[df_i94.depdate >= df_i94.arrdate]
    # variable selection for data model
    df_i94 = df_i94[
        ["cicid", "i94yr", "i94mon", "i94port", "arrdate", "depdate", "i94visa", "biryear", "gender", "airline",
         "fltno", "visatype"]]

    return df_i94


In [14]:
def process_airport_codes_data(df_airport_codes, df_port_locations):
    """
    Perform cleaning and filtering process for airport_codes data
    :param airport_codes: Raw data frame to be processed
    :return: airport_codes
    """
    # clear records with missing values
    df_airport_codes.dropna(inplace=True)
    # merge datasets
    df_airport_codes = df_airport_codes.merge(df_port_locations, left_on="iata_code", right_on="port_code")
    df_airport_codes.drop(columns=["port_code"], inplace=True)
    # remove duplicated rows
    df_airport_codes = df_airport_codes.drop_duplicates()
    # define set of columns for final dataset
    df_airport_codes = df_airport_codes[["iata_code", "name", "type", "local_code", "coordinates", "port_city", "elevation_ft", "continent", "iso_country", "iso_region", "municipality", "gps_code"]]
    
    return df_airport_codes

In [15]:
df_port_locations = process_port_locations_data("./I94_SAS_Labels_Descriptions.SAS")


In [18]:
df_temperature    = process_temperature_data(df_temperature)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  # Remove the CWD from sys.path while we load stuff.


In [19]:
df_i94            = process_i94_data(df_i94)


In [20]:
df_airport_codes  = process_airport_codes_data(df_airport_codes, df_port_locations)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Chosen data model contains following tables available for analysis: airports, demographics, temperature and immigrations. 
Data model implements snowflake schema, where some dimension tables reference to other dimensional tables.
Fact table references to other dimensional tables over iata_code as a foreign key.


#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model:
1. Read data from .csv files
2. Process data using pandas (filtering, cleaning, merging)
3. Create final datasets ready to load into postgres database
4. Load datasets into data model defined in postgres
5. Run quality check on data load volumes.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [21]:
# Open connection towards postgres on local machine
def connect_to_postgres():
    conn = psycopg2.connect("host=127.0.0.1 dbname=udacitydb user=student password=student")
    cur = conn.cursor()
    return conn, cur

In [22]:
def insert_data(sql_statement, dframe):
    """
    Executes SQL insert statement on postgresql
    :param conn: active connection
    :param curr: cursor
    :param sql_statement: sql statement to be executed
    :param dframe: data to be inserted
    """
    conn, cur = connect_to_postgres()
    try:
        for index, row in dframe.iterrows():
            cur.execute(sql_statement, list(row.values))
        conn.commit()
        print("Data insert was successfull")
    except:
        print("Data insert failed")
    finally:
        conn.close()

In [23]:
# Load data from pandas dataframes into postges database

# load data into airports table
insert_data(airport_insert, df_airport_codes)

Data insert was successfull


In [24]:
# load data into demographics table
insert_data(demographic_insert, df_demographics)

Data insert was successfull


In [26]:
# load data into immigrations table
insert_data(immigration_insert, df_i94)

Data insert was successfull


In [25]:
# load data into temperature table
insert_data(temperature_insert, df_temperature)

Data insert was successfull


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [27]:
def data_volume_check(table_name):
    """
    Checks how many rows were inserted in a table.
    """
    conn, cur = connect_to_postgres()
    try:
        sql_statement = "SELECT * FROM " + table_name
        cur.execute(sql_statement)
        conn.commit()
        if cur.rowcount < 1:
            print(table_name, "table contains no data")
        else:
            print(table_name, "table contains rows: ",  cur.rowcount)
    except:
        print("Data check failed for table:", table_name)
    finally:
        conn.close()

In [28]:
def valid_date_immigrations_check():
    """
    Add constraint check on arrdate and depdate logic.
    """
    conn, cur = connect_to_postgres()
    try:    
        sql_statement = """ ALTER TABLE immigrations 
                            ADD CONSTRAINT valid_range_check 
                            CHECK (depdate >= arrdate)"""
        cur.execute(sql_statement)
        conn.commit()
        print("Valid date check success for table immigrations.")
    except:
        print("Valid date check failed for table immigrations.")
    finally:
        conn.close()

In [29]:
# data volume check on immigrations table
data_volume_check("immigrations")

# data volume check on demographics table
data_volume_check("demographics")

# data volume check on temperature table
data_volume_check("temperature")

# data volume check on airports table 
data_volume_check("airports")

immigrations table contains rows:  2384325
demographics table contains rows:  2891
temperature table contains rows:  661524
airports table contains rows:  34


In [30]:
# constraint check on whether departure dates happened after arrival dates
valid_date_immigrations_check()

Valid date check success for table immigrations.


In [47]:
def extract_example_data():
    """
    Extract example data from database and join two tables
    """
    conn, cur = connect_to_postgres()
    try:
        df = pd.read_sql_query("SELECT a.*, b.* FROM immigrations a left join airports b on a.iata=b.iata_code", con=conn)
        return df
    except:
        print("Data read failed.")
    finally:
        conn.close()

In [48]:
# extract example data from postgres database
example_data = extract_example_data()

In [42]:
example_data.head()

Unnamed: 0,cicid,year,month,iata,arrdate,depdate,visa,biryear,gender,airline,...,type,local_code,coordinates,city,elevation_ft,continent,iso_country,iso_region,municipality,gps_code
0,524.0,2016.0,4.0,PHI,2016-04-01,2016-04-15,2.0,1963.0,M,AA,...,small_airport,SNYE,"-45.067222595214844, -2.4836111068725586",PHILADELPHIA,125.0,SA,BR,BR-MA,Pinheiro,SNYE
1,525.0,2016.0,4.0,PHI,2016-04-01,2016-04-15,2.0,1964.0,F,AA,...,small_airport,SNYE,"-45.067222595214844, -2.4836111068725586",PHILADELPHIA,125.0,SA,BR,BR-MA,Pinheiro,SNYE
2,526.0,2016.0,4.0,PHI,2016-04-01,2016-04-22,2.0,1963.0,M,AA,...,small_airport,SNYE,"-45.067222595214844, -2.4836111068725586",PHILADELPHIA,125.0,SA,BR,BR-MA,Pinheiro,SNYE
3,527.0,2016.0,4.0,PHI,2016-04-01,2016-04-06,2.0,1985.0,M,AA,...,small_airport,SNYE,"-45.067222595214844, -2.4836111068725586",PHILADELPHIA,125.0,SA,BR,BR-MA,Pinheiro,SNYE
4,528.0,2016.0,4.0,PHI,2016-04-01,2016-04-06,2.0,1991.0,M,AA,...,small_airport,SNYE,"-45.067222595214844, -2.4836111068725586",PHILADELPHIA,125.0,SA,BR,BR-MA,Pinheiro,SNYE


In [49]:
example_data_ = example_data[["iata_code", "arrdate", "depdate", "gender", "visatype", "type", "name", "airline", "city", "municipality"]]

In [50]:
counts = example_data_[["iata_code",'visatype', 'gender']].groupby(['visatype', 'gender']).agg('count')

In [52]:
# display visa types per gender
counts

Unnamed: 0_level_0,Unnamed: 1_level_0,iata_code
visatype,gender,Unnamed: 2_level_1
B1,F,334
B1,M,1240
B1,U,0
B2,F,2383
B2,M,2112
B2,U,0
CP,F,0
CP,M,0
CP,X,0
CPL,M,0


#### 4.3 Data dictionary 
Data dictionary is available in a separate .MD file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
  * I have used pandas for data preprocessing, due to lower volumes of data. Otherwise, using Spark dataframes to allow distributed processing with Amazon Elastic Map Reduce (EMR) would be more useful. For scheduled updates, building ETL pipeline into Airflow DAG would be benefitial. As a database tool, I have used postgresql.
  * The purpose of a data model is to provide analysts data source on immigrations within US for further analysis. Analysts may combine information from different tables on exploring weather information, demographics and additional airport data to enrich overall image of immigration patterns accross US.
  * The project is step by step written and documented in this Jupyter notebook with all described phases. Before running the notebook, please run create_tables.py first, in order to intialize database and drop all previous loaded data.



* Propose how often the data should be updated and why.
  * Since immigration dataset contains monthly snapshots, so the updates should run on a monthly basis.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   * In this case, Spark use with EMR and Apache Cassandra in a distributed environment would be the best choice.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   * In this case, Airflow and DAGs would be the way to go, by scheduling result dashboards and notifying end user over email.
 * The database needed to be accessed by 100+ people.
   * Redshift is an option for handling higher access volumes.