# Udacity Data Engineer Nanodegree - Capstone Project
### Data Engineering Capstone Project

#### Project Summary

* The project is to build an ETL pipeline that extracts data from I94 Immigration Data set and World Temperature Data set, processes them using Spark, and loads data into data lake as a set of dimensional tables. The I94 Immigration Data comes from the US National Tourism and Trade Office Source. The World Temperature Dat dataset comes from Kaggle Source. 


* The I94 Immigration Data data lake will allow analysts to find insightes in how many, and what type of visitors are coming to the US. 

* The World Temperature Data lake will allow anlaysts to compare the temporary change month by month, or year by year acrooss multiple major cities. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [17]:
# Do all imports and installs here
import pandas as pd
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
import datetime
from  pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *

### Step 1: Scope the Project and Gather Data

#### Scope 


##### I94 Immigration Data (Data Source is in SAS format) 
* This data comes from the US National Tourism and Trade Office. Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited, and the top ports of entry. 
* The data set contains 3,096,313 Rows

##### World Temperature Data (Data sour is in CSV format) 
* This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.
* https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data
* The data set contains 8,599,212 Rows

#### Tools
* Python
* Pandas - exploratory data analysis on small data set
* PySpark - data processing on large data set


### Step 2: Explore and Assess the Data
#### Explore the Data 
1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding
3. Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic



#### 1. Explore I94 Immigration Data

In [2]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")
df_immi.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [3]:
df_City = pd.read_csv("Port_City.csv")
df_City.head(5)

Unnamed: 0,ALC,ALCAN,AK
0,ANC,ANCHORAGE,AK
1,BAR,BAKER AAF - BAKER ISLAND,AK
2,DAC,DALTONS CACHE,AK
3,PIZ,DEW STATION PT LAY DEW,AK
4,DTH,DUTCH HARBOR,AK


#### 2. Explore temperature data set

In [4]:
# Read in the data here
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp.head(5)
#df_temp.count()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Cleaning Steps
Document steps necessary to clean the data
# Performing cleaning tasks





Data Cleaning is combined in the modeling steps. Below are the cleaniing steps that will be performed. 
* SAS format need to transfer to standard date format.
* Since the Port City is Upper Case in the source table, we are going to convert the City to Uppder case in the dimentional table and Fact Table
* Use Distinct in sql to remove duplicate in the dimension table.
* Remove Null from the temperature data soure. 

In [18]:
#Connect to Data Source

#Read SAS file into df_immigration
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

#Create the view to run SQL quries
df_immigration.createOrReplaceTempView("df_spark")


In [6]:
#Connect to Data Source
#Read CSV file into  df_temp
df_temp=spark.read.format("csv").option("header","true").option("inferSchema","true").csv( '../../data2/GlobalLandTemperaturesByCity.csv')
#Create the view to run SQL quries
df_temp.createOrReplaceTempView("df_temp_tb")

In [7]:
#Connect to Data Source
#Read CSV file into df_City
Dim_PortCity = StructType([
    StructField("PortCode",StringType(),True),
    StructField("City",StringType(),True),
    StructField("States",StringType(),True),
])
df_City=spark.read.format("csv").schema(Dim_PortCity).csv("Port_City.csv")
df_City.createOrReplaceTempView("df_City")


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with star schema data modeling.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

* Data Model.jpg in the project







In [8]:
#Define the data model 

Fact_immigration_Schema = StructType([
    StructField("CICID",FloatType(),True),
    StructField("ArrivalDate",DateType(),True),
    StructField("I94DepartureDate",DateType(),True),
    StructField("i94modeID",StringType(),True),
    StructField("ArrivalFlag",StringType(),True),
    StructField("DepartureFlag",StringType(),True),
    StructField("UpdateFlag",StringType(),True),
    StructField("I94ArrivalMode",StringType(),True),
    StructField("I94Port",StringType(),True),
    StructField("I94Month",StringType(),True),
    StructField("I94Year",StringType(),True)])

Dim_Admission_Schema = StructType([
    StructField("CICID",FloatType(),True),
    StructField("AddmissionNumber",FloatType(),True),
    StructField("BirthYear",FloatType(),True),
    StructField("Gender",StringType(),True),
    StructField("PersonOccupation",StringType(),True),
    StructField("PersonAddress",StringType(),True),
    StructField("PersonOriginCity",StringType(),True),
    StructField("FlightNumber",StringType(),True),
    StructField("VisaType",StringType(),True),
    StructField("VisaIssuePlace",StringType(),True)])

