# Project Title
### Data Engineering Capstone Project

#### Project Summary
The objective of this project is to build an ETL pipeline that combines the US Immigration data with other data sources to create data models that can be analyzed.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [5]:
# Do all imports and installs here
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

I94 Immigration Data

In [6]:
# Read in the data here
df_immigration = pd.read_parquet('./sas_data/part-00000-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet', engine='pyarrow')
df_immigration.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
	
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.\
# config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
# .enableHiveSupport().getOrCreate()
# df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
# #write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

##### World Temperature Data

In [9]:
df_temp = pd.read_csv("./GlobalLandTemperaturesByCity.csv")
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Airport Code Data

In [10]:
df_airport_codes = pd.read_csv("./airport-codes_csv.csv")
df_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### US City Demographic Data

In [11]:
df_demographics = pd.read_csv("./us-cities-demographics.csv", delimiter=";")
df_demographics

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.60,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
...,...,...,...,...,...,...,...,...,...,...,...,...
2886,Stockton,California,32.5,150976.0,154674.0,305650,12822.0,79583.0,3.16,CA,American Indian and Alaska Native,19834
2887,Southfield,Michigan,41.6,31369.0,41808.0,73177,4035.0,4011.0,2.27,MI,American Indian and Alaska Native,983
2888,Indianapolis,Indiana,34.1,410615.0,437808.0,848423,42186.0,72456.0,2.53,IN,White,553665
2889,Somerville,Massachusetts,31.0,41028.0,39306.0,80334,2103.0,22292.0,2.43,MA,American Indian and Alaska Native,374


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### I94 Immigration Data

In [12]:
# Performing cleaning tasks here
df_immigration.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [13]:
df_immigration.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,219268.0,219268.0,219268.0,219268.0,219268.0,219268.0,219267.0,211405.0,219268.0,219268.0,219268.0,219268.0,219268.0
mean,211413.42791,2016.0,4.0,280.641407,280.726522,20545.637225,1.006462,20561.003926,39.853672,1.869502,1.0,1976.146328,68942110000.0
std,143310.481225,0.0,0.0,209.791615,207.898439,0.656603,0.084776,22.820768,17.441783,0.405161,0.0,17.441783,20595540000.0
min,6.0,2016.0,4.0,101.0,101.0,20545.0,1.0,20479.0,0.0,1.0,1.0,1916.0,8204285.0
25%,66552.75,2016.0,4.0,129.0,123.0,20545.0,1.0,20551.0,28.0,2.0,1.0,1964.0,55459800000.0
50%,229331.5,2016.0,4.0,148.0,207.0,20546.0,1.0,20554.0,40.0,2.0,1.0,1976.0,55521200000.0
75%,298809.25,2016.0,4.0,438.0,438.0,20546.0,1.0,20560.0,52.0,2.0,1.0,1988.0,92493620000.0
max,459650.0,2016.0,4.0,999.0,749.0,20573.0,9.0,20716.0,100.0,3.0,1.0,2016.0,92693270000.0


In [14]:
# checking the data types of the columns
df_immigration.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile     object
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu      object
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum       object
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object

In [15]:
df_immigration.isnull().sum()

cicid            0
i94yr            0
i94mon           0
i94cit           0
i94res           0
i94port          0
arrdate          0
i94mode          1
i94addr       7911
depdate       7863
i94bir           0
i94visa          0
count            0
dtadfile         1
visapost    138521
occup       218310
entdepa          0
entdepd       7863
entdepu     219241
matflag       7863
biryear          0
dtaddto          6
gender       27744
insnum      219268
airline          2
admnum           0
fltno            1
visatype         0
dtype: int64

In [16]:
len(df_immigration)

219268

In [17]:
# Dropping all columns with over 60% missing values as they won't be useful for analytics
df_immigration.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)
df_immigration.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'entdepa', 'entdepd', 'matflag', 'biryear', 'dtaddto',
       'gender', 'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [18]:
# Dropping all rows with missing values.
df_immigration.dropna(inplace=True)

In [19]:
F"Count of immigration data {len(df_immigration)} after cleaning"

