# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob

from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
from uscity_state_codes import us_state_abbrev, state_code_udf, abbrev_state, code_state_udf,code_state_udf,city_codes, city_code_udf
from country_codes import country_udf

In [2]:
# Create Spark session with SAS7BDAT jar
spark = SparkSession\
.builder \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
.enableHiveSupport().getOrCreate()

#Build SQL context object
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will aggregate I94 immigration data by destination city to form our first dimension table. The scond dimenstion table will aggregate city temperature data by city. The two datasets will be joined on destination city to form the fact table. The final database is optimized to query on immigration events to determine if temperature affects the selection of destination cities. Spark will be used to process the data.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
 **I94 Immigration Data:** comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html) and includes details on incoming immigrants and their ports of entry.

**World Temperature Data:** comes from [kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and includes data on temperature changes in the U.S. since 1850.

In [3]:
# Read in the data here
# Read April 2016 I94 immigration data into Pandas for exploration
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

temperatureData=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByCity.csv")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Immigration Data by City with Origin

In [8]:
# remove nulls then convert i94res codes to country of origin and filter out NULLS and run country_udf function to show state names
# country_udf, abbrev_state_udf and city_code_udf were created with data from i94 SAS labels Descriptions file.
i94_data=df_spark.filter(df_spark.i94addr.isNotNull())\
.filter(df_spark.i94res.isNotNull())\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.filter(col("i94addr").isin(list(abbrev_state.keys())))\
.filter(col("i94port").isin(list(city_codes.keys())))\
.withColumn("city_port_name",city_code_udf(df_spark["i94port"]))\
.withColumn("dest_state_name",code_state_udf(df_spark["i94addr"]))\
.withColumn("origin_country",country_udf(df_spark["i94res"]))


In [9]:
new_I94_Data=i94_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                            "dest_state_name","city_port_name", "origin_country","visatype")

In [62]:
new_I94_Data.show(5, truncate=False)

+-----+----+-----+---------------+----------------------+--------------+--------+
|cicid|year|month|dest_state_name|city_port_name        |origin_country|visatype|
+-----+----+-----+---------------+----------------------+--------------+--------+
|7.0  |2016|4    |Alabama        |ATLANTA               |SOUTH KOREA   |F1      |
|15.0 |2016|4    |Michigan       |WASHINGTON DC         |ALBANIA       |B2      |
|16.0 |2016|4    |Massachusetts  |NEW YORK              |ALBANIA       |B2      |
|17.0 |2016|4    |Massachusetts  |NEW YORK              |ALBANIA       |B2      |
|18.0 |2016|4    |Michigan       |NEW YORK              |ALBANIA       |B1      |
+-----+----+-----+---------------+----------------------+--------------+--------+
only showing top 5 rows



### Temperature Data by City 

In [34]:
# Performing cleaning tasks here

#filter the world Temperature Data for only the U.S. and only == 2013 and drop duplicates and convert celcius temp to f
usTemperatures=temperatureData.filter(temperatureData["country"]=="United States")\
.filter(year(temperatureData["dt"])==2013)\
.withColumn("year",year(temperatureData["dt"]))\
.withColumn("month",month(temperatureData["dt"]))\
.withColumn("avg_temp_fahrenheit",temperatureData["AverageTemperature"]*9/5+32)\
.withColumn("city", temperatureData["City"])
#.filter(temperatureData["AverageTemperature"] != 'NaN')\

In [47]:
new_Temperatures=usTemperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                       round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       upper(col("city")).alias("city"),"country").dropDuplicates()

In [63]:
new_Temperatures.show(5)

+----+-----+----------------+-------------------+-----------+-------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|       City|      Country|
+----+-----+----------------+-------------------+-----------+-------------+
|2013|    3|             0.7|               33.3|     AURORA|United States|
|2013|    1|             1.0|               33.8|   BELLEVUE|United States|
|2013|    7|            24.7|               76.5|CHATTANOOGA|United States|
|2013|    9|            22.9|               73.3| CHESAPEAKE|United States|
|2013|    7|            25.1|               77.2|   EL MONTE|United States|
+----+-----+----------------+-------------------+-----------+-------------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

### Star Schema

####    Dimension Tables
    * immigration_table
        year, month, dest_state_name, city_port_name, origin_country, visatype
    * city_temperature_table
        year, month, avg_temp_celcius, avg_temp_fahrenheit, city, country
####    Fact Table
    * immigration_to_cities
        year, immig_month, immig_origin, to_immig_city, to_immig_city_count, avg_temp_fahrenheit