Dim_Date_Schema = StructType([
    StructField("DateTime",DateType(),True),
    StructField("Year",IntegerType(),True),
    StructField("Month",IntegerType(),True),
    StructField("Day",IntegerType(),True)])

Dim_PortTemperature_Schema = StructType([
    StructField("PortCode",StringType(),True),
    StructField("City",StringType(),True),
    StructField("State",StringType(),True),
    StructField("Country",StringType(),True),
    StructField("AverageTemperature",FloatType(),True)])


In [9]:
# Conver the arrdate and depdate from SAS format to date format
# Translate the i94mode to readable format
Fact_immigration=Fact_immigration=spark.sql("""
SELECT distinct 
    cicid,
    date_add('1960-01-01', arrdate),
    date_add('1960-01-01', depdate),
    CASE 
    WHEN i94visa=1 THEN "Air"
    WHEN i94visa=2 THEN "Sea"
    WHEN i94visa=3 THEN "Land"
    ELSE "Not reported" END as i94mode,
    entdepa,
    entdepd,
    entdepd,
    i94mode,
    i94port,
    i94mon,
    i94yr
FROM df_spark 
where i94mode is not null
""")

In [10]:
# Translate the i94visa type to readable format
Dim_Admission=Dim_Admission=spark.sql("""
SELECT distinct 
    cicid,
    admnum,
    biryear,
    gender,
    occup,
    i94addr,
    i94cit,
    fltno,
CASE 
    WHEN i94visa=1 THEN "Business"
    WHEN i94visa=2 THEN "Pleasure"
    WHEN i94visa=3 THEN "Student"
    ELSE "Other" END as i94visa,
    visapost

FROM df_spark where i94mode is not null
""")

In [11]:
Dim_Date=Dim_Date=spark.sql("""
(SELECT distinct 
    date_add('1960-01-01', arrdate),
    Year(date_add('1960-01-01', arrdate)),
    Month(date_add('1960-01-01', arrdate)),
    Day(date_add('1960-01-01', arrdate))
FROM df_spark
where arrdate is not null)
UNION
(SELECT distinct 
    date_add('1960-01-01', depdate),
    Year(date_add('1960-01-01', depdate)),
    Month(date_add('1960-01-01', depdate)),
    Day(date_add('1960-01-01', depdate))
FROM df_spark
where arrdate is not null)

""")

In [12]:
Dim_PortTemperature=spark.sql("""
SELECT distinct 
df_City.PortCode,
upper(df_temp_tb.City), 
df_City.States,
df_temp_tb.Country,
avg(AverageTemperature) as AverageTemperature

FROM df_temp_tb 

join df_City
on upper(df_temp_tb.City)=upper(df_City.City)

where df_temp_tb.Country='United States' and AverageTemperature is not null
Group by 
df_City.PortCode,
df_temp_tb.City, 
df_City.States,
df_temp_tb.Country
""")

In [13]:

def readDataUsingSchema(df,schema):
    df=spark.createDataFrame(df.rdd, schema=schema)
    return df
    
Fact_immigration=readDataUsingSchema(Fact_immigration,Fact_immigration_Schema)
Dim_Admission=readDataUsingSchema(Dim_Admission,Dim_Admission_Schema)
Dim_Date=readDataUsingSchema(Dim_Date,Dim_Date_Schema)
Dim_PortTemperature=readDataUsingSchema(Dim_PortTemperature,Dim_PortTemperature_Schema)

#Fact_immigration = spark.createDataFrame(Fact_immigration.rdd, schema=Fact_immigration_Schema)
#Dim_Admission = spark.createDataFrame(Dim_Admission.rdd, schema=Dim_Admission_Schema)
#Dim_Date = spark.createDataFrame(Dim_Date.rdd, schema=Dim_Date_Schema)
#Dim_PortTemperature = spark.createDataFrame(Dim_PortTemperature.rdd, schema=Dim_PortTemperature_Schema)

Write data to Parquest

In [None]:
def loadToParquest(df,outpath):
    try:
        df.write.mode("overwrite").parquet("data/"+outpath)
    except:
        raise ValueError("Error exporting the data")
    print(" dataframe is loaded to" +outpath)

