### Data Engineering Capstone Project

#### Project Summary
This project is aimed to analysis the immigration in United States.        
The main data sources is I94 immigration data and airport_codes, us_cities_demographics, and 
world temperature data soucres is supplement

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from sql_queries import fact_table_insert, people_table_insert, port_table_insert, demo_table_insert

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

## Step 1: Original Data from different sources

##### I94 Immigration Data
This data comes from the US National Tourism and Trade Office.
https://www.trade.gov/national-travel-and-tourism-office

In [3]:
df_spark=spark.read.parquet("sas_data")
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

In [4]:
df_immi = pd.read_csv("./immigration_data_sample.csv")
df_immi.head(1)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,782,WT


##### US-Cities_Demographics
This data comes from OpenSoft.                                               
https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

In [5]:
df_demo = pd.read_csv("us-cities-demographics.csv", delimiter=';')
df_demo.columns=["city","state","median_age","male_pop","female_pop","tot_pop","veterans","foreign_born","avg_household_size","state_code","race","count"]
df_demo.head(1)

Unnamed: 0,city,state,median_age,male_pop,female_pop,tot_pop,veterans,foreign_born,avg_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924


##### Airport-Codes
This is a simple table of airport codes and corresponding cities.  
https://datahub.io/core/airport-codes#data

In [6]:
# there is no data in continent column
df_airport = pd.read_csv("airport-codes_csv.csv")
df_airport.head(3)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"


##### World Temperature Data
This dataset came from Kaggle.              
https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

## Step 2: Explore and Assess the Data

#### Explore and Clean the Data 
Move I94_SAS data to df_immi dataframe

In [7]:
# Change the df_immi dataframe's column name for readability
df_immi = df_immi.rename(columns=
{'i94yr': 'year', 'i94mon': 'month', 'i94cit': 'citizen', 'i94res': 'resident', 'i94port': 'port', 'arrdate': 'arrive_date' , 
 'i94mode': 'mode', 'i94addr': 'state', 'depdate' : 'departure_date' , 'i94bir': 'age', 'i94visa': 'visa'}, inplace=False)
df_immi.head(3)

Unnamed: 0.1,Unnamed: 0,cicid,year,month,citizen,resident,port,arrive_date,mode,state,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT


In [8]:
# Change the df_immi data from I94_SAS_Labels_Description for readability and efficient
# citizen, resident, port, mode, address, visa
with open("I94_SAS_Labels_Descriptions.SAS","r") as f:
    context = f.readlines()

In [9]:
# Citizen,resident data, bring data and make table
data_store_ci = {}
for line in context[10:298]:
    code, country = line.split("=")
    country = country.strip().strip("'")
    code = code.strip()
    data_store_ci[code] = country     

# This variable contains code and country data    
city_immi = pd.DataFrame(list(data_store_ci.items()), columns = ["code","country"])
city_immi.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [10]:
# Replace the citizen data in df_immi table
city_immi.code = city_immi.code.apply(lambda x: int(x))
dict_country = dict(sorted(city_immi.values.tolist()))

In [11]:
df_immi.citizen = df_immi.citizen.map(dict_country)
df_immi.resident= df_immi.resident.map(dict_country)
df_immi['country']='United States'

In [12]:
# Port data, bring data from SAS and make table
context = [x.strip() for x in context]
ports = context[302:962]
split_port = [port.split("=") for port in ports]
port_code = [x[0].replace("'","").strip() for x in split_port]
port_location = [x[1].replace("'","").strip() for x in split_port]
port_city = [x.split(",")[0] for x in port_location]
port_state = [x.split(",")[-1] for x in port_location]
df_port = pd.DataFrame({"port_code" : port_code, "port_city" : port_city})
df_port.head(5)

Unnamed: 0,port_code,port_city
0,ALC,ALCAN
1,ANC,ANCHORAGE
2,BAR,BAKER AAF - BAKER ISLAND
3,DAC,DALTONS CACHE
4,PIZ,DEW STATION PT LAY DEW


In [13]:
# Replace the port data in df_immi table
dict_country = dict(sorted(df_port.values.tolist()))
df_immi.port = df_immi.port.map(dict_country)
df_immi.head(1)

Unnamed: 0.1,Unnamed: 0,cicid,year,month,citizen,resident,port,arrive_date,mode,state,...,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,country
0,2027561,4084316.0,2016.0,4.0,JAPAN,JAPAN,HONOLULU,20566.0,1.0,HI,...,M,1955.0,7202016,F,,JL,56582670000.0,782,WT,United States


