# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [75]:
# Do all imports and installs here
import pandas as pd
import os
import glob
from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [76]:
"""
 Build spark session
"""
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [77]:
"""
  Build SQL context object
"""
sqlContext = SQLContext(spark)

In [78]:
"""
 Reading in the data
"""
demog=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")
temperatureData=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByState.csv")
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [79]:
"""
  Performing cleaning tasks and Data Agregation
  
"""
usTemperatures=temperatureData.filter(temperatureData["country"]=="United States")\
    .filter(year(temperatureData["dt"])==2013)\
    .withColumn("year",year(temperatureData["dt"]))\
    .withColumn("month",month(temperatureData["dt"]))\
    .withColumn("avg_temp_fahrenheit",temperatureData["AverageTemperature"]*9/5+32)\
    .withColumn("state_abbrev",state_udf(temperatureData["State"]))

clean_Temperatures=usTemperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                       round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       "state_abbrev","State","Country").dropDuplicates()


In [80]:
clean_Temperatures.show(10)

+----+-----+----------------+-------------------+------------+-------------+-------------+
|year|month|avg_temp_celcius|avg_temp_fahrenheit|state_abbrev|        State|      Country|
+----+-----+----------------+-------------------+------------+-------------+-------------+
|2013|    7|            23.4|               74.1|          MA|Massachusetts|United States|
|2013|    3|            -1.9|               28.5|          SD| South Dakota|United States|
|2013|    9|            14.1|               57.4|          ME|        Maine|United States|
|2013|    1|            -1.3|               29.7|          PA| Pennsylvania|United States|
|2013|    9|            25.1|               77.2|          AL|      Alabama|United States|
|2013|    9|            21.0|               69.8|          IL|     Illinois|United States|
|2013|    3|            10.8|               51.4|          MS|  Mississippi|United States|
|2013|    8|            20.5|               68.9|          RI| Rhode Island|United States|

In [81]:
"""
  remove nulls then convert i94res codes to country of origin and filter out NULLS and run country_udf function to show state names
  country_udf, abbrev_state_udf and city_code_udf were created with data from SAS labels Descriptions file.

"""
i94_data=df_spark.filter(df_spark.i94addr.isNotNull())\
    .filter(df_spark.i94res.isNotNull())\
    .filter(col("i94addr").isin(list(abbrev_state.keys())))\
    .filter(col("i94port").isin(list(city_codes.keys())))\
    .withColumn("origin_country",country_udf(df_spark["i94res"]))\
    .withColumn("dest_state_name",abbrev_state_udf(df_spark["i94addr"]))\
    .withColumn("i94yr",col("i94yr").cast("integer"))\
    .withColumn("i94mon",col("i94mon").cast("integer"))\
    .withColumn("city_port_name",city_code_udf(df_spark["i94port"]))

clean_I94_Data=i94_data.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country","i94port","city_port_name",col("i94addr").alias("state_code"),"dest_state_name")

In [82]:
clean_I94_Data.show(10)

+-----+----+-----+--------------+-------+--------------------+----------+---------------+
|cicid|year|month|origin_country|i94port|      city_port_name|state_code|dest_state_name|
+-----+----+-----+--------------+-------+--------------------+----------+---------------+
|  7.0|2016|    4|   SOUTH KOREA|    ATL|  ATLANTA           |        AL|        Alabama|
| 15.0|2016|    4|       ALBANIA|    WAS|WASHINGTON DC    ...|        MI|       Michigan|
| 16.0|2016|    4|       ALBANIA|    NYC|  NEW YORK          |        MA|  Massachusetts|
| 17.0|2016|    4|       ALBANIA|    NYC|  NEW YORK          |        MA|  Massachusetts|
| 18.0|2016|    4|       ALBANIA|    NYC|  NEW YORK          |        MI|       Michigan|
| 19.0|2016|    4|       ALBANIA|    NYC|  NEW YORK          |        NJ|     New Jersey|
| 20.0|2016|    4|       ALBANIA|    NYC|  NEW YORK          |        NJ|     New Jersey|
| 21.0|2016|    4|       ALBANIA|    NYC|  NEW YORK          |        NY|       New York|
| 22.0|201

