# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import configparser
import datetime as dt

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

import requests
requests.packages.urllib3.disable_warnings()

import utility
import functions

import importlib
importlib.reload(utility)
from utility import clean_spark_immigration_data, clean_spark_temperature_data
from utility import clean_spark_demographics_data, print_formatted_float

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [None]:
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()



## Step 1: Scope the Project and Gather Data

#### Scope 
The objective of this project is to make analysis on the immigration events based on various datasets obtained. An ETL is to be created for I-94 immigration, global land temperature and US demographics datasets. This is used to get an idea of immigration patterns to US.
#### Immigration Dataset
This data comes from the US National Tourism and Trade Office.In the past all foreign visitors to the U.S. arriving via air or sea were required to complete paper Customs and Border Protection Form I-94 Arrival/Departure Record or Form I-94W Nonimmigrant Visa Waiver Arrival/Departure Record and this dataset comes from this forms. This dataset forms the core of the data warehouse and the customer repository has a years worth of data for the year 2016 and the dataset is divided by month. For this project the data is in a folder located at ../../data/18-83510-I94-Data-2016/. Each months data is stored in an SAS binary database storage format sas7bdat. For this project we have chosen going to work with data for the month of April. The data extraction, transformation and loading utility functions have been designed to work with any month's worth of data.

In [None]:
# Read in the data here
immig_name = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df =spark.read.format('com.github.saurfang.sas.spark').load(immig_name)



In [None]:
immigration_df.head()

In [None]:
print_formatted_float(immigration_df.count())

In [None]:
immigration_df.select("visapost").dropDuplicates().show(5)

#### World Temperature Data
This dataset came from Kaggle accessible through '../../data2/GlobalLandTemperaturesByCity.csv' . The dataset provides data about global land temperatures by cities

In [None]:
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.csv(file_name, header=True, inferSchema=True)
# display the first five records
temperature_df.limit(5).toPandas()

In [None]:
# check the total number of records
print_formatted_float(temperature_df.count())

#### US Demographic Data
This data comes from OpenSoft. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. The original data source is the US Census Bureau's 2015 American Community Survey.

In [None]:
file_name = "us-cities-demographics.csv"
demographics_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')
# display the first five records
demographics_df.limit(5).toPandas()

In [None]:
# check the total number of records
print_formatted_float(demographics_df.count())

In [None]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.\
# config("spark.jars.repositories", "https://repos.spark-packages.org/").\
# config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
# enableHiveSupport().getOrCreate()

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [None]:
# #write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### 1.Immigration dataset

In [None]:
# Performing cleaning tasks here

files = os.listdir('../../data/18-83510-I94-Data-2016/')
files


In [None]:
immigration_df.printSchema()

#### Data Dictionary - Immigaration Dataset

In [None]:
imm_dict =pd.read_csv('Data dictionaries/immigration-data-dictionary.csv')
print(imm_dict)

In [None]:
# columns with large amount of missing values as discovered by the exploratory analysis 
cols = ['occup', 'entdepu','insnum']

# drop these columns
new_immig_df = immigration_df.drop(*cols)

In [None]:
# display the new schema
new_immig_df.printSchema()

In [None]:
# drop duplicate entries
new_immig_df = new_immig_df.dropDuplicates(['cicid'])

In [None]:
# get a count after dropping duplicates
print_formatted_float(new_immig_df.count())

In [None]:
# drop rows with missing values
new_immig_df = new_immig_df.dropna(how='all', subset=['cicid'])

In [None]:
# get a count after dropping rows with missing values
print_formatted_float(new_immig_df.count())

In [None]:
# clean the immigration dataframe
new_immigration_df = utility.clean_spark_immigration_data(immigration_df)

print ('The shape of new immigration dataset :')
new_immigration_df.shape

### 2.Temperature Dataset

In [None]:
# print temperature dataframe schema
temperature_df.printSchema()

#### Data Dictionary - Temperature Dataset

In [None]:
temp_dict =pd.read_csv('Data dictionaries/Temperature-data-dictionary.csv')

In [None]:
print(temp_dict)

**Exploratory analysis showed some missing values of average temperature.Hence clean global temperature data by dropping the rows that has missing average temperature values and dropping duplicate columns.**

In [None]:
# clean the data by dropping rows with missing average temperature values and dropping duplicates
new_temperature_df = utility.clean_spark_temperature_data(temperature_df)

In [None]:
# count the number of records in dataset
print_formatted_float(demographics_df.count())

### 3.Demographics Dataset

In [None]:
# print demographics database schema
demographics_df.printSchema()

