### Data Engineering Capstone Project

#### Project Summary
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import sys
!{sys.executable} -m pip install xlsxwriter
!{sys.executable} -m pip install ijson
!{sys.executable} -m pip install pandas==0.24.0

import pandas as pd
import json
import os
from os import listdir
import xlsxwriter
import configparser
import psycopg2
from sqlalchemy import create_engine
import pprint
import csv
import logging 

# Create and configure logger 
logging.basicConfig(filename="capstone_project_template.log", 
                    format='%(asctime)s %(message)s', 
                    filemode='w') 
  
# Creating logging object 
logger=logging.getLogger() 
  
# Setting the threshold of logger to DEBUG 
logger.setLevel(logging.DEBUG) 

print("Initialization Complete!")


Initialization Complete!


In [2]:
# Create tables for extracting data to
%run -i 'create_tables.py'

### Step 1: Scope the Project and Gather Data

#### Scope
The goal of this project is to take various sources of data related to immigration to the United States (US) and create an ETL pipeline to create a single database for further analysis. The database can then be easily queried to analyze subjects such as do immigrants tend to migrate to warmer destinations, larger cities, etc.  

#### Describe and Gather Data 
The datasources include:
1. Immigration data - includes detailed information about immigrants to the United States.
2. Temperature data - includes temperature information. I filtered this to only include information about US cities.
3. Airport data - includes basic information about location and type of airport. I filtered this to only include information about US cities.
4. Demographics data - includes information about household size and types of members, including foreign born statistics. The original dataset only includes information for US cities.

All of the data was provided in the project template. I modified the original us-cities-demographics.csv file to a json file so that I could practice working with different types of data.

I used the Immigration dataset as my fact table, with the remaining tables being dimension tables.  I used Pandas and Psycopg to read in the data, display header rows and then insert into the Postgres database.

The Immigration dataset is quite large.  On my initial passes, I utilized much smaller subsets of data to create the ETL processes.  I left code that I used for creating these smaller subsets, but commented out for the final project.



### Step 2: Explore and Assess the Data
#### Explore the Data 
The following cells read in various data files and displays the first 5 rows of data.

In looking at the first few rows, I determined that the following issues should be addressed:  
1. Null data in certain columns.
2. Data that was not about the 50 "primary" US states.
3. For immigration data, the month of June had a number of extra columns that were not in any of the other monthly files.  

#### Cleaning Steps
1. In cases where the null data was deemed critical I simply skipped that row of data so it is not included in the final database.
2. I also filtered out any data that was not relevant to the US.
3. For immigration data, I only read columns that were in all monthly files.

In [3]:
# Read config file for connecting to database
config = configparser.ConfigParser()
config.read('dwh.cfg')
conn = config['CLUSTER']['HOST_POS']

# Below is for DataFrame.to_sql
engine = create_engine(conn) 

In [4]:
# Read airport code data and display first 5 rows
#df_airport = pd.read_csv('airport-codes_csv.csv', nrows=10000)
df_airport = pd.read_csv('airport-codes_csv.csv')
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [5]:
# Read demographics data and output the first 5 rows
# Note that I converted original project csv file to json in order to test working with different types of data
counter = 0

# Read demographics data and display first 5 rows
with open('us-cities-demographics.json') as data_file:    
    data = json.load(data_file)
    for row in data:
        counter += 1
        if counter > 5:
            break
        pprint.pprint(row)

