# Project Title
### Data Engineering Capstone Project

#### Project Summary
To build a data warehouse on the immigration data of the United States.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [105]:
# Do all imports and installs here
import pandas as pd
!pip install pandas_redshift
import pandas_redshift as pr
import datetime
import re



### Step 1: Scope the Project and Gather Data

#### Scope 
*Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc*

I plan to build a data warehouse on AWS, the end use cases I would like to prepare the data for are analytic tables.
The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
- **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. It contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. The 12 datasets have got more than 40 million rows (40,790,529) and 28 columns. For most of the work we used only the month of April of 2016 which has more than three million records (3,096,313).

In [106]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [107]:
df.shape

(3096313, 28)

In [108]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


##### Data Dictionary
Column Name|Description
---|---
CICID*|ID that uniquely identify one record in the dataset
I94YR|4 digit year
I94MON|Numeric month
I94CIT|3 digit code of source city for immigration (Born country)
I94RES|3 digit code of source country for immigration (Residence country)
I94PORT|Port addmitted through
ARRDATE|Arrival date in the USA
I94MODE|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
I94ADDR|State of arrival
DEPDATE|Departure date
I94BIR|Age of Respondent in Years
I94VISA|Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student)
COUNT|Used for summary statistics
DTADFILE|Character Date Field
VISAPOST|Department of State where where Visa was issued
OCCUP|Occupation that will be performed in U.S.
ENTDEPA|Arrival Flag. Whether admitted or paroled into the US
ENTDEPD|Departure Flag. Whether departed, lost visa, or deceased
ENTDEPU|Update Flag. Update of visa, either apprehended, overstayed, or updated to PR
MATFLAG|Match flag
BIRYEAR|4 digit year of birth
DTADDTO|Character date field to when admitted in the US
GENDER|Gender
INSNUM|INS number
AIRLINE|Airline used to arrive in U.S.
ADMNUM|Admission number, should be unique and not nullable
FLTNO|Flight number of Airline used to arrive in U.S.
VISATYPE|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

- **U.S. City Demographic Data:** This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000 from the US Census Bureau's 2015 American Community Survey. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). 

In [109]:
city_demo_df = pd.read_csv('us-cities-demographics.csv', delimiter=';')
print(city_demo_df.shape)
city_demo_df.head()

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


- #### I94 Immigration Data
i94mode, i94bir, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, fltno have missing data. We will drop all these columns. We keep i94addr and depdate because we are interested in those data.

Columns we will keep:
- cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94addr, depdate, i94visa, count, admnum, visatype

In [110]:
df.count()

cicid       3096313
i94yr       3096313
i94mon      3096313
i94cit      3096313
i94res      3096313
i94port     3096313
arrdate     3096313
i94mode     3096074
i94addr     2943941
depdate     2953856
i94bir      3095511
i94visa     3096313
count       3096313
dtadfile    3096312
visapost    1215063
occup          8126
entdepa     3096075
entdepd     2957884
entdepu         392
matflag     2957884
biryear     3095511
dtaddto     3095836
gender      2682044
insnum       113708
airline     3012686
admnum      3096313
fltno       3076764
visatype    3096313
dtype: int64

- #### U.S. City Demographic Data
Male Population, Female Population, Number of Veterans, Foreign-born, Average Household Size in U.S. City Demographic Data have missing values. We only interested in Total Population of the states.

In [111]:
city_demo_df.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

#### Cleaning Steps
Document steps necessary to clean the data

- #### I94 Immigration Data
We select the relevant columns and remove the missing values.

In [112]:
# Performing cleaning tasks here
immigration_df = df[['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94addr', 'depdate', 'i94visa', 'count', 'admnum', 'visatype']].dropna()

In [113]:
# remove escape charachter from i94addr
immigration_df['i94addr'] = immigration_df['i94addr'].map(lambda x: re.sub(r'\W+', '', x))

