# Immigration datawarehousing project
### Data Engineering Capstone Project

#### Project Summary
In this project, we will demonstrate how to build a data warehouse that enriches the analysis of frequently occurring events by augmenting the event data with ancillary information. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
import psycopg2
from sql_queries import airport_insert, demographic_insert, immigration_insert, temperature_insert

### Step 1: Scope the Project and Gather Data

#### Scope 
We explain the scope of this project in detail:

- To aid in the analysis of U.S. immigration data, we expand the dataset with information on the ports through which immigration occurs.
  - This information includes data on airports, demographics, and temperatures.
- We will finally construct the data warehouse in snowflake schema by building ETL processes which convert original data into these tables.
- With this data warehouse, various analyses can be performed,such as getting an insight into immigration patterns to a city based on the overall population of the state.
- We use pandas and PostgresSQL in local development.

#### Describe and Gather Data 
We use the following datasets:
- **I94 Immigration Data**: This data comes from [the US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office).
  - A data dictionary exported from SAS is included in the repository: I94_SAS_Labels_Descriptions.SAS.
- **Airport Code Table**: This is a simple table of airport codes and corresponding cities published [here](https://datahub.io/core/airport-codes#data).
- **U.S. City Demographic Data**: This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- **World Temperature Data**: This dataset came from Kaggle kernel: [Climate Change: Earth Surface Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).


In [2]:
df_immigration = pd.read_csv('immigration_data_sample.csv')
print(df_immigration.shape)
df_immigration = df_immigration.set_index('Unnamed: 0').sort_index()
df_immigration.head()

(1000, 29)


Unnamed: 0_level_0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
Unnamed: 0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
10925,13208.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20574.0,...,,M,1987.0,6292016,M,,VS,55442240000.0,7,WT
10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,...,,M,1981.0,6292016,,,AA,55449790000.0,109,WT
11328,13826.0,2016.0,4.0,117.0,117.0,ATL,20545.0,1.0,SC,20553.0,...,,M,1972.0,6292016,M,,AF,55459080000.0,688,WB
14575,17786.0,2016.0,4.0,123.0,123.0,NYC,20545.0,1.0,NE,20556.0,...,,M,1985.0,6292016,,,VS,55455180000.0,9,WB
15053,18310.0,2016.0,4.0,123.0,123.0,SEA,20545.0,1.0,CA,20548.0,...,,M,1971.0,6292016,M,,DL,55421540000.0,143,WT


In [3]:
df_airport = pd.read_csv('airport-codes_csv.csv')
print(df_airport.shape)
df_airport.head()

(55075, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [4]:
df_demographics = pd.read_csv('us-cities-demographics.csv', delimiter=';')
print(df_demographics.shape)
df_demographics.head()

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [5]:
df_temperature = pd.read_csv('GlobalLandTemperaturesByCity.csv')
print(df_temperature.shape)
df_temperature_us = df_temperature[df_temperature["Country"] == "United States"]
print(df_temperature_us.shape)
df_temperature_us.head()

(8599212, 7)
(687289, 7)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [6]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.\
# config("spark.jars.repositories", "https://repos.spark-packages.org/").\
# config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
# enableHiveSupport().getOrCreate()

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# #write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [7]:
def read_sas_dictionary():
    """Read data dictionary of the immigration dataset in SAS format.
    Args:
    Returns:
        iata_codes: Valid IATA codes specified in the SAS dictionary.
        cities: Valid cities specified in the SAS dictionary.
        state_codes: Valid state codes specified in the SAS dictionary.
    Raises:
    """
    print("Read valid iata codes, cities and state codes from the SAS description file.")
    iata_codes, cities, state_codes = [], [], []

    with open("./I94_SAS_Labels_Descriptions.SAS") as f:
        lines = [l.strip() for l in f]
        print(lines[300:302], lines[962:963])
        for port in lines[302:962]:
            iata_code, city_state = tuple(map(lambda x:x.replace("'","").strip(), port.split("=")))
            if len(city_state.rsplit(',', 1)) == 2:
                city, state_code = tuple(map(lambda x: x.strip(), city_state.rsplit(',', 1)))
                iata_codes.append(iata_code)
                cities.append(city)
                state_codes.append(state_code)
    return iata_codes, cities, state_codes


iata_codes, cities, state_codes = read_sas_dictionary()

Read valid iata codes, cities and state codes from the SAS description file.
['/* I94PORT - This format shows all the valid and invalid codes for processing */', 'value $i94prtl'] [';']


In [8]:
def print_na_counts(df):
    """Show columns of given dataframe which have too many NaNs.
    Args:
        df: dataframe to be checked.
    Returns:
    Raises:
    """
    print("Identify columns which have too many NaNs.")
    for k in [k for k,v in df.isnull().any().items() if v]:
        print(df[k].isna().value_counts().sort_index())

In [9]:
# key: i94port, i94addr
# ["visapost", "occup", "entdepu", "insnum", ]
print_na_counts(df_immigration)

Identify columns which have too many NaNs.
False    941
True      59
Name: i94addr, dtype: int64
False    951
True      49
Name: depdate, dtype: int64
False    382
True     618
Name: visapost, dtype: int64
False      4
True     996
Name: occup, dtype: int64
False    954
True      46
Name: entdepd, dtype: int64
True    1000
Name: entdepu, dtype: int64
False    954
True      46
Name: matflag, dtype: int64
False    859
True     141
Name: gender, dtype: int64
False     35
True     965
Name: insnum, dtype: int64
False    967
True      33
Name: airline, dtype: int64
False    992
True       8
Name: fltno, dtype: int64


In [10]:
# key: iata_code
# ["continent", "local_code",]
print_na_counts(df_airport)

Identify columns which have too many NaNs.
False    48069
True      7006
Name: elevation_ft, dtype: int64
False    27356
True     27719
Name: continent, dtype: int64
False    54828
True       247
Name: iso_country, dtype: int64
False    49399
True      5676
Name: municipality, dtype: int64
False    41030
True     14045
Name: gps_code, dtype: int64
False     9189
True     45886
Name: iata_code, dtype: int64
False    28686
True     26389
Name: local_code, dtype: int64


In [11]:
print("Identify duplicated key columns.")
_df_airport = df_airport.dropna(subset=['iata_code'])
_df_airport = _df_airport[_df_airport["iata_code"].isin(iata_codes)]
_df_airport = _df_airport[_df_airport['type']!='closed']
# _df_airport.drop_duplicates(subset=['iata_code']).shape
_df_airport[_df_airport.duplicated(subset=['iata_code'])]

Identify duplicated key columns.


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
40590,RU-0493,small_airport,Dalnerechensk Airport,272.0,EU,RU,RU-PRI,Dalnerechensk,,DLR,,"133.7363, 45.8783"


In [12]:
# key: City, State Code
print_na_counts(df_demographics)

Identify columns which have too many NaNs.
False    2888
True        3
Name: Male Population, dtype: int64
False    2888
True        3
Name: Female Population, dtype: int64
False    2878
True       13
Name: Number of Veterans, dtype: int64
False    2878
True       13
Name: Foreign-born, dtype: int64
False    2875
True       16
Name: Average Household Size, dtype: int64


In [13]:
print("Identify duplicated key columns.")
_df_demographics = df_demographics.dropna(subset=['City'])
_df_demographics = _df_demographics[_df_demographics["State Code"].isin(state_codes)]
print("Note that demographic data has duplicate rows on (city, state) with different the race values.")
_df_demographics[_df_demographics.duplicated(subset=['City', 'State', 'Race'])] #.head(20)

Identify duplicated key columns.
Note that demographic data has duplicate rows on (city, state) with different the race values.


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


In [14]:
# key: City
print("Note that temperature data has duplicate rows on city with different (latitude, longitude) values.")
df_temperature_us[df_temperature_us.duplicated(subset=['dt', 'City', 'Latitude', 'Longitude'])]

Note that temperature data has duplicate rows on city with different (latitude, longitude) values.


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude


#### Cleaning Steps
Document steps necessary to clean the data

In [15]:
# Common cleaning steps among tables.

def filter_valid_columnvalues(df, colname, valid_values):
    """Filter rows which have valid values specified in given list.
    Args:
        df: Dataframe to be cleansed.
        colname: The column name to be checked.
        valid_values: Valid values in the given column.
    Returns:
        df_filtered: Filtered dataframe.
    Raises:
    """
    print(f"Filter rows which have valid {colname} values.")
    df_filtered = df[df[colname].isin(valid_values)]
    return df_filtered

def filtercols_then_dropna(df, drop_columns):
    """Drop specified columns which have too many NaNs,
    then drop rows with any NaNs.
    Args:
        df: Dataframe to be cleansed.
        drop_columns: Columns which have too many NaNs.
    Returns:
        df_notnull: Cleansed dataframe.
    Raises:
    """
    print("Drop columns which have too many NaNs, drop rows with any NaNs.")
    if drop_columns:
        df = df.drop(columns=drop_columns)
    df_notnull = df.dropna()
    return df_notnull

In [16]:
def clean_immigration(df, iata_codes, state_codes):
    """Cleanse immigration dataframe.
    Args:
        df: Dataframe to be cleansed.
    Returns:
        df_notnull: Cleansed dataframe.
    Raises:
    """
    print(df.shape)
    _df = filter_valid_columnvalues(df, "i94port", valid_values=iata_codes)
    df_filtered = filter_valid_columnvalues(_df, "i94addr", valid_values=state_codes)
    print(df_filtered.shape)
    df_notnull = filtercols_then_dropna(df_filtered, drop_columns=["insnum", "entdepu", "occup", "visapost"])
    print(df_notnull.shape)
    return df_notnull

df_immigration_notnull = clean_immigration(df_immigration, iata_codes, state_codes)
df_immigration_notnull.isnull().any()

(1000, 28)
Filter rows which have valid i94port values.
Filter rows which have valid i94addr values.
(904, 28)
Drop columns which have too many NaNs, drop rows with any NaNs.
(726, 24)


cicid       False
i94yr       False
i94mon      False
i94cit      False
i94res      False
i94port     False
arrdate     False
i94mode     False
i94addr     False
depdate     False
i94bir      False
i94visa     False
count       False
dtadfile    False
entdepa     False
entdepd     False
matflag     False
biryear     False
dtaddto     False
gender      False
airline     False
admnum      False
fltno       False
visatype    False
dtype: bool

In [17]:
def clean_airport(df, iata_codes):
    """Cleanse airport dataframe.
    Args:
        df: Dataframe to be cleansed.
    Returns:
        df_notnull: Cleansed dataframe.
    Raises:
    """
    print(df.shape)
    _df = df.dropna(subset=['iata_code'])
    _df = _df[_df['type']!='closed']
    _df = _df.drop_duplicates(subset=['iata_code'])
    print(_df.shape)
    df_filtered = filter_valid_columnvalues(_df, "iata_code", valid_values=iata_codes)
    print(df_filtered.shape)
    df_notnull = filtercols_then_dropna(df_filtered, drop_columns=["continent", "local_code", ])
    print(df_notnull.shape)
    return df_notnull

df_airport_notnull = clean_airport(df_airport, iata_codes)
df_airport_notnull.isnull().any()

(55075, 12)
(8798, 12)
Filter rows which have valid iata_code values.
(495, 12)
Drop columns which have too many NaNs, drop rows with any NaNs.
(440, 10)


ident           False
type            False
name            False
elevation_ft    False
iso_country     False
iso_region      False
municipality    False
gps_code        False
iata_code       False
coordinates     False
dtype: bool

In [18]:
def clean_demographics(df, state_codes):
    """Cleanse demographics dataframe.
    Args:
        df: Dataframe to be cleansed.
    Returns:
        df_notnull: Cleansed dataframe.
    Raises:
    """
    print(df.shape)
    df_filtered = filter_valid_columnvalues(df, "State Code", valid_values=state_codes)
    print(df_filtered.shape)
    df_notnull = filtercols_then_dropna(df_filtered, drop_columns=[])
    print(df_notnull.shape)
    return df_notnull

df_demographics_notnull = clean_demographics(df_demographics, state_codes)
df_demographics_notnull.isnull().any()

(2891, 12)
Filter rows which have valid State Code values.
(2886, 12)
Drop columns which have too many NaNs, drop rows with any NaNs.
(2870, 12)


City                      False
State                     False
Median Age                False
Male Population           False
Female Population         False
Total Population          False
Number of Veterans        False
Foreign-born              False
Average Household Size    False
State Code                False
Race                      False
Count                     False
dtype: bool

In [19]:
def clean_temperature(df, cities):
    """Cleanse temperature dataframe.
    Args:
        df: Dataframe to be cleansed.
    Returns:
        df_notnull: Cleansed dataframe.
    Raises:
    """
    print(df.shape)
    print("Filter rows to the last five years.")
    df_copied = df.copy()
    df_copied.dt = pd.to_datetime(df_copied.dt)
    df_last = df_copied.set_index('dt').sort_index().last('5Y')
    df_last = df_last.reset_index()
    print(df_last.shape)
    _df = df_last.copy()
    cities_l = [city.lower() for city in cities]
    _df["City_lower"] = _df["City"].str.lower().values
    df_filtered = filter_valid_columnvalues(_df, "City_lower", valid_values=cities_l)
    print(df_filtered.shape)
    df_notnull = filtercols_then_dropna(df_filtered, drop_columns=[])
    print(df_notnull.shape)
    return df_notnull

df_temperature_us_notnull = clean_temperature(df_temperature_us, cities)
df_temperature_us_notnull.isnull().any()

(687289, 7)
Filter rows to the last five years.
(14649, 7)
Filter rows which have valid City_lower values.
(5244, 8)
Drop columns which have too many NaNs, drop rows with any NaNs.
(5243, 8)


dt                               False
AverageTemperature               False
AverageTemperatureUncertainty    False
City                             False
Country                          False
Latitude                         False
Longitude                        False
City_lower                       False
dtype: bool

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

We chose the data model in the following table and the reasons are following:
- We take immigration data as the fact table because this is frequently generated event data.
- We take airports, demographics and temperature data as the dimension tables because they can be joined via airport location keys, including iata code, city name and state code.


| table name | description | columns |
| ------- | ---------- | ----------- |
| immigrations | A fact table which stores all immigrations data. | \|cicid\|year\|month\|cit\|res\|iata\|arrdate\|mode\|addr\|depdate\|bir\|visa\|coun\|dtadfil\|visapost\|occup\|entdepa\|entdepd\|entdepu\|matflag\|biryear\|dtaddto\|gender\|insnum\|airline\|admnum\|fltno\|visatype\| |
| airports | A dimension table which stores airport information. | \|iata_code\|name\|type\|coordinates\|city\|state_code\|iso_country\|iso_region\|gps_code\| |
| demographics | A dimension table which stores demographics of cities in the U.S. | \|city\|state_code\|state\|race\|count\|male_population\|female_population\|total_population\|media_age\|num_veterans\|foreign_born\|average_household_size\| |
| temperatures | A dimension table which stores temperature in the U.S. | \|timestamp\|average_temperature\|average_temperatur_uncertainty\|city\|country\|latitude\|longitude\| |

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Join port locations in immigration data to airports data.
2. Rename and sort columns of each table data to match the schema.
3. Create database and each tables by executing create_tables.py.
4. Insert data into created tables.

### Step 4: Run Pipelines to Model the Data 


#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [20]:
def make_airport_model(df_airport_notnull, iata_codes, cities, state_codes):
    """Make airport model dataframe to be inserted into airports table.
    Args:
        df_airport_notnull: DataFrame which is cleansed in prior.
        iata_codes: Valid IATA codes specified in the SAS dictionary.
        cities: Valid cities specified in the SAS dictionary.
        state_codes: Valid state codes specified in the SAS dictionary.
    Returns:
        df_airport_model:  DataFrame to be actually inserted.
    Raises:
    """
    df_immigration_port = pd.DataFrame(
        {"iata_code": iata_codes, "city": cities, "state_code": state_codes}
    )
    df_airport_joined = df_airport_notnull.merge(
        df_immigration_port, left_on="iata_code", right_on="iata_code"
    )
    df_airport_model = df_airport_joined[
        [
            "iata_code",
            "name",
            "type",
            "coordinates",
            "city",
            "state_code",
            "iso_country",
            "iso_region",
            "gps_code",
        ]
    ]
    return df_airport_model


def make_demographics_model(df_demographics_notnull):
    """Make demographic model dataframe to be inserted into demographics table.
    Args:
        df_demographics_notnull: DataFrame which is cleansed in prior.
    Returns:
        df_demographics_model:  DataFrame to be actually inserted.
    Raises:
    """
    df_demographics_rn = df_demographics_notnull.rename(
        columns={
            "City": "city",
            "State": "state",
            "Median Age": "median_age",
            "Male Population": "male_population",
            "Female Population": "female_population",
            "Total Population": "total_population",
            "Number of Veterans": "num_veterans",
            "Foreign-born": "foreign_born",
            "Average Household Size": "average_household_size",
            "State Code": "state_code",
            "Race": "race",
            "Count": "count",
        }
    )
    df_demographics_model = df_demographics_rn[
        [
            "city",
            "state_code",
            "state",
            "race",
            "count",
            "male_population",
            "female_population",
            "total_population",
            "median_age",
            "num_veterans",
            "foreign_born",
            "average_household_size",
        ]
    ]
    return df_demographics_model


def make_immigration_model(df_immigration_notnull):
    """Make immigration model dataframe to be inserted into immigrations table.
    Args:
        df_immigration_notnull: DataFrame which is cleansed in prior.
    Returns:
        df_immigration_model:  DataFrame to be actually inserted.
    Raises:
    """
    # ["insnum", "entdepu", "occup", "visapost"]
    df_immigration_notnull_rn = df_immigration_notnull.rename(
        columns={
            "cicid": "cicid",
            "i94yr": "year",
            "i94mon": "month",
            "i94cit": "cit",
            "i94res": "res",
            "i94port": "iata",
            "arrdate": "arrdate",
            "i94mode": "mode",
            "i94addr": "addr",
            "depdate": "depdate",
            "i94bir": "bir",
            "i94visa": "visa",
            "count": "count",
            "dtadfile": "date_added",
            "entdepa": "entdepa",
            "entdepd": "entdepd",
            "matflag": "matflag",
            "biryear": "biryear",
            "dtaddto": "dtaddto",
            "gender": "gender",
            "airline": "airline",
            "admnum": "admnum",
            "fltno": "fltno",
            "visatype": "visatype",
        }
    )
    df_immigration_model = df_immigration_notnull_rn[
        [
            "cicid",
            "year",
            "month",
            "cit",
            "res",
            "iata",
            "arrdate",
            "mode",
            "addr",
            "depdate",
            "bir",
            "visa",
            "count",
            "date_added",
            "entdepa",
            "entdepd",
            "matflag",
            "biryear",
            "dtaddto",
            "gender",
            "airline",
            "admnum",
            "fltno",
            "visatype",
        ]
    ]
    return df_immigration_model


def make_temperature_model(df_temperature_us_notnull):
    """Make temperature model dataframe to be inserted into temperatures table.
    Args:
        df_temperature_us_notnull: DataFrame which is cleansed in prior.
    Returns:
        df_temperature_us_model:  DataFrame to be actually inserted.
    Raises:
    """
    df_temperature_us_notnull_rn = df_temperature_us_notnull.rename(
        columns={
            "dt": "timestamp",
            "AverageTemperature": "average_temperature",
            "AverageTemperatureUncertainty": "average_temperature_uncertainty",
            "City": "city",
            "Country": "country",
            "Latitude": "latitude",
            "Longitude": "longitude",
        }
    )
    df_temperature_us_model = df_temperature_us_notnull_rn[
        [
            "timestamp",
            "average_temperature",
            "average_temperature_uncertainty",
            "city",
            "country",
            "latitude",
            "longitude",
        ]
    ]
    return df_temperature_us_model

In [21]:
df_airport_model = make_airport_model(df_airport_notnull, iata_codes, cities, state_codes)
df_demographics_model = make_demographics_model(df_demographics_notnull)
df_immigration_model = make_immigration_model(df_immigration_notnull)
df_temperature_us_model = make_temperature_model(df_temperature_us_notnull)

In [22]:
!python create_tables.py

connecting to postgres...
DROP EXISTING TABLES...
CREATE TABLES...


In [23]:
# Insert data
import configparser
import psycopg2
from sql_queries import (
    airport_insert,
    demographic_insert,
    immigration_insert,
    temperature_insert,
)


def insert_df_model(df_model, insert_sql, conn, cur):
    """Assert that the given has any rows.
    Args:
        df_model: DataFrame to be inserted. Its columns must match with the
            table columns to be inserted into.
        insert_sql: The actual INSERT SQL to be executed.
        cur: SQL cursor object.
        conn: DB connection object.
    Returns:
    Raises:
    """
    for index, row in df_model.iterrows():
        cur.execute(insert_sql, list(row.values))
        conn.commit()


config = configparser.ConfigParser()
config.read("db.cfg")
conn = psycopg2.connect(
    "host={} dbname={} user={} password={} port={}".format(*config["LOCAL"].values())
)
cur = conn.cursor()

insert_df_model(df_airport_model, airport_insert, conn, cur)
insert_df_model(df_demographics_model, demographic_insert, conn, cur)
insert_df_model(df_immigration_model, immigration_insert, conn, cur)
insert_df_model(df_temperature_us_model, temperature_insert, conn, cur)


In [24]:
# conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [27]:
# Perform quality checks here
def assert_count(cur, conn, table):
    """ Assert that the given has any rows.
    Args:
        cur: SQL cursor object.
        conn: DB connection object.
        table: The name of table to check.
    Returns:
    Raises:
        AssertionError
    """
    cur.execute(f"select count(*) from {table}")
    conn.commit()
    assert cur.fetchone()[0] >= 1

assert_count(cur, conn, "airports")
assert_count(cur, conn, "demographics")
assert_count(cur, conn, "immigrations")
assert_count(cur, conn, "temperatures")

In [28]:
def assert_key_is_notnull(cur, conn, table, colname):
    """ Assert that the given has any rows.
    Args:
        cur: SQL cursor object.
        conn: DB connection object.
        table: The name of table to check.
        colname: The name of column of which values must not be null.
    Returns:
    Raises:
        AssertionError
    """
    sql = f"""
    select count(*) from {table}
    where {colname} is null
    """
    cur.execute(sql)
    conn.commit()
    assert cur.fetchone()[0] == 0

assert_key_is_notnull(cur, conn, "airports", "iata_code")
assert_key_is_notnull(cur, conn, "airports", "city")
assert_key_is_notnull(cur, conn, "airports", "state_code")
assert_key_is_notnull(cur, conn, "demographics", "city")
assert_key_is_notnull(cur, conn, "demographics", "state_code")
assert_key_is_notnull(cur, conn, "immigrations", "cicid")
assert_key_is_notnull(cur, conn, "immigrations", "iata")
assert_key_is_notnull(cur, conn, "temperatures", "timestamp")
assert_key_is_notnull(cur, conn, "temperatures", "city")


In [29]:
# Show analysis on immigration patterns to a city based on the overall population of the state.

# config = configparser.ConfigParser()
# config.read("db.cfg")
# conn = psycopg2.connect(
#     "host={} dbname={} user={} password={} port={}".format(*config["LOCAL"].values())
# )
# cur = conn.cursor()

sql="""select (
  demographics.city,
  immigrations.year,
  immigrations.month,
  sum(immigrations.count),
  cast(avg(demographics.count) as integer) 
)
from airports
inner join immigrations on (airports.iata_code = immigrations.iata)
inner join demographics on (lower(airports.city) = lower(demographics.city))
group by (demographics.city, immigrations.year, immigrations.month);
"""
cur.execute(sql)
print(cur.fetchmany(10))
conn.commit()

[('(Charlotte,2016,4,30,185248)',), ('(Phoenix,2016,4,60,414492)',), ('(Miami,2016,4,440,150338)',), ('("Fort Myers",2016,4,20,22213)',), ('(Houston,2016,4,130,626196)',), ('(Denver,2016,4,10,174601)',), ('(McAllen,2016,4,5,46233)',), ('("San Diego",2016,4,5,354034)',), ('(Vancouver,2016,4,5,39198)',), ('(Tampa,2016,4,40,92718)',)]


In [30]:
conn.close()


##### immigrations (fact table)

Foreign keys are below:
- iata: with airports table


| Column   | Type             | Description                                                                        |
| -------- | ---------------- | ---------------------------------------------------------------------------------- |
| cicid    | int, primary key | ID of immigrants                                                                   |
| year     | int              | Year of immigration                                                                |
| month    | int              | Month of immigration                                                               |
| cit      | int              | Contry codes from where immigrants come                                            |
| res      | int              | Contry codes where immigrants live in                                              |
| iata     | varchar(3)       | IATA codes of ports where immigration happen                                       |
| arrdate  | int              | Arrival date in the U.S.                                                           |
| mode     | int              | How immigrants come to the U.S.                                                    |
| addr     | varchar(2)       | State code of address where immigrants will live in                                |
| depdate  | int              | Departure date from the U.S.                                                       |
| bir      | int              | Age of immigrants                                                                  |
| visa     | int              | Visa code in three categories                                                      |
| count    | int              | Count of immigration                                                               |
| dtadfile | varchar          | Date added to I-94 Files                                                           |
| entdepa  | varchar(1)       | Arrival flag - admitted or paroled into the U.S.                                   |
| entdepd  | varchar(1)       | Departure flag - departed, lost I-94 or is deceased                                |
| matflag  | varchar(1)       | Match flag - match of arrival and departure records                                |
| biryear  | int              | 4 digit year of birth                                                              |
| dtaddto  | varchar          | Date to which admitted to U.S.                                                     |
| gender   | varchar(1)       | Non-immigrant sex                                                                  |
| airline  | varchar          | Airline used to arrive in U.S.                                                     |
| admnum   | bigint           | Admission number of immigrants                                                     |
| fltno    | varchar          | Flight number of immigrants                                                        |
| visatype | varchar          | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |


##### airports

Foreign keys are below:
- city: with demographics, temperatures table
- state_code: with demographics table

| Column      | Type                    | Description               |
| ----------- | ----------------------- | ------------------------- |
| iata_code   | varchar(3), primary key | IATA codes of airports    |
| name        | varchar                 | Name of airports          |
| type        | varchar                 | Type of airports          |
| coordinates | varchar                 | Coordicates of airports   |
| city        | varchar                 | City where airports exist |
| state_code  | varchar                 | State code of airports    |
| iso_country | varchar                 | Country code of airports  |
| iso_region  | varchar                 | Region code of airports   |
| gps_code    | varchar                 | GPS code of airports      |


##### demographics

| Column            | Type       | Description                              |
| ----------------- | ---------- | ---------------------------------------- |
| city              | varchar    | City name                                |
| state_code        | varchar(2) | State code                               |
| state             | varchar    | State name                               |
| race              | varchar    | Race of residents with statistics        |
| count             | int        | Count of residents of specified race     |
| male_population   | int        | Total count of male residents            |
| female_population | int        | Total count of female residents          |
| total_population  | int        | Total count of residents                 |
| median_age        | int        | Median of ages of residents              |
| num_veterans      | int        | Number of veterans in the city           |
| foreign_born      | int        | Number of people who are born in foreign |

##### temperatures

| Column                          | Type    | Description                                    |
| ------------------------------- | ------- | ---------------------------------------------- |
| timestamp                       | date    | Date timestamp of the temperature is measured  |
| average_temperature             | float   | average land temperature in celsius            |
| average_temperature_uncertainty | float   | The 95% confidence interval around the average |
| city                            | varchar | City name                                      |
| country                         | varchar | Country name                                   |
| latitude                        | varchar | Latitude value                                 |
| longitude                       | varchar | Longitude value                                |

#### Step 5: Complete Project Write Up

* Clearly state the rationale for the choice of tools and technologies for the project.
  - We use pandas and PostgresSQL in local development. If large scale analysis is needed, these can be naturally extended to Spark and Redshift for scaling.


* Propose how often the data should be updated and why.
  - We should update the data on a monthly basis since the dataset describes immigration information that is aggregated monthly.


* Write a description of how you would approach the problem differently under the following scenarios:
  * The data was increased by 100x.
    - We should use Spark cluster with EMR for example to scale out ETL process. If SQL querying is slow, use columnar database like Redshift.
  * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - We sholud use data workflow system like Airflow to schedule our ETL pipeline execution.
  * The database needed to be accessed by 100+ people.
    - We could scale out Redshift cluster to handle a large number of user requests. 