'Count of immigration data 177975 after cleaning'

##### World Temperature Data

In [20]:
df_temp.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


In [21]:
df_temp.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

Since the datsets and analysis is on the United States, we filter down the world temperature data to only United States data.

In [22]:
df_temp['Country'].unique()

array(['Denmark', 'Turkey', 'Kazakhstan', 'China', 'Spain', 'Germany',
       'Nigeria', 'Iran', 'Russia', 'Canada', "Côte D'Ivoire",
       'United Kingdom', 'Saudi Arabia', 'Japan', 'United States',
       'India', 'Benin', 'United Arab Emirates', 'Mexico', 'Venezuela',
       'Ghana', 'Ethiopia', 'Australia', 'Yemen', 'Indonesia', 'Morocco',
       'Pakistan', 'France', 'Libya', 'Burma', 'Brazil', 'South Africa',
       'Syria', 'Egypt', 'Algeria', 'Netherlands', 'Malaysia', 'Portugal',
       'Ecuador', 'Italy', 'Uzbekistan', 'Philippines', 'Madagascar',
       'Chile', 'Belgium', 'El Salvador', 'Romania', 'Peru', 'Colombia',
       'Tanzania', 'Tunisia', 'Turkmenistan', 'Israel', 'Eritrea',
       'Paraguay', 'Greece', 'New Zealand', 'Vietnam', 'Cameroon', 'Iraq',
       'Afghanistan', 'Argentina', 'Azerbaijan', 'Moldova', 'Mali',
       'Congo (Democratic Republic Of The)', 'Thailand',
       'Central African Republic', 'Bosnia And Herzegovina', 'Bangladesh',
       'Switzerland'

In [23]:
df_temp_us = df_temp[df_temp['Country'] == 'United States']
df_temp_us

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W
...,...,...,...,...,...,...,...
8439242,2013-05-01,15.544,0.281,Yonkers,United States,40.99N,74.56W
8439243,2013-06-01,20.892,0.273,Yonkers,United States,40.99N,74.56W
8439244,2013-07-01,24.722,0.279,Yonkers,United States,40.99N,74.56W
8439245,2013-08-01,21.001,0.323,Yonkers,United States,40.99N,74.56W


In [24]:
# Checking for missing values
df_temp_us.isnull().sum()

dt                                   0
AverageTemperature               25765
AverageTemperatureUncertainty    25765
City                                 0
Country                              0
Latitude                             0
Longitude                            0
dtype: int64

In [25]:
len(df_temp_us)

687289

In [26]:
# Clearing missing temperature values
df_temp_us.dropna(inplace=True)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_temp_us.dropna(inplace=True)


##### U.S City Demographics Data

In [27]:
df_demographics.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [28]:
df_demographics.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [29]:
len(df_demographics)

2891

The U.S City Demographics Data looks clean enough.

##### Airport Code Data

In [30]:
df_airport_codes.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


In [31]:
df_airport_codes.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [32]:
len(df_airport_codes)

55075

In [33]:
df_airport_codes.dropna(subset=['iata_code'], inplace=True)
df_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
594,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
673,0CO2,small_airport,Crested Butte Airpark,8980.0,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
1088,0TE7,small_airport,LBJ Ranch Airport,1515.0,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"


##### Data Enrichment
Additonal transformation step to parse the `I94_SAS_Labels_Description.SAS` file to derive corresponding port city and state for the I94 Immigration data port field.
This will allow us to join the `I94 Immigration` data with the other datasets through the port code and city fields.

In [34]:
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
splitted_content = [x.split('=') for x in content[303:962]]
port_codes = [x[0].strip('\t').strip().strip("'") for x in splitted_content]
port_locations = [x[1].strip().strip("'").strip() for x in splitted_content]
port_city = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_city, "port_state": port_states})
df_port_locations.head()
df_port_locations.to_csv('I94_SAS_Labels_Descriptions.csv')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Please refer to `README.md` for solution

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Please refer to `etl.py`

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Please refer to `data_check.py`

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Please refer to `README.md` for solution

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Please refer to `README.md` for solution