In [114]:
immigration_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94addr,depdate,i94visa,count,admnum,visatype
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,MI,20691.0,2.0,1.0,666643200.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,MA,20567.0,2.0,1.0,92468460000.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,MA,20567.0,2.0,1.0,92468460000.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,MI,20555.0,1.0,1.0,92471040000.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,NJ,20558.0,2.0,1.0,92471400000.0,B2


- #### U.S. City Demographic Data
We select the relevant columns and drop the duplicates.

In [115]:
city_demo_df = city_demo_df[['State Code', 'City', 'State', 'Total Population']].drop_duplicates()
city_demo_df.head()

Unnamed: 0,State Code,City,State,Total Population
0,MD,Silver Spring,Maryland,82463
1,MA,Quincy,Massachusetts,93629
2,AL,Hoover,Alabama,84839
3,CA,Rancho Cucamonga,California,175232
4,NJ,Newark,New Jersey,281913


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

We chose star schema and select immigration data as fact while time and state are dimension.
Using this model, we can easily aggregate our immigration data based on month or year and also by different states.

**Fact Table**

`immigration` 
- **cicid***, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94addr, depdate, i94visa, count, admnum, visatype

**Dimension Tables**

`time` - arrival and departure date in immigration broken down into specific units 
- **sas_date***, date, day, month, year, weekday

`state` - total population of the states
- **state_code***, state, total_population

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Create the state table

In [116]:
# We group all city population by state and sum them up.
state_population = city_demo_df.groupby('State').sum().reset_index()
state_population.head()

Unnamed: 0,State,Total Population
0,Alabama,1049629
1,Alaska,298695
2,Arizona,4499542
3,Arkansas,589879
4,California,24822460


In [117]:
# And we merge to get the state code and state name relationship.
state_df = pd.merge(city_demo_df[['State Code', 'State']].drop_duplicates(), state_population, how='right', on='State')
state_df.head()

Unnamed: 0,State Code,State,Total Population
0,MD,Maryland,1312129
1,MA,Massachusetts,2015457
2,AL,Alabama,1049629
3,CA,California,24822460
4,NJ,New Jersey,1428908


#### Create the date table
SAS date value
is a value that represents the number of days between January 1, 1960, and a specified date. We extract all values from arrdate and depdate and convert them into readable date format.

In [118]:
arrdate = immigration_df[['cicid', 'arrdate']].copy()

In [119]:
arrdate['date'] = arrdate['arrdate'].apply(lambda x: datetime.date(1960,1,1)+datetime.timedelta(days=x))

In [120]:
arrdate['day'] = arrdate['date'].apply(lambda x: x.day)
arrdate['month'] = arrdate['date'].apply(lambda x: x.month)
arrdate['year'] = arrdate['date'].apply(lambda x: x.year)
arrdate['weekday'] = arrdate['date'].apply(lambda x: x.weekday())

In [121]:
arrdate.drop(columns='cicid', inplace=True)

In [122]:
arrdate.rename(columns={"arrdate": "sas_date"}, inplace=True)

In [123]:
arrdate.drop_duplicates(inplace=True)

In [124]:
arrdate.head()

Unnamed: 0,sas_date,date,day,month,year,weekday
2,20545.0,2016-04-01,1,4,2016,4
101084,20546.0,2016-04-02,2,4,2016,5
197740,20547.0,2016-04-03,3,4,2016,6
290895,20548.0,2016-04-04,4,4,2016,0
384303,20549.0,2016-04-05,5,4,2016,1


In [125]:
depdate = immigration_df[['cicid', 'depdate']].copy()

In [126]:
depdate['date'] = depdate['depdate'].apply(lambda x: datetime.date(1960,1,1)+datetime.timedelta(days=x))

In [127]:
depdate['day'] = depdate['date'].apply(lambda x: x.day)
depdate['month'] = depdate['date'].apply(lambda x: x.month)
depdate['year'] = depdate['date'].apply(lambda x: x.year)
depdate['weekday'] = depdate['date'].apply(lambda x: x.weekday())

In [128]:
depdate.drop(columns='cicid', inplace=True)