In [83]:
"""
 Calculate percentages of each numeric column and create new columns.
"""
us_demographic_data=demog\
    .withColumn("Median Age",col("Median Age").cast("float"))\
    .withColumn("pct_male_pop",demog["Male Population"]/demog["Total Population"]*100)\
    .withColumn("pct_female_pop",demog["Female Population"]/demog["Total Population"]*100)\
    .withColumn("pct_veterans",demog["Number of Veterans"]/demog["Total Population"]*100)\
    .withColumn("pct_foreign_born",demog["Foreign-born"]/demog["Total Population"]*100)\
    .withColumn("pct_race",demog["Count"]/demog["Total Population"]*100)\
    .orderBy("State")

In [84]:
"""
  Select columns with new calculated percentages and state names.
"""
us_demographic_data_with_percentage=us_demographic_data.select("State",col("State Code").alias("state_code"),\
     col("Median Age").alias("median_age"),\
     "pct_male_pop",\
     "pct_female_pop",\
     "pct_veterans",\
     "pct_foreign_born",\
     "Race",\
     "pct_race")

In [85]:
"""
 Pivoting the Race column
"""
pivot_us_demographic_data=us_demographic_data_with_percentage.groupBy("State","state_code","median_age","pct_male_pop",\
                                    "pct_female_pop","pct_veterans",\
                                    "pct_foreign_born").pivot("Race").avg("pct_race")

pivot_us_demographic_data=pivot_us_demographic_data.select("State","state_code","median_age","pct_male_pop","pct_female_pop","pct_veterans","pct_foreign_born",\
                                         col("American Indian and Alaska Native").alias("native_american"),\
                                         col("Asian"),col("Black or African-American").alias("Black"),\
                                         col("Hispanic or Latino").alias("hispanic_or_latino"),"White")

In [86]:
"""
 Find the average of each column per state. 
"""
pivot=pivot_us_demographic_data.groupBy("State","state_code").avg("median_age","pct_male_pop","pct_female_pop",\
                                                       "pct_veterans","pct_foreign_born","native_american",\
                                                       "Asian","Black","hispanic_or_latino","White").orderBy("State")

In [87]:
"""
  Round the percentages and fix column names
"""
pivot=pivot.select("State","state_code",round(col("avg(median_age)"),1).alias("median_age"),\
                  round(col("avg(pct_male_pop)"),1).alias("pct_male_pop"),\
                   round(col("avg(pct_female_pop)"),1).alias("pct_female_pop"),\
                   round(col("avg(pct_veterans)"),1).alias("pct_veterans"),\
                   round(col("avg(pct_foreign_born)"),1).alias("pct_foreign_born"),\
                   round(col("avg(native_american)"),1).alias("native_american"),\
                   round(col("avg(Asian)"),1).alias("Asian"),\
                   round(col("avg(hispanic_or_latino)"),1).alias("hispanic_or_latino"),\
                   round(col("avg(Black)"),1).alias("Black"),\
                   round(col('avg(White)'),1).alias('White')
                  )

In [88]:
pivot.show(10)

+--------------------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|               State|state_code|median_age|pct_male_pop|pct_female_pop|pct_veterans|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+--------------------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|             Alabama|        AL|      36.2|        47.2|          52.8|         6.8|             5.1|            0.8|  2.9|               3.6| 45.0| 52.0|
|              Alaska|        AK|      32.2|        51.2|          48.8|         9.2|            11.1|           12.2| 12.3|               9.1|  7.7| 71.2|
|             Arizona|        AZ|      35.0|        48.8|          51.2|         6.6|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|            Arkansas|        AR|      32.8|        48.4|       