loadToParquest(Fact_immigration,"Fact_immigration")


In [None]:
loadToParquest(Dim_Date,"Dim_Date") 
loadToParquest(Dim_Admission,"Dim_Admission")
loadToParquest(Dim_PortTemperature,"Dim_PortTemperature")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 4.2.1 Data Quality Checks - Unit Tests for the script to ensure they are doing the right thing. 

In [53]:
def readFromParquet(table):
    df=spark.read.parquet("data/"+table)
    ##df.createOrReplaceTempView(table)
    return df

Fact_immigration=readFromParquet("Fact_immigration")
Dim_Date=readFromParquet("Dim_Date")
Dim_Admission=readFromParquet("Dim_Admission")
Dim_PortTemperature=readFromParquet("Dim_PortTemperature")

##### 4.2.1 Integrity constraints quality check

In [67]:
# Integrity constraints quality check
#Expected result: duplicate primary key does not exist 

def IntegrityConstraintsCheck(table,PK):
    result=spark.sql(f"""
    SELECT {PK} , count( {PK} )
       from {table}
       group by  {PK} 
       having  count( {PK} )>1
        """) 
    if result.count() == 0:
        print(f"Table {table} passed.")
    else:
        raise ValueError("Integrity ConstraintsCheck failed! Duplicated primary Key exist")
            
        
IntegrityConstraintsCheck( "Fact_immigration", "CICID")
IntegrityConstraintsCheck( "Dim_Admission", "CICID")
IntegrityConstraintsCheck( "Dim_Date", "DateTime")
IntegrityConstraintsCheck( "Dim_PortTemperature", "PortCode")

Table Fact_immigration passed.
Table Dim_Admission passed.
Table Dim_Date passed.
Table Dim_PortTemperature passed.


##### 4.2.2 Data Quality Checks No empty table

In [19]:
# Perform quality checks here

def row_count(table):
        try:
            df_spark=spark.read.parquet("data/"+table)
            RowCount = df_spark.count()
        except:
            RowCount=0
        
        
        if RowCount <= 0:
            raise ValueError("No data in the table!")
        else:
            print("Table: " + table + " has total " + str(RowCount) +" records.") 
    
row_count("Fact_immigration") 
row_count("Dim_Date") 
row_count("Dim_Admission") 
row_count("Dim_PortTemperature") 

Table: Fact_immigration has total 3096074 records.
Table: Dim_Date has total 236 records.
Table: Dim_Admission has total 3096074 records.
Table: Dim_PortTemperature has total 96 records.


#### 4.3 Data dictionary 
* See the Data dictionary in the project folder.
* Below is the is the ecidence tha the ETL has processed the result into the final data model

In [20]:
#Demo: Does visitor to United States like to travel to warmer or cooler cities? 

popular_cities=spark.sql("""
SELECT 
I94Port,
City,
State,
Country,
AverageTemperature,
Count(distinct Fact_immigration.CICID)
FROM Fact_immigration
left join Dim_PortTemperature
on Fact_immigration.I94Port= Dim_PortTemperature.PortCode

Group by 
I94Port,
City,
State,
Country,
AverageTemperature

Order by Count(distinct Fact_immigration.CICID) desc
""")
popular_cities.show(5)


+-------+-------------+-----+-------------+------------------+---------------------+
|I94Port|         City|State|      Country|AverageTemperature|count(DISTINCT CICID)|
+-------+-------------+-----+-------------+------------------+---------------------+
|    NYC|     NEW YORK|   NY|United States|          9.523295|               485913|
|    MIA|        MIAMI|   FL|United States|         23.068924|               343939|
|    LOS|  LOS ANGELES|   CA|United States|         15.878038|               310162|
|    SFR|SAN FRANCISCO|   CA|United States|         14.447988|               152583|
|    ORL|      ORLANDO|   FL|United States|         22.302603|               149194|
+-------+-------------+-----+-------------+------------------+---------------------+
only showing top 5 rows



In [21]:
# Demo the Fact_immigration table 
Fact_immigration_Out=spark.sql("""
SELECT *
FROM Fact_immigration
""")
Fact_immigration_Out.show(5)

