# Udacity Data Engineer Nanodegree - Capstone Project

#### Project Summary
This ultimate goal of this project is to build datawarehouse in BigQuery for analytical workloads. 
The goal of this notebook is to make exploratory data analysis of a given datasets.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope
This project will integrate I94 immigration data, world temperature data and US demographic data and the data about ports, counties and US states to setup a data warehouse with fact and dimension tables.

* Data Sets 
    1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
    2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
    3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

* Tools
    * GCS bucket: Data Lake
    * Python for building DAG that orchestrates all the steps
        * Pandas - exploratory data analysis on small data set
        * PySpark - data processing on large data set

#### Describe and Gather Data 

| Data Set | Format | Description |
| ---      | ---    | ---         |
|[I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)| SAS | Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).|
|[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)| CSV | This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.|
|[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)| CSV | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|


### Step 2: Explore and Assess the Data

#### Explore the data

1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding
3. Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic

#### Explore immigration data set

In [2]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immi.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
fact_immigration = df_immi[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']
fact_immigration.head(5)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [6]:
fact_immigration['country'] = 'United States'
fact_immigration.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [7]:
dim_immi_personal = df_immi[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]
dim_immi_personal.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]
dim_immi_personal.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [8]:
dim_immi_airline = df_immi[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]
dim_immi_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_immi_airline.head(5)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


#### Explore temperature data set

In [9]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [10]:
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
df_temp_usa = df_temp[df_temp['Country'] == 'United States']
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temp_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
df_temp_usa.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [12]:
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].apply(lambda t: t.year)
df_temp_usa['month'] = df_temp_usa['dt'].apply(lambda t: t.month)
df_temp_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


##### This data set doesn't contain USA 2016 temperature data

In [13]:
df_temp_usa_2016 = df_temp_usa[df_temp_usa['year'] == 2016]
df_temp_usa_2016.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month


#### Explore demography data set