#### 3.2 Mapping Out Data Pipelines

1. Dimension tables will be created from cleansed data.
2. Fact table is created as a SQL query with joins to dimension tables.
3. Fact table is converted back to a spark dataframe.
4. Fact table is written as final parquet file.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [53]:

# Create Dimension tables
new_I94_Data.createOrReplaceTempView("immigration")
new_Temperatures.createOrReplaceTempView("temperature")

#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [59]:
# This query will build the fact table by joining to the dimension tables above.
immigration_to_cities=spark.sql("""SELECT 
                                    m.year,
                                    m.month AS immig_month,
                                    m.origin_country AS immig_origin,
                                    m.dest_state_name AS to_immig_city,
                                    COUNT(m.city_port_name) AS to_immig_city_count,
                                    t.avg_temp_fahrenheit                                   
                                    
                                    FROM immigration m JOIN temperature t ON m.city_port_name=t.City AND m.month=t.month
                                   
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.dest_state_name,m.city_port_name,t.avg_temp_fahrenheit                                    
                                    
                                    ORDER BY m.origin_country,m.city_port_name                                    
""")

In [60]:
immigration_to_cities.toDF('year', 'immig_month', 'immig_origin', 'to_immig_city', \
          'to_immig_city_count', 'avg_temp_fahrenheit').show(5)

+----+-----------+------------+--------------+-------------------+-------------------+
|year|immig_month|immig_origin| to_immig_city|to_immig_city_count|avg_temp_fahrenheit|
+----+-----------+------------+--------------+-------------------+-------------------+
|2016|          4|  ARGENTINA |     Tennessee|                  2|               59.7|
|2016|          4|  ARGENTINA |      New York|                 16|               59.7|
|2016|          4|  ARGENTINA | Massachusetts|                  2|               59.7|
|2016|          4|  ARGENTINA |    Washington|                  3|               59.7|
|2016|          4|  ARGENTINA |South Carolina|                  1|               59.7|
+----+-----------+------------+--------------+-------------------+-------------------+
only showing top 5 rows



In [61]:
# Write fact table to parquet
immigration_to_cities.write.parquet("immigration_to_cities")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [64]:
# Perform quality checks here

def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(new_I94_Data, "immigration table")
quality_check(new_Temperatures, "temperature table")

Data quality check passed for immigration table with 2783521 records
Data quality check passed for temperature table with 2313 records


0

#### 4.3 Data dictionary 

##### The first dimension table will contain events from the I94 immigration data:
year, month, dest_state_name, city_port_name, origin_country, visatype
* year: integer (nullable = true)-Year of Immigration
* month: integer (nullable = true)-Month of Immigration
* dest_state_name: string (nullable = true)-State Name
* city_port_name: string (nullable = true)-City Port Name
* origin_country: string (nullable = true)-Country of Origin
* visatype: string (nullable = true)-Type of visa

##### The second dimension table will contain city temperature data. The columns below will be extracted from the temperature dataframe:

* year: integer (nullable = true)-Temperature Year
* month: integer (nullable = true)-Temerpature Month
* avg_temp_celcius: double (nullable = true)-Avg Temperature in Celcius per State
* avg_temp_fahrenheit: double (nullable = true)-Avg Temperatrue in Fahrenheit
* city: string (nullable = true)-State Name
* country: string (nullable = true)-United States

##### The fact table will contain information from the I94 immigration data joined with the city temperature data on city name:

* year: integer (nullable = true)-Year from immigration table
* immig_month: integer (nullable = true)-Month from immigration table
* immig_origin: string (nullable = true)-Country of Origin from immigration table
* to_immig_city: string (nullable = true)-City immigrated to from immigration table
* to_immig_city_count: long (nullable = false)-Total count of people immigrated per city from immigration table
* avg_temp_fahrenheit: double (nullable = true)-Avg temperature per state from Temperature table
 
AverageTemperature = average temperature of destination city

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. For this project I used Apache Spark to read, transform, and create data outputs for further analysis. The reason for this was due to the small amount of data and the speed of Spark.

2. The data should be updated quarterly. This gives the most up-to-date data for government and organizations.

3. Under the following scenarios, I would approach the problem differently:

* If the data was increased by 100x, I would use Apache Hadoop to create a distributed processing system for faster processing.
* If the data needs to populate a dashboard daily to meet an SLA then I would use a scheduling tool such as Airflow to run the ETL pipeline overnight.
* If the data needs to be accessed by 100+ people, I would use a web app running on Amazon AWS for increased capacity.