# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope
This project will integrate I94 immigration data, world temperature data and US demographic data to setup a data warehouse with fact and dimension tables.

* Data Sets 
    1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
    2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
    3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

* Tools
    * AWS S3: data storage
    * Python for data processing
        * Pandas - exploratory data analysis on small data set
        * PySpark - data processing on large data set

#### Describe and Gather Data 

| Data Set | Format | Description |
| ---      | ---    | ---         |
|[I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)| SAS | Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).|
|[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)| CSV | This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.|
|[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)| CSV | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|


### Step 2: Explore and Assess the Data

#### Explore the data

1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding
3. Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic

##### Explore Immigration Dataset

In [2]:
# Read in the data
df_immigration = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
# List columns
df_immigration.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
# Define `fact_immigration` from `df_immigration`
fact_immigration = df_immigration[['cicid', 'i94yr', 'i94mon',
                                   'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]

# Rename `fact_immigration` columns
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code',
                            'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']

fact_immigration.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [6]:
fact_immigration["country"] = "United States"

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [7]:
fact_immigration.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [8]:
dim_person_immigration = df_immigration[[
    'cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]
dim_person_immigration.columns = [
    ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]

dim_person_immigration.head()

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [9]:
dim_airline_immigration = df_immigration[[
    'cicid', 'airline', 'admnum', 'fltno', 'visatype']]
dim_airline_immigration.columns = [
    'cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_airline_immigration.head()

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


##### Explore Temperature Dataset

In [10]:
df_temperature = pd.read_csv("GlobalLandTemperaturesByCity.csv")
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
df_temperature = df_temperature[df_temperature['Country'] == 'United States']
df_temperature = df_temperature[[
    'dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temperature.columns = ['dt', 'average_temp', 'average_temp_uncertainty', 'city', 'country']
df_temperature.head()

Unnamed: 0,dt,average_temp,average_temp_uncertainty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [12]:
df_temperature['dt'] = pd.to_datetime(df_temperature['dt'])
df_temperature['year'] = df_temperature['dt'].apply(lambda x: x.year)
df_temperature['month'] = df_temperature['dt'].apply(lambda x: x.month)
df_temperature.head()

Unnamed: 0,dt,average_temp,average_temp_uncertainty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


In [13]:
# filter by year
df_temperature_year = df_temperature[df_temperature['year'] == 2016]
df_temperature_year.head()

Unnamed: 0,dt,average_temp,average_temp_uncertainty,city,country,year,month


Dataset does not contain records for 2016

##### Explore Demographic Data

In [15]:
df_demographics = pd.read_csv("us-cities-demographics.csv", delimiter=';')
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [16]:
# Define dim_city_demographics
dim_city_demographics = df_demographics[[
    'City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
dim_city_demographics.columns = ['city', 'state', 'male_pop', 'female_pop', 'num_veterans', 'foreign_born', 'race']
dim_city_demographics.head()

Unnamed: 0,city,state,male_pop,female_pop,num_veterans,foreign_born,race
0,Silver Spring,Maryland,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,Quincy,Massachusetts,44129.0,49500.0,4147.0,32935.0,White
2,Hoover,Alabama,38040.0,46799.0,4819.0,8229.0,Asian
3,Rancho Cucamonga,California,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,Newark,New Jersey,138040.0,143873.0,5829.0,86253.0,White


In [17]:
dim_city_statistics = df_demographics[[
    'City', 'State', 'Median Age', 'Average Household Size']]
dim_city_statistics.columns = ['city', 'state', 'median_age', 'average_household_size']
dim_city_statistics.head()

Unnamed: 0,city,state,median_age,average_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


##### Explore with Spark

In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [20]:
#write to parquet
df_spark.write.parquet("sas_data")

In [21]:
# Read in the parquet file
df_spark = spark.read.parquet("sas_data")
df_spark.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

#### Cleaning Steps

1. Transform arrive_date, departure_date in immigration data from SAS time format to pandas datetime format
2. Parse I94_SAS_Labels_Descriptions.SAS file to get auxiliary dimension table - country_code, city_code, state_code
3. Transform city, state in demography data to upper case to match city_code and state_code table

##### 1. Transform `arrdate`, `depdate` from SAS time format to pandas time format

In [22]:
def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [23]:
fact_immigration['arrive_date'] = SAS_to_datetime(fact_immigration['arrive_date'])
fact_immigration['departure_date'] = SAS_to_datetime(fact_immigration['departure_date'])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [24]:
fact_immigration.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,United States


##### 2. Parse description file to get auxillary dimension table - country_code, city_code, state_code

In [25]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [26]:
country_codes = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_codes[code] = country


In [27]:
df_country_codes = pd.DataFrame(list(country_codes.items()), columns=['code', 'country'])
df_country_codes.head()

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [28]:
city_codes = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip(
        "'"), pair[1].strip('\t').strip().strip("''")
    city_codes[code] = city

In [29]:
df_city_code = pd.DataFrame(list(city_codes.items()), columns=['code', 'city'])
df_city_code.head()

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [30]:
state_codes = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip("\t").strip().strip(
        "'"), pair[1].strip('\t').strip().strip("''")
    state_codes[code] = state

In [31]:
df_state_code = pd.DataFrame(list(state_codes.items()), columns=['code', 'state'])
df_state_code.head()

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


##### 3. Transform city, state in dimension table to upper case to match city_code and state_code table

In [32]:
dim_city_statistics['city'] = dim_city_statistics['city'].str.upper()
dim_city_statistics['state'] = dim_city_statistics['state'].str.upper()
dim_city_statistics.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,city,state,median_age,average_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data warehouse is for the analytics team and BI app usage. These datasets will be modeled with star schema consisting of one fact and five dimension tables.

#### 3.2 Mapping Out Data Pipelines
All data sets are transformed to a common data model with the following steps:
   1. Transform immigration data to 1 fact table and 2 dimension tables, fact table will be partitioned by state
   2. Parse label description file to get auxiliary tables
   3. Transform temperature data to dimension table
   4. Split demographics data to produce two dimension tables
   5. Store these tables in a AWS S3 bucket

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Refer to [ETL](etl.py) for the data model.

#### 4.2 Data Quality Checks
These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Refer to [Data Quality Checks](data_quality_checks.ipynb) for the data quality checks.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Step 5: Complete Project Write Up

#### Tools and Technologies
1. AWS S3: for data storage
2. Pandas: for sample data set exploratory data analysis
3. PySpark: for processing large dataset to transform staging table to dimensional table


#### Data Update Frequency
1. We can set the update frequency of the tables created from immigration and temperature dataset to monthly. This seems logical since the raw dataset is built up monthly.
2. Tables created from demography dataset can be updated annually for the following reasons:
   1. Demographics data collection typically takes time and is usually updated annually.
   2. Cost implications of of a more frequent collection  schedule.
3. All tables should be update in an append-only mode. Adding new data to existing tables.


#### Future Design Considerations for  Scenarios
1. The data was increased by 100x.
	
	If Spark with standalone server mode can not process 100x data set, we could consider to put data in [AWS EMR](https://aws.amazon.com/tw/emr/?nc2=h_ql_prod_an_emr&whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc) which is a distributed data cluster for processing large data sets on cloud

2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

	A scheduling and monitoring tool such as [Apache Airflow](https://airflow.apache.org) could be used for building up a ETL data pipeline to regularly update at a configured schedule.

3. The database needed to be accessed by 100+ people.

	[AWS Redshift](https://aws.amazon.com/tw/redshift/?nc2=h_ql_prod_db_rs&whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc) can handle up to 500 connections and seems appropriate for this scenario.
