# Data Engineering Capstone Project

##### PROJECT SUMMARY: 
The objective of this project was to create an Elt pipeline for I94 immigration, global land temperatures and happiness and Human development index datsets to form an analytics database on immigration events. 

A use case for this analytics database is to find immigration patterns for the US immigration department.

For example, they could try to find answears to questions such as,

- Do people from countries with warmer or cold climate immigrate to the US in large numbers?

- Do people come from developed countries?

- Does Freedom and Human development imply in the number of people coming in to the us?

In [None]:
import pandas as pd
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, to_timestamp
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import monotonically_increasing_id
import datetime as dt
from datetime import timedelta, datetime
date_format = "%Y-%m-%d"

In [3]:
config = configparser.ConfigParser()
config.read('credentials.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [5]:
input_data = "s3://capstoneprojectsiva/"
output_data = "s3://capstoneprojectsiva/"

immigration_file_name = 'i94_apr16_sub.sas7bdat'
temperature_file_name = 'GlobalLandTemperaturesByCity.csv'
happiness_development_file_name = 'happiness_and_development.csv'

country_code_file = input_data + "i94cit.csv"
visa_code_file = input_data + "i94visa.csv"

In [6]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [7]:
def clean_spark_immigration_data(df):
    """Clean immigration dataframe
    :param df: spark dataframe with monthly immigration data
    :return: clean dataframe
    """
    total_records_count = df.count()
    print(f'Total records in dataframe: {total_records_count:,}')
    
    # EDA has shown these columns to exhibit over 90% missing values, and hence we drop them
    drop_columns = ["visapost", "occup", "entdepu", "insnum","count", "entdepa", "entdepd", "matflag", "dtaddto", "biryear", "admnum","fltno", "airline"]

    df = df.drop(*drop_columns)    
    # drop rows where all elements are missing
    df = df.dropna(how='all')
    
    
    return df

In [8]:
df=clean_spark_immigration_data(df_spark)

Total records in dataframe: 3,096,313


In [9]:
df.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|dtadfile|gender|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|    null|  null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|20130811|     M|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|20160401|     M|      B2|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+
only showing top 3 rows



In [10]:
def create_immigration_fact_table(spark, df):
    """
     This Function Creates visa dimension table
    
    :param df: spark dataframe of the immigration data
    :output: path to write visa dimension table
    :return: Visa dataframe
    """
    
    @udf()
    def date_diff(date1, date2):
    #Calculates the difference in days between two dates
    
        if date2 is None:
            return None
        else:
            a = datetime.strptime(date1, date_format)
            b = datetime.strptime(date2, date_format)
            delta = b - a
            return delta.days

    date_format = "%Y-%m-%d"
    
    #User defined functions using Spark udf wrapper function to convert SAS dates into string dates in the format YYYY-MM-DD
    convert_sas_udf = udf(lambda x: x if x is None else (timedelta(days=x) + datetime(1960, 1, 1)).strftime(date_format))
    
    df = df.withColumn("Arrival_date", convert_sas_udf(col("arrdate"))) 
    df = df.withColumn("Departure_date", convert_sas_udf(col("depdate")))
    df = df.withColumn('Stay_period', date_diff(col('Arrival_date'), col('Departure_date')))
    df = df.drop(*drop_columns) 
    
    df.show(3)
    #df.write.parquet(output_data + "Immigration_Fact", mode="overwrite")
    
    return df

In [12]:
fact=create_immigration_fact_table(spark,df)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+------------+--------------+-----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|dtadfile|gender|visatype|Arrival_date|Departure_date|Stay_period|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+------------+--------------+-----------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|    null|  null|      B2|  2016-04-29|          null|       null|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|20130811|     M|      F1|  2016-04-07|          null|       null|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|20160401|     M|      B2|  2016-04-01|    2016-08-25|        146|
+-----+------+------+------+------+-------+-------+-------+-----

In [33]:
def create_temperature_dimension_table(immigration_df):
    """
    This Function Creates visa dimension table
    
    :param df: spark dataframe of the immigration data
    :output: path to write visa dimension table
    :return: Visa dataframe
    """
    temp_df= spark.read.csv("../../data2/GlobalLandTemperaturesByCity.csv",header=True, inferSchema=True)
    temp_df = temp_df.groupby(["Country"])\
                            .agg({"AverageTemperature": "avg", "Latitude": "first", "Longitude": "first"})\
                            .withColumnRenamed('avg(AverageTemperature)', 'Temperature')\
                            .withColumnRenamed('first(Latitude)', 'Latitude')\
                            .withColumnRenamed('first(Longitude)', 'Longitude')
        
    temp_df = temp_df.withColumn('lower_country', lower(temp_df.Country))
    temp_df = temp_df.drop("Country")
    temp_df.createOrReplaceTempView("temp_view")

    country_code = spark.read.csv("Lookup/i94cit.csv", header=True, inferSchema=True)
    country_code.createOrReplaceTempView("countrycode_view")
    
    # create country dimension using SQL
    Final_df=spark.sql(
        """
        SELECT 
        c.code as Country_Code,
        t.lower_country as Country,
        t.Temperature,
        t.Latitude,
        t.Longitude
        from countrycode_view as c
        LEFT JOIN temp_view as t
        on c.country=t.lower_country
        """
    ).distinct()
    
    Final_df = Final_df.dropna(subset=['Temperature'])
    Final_df.show(3)
    

    return Final_df
    
    # write dimension to parquet file
    #temp_df.write.parquet(output_data + "visatype", mode="overwrite")
    

In [35]:
temp_df=create_temperature_dimension_table(df)

+------------+------------+------------------+--------+---------+
|Country_Code|     Country|       Temperature|Latitude|Longitude|
+------------+------------+------------------+--------+---------+
|         348|sierra leone|25.641010910058544|   8.84N|   13.78W|
|         576| el salvador| 25.26285255093977|  13.66N|   90.00W|
|         163|  uzbekistan|11.946573813309117|  40.99N|   72.43E|
+------------+------------+------------------+--------+---------+
only showing top 3 rows



In [12]:
fact_df=create_immigration_fact_table(spark, df)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+------------+--------------+-----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|dtadfile|gender|visatype|Arrival_date|Departure_date|Stay_period|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--------+------+--------+------------+--------------+-----------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|    null|  null|      B2|  2016-04-29|          null|       null|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|20130811|     M|      F1|  2016-04-07|          null|       null|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|20160401|     M|      B2|  2016-04-01|    2016-08-25|        146|
+-----+------+------+------+------+-------+-------+-------+-----

In [13]:
def create_visa_dimension_table(immigration_df):
    """
    This Function Creates visa dimension table
    
    :param df: spark dataframe of the immigration data
    :output: path to write visa dimension table
    :return: Visa dataframe
    """
    visa_code_file="Lookup/i94visa.csv"
    visa_code = spark.read.csv(visa_code_file, header=True, inferSchema=True)
    # Create Visa code view
    visa_code.createOrReplaceTempView("visa_code_view")
    # Create Immiogration view
    immigration_df.createOrReplaceTempView("immigration_view")
    
    # create country dimension using SQL
    visa=spark.sql(
        """
        SELECT 
        v.Stay_Purpose,
        i.visatype,
        i.Stay_period
        from 
        immigration_view as i
        JOIN visa_code_view as v
        on i.i94visa = v.code
        """
    ).distinct()
        
    visa_df = visa.withColumn('visa_key', monotonically_increasing_id())
    visa_df.show(5)
    
    # write dimension to parquet file
    #visa_df.write.parquet(output_data + "visatype", mode="overwrite")

    return visa_df
    

In [14]:
visa_df=create_visa_dimension_table(fact_df)

+------------+--------+-----------+--------+
|Stay_Purpose|visatype|Stay_period|visa_key|
+------------+--------+-----------+--------+
|     Student|      F1|         70|       0|
|    Business|      E2|         23|       1|
|    Business|      B1|         65|       2|
|    Business|      E1|         67|       3|
|    Pleasure|      B2|        113|       4|
+------------+--------+-----------+--------+
only showing top 5 rows



In [1]:
def create_immigration_calendar_dimension(immigration_df):
     """
     This Function Creates visa dimension table
    
    :param df: spark dataframe of the immigration data
    :output: path to write visa dimension table
    :return: Visa dataframe
    """
            
        
     calendar_df=df.select(["Arrival_date","Departure_date","Stay_period"]).distinct()    

     # expand df by adding other calendar columns
     calendar_df = calendar_df.withColumn('arrival_day', dayofmonth('Arrival_date'))
     calendar_df = calendar_df.withColumn('arrival_week', weekofyear('Arrival_date'))
     calendar_df = calendar_df.withColumn('arrival_month', month('Arrival_date'))
     calendar_df = calendar_df.withColumn('arrival_year', year('Arrival_date'))
     calendar_df.show(4)
        
     # create an id field 
     calendar_df = calendar_df.withColumn('id_key', monotonically_increasing_id())

     # write dimension to parquet file
     #calendar_df.write.parquet(output_data + "visatype", mode="overwrite")

     return calendar_df

In [16]:
calendar_df=create_immigration_calendar_dimension(fact_df)

+----------+-----------+------------+-------------+------------+---------------+-----------+
|   arrdate|arrival_day|arrival_week|arrival_month|arrival_year|arrival_weekday|     id_key|
+----------+-----------+------------+-------------+------------+---------------+-----------+
|2016-04-22|         22|          16|            4|        2016|              6| 8589934592|
|2016-04-15|         15|          15|            4|        2016|              6|25769803776|
|2016-04-18|         18|          16|            4|        2016|              2|42949672960|
|2016-04-09|          9|          14|            4|        2016|              7|68719476736|
+----------+-----------+------------+-------------+------------+---------------+-----------+
only showing top 4 rows



In [17]:
def create_country_dimension_table(spark, temperature_df):
    
    country_code = spark.read.csv("Lookup/i94cit.csv", header=True, inferSchema=True)
    combined = country_code.join(temperature_df, country_code.country == temperature_df.lower_country, how="right")
    combined.createOrReplaceTempView("countrycode_temp_view")

    happiness_development = spark.read.csv("Lookup/happiness_and_development.csv", header=True, inferSchema=True)
    happiness_development.createOrReplaceTempView("H_D")
    
    # create country dimension using SQL
    Final_df=spark.sql(
        """
        SELECT 
        t.code Country_Code,
        h.*
        from countrycode_temp_view as t
        LEFT JOIN H_D as h
        on t.lower_country=h.country
        """
    ).distinct()
    
    Final_df.show(3)
    # write the dimension to a parquet file
    #Final_country_df.write.parquet(output_data + "country", mode="overwrite")

    return Final_df

In [18]:
Final_df=create_country_dimension_table(spark, temp_df)

+------------+---------+------------------+--------+---------+---------+--------------+-----------------------+----------------------------+-------------------------+---------------------+
|Country_Code|  Country|       Temperature|Latitude|Longitude|  Country|Social support|Healthy life expectancy|Freedom to make life choices|Perceptions of corruption|HumanDevelopmentIndex|
+------------+---------+------------------+--------+---------+---------+--------------+-----------------------+----------------------------+-------------------------+---------------------+
|         689|   brazil|21.902762979774167|  12.05S|   37.81W|   brazil|         0.882|                 66.601|                       0.804|                    0.756|                0.759|
|         438|australia| 16.70146214247643|  34.56S|  138.16E|australia|          0.94|                   73.9|                       0.914|                    0.442|                0.939|
|         369| ethiopia| 20.61152529673101|   8.84N|   

In [19]:
Final_df.show(20)

+------------+--------------------+------------------+--------+---------+------------+--------------+-----------------------+----------------------------+-------------------------+---------------------+
|Country_Code|             Country|       Temperature|Latitude|Longitude|     Country|Social support|Healthy life expectancy|Freedom to make life choices|Perceptions of corruption|HumanDevelopmentIndex|
+------------+--------------------+------------------+--------+---------+------------+--------------+-----------------------+----------------------------+-------------------------+---------------------+
|         689|              brazil|21.902762979774167|  12.05S|   37.81W|      brazil|         0.882|                 66.601|                       0.804|                    0.756|                0.759|
|         438|           australia| 16.70146214247643|  34.56S|  138.16E|   australia|          0.94|                   73.9|                       0.914|                    0.442|        