In [129]:
depdate.rename(columns={"depdate": "sas_date"}, inplace=True)

In [130]:
depdate.drop_duplicates(inplace=True)

In [131]:
depdate.head()

Unnamed: 0,sas_date,date,day,month,year,weekday
2,20691.0,2016-08-25,25,8,2016,3
3,20567.0,2016-04-23,23,4,2016,5
5,20555.0,2016-04-11,11,4,2016,0
6,20558.0,2016-04-14,14,4,2016,3
8,20553.0,2016-04-09,9,4,2016,5


In [132]:
date = pd.merge(arrdate, depdate, how='outer')

We can now load our data into redshift by library pandas_redshift

In [133]:
# redshift and s3 config
pr.connect_to_redshift(dbname = dbname,
                        host = host,
                        port = 5439,
                        user = awsuser,
                        password = password)

pr.connect_to_s3(aws_access_key_id = aws_access_key_id,
                aws_secret_access_key = aws_secret_access_key,
                bucket = bucket,
                subdirectory = subdirectory)

In [134]:
# Write code here
# create immigration table
pr.pandas_to_redshift(data_frame = immigration_df,
                      redshift_table_name = 'public.immigration',
                      index = False)

Pandas Redshift | 2020-07-11 08:03:08,196 | pandas_redshift.core | INFO | saved file public.immigration-e4cd6404-d1ec-4718-96a1-436678f9eb22.csv in bucket capstone/public.immigration-e4cd6404-d1ec-4718-96a1-436678f9eb22.csv
Pandas Redshift | 2020-07-11 08:03:08,214 | pandas_redshift.core | INFO | create table public.immigration (cicid REAL, i94yr REAL, i94mon REAL, i94cit REAL, i94res REAL, i94port VARCHAR(256), arrdate REAL, i94addr VARCHAR(256), depdate REAL, i94visa REAL, count REAL, admnum REAL, visatype VARCHAR(256)) diststyle even
Pandas Redshift | 2020-07-11 08:03:08,218 | pandas_redshift.core | INFO | CREATING A TABLE IN REDSHIFT
Pandas Redshift | 2020-07-11 08:03:08,465 | pandas_redshift.core | INFO | 
    copy public.immigration
    from 's3://udacity-data-lake-joshuayeung/capstone/public.immigration-e4cd6404-d1ec-4718-96a1-436678f9eb22.csv'
    delimiter ','
    ignoreheader 1
    csv quote as '"'
    dateformat 'auto'
    timeformat 'auto'
    
        access_key_id '******

In [135]:
# create state table
state_df.columns = ['state_code', 'state', 'population']
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = state_df,
                        redshift_table_name = 'public.state')

Pandas Redshift | 2020-07-11 08:03:21,866 | pandas_redshift.core | INFO | saved file public.state-e73eec74-0423-4615-bb15-6eab02325a84.csv in bucket capstone/public.state-e73eec74-0423-4615-bb15-6eab02325a84.csv
Pandas Redshift | 2020-07-11 08:03:21,871 | pandas_redshift.core | INFO | create table public.state (state_code VARCHAR(256), state VARCHAR(256), population BIGINT) diststyle even
Pandas Redshift | 2020-07-11 08:03:21,874 | pandas_redshift.core | INFO | CREATING A TABLE IN REDSHIFT
Pandas Redshift | 2020-07-11 08:03:22,115 | pandas_redshift.core | INFO | 
    copy public.state
    from 's3://udacity-data-lake-joshuayeung/capstone/public.state-e73eec74-0423-4615-bb15-6eab02325a84.csv'
    delimiter ','
    ignoreheader 1
    csv quote as '"'
    dateformat 'auto'
    timeformat 'auto'
    
        access_key_id '********'
        secret_access_key '********'
        
    
    ;
Pandas Redshift | 2020-07-11 08:03:22,116 | pandas_redshift.core | INFO | FILLING THE TABLE IN REDSHIF

In [136]:
# create date table
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = date,
                        redshift_table_name = 'public.date')

