# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project's main function is to put into practice all the concepts learned in this course. For this, I94 immigration data will be integrated with US Cities demographics data and airport data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import pyspark
from pyspark.sql.functions import col, when, count, udf, isnan, year, month, date_format
from pyspark.sql.types import StringType

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will integrate I94 immigration data, airport codes and US cities demographics. For this integration, the star schema model will be used.

#### Describe and Gather Data 
Data Sets:
- I94 Imigration Data
- Airport Codes
- US Cities Demographics

AWS S3 will be used for data storage and Pyspark for data processing and exploitation.

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. 

- US Cities Demographics: This data comes from OpenSoft.

- Airport Codes: This is a simple table of airport codes and corresponding cities.

Immigration Data Set

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

#write to parquet
df_immigration=spark.read.parquet("sas_data")
df_immigration.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

Demographics Data Set

In [4]:
df_demographics=spark.read.csv('us-cities-demographics.csv', inferSchema=True, header=True, sep=';')
df_demographics.head()

Row(City='Silver Spring', State='Maryland', Median Age=33.8, Male Population=40601, Female Population=41862, Total Population=82463, Number of Veterans=1562, Foreign-born=30908, Average Household Size=2.6, State Code='MD', Race='Hispanic or Latino', Count=25924)

Airport Codes Data Set

In [5]:
df_airport = spark.read.csv('airport-codes_csv.csv', inferSchema=True, header=True)
df_airport.head()

Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft=11, continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

Print Schema

In [6]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
df_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [8]:
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



Missing Values

In [9]:
test_immigration = df_immigration.select([(count(when(isnan(c) | col(c).isNull(), c))*100/df_immigration.count()).alias(c) for c in df_immigration.columns]).show()

+-----+-----+------+------+------+-------+-------+--------------------+-----------------+---------------+--------------------+-------+-----+--------------------+-----------------+-----------------+--------------------+-----------------+-----------------+-----------------+--------------------+--------------------+------------------+-----------------+-----------------+------+------------------+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|             i94mode|          i94addr|        depdate|              i94bir|i94visa|count|            dtadfile|         visapost|            occup|             entdepa|          entdepd|          entdepu|          matflag|             biryear|             dtaddto|            gender|           insnum|          airline|admnum|             fltno|visatype|
+-----+-----+------+------+------+-------+-------+--------------------+-----------------+---------------+--------------------+-------+-----+--------------------+-----------------+-----------

In [10]:
test_demographics = df_demographics.select([(count(when(isnan(c) | col(c).isNull(), c))*100/df_demographics.count()).alias(c) for c in df_demographics.columns]).show()

+----+-----+----------+-------------------+-------------------+----------------+-------------------+-------------------+----------------------+----------+----+-----+
|City|State|Median Age|    Male Population|  Female Population|Total Population| Number of Veterans|       Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+-------------------+-------------------+----------------+-------------------+-------------------+----------------------+----------+----+-----+
| 0.0|  0.0|       0.0|0.10377032168799723|0.10377032168799723|             0.0|0.44967139398132133|0.44967139398132133|    0.5534417156693185|       0.0| 0.0|  0.0|
+----+-----+----------+-------------------+-------------------+----------------+-------------------+-------------------+----------------------+----------+----+-----+



In [11]:
test_airport = df_airport.select([(count(when(isnan(c) | col(c).isNull(), c))*100/df_airport.count()).alias(c) for c in df_airport.columns]).show()

+-----+----+----+----------------+---------+-----------+----------+------------------+------------------+-----------------+-----------------+-----------+
|ident|type|name|    elevation_ft|continent|iso_country|iso_region|      municipality|          gps_code|        iata_code|       local_code|coordinates|
+-----+----+----+----------------+---------+-----------+----------+------------------+------------------+-----------------+-----------------+-----------+
|  0.0| 0.0| 0.0|12.7208352246936|      0.0|        0.0|       0.0|10.305946436677258|25.501588742623696|83.31547889241943|47.91466182478438|        0.0|
+-----+----+----+----------------+---------+-----------+----------+------------------+------------------+-----------------+-----------------+-----------+



#### Cleaning Steps
duplicated data and columns with more than 80% null data will be deleted 

In [12]:
df_immigration=df_immigration.dropDuplicates().drop(*['occup', 'entdepu','insnum'])

@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

df_immigration=df_immigration.withColumn("arrdate", convert_datetime(df_immigration.arrdate))
df_immigration=df_immigration.withColumnRenamed("cicid","immigration_id")
df_immigration=df_immigration.withColumnRenamed("i94yr","year")
df_immigration=df_immigration.withColumnRenamed("i94mon","month")
df_immigration=df_immigration.withColumnRenamed("i94cit","country_code_1")
df_immigration=df_immigration.withColumnRenamed("i94res","country_code_2")
df_immigration=df_immigration.withColumnRenamed("i94port","city_code")
df_immigration=df_immigration.withColumnRenamed("arrdate","arrival_date")
df_immigration=df_immigration.withColumnRenamed("i94mode","transportation_type")
df_immigration=df_immigration.withColumnRenamed("i94addr","state_code")
df_immigration=df_immigration.withColumnRenamed("depdate","departure_date")
df_immigration=df_immigration.withColumnRenamed("i94bir","respondent_age")
df_immigration=df_immigration.withColumnRenamed("i94visa","visa_code")
df_immigration=df_immigration.withColumnRenamed("dtadfile","file_date")
df_immigration=df_immigration.withColumnRenamed("visapost","state_department")
df_immigration=df_immigration.withColumnRenamed("entdepa","arrival_flag")
df_immigration=df_immigration.withColumnRenamed("entdepd","departure_flag")
df_immigration=df_immigration.withColumnRenamed("matflag","match_flag")
df_immigration=df_immigration.withColumnRenamed("biryear","birth_year")
df_immigration=df_immigration.withColumnRenamed("dtaddto","admitted_date")
df_immigration=df_immigration.withColumnRenamed("admnum","admission_number")
df_immigration=df_immigration.withColumnRenamed("fltno","flight_number")
df_immigration=df_immigration.withColumnRenamed("visatype","admission_class")

