# Project Title
### Data Engineering Capstone Project
## Sebastian Baeza October - 2019
#### Project Summary

The data sources will be aggregated with Spark SQL (an instance is available by project template), and pandas will be used to analyze the data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
#Build SQL context object
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will get data from all available sources in the project template and create dimension and fact tables to show some insights of immigration in the USA.

Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 




#### Loading data in dataframes

In [8]:
df_demog = pd.read_csv("us-cities-demographics.csv", sep=";", header=0)

df_airport = pd.read_csv("airport-codes_csv.csv", sep=",", header=0)

df_temp = pd.read_csv("GlobalLandTemperaturesByState.csv", sep=",", header=0)

df_immi = pd.read_sas("../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat")

#### Loading data in Spark

In [5]:
# Read in the data here
spark_demographics=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

spark_airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

spark_temperature=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByState.csv")

spark_sas_data=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

First, let's take a view about data sources

#### U.S cities demographics

In [6]:
df_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
df_demog.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


#### comments about demographic data


#### Airport codes

In [16]:
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [17]:
df_airport.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


comments about airplane data

In [9]:
df_immi.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,46.0,2016.0,12.0,129.0,129.0,b'HOU',20789.0,1.0,b'TX',20802.0,...,,b'M',1970.0,b'05262018',b'M',,b'RS',97554140000.0,b'7715',b'E2'
1,56.0,2016.0,12.0,245.0,245.0,b'NEW',20789.0,1.0,b'OH',20835.0,...,,b'M',1988.0,b'D/S',b'F',,b'CA',90623720000.0,b'819',b'F1'
2,67.0,2016.0,12.0,512.0,512.0,b'PEV',20789.0,2.0,b'MD',20794.0,...,,b'M',1968.0,b'06012017',b'M',b'5920',,80105030000.0,,b'B2'
3,68.0,2016.0,12.0,512.0,512.0,b'PEV',20789.0,2.0,b'FL',20792.0,...,,b'M',1970.0,b'06012017',b'F',b'5920',,80105110000.0,,b'B2'
4,69.0,2016.0,12.0,512.0,512.0,b'PEV',20789.0,2.0,b'HI',20792.0,...,,b'M',1968.0,b'06012017',b'M',b'5920',,80105110000.0,,b'B2'


In [10]:
df_immi.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,biryear,admnum
count,3432990.0,3432990.0,3432990.0,3432021.0,3432990.0,3432990.0,3432990.0,3218202.0,3432297.0,3432990.0,3432990.0,3432297.0,3432990.0
mean,3661998.0,2016.0,12.0,345.9631,341.0846,20805.12,1.065441,20822.03,37.81565,1.949797,1.0,1978.184,21915410000.0
std,2064380.0,0.0,0.0,218.8854,216.6048,8.643925,0.4140615,37.35567,18.69041,0.3046412,0.0,18.69041,13383260000.0
min,6.0,2016.0,12.0,0.0,101.0,20789.0,1.0,-11972.0,-1.0,1.0,1.0,1907.0,0.0
25%,1873762.0,2016.0,12.0,135.0,135.0,20798.0,1.0,20807.0,24.0,2.0,1.0,1965.0,16656010000.0
50%,3745504.0,2016.0,12.0,251.0,245.0,20806.0,1.0,20819.0,37.0,2.0,1.0,1979.0,18033440000.0
75%,5414167.0,2016.0,12.0,582.0,582.0,20812.0,1.0,20826.0,51.0,2.0,1.0,1992.0,19430260000.0
max,7318723.0,2016.0,12.0,999.0,749.0,20819.0,9.0,20984.0,109.0,3.0,1.0,2017.0,99997520000.0


comments about immi

#### US Temperature

#### Cleaning Steps
Document steps necessary to clean the data

In [7]:
# Performing cleaning tasks here





Immigration Data by State with Origin

In [9]:
# 1.- Remove nulls values
# 2.- Convert i94res codes to country of origin
# 3.- Filter out NULLS
# 4.- Run country_udf function to show state names
# country_udf, abbrev_state_udf and city_code_udf were created with data from i94 SAS labels Descriptions file.
i94_sas_data=spark_sas_data.filter(spark_sas_data.i94addr.isNotNull())\
.filter(spark_sas_data.i94res.isNotNull())\
.filter(col("i94addr").isin(list(abbrev_state.keys())))\
.filter(col("i94port").isin(list(city_codes.keys())))\
.withColumn("origin_country",country_udf(spark_sas_data["i94res"]))\
.withColumn("dest_state_name",abbrev_state_udf(spark_sas_data["i94addr"]))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",city_code_udf(spark_sas_data["i94port"]))

new_i94_sas_data=i94_sas_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country","i94port","city_port_name",col("i94addr").alias("state_code"),"dest_state_name")

In [10]:
new_i94_sas_data.show(5)

+-----+----+-----+--------------+-------+------------------+----------+---------------+
|cicid|year|month|origin_country|i94port|    city_port_name|state_code|dest_state_name|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
| 46.0|2016|   12|         SPAIN|    HOU|HOUSTON           |        TX|          Texas|
| 56.0|2016|   12|    CHINA, PRC|    NEW|NEWARK/TETERBORO  |        OH|           Ohio|
| 67.0|2016|   12|       BAHAMAS|    PEV|PORT EVERGLADES   |        MD|       Maryland|
| 68.0|2016|   12|       BAHAMAS|    PEV|PORT EVERGLADES   |        FL|        Florida|
| 69.0|2016|   12|       BAHAMAS|    PEV|PORT EVERGLADES   |        HI|         Hawaii|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
only showing top 5 rows



In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [12]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

AnalysisException: 'path file:/home/workspace/sas_data already exists.;'

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.