Pandas Redshift | 2020-07-11 08:03:22,660 | pandas_redshift.core | INFO | saved file public.date-3161ebff-1de9-4f4f-b8e9-d12302bf9fbf.csv in bucket capstone/public.date-3161ebff-1de9-4f4f-b8e9-d12302bf9fbf.csv
Pandas Redshift | 2020-07-11 08:03:22,665 | pandas_redshift.core | INFO | create table public.date (sas_date REAL, date VARCHAR(256), day BIGINT, month BIGINT, year BIGINT, weekday BIGINT) diststyle even
Pandas Redshift | 2020-07-11 08:03:22,668 | pandas_redshift.core | INFO | CREATING A TABLE IN REDSHIFT
Pandas Redshift | 2020-07-11 08:03:22,910 | pandas_redshift.core | INFO | 
    copy public.date
    from 's3://udacity-data-lake-joshuayeung/capstone/public.date-3161ebff-1de9-4f4f-b8e9-d12302bf9fbf.csv'
    delimiter ','
    ignoreheader 1
    csv quote as '"'
    dateformat 'auto'
    timeformat 'auto'
    
        access_key_id '********'
        secret_access_key '********'
        
    
    ;
Pandas Redshift | 2020-07-11 08:03:22,912 | pandas_redshift.core | INFO | FILLING 

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [137]:
# Perform quality checks here
def data_quality_checks(table):
    data = pr.redshift_to_pandas(f"SELECT COUNT(*) FROM {table}")
    if len(data) > 0:
        return True
    else:
        return False

In [138]:
tables = ['immigration', 'state', 'date']
for table in tables:
    if not data_quality_checks(table):
        print(f"Data quality check failed. {table} returned no results")
    else:
        print(f"Data quality check succeed for {table}")

Data quality check succeed for immigration
Data quality check succeed for state
Data quality check succeed for date


In [139]:
# Finally close the cursor, commit and close the database connection, and remove variables from the environment.
pr.close_up_shop()

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Data Dictionary
#### Immigration
Column Name|Description
---|---
CICID*|ID that uniquely identify one record in the dataset
I94YR|4 digit year
I94MON|Numeric month
I94CIT|3 digit code of source city for immigration (Born country)
I94RES|3 digit code of source country for immigration (Residence country)
I94PORT|Port addmitted through
ARRDATE|Arrival date in the USA
I94MODE|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
I94ADDR|State of arrival
DEPDATE|Departure date
I94BIR|Age of Respondent in Years
I94VISA|Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student)
COUNT|Used for summary statistics
DTADFILE|Character Date Field
VISAPOST|Department of State where where Visa was issued
OCCUP|Occupation that will be performed in U.S.
ENTDEPA|Arrival Flag. Whether admitted or paroled into the US
ENTDEPD|Departure Flag. Whether departed, lost visa, or deceased
ENTDEPU|Update Flag. Update of visa, either apprehended, overstayed, or updated to PR
MATFLAG|Match flag
BIRYEAR|4 digit year of birth
DTADDTO|Character date field to when admitted in the US
GENDER|Gender
INSNUM|INS number
AIRLINE|Airline used to arrive in U.S.
ADMNUM|Admission number, should be unique and not nullable
FLTNO|Flight number of Airline used to arrive in U.S.
VISATYPE|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

 We use a library called [pandas-redshift](https://github.com/agawronski/pandas_redshift), which is designed to make it easier to get data from redshift into a pandas DataFrame and vice versa. It transfers the data frame to S3 and then to Redshift.

* Propose how often the data should be updated and why.
 
 For I94 Immigration Data, it depends how often the immigration authorities update the data. 
 
 For U.S. City Demographic Data, the data will be updated when there is another survey held by the US Census Bureau.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 If the data was increased by 100x, pure pandas cannot handle it. We will need Spark to handle it.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 We can use Airflow to schedule a pipeline that run every day before 7am.
 * The database needed to be accessed by 100+ people.
 
 Redshift has the ability to be scalable, our current technology stack can support that.