#### Data Dictionary - Demographics Dataset

In [None]:
demo_dict =pd.read_csv('Data dictionaries/Demographic data dictionary.csv')
print(demo_dict)

**Exploratory analysis showed very minimal missing values in the demographics dataset.Hence clean demographics data by dropping the rows that contain missing values and dropping duplicate columns.**

In [None]:
# clean demographics data by dropping the rows that contain missing values and dropping duplicate columns
new_demographics_df = utility.clean_spark_demographics_data(demographics_df)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model



### Database schema
#### Fact Table
- The immigration fact table is the fact table of the data model. This table's data comes from the immigration data sets and contains keys that links to the dimension tables. 

#### Dimension Tables
- The *country* dimension table is made up of data from the global land temperatures by city and the immigration datasets. The combination of these two  datasets allows analysts to study correlations between global land temperatures and immigration patterns to the US.

- The *us_demographics* dimension table comes from the demographics dataset and links to the immigration fact table at US state level. This dimension would allow analysts to get insights into migration patterns into the US based on demographics as well as overall population of states.It gives an insight on which states most immigrants are drawn towards

- The *immigration_calendar* dimension table formed from the immigration datasets points towards arrival time of the immigrants

- The *visa_type* dimension table comes from the immigration datasets and links to the immigaration making use of the visa_type_key.

#### 3.2 Mapping Out Data Pipelines
 **The pipeline steps are as follows:**

- Run functions.py and utility.py in the console to enable the functions for cleaning the datasets and creating the tables
- Load the datasets
- Cleaning the I94 Immigration data to create Spark dataframe for each month
- Create visa_type dimension table
- Create calendar dimension table
- Extract clean global temperatures data
- Create country dimension table
- Create immigration fact table
- Load demographics data
- Clean demographics data
- Create demographic dimension table

In [None]:
# import image module
from IPython.display import Image
  
# get the image
Image(url="Data model.png", width=700, height=700)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### I. Create the immigration calendar dimension table

In [None]:
# Write code here