In [14]:
df_demog = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demog.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [15]:
dim_city_population = df_demog[['City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
dim_city_population.columns = ['city', 'state', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
dim_city_population.head(5)

Unnamed: 0,city,state,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,Maryland,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,Quincy,Massachusetts,44129.0,49500.0,4147.0,32935.0,White
2,Hoover,Alabama,38040.0,46799.0,4819.0,8229.0,Asian
3,Rancho Cucamonga,California,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,Newark,New Jersey,138040.0,143873.0,5829.0,86253.0,White


In [16]:
dim_city_statistics = df_demog[['City', 'State', 'Median Age', 'Average Household Size']]
dim_city_statistics.columns = ['city', 'state', 'median_age', 'avg_household_size']
dim_city_statistics.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


#### Run data with Spark

In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [18]:
#write to parquet
# df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

#### Cleaning Steps
1. Transform arrive_date, departure_date in immigration data from SAS time format to pandad datetime format
2. Parse I94_SAS_Labels_Descriptions.SAS file to get auxiliary dimension table - country_code, city_code, state_code
3. Tranform city, state in demography data to upper case to match city_code and state_code table

#### 1. Transform arrdate, depdate from SAS time format to pandad.datetime

In [19]:
def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [20]:
fact_immigration['arrive_date'] = SAS_to_datetime(fact_immigration['arrive_date'])
fact_immigration['departure_date'] = SAS_to_datetime(fact_immigration['departure_date'])
fact_immigration.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,United States


#### 2. Parse description file to get auxiliary dimension table - country_code, city_code, state_code

In [21]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [22]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [23]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [24]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    city_code[code] = city

In [25]:
df_city_code = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
df_city_code.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [26]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state

In [27]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


#### 3. Tranform city, state in dimension talbe to upper case to match city_code and state_code table

In [28]:
dim_city_statistics['city'] = dim_city_statistics['city'].str.upper()
dim_city_statistics['state'] = dim_city_statistics['state'].str.upper()
dim_city_statistics.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,city,state,median_age,avg_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73


### Step 3: Define the Data Model

#### Conceptual Data Model
Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with snowflake data modeling.

* Snowflake Schema

	![alt text](./img/snowflake_schema.png)

#### Data Pipeline Build Up Steps

![alt_text](./img/pipeline_structure.png)

1. Create staging dataset.
2. Loading raw data from GCS bucket to staging dataset in BigQuery.
3. Checking the newly created tables.
4. Creating final dataset that will store our Snowflake Schema Data Model.
5. Running multiple queries from sql_queries.py script against staging tables to create dimensional and facts tables.
6. Checking the tables.

### Step 4: Run Pipelines to Model the Data 

#### 4.1 Create the data model

Data processing and data model was created by Cloud Composer.

Please refer to [dag.py](./dag.py).

#### 4.2 Data Quality Checks

Data quality checks are performed via BQ operators in Cloud Composer.
After creating a new table we first run the check on existen of a new table and then check if this table has any actual rows in it.

Please refer to [dag.py](./dag.py).

#### 4.3 Data dictionary 

#### facts_immigration

In [1]:
im = pd.read_csv('./data_dict/facts_immigration.csv')
im

Unnamed: 0,column_name,description
0,CICID,immigrant id (immigration parquet files)
1,YEAR,i94yr (immigration parquet files)
2,MONTH,i94mon (immigration parquet files)
3,COUNTRY_ORIGIN,immigrant's origin country (countries.csv)
4,PORT_NAME,name of the arriving port (ports.csv)
5,ARRIVAL_DATE,date of the arrival (immigration parquet files)
6,ARRIVAL_MODE,how immigrant has arrived (air/land/water) (im...
7,DESTINATION_SATE_CODE,destination state code (us_states.csv)
8,DESTINATION_SATE,destination state name (us_states.csv)
9,DEPARTURE_DATE,date of the departure (immigration parquet files)


#### dim_weather

In [2]:
w = pd.read_csv('./data_dict/dim_weather.csv')
w

Unnamed: 0,column_name,description
0,YEAR,year of measurement (GlobalLandTemperaturesByC...
1,CITY,city of measurement (GlobalLandTemperaturesByC...
2,COUNTRY,country of measurement (GlobalLandTemperatures...
3,LATITUDE,latitude of measurement (GlobalLandTemperature...
4,LONGITUDE,longitude of measurement (GlobalLandTemperatur...
5,AVG_TEMP,average temperature (GlobalLandTemperaturesByC...


#### dim_time

In [3]:
t = pd.read_csv('./data_dict/dim_time.csv')
t

Unnamed: 0,column_name,description
0,DATE,full arrival date (immigration parquet files)
1,DAY,arrival day (immigration parquet files)
2,MONTH,arrival month (immigration parquet files)
3,YEAR,arrival month (immigration parquet files)
4,QUARTER,arrival quarter (immigration parquet files)
5,DAYOFWEEK,arrival day of the week (immigration parquet f...
6,WEEKOFYEAR,arrival week of the year (immigration parquet ...


#### dim_cities

In [4]:
c = pd.read_csv('./data_dict/dim_cities.csv')
c

Unnamed: 0,column_name,description
0,CITY_NAME,name of the city (us-cities-demographics.csv)
1,STATE_NAME,name of the state (us-cities-demographics.csv)
2,STATE_ID,states' unique identifier (us-cities-demograph...
3,MEDIAN_AGE,most typical age in the city (us-cities-demogr...
4,MALE_POPULATION,number of males (us-cities-demographics.csv)
5,FEMALE_POPULATION,number of females (us-cities-demographics.csv)
6,TOTAL_POPULATION,total number of people (us-cities-demographics...
7,NUMBER_OF_VETERANS,number of veterans (us-cities-demographics.csv)
8,FOREIGN_BORN,was person born in US? (us-cities-demographics...
9,AVG_HOUSEHOLD_SIZE,average size of a house (us-cities-demographic...


#### dim_airports

In [6]:
a = pd.read_csv('./data_dict/dim_airports.csv')
a

Unnamed: 0,column_name,description
0,STATE_ID,unique identifier of the state where airport i...
1,AIRPORT_NAME,full name of the airport (airport-codes.csv)
2,IATA_CODE,International Air Transport Association code (...
3,LOCAL_CODE,local code of an airport (airport-codes.csv)
4,COORDINATES,latitude/longitude of an airport(airport-codes...


### Step 5: Complete Project Write Up

#### Tools and Technologies
1. GCS bucket as a Data Lake to store raw data.
2. Pandas and PySpark for sample data set exploratory data analysis
3. Cloud Composer for orchestrating the pipeline.
4. BigQuery for storing staging models and final Snowflake Schema.

#### Data Update Frequency
1. Tables created from immigration and temperature data set should be updated monthly since the raw data set is built up monthly.
2. Tables created from demography data set could be updated annually since demography data collection takes time and high frequent demography might take high cost but generate wrong conclusion.
3. All tables should be update in an append-only mode.

#### Future Design Considerations
1. The data was increased by 100x.

    If Spark with standalone server mode can not process 100x data set, we could consider to put data in Cloud Dataproc, which is a distributed data cluster for processing large data sets on cloud.
    
2. The pipelines would be run on a daily basis by 7 am every day.

    We use Cloud Composer as an execution environment for Apache Airflow, which in turn is an orchestration tool for scheduling the tasks across multiple GCP services. Airflow itself is not a data processing framework. We don't pass data in-memory between steps in DAG. Airflow just coordinates the movement of data between other data storage and data processing tools.
    
3. The database needed to be accessed by 100+ people.

    BiqQuery is the petabyte-scale datawarehouse designed for analytical workloads, which can easily handle access of 100+ people. 

### Future Improvements
There are several incompletions within these data sets. We will need to collect more data to get a more complete SSOT database.

1. Immigration data set is based at 2016 but temperature data set only get to 2013 which is not enough for us to see the temperature change at 2016.
	
2. Missing state and city in label description file. This makes it hard to join immigration tables and demography tables.