# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os
import configparser

from pyspark.sql import SparkSession

from etl import process_immigration

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()



In [3]:
process_immigration(spark, "source-bucket-capstone")

INFO:root:Start loading immigration data
INFO:root:Start processing fact_im
INFO:root:Start processing dim_im_person
INFO:root:Start processing dim_im_airline


In [None]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/*.sas7bdat')

In [3]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [4]:
df_spark.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [8]:
# Read in the data here

fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im = pd.read_sas(filepath_or_buffer= fname, format='sas7bdat', encoding="ISO-8859-1")


In [9]:
df_im.shape

(3096313, 28)

In [10]:
df_im.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [11]:
df_im.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


In [12]:
df_im.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [13]:
df_im["cicid"].nunique()

3096313

In [14]:
fact_im = df_im[['cicid', 'i94yr', 'i94mon',  'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94visa']]
fact_im.rename(columns= {"cicid": "immigration_id", "i94yr": "year", "i94mon": "month", "i94port":"city_code", "i94addr":"state_code", "arrdate": "arrival_date", "depdate": "departure_date", "i94mode": "transportation_mode", "i94visa": "visa"}, inplace = True)
fact_im = fact_im[["immigration_id", "year", "month", "city_code", "state_code", "arrival_date", "departure_date", "transportation_mode", "visa"]]

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  return super(DataFrame, self).rename(**kwargs)


In [15]:
fact_im.head()

Unnamed: 0,immigration_id,year,month,city_code,state_code,arrival_date,departure_date,transportation_mode,visa
0,6.0,2016.0,4.0,XXX,,20573.0,,,2.0
1,7.0,2016.0,4.0,ATL,AL,20551.0,,1.0,3.0
2,15.0,2016.0,4.0,WAS,MI,20545.0,20691.0,1.0,2.0
3,16.0,2016.0,4.0,NYC,MA,20545.0,20567.0,1.0,2.0
4,17.0,2016.0,4.0,NYC,MA,20545.0,20567.0,1.0,2.0


In [16]:
dim_im_person = df_im[["cicid", "i94cit", "i94res", "biryear", "gender"]]
dim_im_person.rename(columns= {"cicid": "immigration_id", "i94cit": "citizen_country", "i94res": "residence_country", "biryear": "birth_year"}, inplace= True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  return super(DataFrame, self).rename(**kwargs)


In [17]:
dim_im_person.head()

Unnamed: 0,immigration_id,citizen_country,residence_country,birth_year,gender
0,6.0,692.0,692.0,1979.0,
1,7.0,254.0,276.0,1991.0,M
2,15.0,101.0,101.0,1961.0,M
3,16.0,101.0,101.0,1988.0,
4,17.0,101.0,101.0,2012.0,


In [18]:
dim_im_airline = df_im[["cicid", "airline", "fltno", "visatype"]]
dim_im_airline.rename(columns={"cicid": "immigration_id", "fltno": "flight_number", "visatype": "visa_type"}, inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  return super(DataFrame, self).rename(**kwargs)


In [19]:
dim_im_airline.head()

Unnamed: 0,immigration_id,airline,flight_number,visa_type
0,6.0,,,B2
1,7.0,,296.0,F1
2,15.0,OS,93.0,B2
3,16.0,AA,199.0,B2
4,17.0,AA,199.0,B2


### Temperature Data Set

In [8]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [37]:
df_temp.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [38]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [39]:
# filter data set for US data
df_temp = df_temp[df_temp['Country'] == 'United States']
df_temp['dt'] = pd.to_datetime(df_temp['dt'])
df_temp['year'] = df_temp['dt'].apply(lambda dt: dt.year)
df_temp['month'] = df_temp['dt'].apply(lambda dt: dt.month)

dim_temp = df_temp
dim_temp.rename(columns= {"dt": "date", "AverageTemperature": "avg_temperature", "AverageTemperatureUncertainty": "avg_temp_uncertainty"}, inplace= True)
dim_temp.head()

Unnamed: 0,date,avg_temperature,avg_temp_uncertainty,City,Country,Latitude,Longitude,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,1820,5


### Demography Data Set

In [24]:
df_demog = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demog.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [25]:
dim_demog = df_demog
dim_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [26]:
dim_demog.loc[dim_demog["City"] == "Silver Spring"]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
592,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,White,37756
1678,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Black or African-American,21330
2123,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,American Indian and Alaska Native,1084
2162,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Asian,8841


In [27]:
dim_demog_state = dim_demog[["State Code", "State", "Median Age", "Male Population", "Female Population", "Total Population", "Number of Veterans", "Foreign-born", "Average Household Size"]]
dim_demog_state.head()

Unnamed: 0,State Code,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
0,MD,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6
1,MA,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39
2,AL,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58
3,CA,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18
4,NJ,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73


### Airport Codes Data Set

In [28]:
df_airport = pd.read_csv("airport-codes_csv.csv")
df_airport.head()

dim_airport = df_airport.rename(columns= {"ident": "airport_id", "type": "airport_type", "name": "airport_name"})
dim_airport.head()

Unnamed: 0,airport_id,airport_type,airport_name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


## Perform Data Cleaning Steps

### Transform dates into pandas datatime object

In [29]:
def transform_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

fact_im['arrival_date'] = transform_datetime(fact_im['arrival_date'])
fact_im['departure_date'] = transform_datetime(fact_im['departure_date'])
fact_im.head(5)

Unnamed: 0,immigration_id,year,month,city_code,state_code,arrival_date,departure_date,transportation_mode,visa
0,6.0,2016.0,4.0,XXX,,2016-04-29,NaT,,2.0
1,7.0,2016.0,4.0,ATL,AL,2016-04-07,NaT,1.0,3.0
2,15.0,2016.0,4.0,WAS,MI,2016-04-01,2016-08-25,1.0,2.0
3,16.0,2016.0,4.0,NYC,MA,2016-04-01,2016-04-23,1.0,2.0
4,17.0,2016.0,4.0,NYC,MA,2016-04-01,2016-04-23,1.0,2.0


### Get information for codes (country, city, state)

In [30]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [31]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country
    
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [32]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    city_code[code] = city
    
df_city_code = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
df_city_code.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [33]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state
    
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


### convert state and city to upper case in the demography and temp data set 

In [34]:
dim_demog_state["State"] = dim_demog_state["State"].str.upper()
dim_demog_state.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0,State Code,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
0,MD,MARYLAND,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6
1,MA,MASSACHUSETTS,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39
2,AL,ALABAMA,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58
3,CA,CALIFORNIA,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18
4,NJ,NEW JERSEY,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73


In [41]:
dim_temp["City"] = dim_temp["City"].str.upper()
dim_temp.head()

Unnamed: 0,date,avg_temperature,avg_temp_uncertainty,City,Country,Latitude,Longitude,year,month
47555,1820-01-01,2.101,3.217,ABILENE,United States,32.95N,100.53W,1820,1
47556,1820-02-01,6.926,2.853,ABILENE,United States,32.95N,100.53W,1820,2
47557,1820-03-01,10.767,2.395,ABILENE,United States,32.95N,100.53W,1820,3
47558,1820-04-01,17.989,2.202,ABILENE,United States,32.95N,100.53W,1820,4
47559,1820-05-01,21.809,2.036,ABILENE,United States,32.95N,100.53W,1820,5


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The chosen data model is a star schema as the designed purpose of it are BI and analysis tasks.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. The data loaded in the workspace is used

2. The immigration data is loaded into a spark dataframe
    - Created tables are: 
        - fact table immigration
        - dim table person
        - dim table airline
    - the final tables are stored in the "output_data" dir
3. The temperature data is loaded into a spark dataframe
    - dim table temperature
    - the final tables are stored in the "output_data" dir
4. The demography data is loaded into a spark dataframe
    - dim table demography
    - the final tables are stored in the "output_data" dir
4. Extract helper tables from immigration label description. The labels data is loaded into a spark dataframe
    - created helper tables are: 
        - Country Code
        - City Code
        - State Code
    - the final tables are stored in the "output_data" dir
        
Note: 
to use S3 instead of the workspace to load the data the following steps are required:
    - define the source bucket in the `config.cfg` file
    - upload the immigration data set and its labels description 
    - upload the temperature data set
    - and the demography data set

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.