def create_immigration_calendar_dimension(df, output_data):
    """This function creates an immigration calendar based on arrival date
    
    :param df: spark dataframe of immigration events
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing calendar dimension
    """
    # create a udf to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # create initial calendar df from arrdate column
    calendar_df = df.select(['arrdate']).withColumn("arrdate", get_datetime(df.arrdate)).distinct()
    
    # expand df by adding other calendar columns
    calendar_df = calendar_df.withColumn('arrival_day', dayofmonth('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_week', weekofyear('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_month', month('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_year', year('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_weekday', dayofweek('arrdate'))

    # create an id field in calendar df
    calendar_df = calendar_df.withColumn('id', monotonically_increasing_id())
    
    # write the calendar dimension to parquet file
    partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
    calendar_df.write.parquet(output_data + "immigration_calendar", partitionBy=partition_columns, mode="overwrite")
    
    return calendar_df

In [None]:
output_data = "tables/"

In [None]:
calendar_df = create_immigration_calendar_dimension(new_immigration_df, output_data)

In [None]:
calendar_df.limit(5).toPandas()

#### II. Create the country dimension table

In [None]:
def create_country_dimension_table(df, temp_df, output_data):
    """This function creates a country dimension from the immigration and global land temperatures data.
    
    :param df: spark dataframe of immigration events
    :temp_df: spark dataframe of global land temperatures data.
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing calendar dimension
    """
    # get the aggregated temperature data
    agg_temp = utility.aggregate_temperature_data(temp_df).toPandas()
    # load the i94res to country mapping data
    mapping_codes = pd.read_csv('i94.csv')
    
    @udf('string')
    def get_country_average_temperature(name):
        print("Processing: ", name)
        avg_temp = agg_temp[agg_temp['Country']==name]['average_temperature']
        
        if not avg_temp.empty:
            return str(avg_temp.iloc[0])
        
        return None
    
    @udf()
    def get_country_name(code):
        name = mapping_codes[mapping_codes['code']==code]['name'].iloc[0]
        
        if name:
            return name.title()
        return None
        
    # select and rename i94res column
    dim_df = df.select(['i94res']).distinct() \
                .withColumnRenamed('i94res', 'country_code')
    
    # create country_name column
    dim_df = dim_df.withColumn('country_name', get_country_name(dim_df.country_code))
    
    # create average_temperature column
    dim_df = dim_df.withColumn('average_temperature', get_country_average_temperature(dim_df.country_name))
    
    # write the dimension to a parquet file
    dim_df.write.parquet(output_data + "country", mode="overwrite")
    
    return dim_df

In [None]:
country_dim_df = create_country_dimension_table(new_immigration_df, new_temperature_df, output_data)


In [None]:
country_dim_df.show(5)

#### III. Create the visa_type dimension table

In [None]:
def create_visa_type_dimension_table(df, output_data):
    """This function creates a visa type dimension from the immigration data.
    
    :param df: spark dataframe of immigration events
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing calendar dimension
    """
    # create visatype df from visatype column
    visatype_df = df.select(['visatype']).distinct()
    
    # add an id column
    visatype_df = visatype_df.withColumn('visa_type_key', monotonically_increasing_id())
    
    # write dimension to parquet file
    visatype_df.write.parquet(output_data + "visatype", mode="overwrite")
    
    return visatype_df

def get_visa_type_dimension(output_data):
    return spark.read.parquet(output_data + "visatype")

In [None]:
visatype_df = create_visa_type_dimension_table(new_immigration_df, output_data)
visatype_df.show(n=5)

#### IV. Create the demographics dimension table

In [None]:
def create_demographics_dimension_table(df, output_data):
    """This function creates a us demographics dimension table from the us cities demographics data.
    
    :param df: spark dataframe of us demographics survey data
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing demographics dimension
    """
    dim_df = df.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')
    # lets add an id column
    dim_df = dim_df.withColumn('id', monotonically_increasing_id())
    
    # write dimension to parquet file
    dim_df.write.parquet(output_data + "demographics", mode="overwrite")
    
    return dim_df

In [None]:
demographics_dim_df = create_demographics_dimension_table(demographics_df, output_data)
demographics_dim_df.limit(5).toPandas()

#### V. Create the immigration Fact table

In [None]:
def create_immigration_fact_table(df, output_data):
    """This function creates an country dimension from the immigration and global land temperatures data.
    
    :param df: spark dataframe of immigration events
    :param visa_type_df: spark dataframe of global land temperatures data.
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing calendar dimension
    """
    # get visa_type dimension
    dim_df = get_visa_type_dimension(output_data).toPandas()
    
    @udf('string')
    def get_visa_key(visa_type):
        """user defined function to get visa key
        
        :param visa_type: US non-immigrant visa type
        :return: corresponding visa key
        """
        key_series = dim_df[dim_df['visatype']==visa_type]['visa_type_key']
        
        if not key_series.empty:
            return str(key_series.iloc[0])
        
        return None
    
    # create a udf to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # rename columns to align with data model
    df = df.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94res', 'country_residence_code') \
            .withColumnRenamed('i94addr', 'state_code') 
    
    # create visa_type key
    df = df.withColumn('visa_type_key', get_visa_key('visatype'))
    
    # convert arrival date into datetime object
    df = df.withColumn("arrdate", get_datetime(df.arrdate))
    
    # write dimension to parquet file
    df.write.parquet(output_data + "immigration_fact", mode="overwrite")
    
    return df

In [None]:
immigration_fact_df = create_immigration_fact_table(new_immigration_df, output_data)

In [None]:
immigration_fact_df.limit(5).toPandas()

### 4.2 Data Quality Checks
The data quality checks are done to ensure that the ETL has created the fact table  and the dimension tables with adequate amount of data.
 
#### Run Quality Checks

In [None]:
# Perform quality checks here

table_dfs = {
    'immigration_fact': immigration_fact_df,
    'visa_type_dim': visatype_df,
    'calendar_dim': calendar_df,
    'usa_demographics_dim': demographics_dim_df,
    'country_dim': country_dim_df
}
for table_name, table_df in table_dfs.items():
    # quality check for table
    functions.quality_checks(table_df, table_name)

### Step 5: Complete Project Write Up

#### Rationale for the choice of tools and technologies for the project
 
 Apache Spark was the main tool used because it is able to handle files of different format and of large amounts of data.A faster unified analytics engine to handle big data

#### Propose how often the data should be updated and why.
 
 The available I94 immigration dataset is being updated on a monthly basis . Thus our data also require monthly update.

#### Write a description of how you would approach the problem differently under the following scenarios:
 1. The data was increased by 100x.
     - Spark has the advantage of being highly scalable. So if the data is increased, we can handle it by increasing the number of our cluser nodes       
 2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
     -  In this kind of a situation,we can use  Apache Airflow to schedule and run data pipelines.
 3. The database needed to be accessed by 100+ people.
     -   Amazon Redshift can be made use of in such a scenario.The analytics database can be shifted to Amazon Redshift
 .