# Project Title
### Data Engineering Capstone Project

#### Project Summary
*--describe your project at a high level--*

This project uses datasets on US Immigration, Demographics and Global Temperatures. The goal of the project is to combine the skills learned throught the Udacity Data Engineering Nano-degree and prepare an ETL pipeline and a Data Model to be used for analytics.
We do this by gathering and exploring source data in order to determine a proper Data Model that suits our purposes. Then we build a data pipeline that extracts the source information, transforms it by various clean-up steps, the loads it into the final tables of our defined data model.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [83]:
# Do all imports and installs here
import pandas as pd
import os
import configparser

import datetime as dt

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import isnan, when, count, avg, col, udf, dayofmonth, dayofweek, month, year, weekofyear, monotonically_increasing_id, desc
from pyspark.sql.types import *

In [2]:
config = configparser.ConfigParser()
config.read('credentials.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

spark = SparkSession.builder.getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
*Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>*

In this project we plan to use the datasets on US Immigration, Demographics and Global Temperatures as source files, develop a data pipeline in order to prepare them for loading into a data lake stored as parquet files on AWS S3 using a Data Model that we will define along the way.

With this scope in hand, the main phases of the development will be the following:
1. **Load** the datasets and do some **Exploratory Data Analysis**.
    
    This will do two things: first, allows us to understand the data, which will build our intuition for what the final Data Model will be, and secondly, it will start revealing the necessary steps in our Data Pipeline (e.g.: how to load each dataset, what clean-up steps are needed, what flaws we find, what data quality checks might be needed, etc.)


2. Perform **clean-up steps** on each of the datasets.
    
    With the knowledge gathered in the previous phase, we start actually implementing cleaning steps and check further each datasets for potential quality issues. This will be the basis for some functions that will be used in our Data Pipeline.
    

3. **Define our Data Model**.
    
    In this phase, we will think more closely about how the final Data Model will look like, what will be the goal for it, what will be our fact and dimension tables, what schema will we use.
    As we will see below, it will be shown that a slightly modified variant of the classic star schema will best suit our needs of providing a simple, easy-to-understand model that also allows for efficient queries for analytical purposes.
    

4. Build the **Data Pipeline** and perform **Data Quality Checks**.
    
    In this phase, we use the knowledge and steps identified earlier in order to build functions that will extract data from the source files, transform it by various clean-up steps, then load it into our fact and dimension tables previously defined.
    Finally, we perform some simple data quality checks in order to ensure that our data pipeline ran as expected.

To this end the technologies that we used are: python, Spark, AWS, Jupyter Notebooks.

#### Describe and Gather Data 
*Describe the data sets you're using. Where did it come from? What type of information is included?*

##### 1. I94 Immigration Data

This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

Data Dictionary:

* **cicid**: Unique record ID
* **i94yr**: 4 digit year
* **i94mon**: Numeric month
* **i94cit**: 3 digit code for immigrant country of birth
* **i94res**: 3 digit code for immigrant country of residence
* **i94port**: Port of admission
* **arrdate**: Arrival Date in the USA
* **i94mode**: Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
* **i94addr**: USA State of arrival
* **depdate**: Departure Date from the USA
* **i94bir**: Age of Respondent in Years
* **i94visa**: Visa codes collapsed into three categories
* **count**: Field used for summary statistics
* **dtadfile**: Character Date Field - Date added to I-94 Files
* **visapost**: Department of State where where Visa was issued
* **occup**: Occupation that will be performed in U.S
* **entdepa**: Arrival Flag - admitted or paroled into the U.S.
* **entdepd**: Departure Flag - Departed, lost I-94 or is deceased
* **entdepu**: Update Flag - Either apprehended, overstayed, adjusted to perm residence
* **matflag**: Match flag - Match of arrival and departure records
* **biryear**: 4 digit year of birth
* **dtaddto**: Character Date Field - Date to which admitted to U.S. (allowed to stay until)
* **gender**: Non-immigrant sex
* **insnum**: INS number
* **airline**: Airline used to arrive in U.S.
* **admnum**: Admission Number
* **fltno**: Flight number of Airline used to arrive in U.S.
* **visatype**: lass of admission legally admitting the non-immigrant to temporarily stay in U.S.

In [4]:
# Read in the data here
i94_data = spark.read.load('./sas_data')

In [5]:
i94_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
i94_data.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [10]:
# df.head()

In [7]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.\
# config("spark.jars.repositories", "https://repos.spark-packages.org/").\
# config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
# enableHiveSupport().getOrCreate()

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
# #write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

##### 2. Temperature Data 

This dataset came from Kaggle. From their dataset description:
*We have repackaged the data from a newer compilation put together by the [Berkeley Earth](http://berkeleyearth.org/about/), which is affiliated with Lawrence Berkeley National Laboratory. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. It is nicely packaged and allows for slicing into interesting subsets (for example by country). They publish the source data and the code for the transformations they applied. They also use methods that allow weather observations from shorter time series to be included, meaning fewer observations need to be thrown away.*

The raw data comes from the [Berkeley Earth data page](http://berkeleyearth.org/data/).

Data Dictionary:

* **dt**: Date
* **AverageTemperature**: Global average land temperature in celsius
* **AverageTemperatureUncertainty**: 95% confidence interval around the average
* **City**: Name of City
* **Country**: Name of Country
* **Latitude**: City Latitude
* **Longitude**: City Longitude

In [6]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = spark.read.csv(fname, header=True, inferSchema=True)

In [7]:
temp_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [14]:
temp_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### 3. U.S. City Demographic Data

This data comes from OpenSoft.

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

This data comes from the US Census Bureau's 2015 American Community Survey.

Reference: https://www.census.gov/data/developers/about/terms-of-service.html

Data Dictionary:

* **City**: City Name
* **State**: US State where city is located
* **Median Age**: Median age of the population
* **Male Population**: Count of male population
* **Female Population**: Count of female population
* **Total Population**: Count of total population
* **Number of Veterans**: Count of total Veterans
* **Foreign born**: Count of residents of the city that were not born in the city
* **Average Household Size**: Average city household size
* **State Code**: Code of the US state
* **Race**: Respondent race
* **Count**: Count of city's individual per race

In [8]:
fname = 'us-cities-demographics.csv'
demog_df = spark.read.csv(fname, header=True, inferSchema=True, sep=';')

In [9]:
demog_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [6]:
demog_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
*Identify data quality issues, like missing values, duplicate data, etc.*

#### Cleaning Steps
*Document steps necessary to clean the data:*
* drop columns with over 70% missing values;
* drop rows with missing data, taking into account unique identifier for row per dataset;
* drop duplicates;

##### 1. I94 Immigration Data

In [11]:
i94_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [11]:
# Performing cleaning tasks here

In [7]:
total_row_count = i94_data.count()

In [8]:
col_nulls_df = i94_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in i94_data.columns]).toPandas()

In [9]:
col_nulls_df = pd.melt(col_nulls_df, var_name='Column Name', value_name='Null Count')

In [10]:
col_nulls_df['Ratio to total'] = (col_nulls_df['Null Count']/total_row_count).round(3)

In [11]:
col_nulls_df.loc[col_nulls_df['Ratio to total'] > 0.7]

Unnamed: 0,Column Name,Null Count,Ratio to total
15,occup,3088187,0.997
18,entdepu,3095921,1.0
23,insnum,2982605,0.963


In [12]:
cols = ['occup', 'entdepu', 'insnum']

i94_data = i94_data.drop(*cols)

In [13]:
i94_data = i94_data.dropDuplicates(['cicid'])

In [14]:
i94_data = i94_data.dropna(how='all', subset=['cicid'])

##### 2. Temperature Data

In [12]:
temp_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [17]:
temp_df = temp_df.dropna(how='all', subset=['AverageTemperature'])

In [18]:
temp_df = temp_df.dropDuplicates(['dt', 'City', 'Country'])

##### 3. U.S. City Demographic Data

In [19]:
demog_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [20]:
demog_df = demog_df.dropna(how='all', subset=['City', 'Male Population', 'Female Population', 'Foreign-born', 'Average Household Size'])

In [21]:
demog_df = demog_df.dropDuplicates(['City', 'Male Population', 'Female Population', 'Foreign-born', 'Average Household Size'])

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
*Map out the conceptual data model and explain why you chose that model*

In order to optimize for simple, easy-to-understand, efficient, queries for analytical use of the Database, the star schema is the best solution for this scope. We also applied a small change to the classic star schema to include the Temperatures data with a relationship to the US Cities.

![Alt text](./Udacity-DEND-Capstone-ERD.svg)

#### 3.2 Mapping Out Data Pipelines
*List the steps necessary to pipeline the data into the chosen data model*

The pipeline has the following steps:
1. Extract the data from the .CSV or parquet source files.
2. Transform, or clean the data by dropping duplicates, nulls and, optionally, columns with mostly or only nulls.
3. Create the 4 dimension tables from the I94 Immigration, USA Demographics & Temparatures datasets by selecting required column, adjusting names and data types, and extracting other information (e.g.: data/time formats).
4. Create the i94 immigration fact table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create functions needed in processing the source data

In [18]:
# Write code here

In [10]:
def create_spark_session():
    """
    Creates Spark Session to be used for data processing.
    """
    
#     spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
    os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
    os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

    spark = SparkSession.builder.getOrCreate()
    
    return spark

In [11]:
spark = create_spark_session()

In [12]:
def clean_dataset(df, subset=None, drop_cols=False, thresh=0.7):
    """
    Cleans-up dataset by dropping duplicates and NaN's/Nulls.
    Optionally, it drops columns with high % of nulls.
    Input:
        df: Spark DataFrame
        subset: default None. List of column names to take into account.
        drop_cols: default False. Whether to drop columns with high % of Nulls.
        thresh: default 0.7. Threshold for deciding % of nulls of column to drop.
    """
    
    df = df.dropna(how='all', subset=subset)
    df = df.dropDuplicates(subset)
    
    if drop_cols:
        total_row_count = df.count()
        col_nulls_df = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()
        col_nulls_df = pd.melt(col_nulls_df, var_name='Column Name', value_name='Null Count')
        col_nulls_df['Ratio to total'] = (col_nulls_df['Null Count']/total_row_count).round(3)
        cols = col_nulls_df.loc[col_nulls_df['Ratio to total'] > 0.7].columns.tolist()
        df = df.drop(*cols)
    
    return df

##### Create the dimension tables

In [13]:
def create_dim_calendar_table(df, output_location):
    """
    Creates the dim_calendar table for the data model.
    Input:
        df: Spark DataFrame.
        output_location: String. Directory or S3 location to write the parquet file.
    """
    
    # udf to convert dt into a datetime data type.
    get_dt = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # create initial dataFrame, extract required columns, date & time formats.
    calendar = df.select(['arrdate']).withColumn("arrdate", get_dt(df.arrdate)).distinct()
    calendar = calendar.withColumn("id", monotonically_increasing_id())
    calendar = calendar.withColumn("year", year("arrdate"))
    calendar = calendar.withColumn("month", month("arrdate"))
    calendar = calendar.withColumn("day", dayofmonth("arrdate"))
    calendar = calendar.withColumn("week", weekofyear("arrdate"))
    calendar = calendar.withColumn("weekday", dayofweek("arrdate"))
    
    # write to parquet file
    partition_cols = ["year", "month"]
    calendar.write.parquet(output_location + "dim_calendar", partitionBy=partition_cols, mode="overwrite")
    
    return calendar

In [14]:
output_location = "output/"

In [15]:
calendar = create_dim_calendar_table(i94_data, output_location=output_location)

In [16]:
calendar.show(5)

+----------+-----------+----+-----+---+----+-------+
|   arrdate|         id|year|month|day|week|weekday|
+----------+-----------+----+-----+---+----+-------+
|2016-04-22| 8589934592|2016|    4| 22|  16|      6|
|2016-04-15|25769803776|2016|    4| 15|  15|      6|
|2016-04-18|42949672960|2016|    4| 18|  16|      2|
|2016-04-09|68719476736|2016|    4|  9|  14|      7|
|2016-04-11|85899345920|2016|    4| 11|  15|      2|
+----------+-----------+----+-----+---+----+-------+
only showing top 5 rows



In [33]:
def create_dim_usa_demographics_table(df, output_location):
    """
    Creates the dim_usa_demographics table for the data model.
    Input:
        df: Spark DataFrame.
        output_location: String. Directory or S3 location to write the parquet file.
    """
    
    # create DataFrame with required column names
    usdemog_df = df.withColumnRenamed("City", "city")\
                .withColumnRenamed("State", "state") \
                .withColumnRenamed("Median Age", "median_age") \
                .withColumnRenamed("Male Population", "male_pop") \
                .withColumnRenamed("Female Population", "female_pop") \
                .withColumnRenamed("Total Population", "total_pop") \
                .withColumnRenamed("Number of Veterans", "veteran_number") \
                .withColumnRenamed("Foreign-born", "foreign_born") \
                .withColumnRenamed("Average Household Size", "avg_household_size") \
                .withColumnRenamed("State Code", "state_code") \
                .withColumnRenamed("Race", "race") \
                .withColumnRenamed("Count", "count")
    
    # add id column
    usdemog_df = usdemog_df.withColumn("id", monotonically_increasing_id())
    
    # write parquet file
    usdemog_df.write.parquet(output_location + "dim_usa_demographics", mode="overwrite")
    
    return usdemog_df

In [34]:
us_demographics = create_dim_usa_demographics_table(demog_df, output_location=output_location)

In [35]:
us_demographics.show(5)

+----------------+-------------+----------+--------+----------+---------+--------------+------------+------------------+----------+--------------------+-----+---+
|            city|        state|median_age|male_pop|female_pop|total_pop|veteran_number|foreign_born|avg_household_size|state_code|                race|count| id|
+----------------+-------------+----------+--------+----------+---------+--------------+------------+------------------+----------+--------------------+-----+---+
|   Silver Spring|     Maryland|      33.8|   40601|     41862|    82463|          1562|       30908|               2.6|        MD|  Hispanic or Latino|25924|  0|
|          Quincy|Massachusetts|      41.0|   44129|     49500|    93629|          4147|       32935|              2.39|        MA|               White|58723|  1|
|          Hoover|      Alabama|      38.5|   38040|     46799|    84839|          4819|        8229|              2.58|        AL|               Asian| 4759|  2|
|Rancho Cucamonga|   C

In [44]:
def create_dim_immigrant_table(df, output_location):
    """
    Creates the dim_immigrant table for the data model.
    Input:
        df: Spark DataFrame.
        output_location: String. Directory or S3 location to write the parquet file. 
    """
    
    # create a DataFrame with required columns.
    cols = [
        "cicid"
        , "i94cit"
        , "i94res"
        , "i94mode"
        , "depdate"
        , "i94bir"
        , "gender"
    ]
    im_df = df.select(cols)
    
    # write parquet file
    im_df.write.parquet(output_location + "dim_immigrant", mode="overwrite")
    
    return im_df

In [45]:
immig_df = create_dim_immigrant_table(i94_data, output_location)

In [46]:
immig_df.show(5)

+---------+------+------+-------+-------+------+------+
|    cicid|i94cit|i94res|i94mode|depdate|i94bir|gender|
+---------+------+------+-------+-------+------+------+
|5748517.0| 245.0| 438.0|    1.0|20582.0|  40.0|     F|
|5748518.0| 245.0| 438.0|    1.0|20591.0|  32.0|     F|
|5748519.0| 245.0| 438.0|    1.0|20582.0|  29.0|     M|
|5748520.0| 245.0| 438.0|    1.0|20588.0|  29.0|     F|
|5748521.0| 245.0| 438.0|    1.0|20588.0|  28.0|     M|
+---------+------+------+-------+-------+------+------+
only showing top 5 rows



In [47]:
def create_dim_temperatures_table(df, output_location):
    """
    Creates the dim_temperatures table for the data model.
    Input:
        df: Spark DataFrame.
        output_location: String. Directory or S3 location to write the parquet file. 
    """
    
    # create a DataFrame with required columns.
    temp_df = df.withColumnRenamed("dt", "dt") \
                .withColumnRenamed("AverageTemperature", "avg_temp") \
                .withColumnRenamed("AverageTemperatureUncertainty", "avg_temp_uncertainty") \
                .withColumnRenamed("City", "city") \
                .withColumnRenamed("Country", "country") \
                .withColumnRenamed("Latitude", "latitude") \
                .withColumnRenamed("Longitude", "longitude")
    
    temp_df.write.parquet(output_location + "dim_temperatures", mode="overwrite")
    
    return temp_df

In [48]:
dimTemp_df = create_dim_temperatures_table(temp_df, output_location)

In [49]:
dimTemp_df.show(5)

+-------------------+--------+--------------------+-----+-------+--------+---------+
|                 dt|avg_temp|avg_temp_uncertainty| city|country|latitude|longitude|
+-------------------+--------+--------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|   6.068|  1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|    null|                null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|    null|                null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|    null|                null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|    null|                null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+--------+--------------------+-----+-------+--------+---------+
only showing top 5 rows



##### Create I94 Immigration Fact table

In [50]:
def create_fact_i94_immigration_table(df, output_location):
    """
    Creates the fact_i94_immigration table for the data model.
    Input:
        df: Spark DataFrame.
        output_location: String. Directory or S3 location to write the parquet file. 
    """
    
    # udf to convert dt into a datetime data type.
    get_dt = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
        
    # create a DataFrame with required columns.
    cols = [
        "cicid"
        , "arrdate"
        , "count"
        , "visapost"
        , "entdepa"
        , "entdepd"
        , "biryear"
        , "dtaddto"
        , "airline"
        , "fltno"
        , "visatype"
        , "i94addr"
        , "i94visa"
    ]
    i94_df = df.select(cols)
    
    # convert arrdate column into Datatime data type
    i94_df = i94_df.withColumn("arrdate", get_dt(df.arrdate))
    
    # write parquet file
    i94_df.write.parquet(output_location + "fact_i94_immigration", mode="overwrite")
    
    return i94_df

In [51]:
factI94_df = create_fact_i94_immigration_table(i94_data, output_location)

In [52]:
factI94_dfI94_df.show(5)

+---------+----------+-----+--------+-------+-------+-------+--------+-------+-----+--------+-------+-------+
|    cicid|   arrdate|count|visapost|entdepa|entdepd|biryear| dtaddto|airline|fltno|visatype|i94addr|i94visa|
+---------+----------+-----+--------+-------+-------+-------+--------+-------+-----+--------+-------+-------+
|5748517.0|2016-04-30|  1.0|     SYD|      G|      O| 1976.0|10292016|     QF|00011|      B1|     CA|    1.0|
|5748518.0|2016-04-30|  1.0|     SYD|      G|      O| 1984.0|10292016|     VA|00007|      B1|     NV|    1.0|
|5748519.0|2016-04-30|  1.0|     SYD|      G|      O| 1987.0|10292016|     DL|00040|      B1|     WA|    1.0|
|5748520.0|2016-04-30|  1.0|     SYD|      G|      O| 1987.0|10292016|     DL|00040|      B1|     WA|    1.0|
|5748521.0|2016-04-30|  1.0|     SYD|      G|      O| 1988.0|10292016|     DL|00040|      B1|     WA|    1.0|
+---------+----------+-----+--------+-------+-------+-------+--------+-------+-----+--------+-------+-------+
only showi

In [39]:
def process_i94_data(spark, input_location, output_location, datadir):
    """
    ETL for the i94 immigration data. 
    Creates the dim_calendar, dim_immigrant and fact_i94_immigration tables.
    Input:
        spark: Spark Session.
        input_location: String. Directory or S3 location to read file.
        output_location: String. Directory or S3 location to write the parquet file. 
        datadir: String. Directory with parquet files with source data.
    """
    
    # read files into Spark DataFrame
    i94_data = spark.read.load(datadir)
    
    # clean-up i94 data
    i94_df = clean_dataset(i94_data, subset=None, drop_cols=False, thresh=0.7)
    
    # create dim_calendar table and save to parquet file in output location
    calendar_table = create_dim_calendar_table(i94_df, output_location)
    
    # create dim_immigrant table and save to parquet file in output location
    im_table = create_dim_immigrant_table(i94_df, output_location)
    
    # create fact_i94_immigration table and save to parquet file in output location
    i94_table = create_fact_i94_immigration_table(i94_df, output_location)

In [40]:
def process_usa_demog_data(spark, input_location, output_location, fname):
    """
    ETL for the USA Demographics data. 
    Creates the dim_calendar, dim_immigrant and fact_i94_immigration tables.
    Input:
        spark: Spark Session.
        input_location: String. Directory or S3 location to read file.
        output_location: String. Directory or S3 location to write the parquet file. 
        fname: String. Name of files with source data.
    """
    
    # read file into Spark DataFrame
    demog_data = spark.read.csv(fname, header=True, inferSchema=True, sep=';')
    
    # clean-up demographics data
    demog_df = clean_dataset(demog_data, subset=None, drop_cols=False, thresh=0.7)
    
    # create dim_usa_demographics table and save to parquet file in output location
    demog_table = create_dim_usa_demographics_table(demog_df, output_location)

In [41]:
def process_temperatures_data(spark, input_location, output_location, fname):
    """
    ETL for the Tables data. 
    Creates the dim_calendar, dim_immigrant and fact_i94_immigration tables.
    Input:
        spark: Spark Session.
        input_location: String. Directory or S3 location to read file.
        output_location: String. Directory or S3 location to write the parquet file. 
        fname: String. Name of files with source data.
    """
    
    # read file into Spark DataFrame
    temp_data = spark.read.csv(fname, header=True, inferSchema=True, sep=';')
    
    # clean-up demographics data
    temp_df = clean_dataset(temp_data, subset=None, drop_cols=False, thresh=0.7)
    
    # create dim_usa_demographics table and save to parquet file in output location
    temp_table = create_dim_temperatures_table(temp_df, output_location)

#### 4.2 Data Quality Checks
*Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:*
 * *Integrity constraints on the relational database (e.g., unique key, data type, etc.)*
 * *Unit tests for the scripts to ensure they are doing the right thing*
 * *Source/Count checks to ensure completeness*

The data quality checks performed on the final datasets in order to ensure that the pipeline ran correctly and as expected are the following:
* Check total number of rows in dataset. If no rows are loaded, it means that the pipeline failed.
* Check number of duplicates. If it is greater than 0, it means that the data clean-up did not work correctly and we have duplicate data in our tables.

Run Quality Checks

In [47]:
# Perform quality checks here

In [44]:
def data_quality_checks(df):
    """
    Runs data quality checks against newly created table in the data model.
    Input:
        df: Spark DataFrame.
    """
    
    total_rows = df.count()
    no_duplicate_rows = df.dropDuplicates(how='all').count()
    total_duplicates = total_rows - no_duplicate_rows
    
    if total_rows == 0:
        return "Data quality check failed. Table has 0 records."
    else:
        if total_duplicates == 0:
            return f"Data quality check failed. Table has {total_duplicates} duplicates."
        else:
            return f"Data quality check passed. Table has {total_rows} and no duplicates."

#### 4.3 Data dictionary 
*Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.*

**Dimension Tables**:

1. **dim_immigrant**:
    * ccid: Unique record ID
    * i94cit: 3 digit code for immigrant country of birth
    * i94res: 3 digit code for immigrant country of residence
    * i94mode: Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
    * depdate: Departure Date from the USA
    * i94bir: Age of Respondent in Years
    * gender: Non-immigrant sex
2. **dim_usa_demographics**:
    * id: Unique record ID
    * city: City Name
    * state: US State where city is located
    * median_age: Median age of the population
    * male_pop: Count of male population
    * female_pop: Count of female population
    * total_pop: Count of total population
    * veteran_number: Count of total Veterans
    * foreign_born: Count of residents of the city that were not born in the city
    * avg_household_size: Average city household size
    * race: Respondent race
    * count: Count of city's individual per race
3. **dim_temperatures**:
    * dt: Date
    * avg_temp: Global average land temperature in celsius
    * avg_temp_uncertainty: 95% confidence interval around the average
    * city: Name of City
    * country: Name of Country
    * latitude: City Latitude
    * longitude: City Longitude
4. **dim_calendar**:
    * arrdate: Arrival Date in the USA
    * year: 4 digit year
    * month: Numeric month of year
    * day: Numeric day of month
    * week: Numeric week of year
    * weekday: Day of week.

**Fact Table**:
1. **fact_i94_immigration**:
    * ccid: Unique record ID
    * arrdate: Arrival Date in the USA
    * count: Field used for summary statistics
    * visapost: Department of State where where Visa was issued
    * entdepa: Arrival Flag - admitted or paroled into the U.S.
    * entdepd: Departure Flag - Departed, lost I-94 or is deceased
    * biryear: 4 digit year of birth
    * dtaddto: 	Character Date Field - Date to which admitted to U.S. (allowed to stay until)
    * airline: 	Airline used to arrive in U.S.
    * fltno: Flight number of Airline used to arrive in U.S.
    * visatype: Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
    * i94addr: USA State of arrival
    * i94visa: Visa codes collapsed into three categories

#### Step 5: Complete Project Write Up
* *What is target audience, who is going to utilize the final data model?*
    
    The final data lake will be best suited for an analytics team with familiarity of the technologies used and the skills to connect, analyze and even visualize the resulting insights. 
    
    
* *What are some of the types of questions we can handle using the proposed data model?, Please be specific as we also need to show the output in the notebook at the end to validate if the data model is able to answer the questions*
    
    This Data Model will allow us to answer questions such as: 
        - What are the 10 US states with the most immigrants arriving?
        - What are the 10 countries of birth with the most immigrants to the US?
        - What is the gender distribution? 
        - What is the average age?


* *Clearly state the rationale for the choice of tools and technologies for the project.*
    
    The reason for using Apache Spark is its scalability, and with its pyspark implementation the ease-of-use and easy data manipulation capabilities. It allows us to build a pipeline using initial smaller datasets, and it will be perfectly capable to scale and perform efficiently when the data increases.
    Jupyter Notebooks are used for interactivity and fast data exploration and in order to build the initial version of the pipeline.
    AWS, specifically S3, is one of the best solution on the market for storage, and as with Spark, it's easily scalable. We will use S3 to store our final data lake, in parquet files.
    
    
* *Propose how often the data should be updated and why.*
    
    The main factor that determines the data refresh rate is how often do the source datasets are updated. Since the Temperatures and I94 Immigration datasets are updated monthly, this should be what we must aim for.
    
    
* *Write a description of how you would approach the problem differently under the following scenarios:*
 * *The data was increased by 100x.* 
     We could use a cluster manager like Yarn or spin-up an EMR cluster that can scale our compute resourses. Spark can then handle technology-wise the scale of the data. Also, on the storage front, S3 can scale up or down according to the data lake size.
     
 * *The data populates a dashboard that must be updated on a daily basis by 7am every day.*
     The ETL process would be handled and scheduled by a tool like Apache Airflow. This will allows us to set-up the refresh times on a daily basis.  
 * *The database needed to be accessed by 100+ people.*
     We can increase our nodes using Yarn or EMR, or, alternatively, move our data lake to a cloud, managed Database like Redshfit that is designed for analytical purposes, has very fast read times, and can handle high workloads (users performing queries on the DB).

##### What are the 10 US states with the most immigrants arriving?

In [76]:
factI94_df.groupBy("i94addr").sum("count").sort(desc("sum(count)")).show(10)

+-------+----------+
|i94addr|sum(count)|
+-------+----------+
|     FL|  621701.0|
|     NY|  553677.0|
|     CA|  470386.0|
|     HI|  168764.0|
|   null|  152592.0|
|     TX|  134321.0|
|     NV|  114609.0|
|     GU|   94107.0|
|     IL|   82126.0|
|     NJ|   76531.0|
+-------+----------+
only showing top 10 rows



In [80]:
factI94_df.groupBy("i94addr").count().sort(desc("count")).show(10)

+-------+------+
|i94addr| count|
+-------+------+
|     FL|621701|
|     NY|553677|
|     CA|470386|
|     HI|168764|
|   null|152592|
|     TX|134321|
|     NV|114609|
|     GU| 94107|
|     IL| 82126|
|     NJ| 76531|
+-------+------+
only showing top 10 rows



##### What are the 10 countries of birth with the most immigrants to the US?

In [79]:
immig_df.groupBy("i94cit").count().sort(desc("count")).show(10)

+------+------+
|i94cit| count|
+------+------+
| 135.0|360157|
| 209.0|206873|
| 245.0|191425|
| 111.0|188766|
| 582.0|175781|
| 148.0|157806|
| 254.0|137735|
| 689.0|129833|
| 213.0|110691|
| 438.0|109884|
+------+------+
only showing top 10 rows



##### What is the gender distribution?

In [82]:
immig_df.groupBy("gender").count().sort(desc("count")).show(10)

+------+-------+
|gender|  count|
+------+-------+
|     M|1377224|
|     F|1302743|
|  null| 414269|
|     X|   1610|
|     U|    467|
+------+-------+



##### What is the average age?

In [95]:
immig_df.groupBy().avg().select(col("avg(i94bir)")).show()

+------------------+
|       avg(i94bir)|
+------------------+
|41.767614458485205|
+------------------+