In [14]:
# Arrdate data, bring data from SAS make table and replace the data in df_immi table
def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.datetime(1960,1,1)
# Replace the date data in df_immi
df_immi['arrive_date'] = SAS_to_datetime(df_immi['arrive_date'])
df_immi['departure_date'] = SAS_to_datetime(df_immi['departure_date'])

In [15]:
# Mode data, bring data from SAS and make table
context = [x.strip() for x in context]
modes = context[972:976]
split_mode = [mode.split("=") for mode in modes]
mo_code = [code[0].strip() for code in split_mode]
mo_way = [code[1].replace("'","").strip() for code in split_mode]
df_mode = pd.DataFrame({"code":mo_code,"way":mo_way})
df_mode.head()

Unnamed: 0,code,way
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported ;


In [16]:
# Replace the mode data in df_immi table
df_mode.code = df_mode.code.apply(lambda x: float(x))
dict_country = dict(sorted(df_mode.values.tolist()))
df_immi["mode"] = df_immi["mode"].map(dict_country)

In [17]:
# Addr data, bring data from SAS and make table
context = [x.strip() for x in context]
addrs = context[981:1036]
split_addr = [addr.split("=") for addr in addrs]
addr_code = [code[0].replace("'","").strip() for code in split_addr]
addr_state = [state[1].replace("'","").strip() for state in split_addr]
df_addr = pd.DataFrame({"code":addr_code,"state":addr_state})
df_addr.head(1)

Unnamed: 0,code,state
0,AL,ALABAMA


In [18]:
# Replace the mode data in df_immi table
dict_country = dict(sorted(df_addr.values.tolist()))
df_immi.state = df_immi.state.map(dict_country)
df_immi.head(1)

Unnamed: 0.1,Unnamed: 0,cicid,year,month,citizen,resident,port,arrive_date,mode,state,...,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,country
0,2027561,4084316.0,2016.0,4.0,JAPAN,JAPAN,HONOLULU,2016-04-22,Air,HAWAII,...,M,1955.0,7202016,F,,JL,56582670000.0,782,WT,United States


In [19]:
# Visa data, bring data from SAS and make table
context = [x.strip() for x in context]
visas = context[1046:1049]
split_visa = [visa.split("=") for visa in visas]
visa_code = [code[0].replace("'","").strip() for code in split_visa]
visa_status = [status[1].replace("'","").strip() for status in split_visa]
df_visa = pd.DataFrame({"code":visa_code,"status":visa_status})
df_visa.head(3)

Unnamed: 0,code,status
0,1,Business
1,2,Pleasure
2,3,Student


In [20]:
# Replace the visa data in df_immi table
df_visa.code = df_visa.code.apply(lambda x : float(x))
dict_country = dict(sorted(df_visa.values.tolist()))
df_immi.visa = df_immi.visa.map(dict_country)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
This data model will be stored at redshift warehouse for OLAP.                          
Therefore, i created star schema data modeling for faster and less complicate query.                    
The purpose is to find insight about 2016 immigrant in United States.


#### 3.2 Mapping Out Data Pipelines


##### 1. fact_immigration_table columns
fact_id, cicid, state, country, year, month                        
-cicid, state, and country will be used as foreign key to join with dimensional table

##### 2. dim_people_immi_table columns
people_id, cicid, citizen, resident, age, gender
-This dimensional table contains personal information

##### 3. dim_port_immi_table columns
port_id, cicid, port, arrive_date, mode, departure_date, visa
-This dimensional table contains when and how immigrants come in

##### 4. dim_temp_table columns
temp_id, dt, avg_temp, avg_temp_uncer, city, country, latitude, longitude

##### 5. dim_demo_table columns
demo_id, city, state, male_pop, female_pop, tot_pop, foreign_born, race



In [21]:
# This dataframe will be used to create data model
fact_immi = df_immi[["cicid", "state", "country", "year","month"]]
dim_people = df_immi[["cicid","citizen","resident","age","gender"]]
dim_port = df_immi[["cicid","port","arrive_date","mode","departure_date","visa"]]
dim_airport = df_airport[["ident","name","continent","iso_country"]]
dim_demo = df_demo[["city","state","male_pop","female_pop","tot_pop","foreign_born","race"]]