In [13]:
df_airport=df_airport.dropDuplicates().drop(*['iata_code'])

In [14]:
df_demographics=df_demographics.dropDuplicates()

df_demographics=df_demographics.withColumnRenamed("Median Age","median_age")
df_demographics=df_demographics.withColumnRenamed("Male Population","male_population")
df_demographics=df_demographics.withColumnRenamed("Female Population","female_population")
df_demographics=df_demographics.withColumnRenamed("Total Population","total_population")
df_demographics=df_demographics.withColumnRenamed("Number of Veterans","number_veterans")
df_demographics=df_demographics.withColumnRenamed("Foreign-born","foreign_born")
df_demographics=df_demographics.withColumnRenamed("Average Household Size","average_household_size")
df_demographics=df_demographics.withColumnRenamed("State Code","state_code")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data model used will be the star schema.

Fact Table
- Fact_Immigration\
immigration_id (PK)\
country_code_1\
country_code_2\
city_code\
state_code(FK)\
count\
arrival_date(FK)\
departure_date\
admission_number\
flight_number\
admission_class

Dimension Tables

- Dim_Demographics\
city\
State\
median_age\
male_population\
female_population\
total_population\
number_veterans\
foreign_born\
average_household_size\
state_code(Pk)\
race\
count


- Dim_Airport\
ident(PK)\
type\
name\
elevation_ft\
continent\
iso_country\
iso_region\
municipality\
gps_code\
local_code\
coordinates


- Dim_Person\
immigration_id(PK)\
gender\
birth_year\
visa_code


- Dim_arrival_calendar\
date(PK)\
year\
month\
day\
week


#### 3.2 Mapping Out Data Pipelines
1- Create Temp View of immigration table to populate immigration fact table, and arrival calendar and person dimension tables\
2- Write Parquets for demografics and airport dimension tables\
3- Select data for arrival calendar table and Write Parquet\
4- Select data for person table and Write Parquet\
5- Select data for immigration table and Write Parquet


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Create Dimension Table

In [15]:
df_demographics.write.parquet("dim_demographics")
df_airport.write.parquet("dim_airport")

AnalysisException: 'path file:/home/workspace/dim_demographics already exists.;'

In [None]:
df_calendar=df_immigration.select("arrival_date")
df_calendar=df_calendar.withColumnRenamed("arrival_date","date")
df_calendar=df_calendar.withColumn("year",year(df_calendar.date))
df_calendar=df_calendar.withColumn("month",month(df_calendar.date))
df_calendar=df_calendar.withColumn("week",date_format(col("date"),"w"))
df_calendar=df_calendar.withColumn("day",date_format(col("date"),"d"))
df_calendar=df_calendar.dropDuplicates()
df_calendar.write.partitionBy("year","month").parquet("dim_arrival_calendar")

In [None]:
df_person=df_immigration.select("immigration_id","gender","birth_year","visa_code").dropDuplicates()
df_person.write.parquet("dim_person")

Create Fact Table

In [None]:
df_immigration=df_immigration.select("immigration_id","country_code_1","country_code_2","city_code","state_code","count", "arrival_date", "departure_date", "admission_number", "flight_number", "admission_class").dropDuplicates()
df_immigration.write.partitionBy("country_code_1","arrival_date").parquet("fact_immigration")

#### 4.2 Data Quality Checks
To check the quality of the data, select count(*), count distinct PK and count PK will be done to confirm that we have data in the dimension and fact tables and that there is no duplicate data.

Run Quality Checks

Create Temp Tables for checks

In [None]:
df_demographics.createOrReplaceTempView("dim_demographics")
df_airport.createOrReplaceTempView("dim_airport")
df_arrival_calendar.createOrReplaceTempView("dim_arrival_calendar")
df_person.createOrReplaceTempView("dim_person")
df_immigration.createOrReplaceTempView("fact_immigration")

In [None]:
spark.sql("""
select 
count(*)
, count(distinct state_code)
, count(state_code)
from dim_demographics
""").show()

In [None]:
spark.sql("""
select count(*)
, count(distinct ident)
, count(ident)
from dim_airport
""").show()

In [None]:
spark.sql("""
select count(*)
, count(distinct date)
, count(date)
from dim_arrival_calendar
""").show()

In [None]:
spark.sql("""
select count(*)
, count(distinct imigration_id)
, count(imigration_id)
from dim_person
""").show()

In [None]:
spark.sql("""
select count(*)
, count(distinct imigration_id)
, count(imigration_id)
from fact_immigration
""").show()

#### 4.3 Data dictionary 
Document.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

As we are dealing with long and historical data, it is essential to use spark.

* Propose how often the data should be updated and why.

The automation of the process depends on the frequency of reporting data and the frequency of updating the databases. Analyzing this data, it is estimated that they are reported and updated once a month, therefore, the suggested automation would be once a month, 1 day before the report and on the day of the base update.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.


As we are already using a system that supports bigdata, the only difference would be to add more processing time before reporting the data.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

In this case, it would be necessary to understand the time that the base is updated, so, at the end of this time, the pipeline would be updated soon afterwards to have plenty of time to correct possible errors.

 * The database needed to be accessed by 100+ people.

For this, it is necessary to add these tables in a data warehouse or lake, so that the users will be able to access the star schema without problems.