+-----+-----------+----------------+---------+-----------+-------------+----------+--------------+-------+--------+-------+
|CICID|ArrivalDate|I94DepartureDate|i94modeID|ArrivalFlag|DepartureFlag|UpdateFlag|I94ArrivalMode|I94Port|I94Month|I94Year|
+-----+-----------+----------------+---------+-----------+-------------+----------+--------------+-------+--------+-------+
| 75.0| 2016-04-01|      2016-04-14|      Sea|          O|            I|         I|           1.0|    ATL|     4.0| 2016.0|
|174.0| 2016-04-01|            null|      Sea|          G|         null|      null|           1.0|    WAS|     4.0| 2016.0|
|267.0| 2016-04-01|      2016-04-15|      Sea|          G|            O|         O|           1.0|    NYC|     4.0| 2016.0|
|609.0| 2016-04-01|      2016-08-21|     Land|          G|            O|         O|           1.0|    BOS|     4.0| 2016.0|
|952.0| 2016-04-01|      2016-04-05|      Sea|          G|            O|         O|           1.0|    NEW|     4.0| 2016.0|
+-----+-

In [22]:
#Demo the Dim_Date table
Dim_Date_Out=spark.sql("""
SELECT *
FROM Dim_Date
""")
Dim_Date_Out.show(5)

+----------+----+-----+---+
|  DateTime|Year|Month|Day|
+----------+----+-----+---+
|2016-05-04|2016|    5|  4|
|2016-05-26|2016|    5| 26|
|2016-05-19|2016|    5| 19|
|2016-08-06|2016|    8|  6|
|2016-04-06|2016|    4|  6|
+----------+----+-----+---+
only showing top 5 rows



In [23]:
#Demo the Dim_Admission table
Dim_Admission_Out=spark.sql("""
SELECT *
FROM Dim_Admission
""")
Dim_Admission_Out.show(5)

+------+----------------+---------+------+----------------+-------------+----------------+------------+--------+--------------+
| CICID|AddmissionNumber|BirthYear|Gender|PersonOccupation|PersonAddress|PersonOriginCity|FlightNumber|VisaType|VisaIssuePlace|
+------+----------------+---------+------+----------------+-------------+----------------+------------+--------+--------------+
| 267.0|    5.5457133E10|   1957.0|     F|            null|           NY|           103.0|       00404|Pleasure|          null|
| 675.0|      5.54392E10|   1984.0|     F|            null|           CA|           103.0|       00066|Pleasure|          null|
|1371.0|    5.5420494E10|   1986.0|     M|            null|           NY|           104.0|       01401|Pleasure|          null|
|1548.0|     6.6757818E8|   1949.0|     F|            null|           NY|           104.0|           6|Pleasure|          null|
|1766.0|    5.5422444E10|   1995.0|     F|            null|           OR|           104.0|       00179|P

In [24]:
Dim_Temperature_Out=spark.sql("""
SELECT *
FROM Dim_PortTemperature
""")
Dim_Temperature_Out.show(5)

+--------+----------------+-----+-------------+------------------+
|PortCode|            City|State|      Country|AverageTemperature|
+--------+----------------+-----+-------------+------------------+
|     COS|COLORADO SPRINGS|   CO|United States|          8.777836|
|     SLC|  SALT LAKE CITY|   UT|United States|         10.177263|
|     FAY|    FAYETTEVILLE|   NC|United States|         16.415527|
|     PHI|    PHILADELPHIA|   PA|United States|         11.855868|
|     CRP|  CORPUS CHRISTI|   TX|United States|         21.533556|
+--------+----------------+-----+-------------+------------------+
only showing top 5 rows



#### Step 5: Complete Project Write Up


* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### Choice of tools and technologies for the project.
* Used Pandas to explore and assess the sample data. As the sample file is small and pandas data format is easy to read. 
* Used PySpark to process the large files and move the data to parquet in data lake. 

#####  Propose how often the data should be updated and why.

I94 Immigration Data 
World Temperature Data
* Depends on the frequence where source data is updated, the data lake can be updated accordeling. 


##### Future Design Considerations
* If the data was increased by 100x, I would process the data load in smaller batch. Also if buget allows, I would add more note to spark servers.


 * If data populates a dashboard that must be updated on a daily basis by 7am every day,  I would use Airflow to schedule the load. 

* If the database needed to be accessed by 100+ people, based on the users needs I will denomalized the data to so that users will not be to query multiple tables to reduce traffic. And if cost allow, move the data to AWS Redshipt database. 