In [22]:
# check any nan value contains and drop it
fact_immi.dropna(inplace=True)
dim_people.dropna(inplace=True)
dim_port.dropna(inplace=True)
dim_airport.dropna(inplace=True)
dim_demo.dropna(inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  after removing the cwd from sys.path.
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://p

In [23]:
#dim_temp is too large to process with pandas, therefore i used pyspark to process
df_temp_spark = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [24]:
df_temp_spark = df_temp_spark.filter(df_temp_spark.AverageTemperature != 'null')
df_temp_spark = df_temp_spark.filter(df_temp_spark.Country=='United States')


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.                        
Referenced by data modeling with postgresql

In [25]:
# After running create_tables.py, insert the data into the database
conn = psycopg2.connect("host=127.0.0.1 dbname=immigrationdb user=student password=student")
cur = conn.cursor()
conn.autocommit = True

In [26]:
for index, row in fact_immi.iterrows():
    cur.execute(fact_table_insert, list(row.values))

In [27]:
for index, row in dim_people.iterrows():
    cur.execute(people_table_insert, list(row.values))

In [28]:
for index, row in dim_port.iterrows():
    cur.execute(port_table_insert, list(row.values))

In [29]:
for index, row in dim_demo.iterrows():
    cur.execute(demo_table_insert, list(row.values))


In [30]:
# Temp table will be directly used with pyspark variable
df_temp_spark.show(1)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1820-01-01|2.1010000000000004|                        3.217|Abilene|United States|  32.95N|  100.53W|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
only showing top 1 row



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [31]:
# Perform quality checks
cur.execute("SELECT COUNT(*) FROM fact_immi")
if cur.rowcount < 1:
    print("Data quality check failed, Debug the error")
else:
    print("Data quality check pass")

Data quality check pass


In [32]:
cur.execute("SELECT COUNT(*) FROM people")
if cur.rowcount < 1:
    print("Data quality check failed, Debug the error")
else:
    print("Data quality check pass")

Data quality check pass


In [33]:
cur.execute("SELECT COUNT(*) FROM port")
if cur.rowcount < 1:
    print("Data quality check failed, Debug the error")
else:
    print("Data quality check pass")

Data quality check pass


In [34]:
cur.execute("SELECT COUNT(*) FROM demo")
if cur.rowcount < 1:
    print("Data quality check failed, Debug the error")
else:
    print("Data quality check pass")

Data quality check pass


In [35]:
#check temp table, this takes few seconds 
if df_temp_spark.count() <1:
    print("Data quality check failed, Debug the error")
else: 
    print("Data quality check pass")

Data quality check pass


#### The data model is appropriate for the identified purpose.

In [44]:
# answer "Find the group of people who are under 25"
#we can use the star schema now
cur.execute("""
SELECT *
FROM fact_immi as f
JOIN people as p
ON f.cicid = p.cicid
WHERE p.age < 25
LIMIT 1
""")
data = cur.fetchall()
print(data)

[(5, 985523, 'NEW YORK', 'United States', 2016, 4, 3, 985523, 'FRANCE', 'FRANCE', 19, 'F')]


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

check the Data Dictionary.md

#### Description
With this data model, we can find insight about immgrant to United States in 2016.      
I believe this data might be useful to people who govern demography.                  
For example, we can find when do people arrive most in april, by joining demo and port tables

-Who might use this data model :                                                                     
Immigration officers, business analysts and sales manager can use this data model      


-What types of answer can be done with this data model :                                                
1) how many man and woman became resident in newyork?                                  
2) which state has most immigrant?                                                    
3) which date did age 20~30 arrive the most?                                                              
4) what is the total population of immigrant?

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.                           
I used pandas to process and analyze data. Pandas is suitable to handle small data.                          
SQL and python are used during process. SQL is used for data quality check. It is comfortable                 
language for me to select particular data in data table.                                             
Python is used for data modeling.



* Propose how often the data should be updated and why.
In the immigration data file, all the data is from april which means
data should be updated every month

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.                                                                            
     I would use Spark to process data. Because the data size is huge that it may cause thresing.
     By using EMR, i don't have to worry about install and updating spark and use right away.
     When there are more variouse data source with various data type, i would consider using
     Cassandra, NoSql. There might be duplicate data but much faster and efficient to handle large data.
     ***
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.                                
     I would use airflow to schedule data processing. I would set SLA to make sure all the tasks 
     are done on time.
     ***
 * The database needed to be accessed by 100+ people.                                                               
     I would move the database to redshift data warehouse for accessiblity.
     Redshift has a auto-scaling function which automatically setting for performance.
    