{'datasetid': 'us-cities-demographics',
 'fields': {'average_household_size': 2.73,
            'city': 'Newark',
            'count': 76402,
            'female_population': 143873,
            'foreign_born': 86253,
            'male_population': 138040,
            'median_age': 34.6,
            'number_of_veterans': 5829,
            'race': 'White',
            'state': 'New Jersey',
            'state_code': 'NJ',
            'total_population': 281913},
 'record_timestamp': '1969-12-31T17:00:00-07:00',
 'recordid': '85458783ecf5da6572ee00e7120f68eff4fd0d61'}
{'datasetid': 'us-cities-demographics',
 'fields': {'average_household_size': 2.4,
            'city': 'Peoria',
            'count': 1343,
            'female_population': 62432,
            'foreign_born': 7517,
            'male_population': 56229,
            'median_age': 33.1,
            'number_of_veterans': 6634,
            'race': 'American Indian and Alaska Native',
            'state': 'Illinois',
            '

In [6]:
# Read temperature data and display first 5 rows
#df_temperature = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv", nrows=48000)
df_temperature = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")

# Display first 5 rows
df_temperature.head()


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
# Since the header rows only show Denmark, run our first filter so that only US cities are pulled in
df_temperature = df_temperature.query("Country == 'United States'")

# Display first 5 rows
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


### Step 3: Define the Data Model
The primary goal is to assess immigration events, so the immigration table is the center of my star schema, with the remaining tables being dimension tables.

The following are tables that were created:  
"""CREATE TABLE fact_immigration(  
    immigration_id int,  
    cicid int,  
    i94yr int NOT NULL,  
    i94mon int NOT NULL,  
    i94cit int,  
    i94res int,  
    i94port char(3),  
    arrdate int,  
    i94mode int,  
    i94addr char(3),  
    depdate int,  
    i94bir int,  
    i94visa int,  
    count int,  
    dtadfile varchar,  
    visapost char(3),  
    occup char(3),  
    entdepa char(1),  
    entdepd char(1),  
    entdepu char(1),  
    matflag char(1),  
    biryear int,  
    dtaddto varchar,  
    gender char(1),  
    insnum varchar,  
    airline char(3),  
    admnum varchar,  
    fltno varchar,  
    visatype char(3))  
"""  

"""CREATE TABLE dim_temperature(  
    dt varchar,  
    averageTemperature int,  
    averageTemperatureUncertainty int,  
    city varchar NOT NULL,  
    country varchar NOT NULL,  
    longitude char(10) NOT NULL,  
    latitude char(10) NOT NULL,  
    PRIMARY KEY (dt))  
"""  
   
"""CREATE TABLE dim_demographics(  
    count int NOT NULL,  
    city varchar NOT NULL,  
    number_of_veterans int NOT NULL,  
    male_population int NOT NULL,  
    foreign_born int NOT NULL,  
    average_household_size int NOT NULL,  
    median_age int NOT NULL,  
    state varchar NOT NULL,  
    race varchar NOT NULL,  
    total_population int NOT NULL,  
    state_code char(2) NOT NULL,  
    female_population int NOT NULL)  
"""  

"""CREATE TABLE dim_airport(  
    ident char(10),  
    type varchar,  
    name varchar,  
    elevation_ft int,  
    continent char(2),  
    iso_country char(2),  
    iso_region char(10),  
    municipality varchar,  
    gps_code char(10),  
    iata_code char(3),  
    local_code char(10),  
    coordinates varchar)  
"""  

"""CREATE TABLE dim_port(  
    abrev varchar,  
    port varchar)  
"""

### Step 4.1: Run Pipelines to Model the Data 
I filtered the data before inserting into the database. The primary job was to make sure that all data was only for the United States.

I also filtered out various rows of data where there were null values in what I considered to be critical columns, such as temperatures.

For the demographics data, I filtered out bad data which was primarily related to cities in Puerto Rico. I chose to limit my analysis to the 50 primary states in the US, so I filtered that data out.

Finally, I filtered the immigration data so that it only included valid ports. I built a valid set of ports by pulling the information from the I94_SAS_Lables_Descriptions.SAS and creating a csv file. I then queried that file and built temporary variables for filtering the immigration data and also inserted into a database table for analysis purposes.


In [8]:
# Clean the airport dataset created earlier by filtering to only use US airports
df_airport = df_airport.query("iso_country == 'US'")

print("Begin Data Insert")

# Read in the airport file and insert to database
df_airport.to_sql(
    'dim_airport',
    engine,
    schema='public',
    if_exists='append',
    index=False,
)
    
print("Complete Date Insert")

Begin Data Insert
Complete Date Insert


In [9]:
# Create database connections
conn_psy = psycopg2.connect(conn)
conn_psy.set_session(autocommit=True)
cur = conn_psy.cursor()

# Create the insert statement
dim_demographics_insert = """INSERT INTO dim_demographics
(average_household_size, city, count, female_population, foreign_born, male_population, median_age, number_of_veterans, race, state, state_code, total_population)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""

print("Begin Data Insert")

# Read the demographics file and insert to database
with open('us-cities-demographics.json') as data_file:    
    data = json.load(data_file)
    for row in data:
        try:
            cur.execute(dim_demographics_insert, 
                [row['fields']['average_household_size'], 
                row['fields']['city'],
                row['fields']['count'],
                row['fields']['female_population'],
                row['fields']['foreign_born'],
                row['fields']['male_population'],
                row['fields']['median_age'],
                row['fields']['number_of_veterans'],
                row['fields']['race'],
                row['fields']['state'],
                row['fields']['state_code'], 
                row['fields']['total_population']]) 
        # Certain cities do not have all rows required for insert. Log to file for further analysis.
        except:
            logger.info("Skip City: {}".format(row['fields']['city'])) 

print("Complete Data Insert")
        

Begin Data Insert
Complete Data Insert


In [10]:
# Clean the temperature dataset created earlier by filtering out entries with NaN average temperature
df_temperature = df_temperature.query("AverageTemperature != 'NaN'")

# Sort the data so that we can pull first 120,000 rows in order of descending date
df_temperature = df_temperature.sort_values(by='dt', ascending=False)

# Filter out first 
df_temperature = df_temperature.head(120000)
    
print("Begin Data Insert")

df_temperature.to_sql(
    'dim_temperature',
    engine,
    schema='public',
    if_exists='append',
    index=False,
)
    
print("Complete Date Insert")


Begin Data Insert
Complete Date Insert


#### 4.2 Data Quality Checks
I ran queries after data loading to verify that each table contained data and also displayed the number of rows in each table.


In [11]:
# Perform quality checks here
def check_db_count(table, description):
    '''
    Input: Table name and description
    
    Output: Print data quality check
    
    '''
    count_records = "SELECT count(*) FROM {}".format(table)
    cur.execute(count_records)
    result = cur.fetchall()
    
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return

# Perform data count check
check_db_count("dim_temperature", "Temperature table")
check_db_count("dim_demographics", "Demographics table")
check_db_count("dim_airport", "Airport table")

Data quality check passed for Temperature table with [(120000,)] records
Data quality check passed for Demographics table with [(2875,)] records
Data quality check passed for Airport table with [(22757,)] records


#### 4.3 Data dictionary 
fact_immigration

    immigration_id: Primary key
    cicid: Unique key within a month
    i94yr: 4 digit year
    i94mon: Numeric month, 1-12
    i94cit: Immigrant's country of citizenship
    i94res: Immigrant's country of residence outside US
    i94port: Port of entry
    arrdate: Arrival date of immigrant
    i94mode: Mode of arrival
    i94addr: Address of immigrant in US
    depdate: Departure date of immigrant
    i94bir: Immigrant's birth date
    i94visa: Visa type
    count: ???
    dtadfile: Dates in the format YYYYMMDD
    visapost: Three-letter codes corresponding to where visa was issued
    occup: Occupation in US of immigration
    entdepa: One-letter arrival code
    entdepd: One-letter departure code
    entdepu: One-letter update code
    matflag: M if the arrival and departure records match
    biryear: Year of birth
    dtaddto: Date field for when the immigrant is admitted until
    gender: Gender
    insnum: Immigration and Naturalization Services number
    airline: Airline of entry for immigrant
    admnum: Admission number
    fltno: Flight number
    visatype: Visa codes

dim_temperature: Provides temperature data. The original dataset was for the whole world, but only US temperatures used for this project.

    dt: Date of temperature recording
    averageTemperature: Average temperature for the day 
    averageTemperatureUncertainty: The amout by which the temperature recording may be wrong
    city: City where temperature was recorded
    country: Country, always the US for this project
    longitude: Longitdue of the city where temperature was recored
    latitude: Longitdue of the city where temperature was recored


dim_demographics: Provides population statistics on cities in the US. Grain is city/state/race.

    city: City name
    state: State city is in
    median_age: Median age of residents of the city
    male_pop: Number of men in the city
    female_pop: Number of women in the city
    total_pop: Total population of the city
    num_vets: Number of veterans in the city
    foreign_born: Number of foreign-born people in the city
    avg_household_size: Average household size
    state_code: Two-letter code for state
    race: Primay race in the city: White, Hispanic or Latino, Asian, Black or African-American, or American Indian and Alaska Native
    count: Number of people of that race in the city
    
dim_airport: Provides information about various airports in the US.

    ident: Primay key
    type: Type of airport
    name: Name of airport
    elevation_ft: Elevation of airport
    continent: Continent where airport resides, always US for this project
    iso_country: The ISO country code
    iso_region: The ISO region code
    municipality: Municipality where airport resides
    gps_code: The GPS code for where the airport resides
    iata_code: The IATA code
    local_code: The local code
    coordinates: Geographic coordinates of where the airport resides

dim_port: A list of the ports of arrival

    code: a short code
    name: the name of the port; there are some No PORT Code ([code]) values too


#### Step 5: Complete Project Write Up
I utilized Python and Pandas because they provide simple functionality that can read csv files and insert the data into a relational database.  I also used Psycopg for reading and inserting json files.  I chose this method because it was the simplest method I could find for reading json file and also because I wanted to include more than 1 type of tool for reading and inserting data as part of this project.

The data was structured and formatted well enough to make using a SQL relational database a good fit. I chose Postgres because it is fast and robust.

The data should be updated once a month to coincide with the monthly immigration reports.  Perhaps some of the dimension tables could be updated less frequently.  For instance, new airports will not be added that often or charactersistics of the airport will not change.  However, since these are fairly quick sql procesess the montly update would not cause much stress on the overall system.

If the data was increased by 100 times, we would need to make a fairly major overhaul to the approach. Instead of inserting data directly to tables I would first convert the data into a format that could be passed to S3.  I would then do a bulk copy of the table from S3 to the database.  Additional analysis would also need to be performed on both the size and number of processors.

If the data were to populate a dashboard that must be updated every day, I would move the system to Airflow and modify the appropriate processes.

There should not be a problem with 100 or so people accessing the data.  However, if there were issues with performance when many users are accessing the data we could implement Concurrency Scaling in AWS.  According to Amazon "With the Concurrency Scaling feature, you can support virtually unlimited concurrent users and concurrent queries, with consistently fast query performance.".  Details can be found here: https://docs.aws.amazon.com/redshift/latest/dg/concurrency-scaling.html 

In [2]:
# Create table for extracting data to
!python load_immigration_data.py