In [89]:
"""
 Filtering the airport data for 'small_airport' in the U.S. and use substring to show state
 Find average elevation per state, select relevant columns and drop duplicates.
"""
airport_data=airport.filter(airport["type"]=="small_airport")\
.filter(airport["iso_country"]=="US")\
.withColumn("iso_region",substring(airport["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))


airport_data_elevation=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")


clean_airport_data=airport_data_elevation.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")

In [90]:
clean_airport_data.show(10)

+-------+-----+----------------+
|country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           545.1|
|     US|   AL|           414.6|
|     US|   AR|           488.4|
|     US|   AZ|          3098.0|
|     US|   CA|          1261.4|
|     US|   CO|          5912.8|
|     US|   CT|           490.3|
|     US|   DE|            47.4|
|     US|   FL|            77.7|
|     US|   GA|           649.5|
+-------+-----+----------------+
only showing top 10 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [91]:
"""
 Create Dimension tables and allow unlimited time for SQL joins and parquet writes.
"""
clean_I94_Data.createOrReplaceTempView("immigration")
pivot.createOrReplaceTempView("demographics")
clean_airport_data.createOrReplaceTempView("airport")
clean_Temperatures.createOrReplaceTempView("temperature")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [92]:
"""
 Creating the fact table by joining to the dimension tables above and counting how many people immigrated to each state in the U.S.
"""
immigration_to_states=spark.sql("""SELECT 
                                    m.year,
                                    m.month AS immig_month,
                                    m.origin_country AS immig_origin,
                                    m.dest_state_name AS to_immig_state,
                                    COUNT(m.state_code) AS to_immig_state_count,
                                    t.avg_temp_fahrenheit,
                                    a.avg_elevation_ft,
                                    d.pct_foreign_born,
                                    d.native_american,
                                    d.Asian,
                                    d.hispanic_or_latino,
                                    d.Black,
                                    d.White
                                    
                                    FROM immigration m JOIN temperature t ON m.state_code=t.state_abbrev AND m.month=t.month
                                    JOIN demographics d ON d.state_code=t.state_abbrev
                                    JOIN airport a ON a.state=t.state_abbrev
                                    
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.dest_state_name,m.state_code,t.avg_temp_fahrenheit,a.avg_elevation_ft,\
                                    d.pct_foreign_born,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black
                                    
                                    ORDER BY m.origin_country,m.state_code
                                    """)

In [93]:
immigration_to_states.toDF('year', 'immig_month', 'immig_origin', 'to_immig_state', \
          'to_immig_state_count', 'avg_temp_fahrenheit', 'avg_elevation_ft',\
          'pct_foreign_born', 'native_american', 'Asian', 'hispanic_or_latino', 'Black', 'White').show(10)

+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|year|immig_month|immig_origin|to_immig_state|to_immig_state_count|avg_temp_fahrenheit|avg_elevation_ft|pct_foreign_born|native_american|Asian|hispanic_or_latino|Black|White|
+----+-----------+------------+--------------+--------------------+-------------------+----------------+----------------+---------------+-----+------------------+-----+-----+
|2016|          4| AFGHANISTAN|       Arizona|                   1|               59.3|          3098.0|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|2016|          4| AFGHANISTAN|    California|                  34|               58.5|          1261.4|            27.6|            1.7| 17.9|              37.8|  7.5| 62.7|
|2016|          4| AFGHANISTAN|      Colorado|                   3|               39.7|          5912.8|             9.6|    

In [95]:
"""
 Write fact table to parquet
"""
immigration_to_states.write.parquet("immigration_to_states")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [96]:
"""
 Perform quality checks here
 Check for NULL values in year, month, origin_country, to_immig_state
"""
immigration_to_states.select(isnull('year').alias('year'),\
                             isnull('immig_month').alias('month'),\
                             isnull('immig_origin').alias('country'),\
                             isnull('to_immig_state').alias('state')).dropDuplicates().show()

+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



In [97]:
"""
 Counting Total number of emigrants to US from the fact table .
"""
immigration_to_states.select(sum('to_immig_state_count').alias('fact_table_count')).show()

+----------------+
|fact_table_count|
+----------------+
|         2780777|
+----------------+



In [98]:
"""
 Count the total number of immigrants from the source data
"""
spark.sql('SELECT COUNT(*) FROM immigration').show()

+--------+
|count(1)|
+--------+
| 2783521|
+--------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

# **Data Dictionary Dimension Tables**
## Airport Data by State
 * country: string (nullable = true)-United States
 * state: string (nullable = true)-State of the U.S.
 * avg_elevation_ft: double (nullable = true)-Average elevation based on all airport data from each state.
 
## U.S. Demographic by State
 * State: string (nullable = true)-Full state name
 * state_code: string (nullable = true)-Abbreviated state code
 * median_age: double (nullable = true)-Median Age per state
 * pct_male_pop: double (nullable = true)- % Avg Male population per state
 * pct_female_pop: double (nullable = true)-% Avg Female population per state
 * pct_veterans: double (nullable = true)-% Avg Veteran population per state
 * pct_foreign_born: double (nullable = true)-% Avg Foreign-Born population per state
 * native_american: double (nullable = true)-% Avg Native American population per state
 * Asian: double (nullable = true)-% Avg Asian population per state
 * hispanic_or_latino: double (nullable = true)% Avg Hispanic or Latino population per state
 * Black: double (nullable = true)-% Avg Black population per state
 * White: double (nullable = true)-% Avg White population per state
 
## Immigration Data by State with Origin
 * cicid: double (nullable = true)-ID number of each individual
 * year: integer (nullable = true)-Year of Immigration
 * month: integer (nullable = true)-Month of Immigration
 * origin_country: string (nullable = true)-Country of Origin
 * i94port: string (nullable = true)-City Port Code where Immigrant entered
 * city_port_name: string (nullable = true)-City Port Name
 * state_code: string (nullable = true)-Abbreviated State code
 * dest_state_name: string (nullable = true)-State Name

## Temperature Data by State
 * year: integer (nullable = true)-Temperature Year
 * month: integer (nullable = true)-Temerpature Month
 * avg_temp_celcius: double (nullable = true)-Avg Temperature in Celcius per State
 * avg_temp_fahrenheit: double (nullable = true)-Avg Temperatrue in Fahrenheit
 * state_abbrev: string (nullable = true)-Abbreviated State Code
 * State: string (nullable = true)-State Name
 * Country: string (nullable = true)-United States

# Fact Table
 * year: integer (nullable = true)-Year from immigration table
 * immig_month: integer (nullable = true)-Month from immigration table
 * immig_origin: string (nullable = true)-Country of Origin from immigration table
 * to_immig_state: string (nullable = true)-State immigrated to from immigration table
 * to_immig_state_count: long (nullable = false)-Total count of people immigrated per state from immigration table
 * avg_temp_fahrenheit: double (nullable = true)-Avg temperature per state from Temperature table
 * avg_elevation_ft: double (nullable = true)-Avg elevation per state from Airport table
 * pct_foreign_born: double (nullable = true)-Avg % foreign born from Demographic table
 * native_american: double (nullable = true)-Avg % Native American opulation from Demographic table
 * Asian: double (nullable = true)-Avg % Asian population from Demographic table
 * hispanic_or_latino: double (nullable = true)-% Avg Hispanic or Latino population per state from Demographic table
 * Black: double (nullable = true)-% Avg Black population per state from Demographic table
 * White: double (nullable = true)-% Avg White population per state from Demographic table

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. I use Apache Spark to get data ready for analytic process. 

2. The data should be updated quarterly. 

3. Under the following scenarios, I would approach the problem differently:

* I would use Apache spark under HDFS to parelarize the data processing tasks.
* And for orchestration I sugest apache airflow. 
* For user demand as the project get bigger, I sugest a cloud infrastructure(IBM, AWS, GCP or Azure) to  handle the user demand.

