# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project consists in develop a Fact Table with the data sources provided by the default project. The technologies involved were Pandas, Spark, Fbprohet and Matplotlib.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import itertools
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
import json
import statsmodels.api as sm
from fbprophet import Prophet
import matplotlib
matplotlib.rcParams['axes.labelsize'] = 14
matplotlib.rcParams['xtick.labelsize'] = 12
matplotlib.rcParams['ytick.labelsize'] = 12
matplotlib.rcParams['text.color'] = 'k'

ERROR:fbprophet:Importing plotly failed. Interactive plots will not work.


In [2]:
util_data = json.load(open('utils.json','r'))
us_state_code = util_data['us_state_code']
us_code_state = {v: k for k, v in us_state_code.items()}
city_codes = util_data['city_codes']
immigration_codes = util_data['immigration_codes']

In [3]:
# Write code here
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
sqlContext = SQLContext(spark)
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

state_codeUDF=udf(lambda x: us_state_code[x],StringType())
code_stateUDF=udf(lambda x: us_code_state[x],StringType())
city_codeUDF=udf(lambda x:city_codes[x],StringType())
country_codeUDF=udf(lambda x: immigration_codes[x],StringType())

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

This project will pull data from all sources and create fact and dimension tables to show movement of immigration.


#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?
* I94 Immigration Data: comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.
* World Temperature Data: comes from kaggle and includes data on temperature changes in the U.S. since 1850.
* U.S. City Demographic Data: comes from OpenSoft and includes data by city, state, age, population, veteran status and race.
* Airport Code Table: comes from datahub.io and includes airport codes and corresponding cities.

In [None]:
# Read in the data here
df_airport = pd.read_csv("airport-codes_csv.csv")
df_demographics = pd.read_csv("us-cities-demographics.csv",sep=';')
df_temperature = pd.read_csv("GlobalLandTemperaturesByState.csv")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Temperature Data by State

In [None]:
df_temperature.head()

In [None]:
# Performing cleaning tasks here
import datetime
df_temperature['dt'] = pd.to_datetime(df_temperature['dt'])
df_temperature['year'] = df_temperature['dt'].dt.year
df_temperature['month'] = df_temperature['dt'].dt.month
df_temperature['y'] = df_temperature['AverageTemperature']
df_temperature['ds'] = df_temperature['dt']

In [None]:
us_df_temperature = df_temperature[(df_temperature["Country"]=="United States")&(df_temperature['year'] > 1900)]

In [None]:
us_df_temperature['state_code'] = us_df_temperature.apply(lambda row: us_state_code[row["State"]],axis=1)
us_df_temperature = us_df_temperature[['dt','AverageTemperature','State','Country','year','month','state_code','y','ds']]


In [None]:
us_df_temperature.head(20)

In [None]:
alabama_temperature_df = us_df_temperature[(us_df_temperature["state_code"]=="AL")&(df_temperature['year'] > 2000)]

In [None]:
alabama_temperature_df.head()

In [None]:
alabama_temperature_df['ds'] = alabama_temperature_df['dt']
alabama_temperature_df['y'] = alabama_temperature_df['AverageTemperature']

In [None]:
alabama_temperature_df.head(20)

In [None]:
temperature_model = Prophet(interval_width=0.95)
temperature_model.fit(alabama_temperature_df)

In [None]:
temperature_forecast = temperature_model.make_future_dataframe(periods=50, freq='MS')
temperature_forecast = temperature_model.predict(temperature_forecast)

In [None]:
plt.figure(figsize=(18, 6))
temperature_model.plot(temperature_forecast, xlabel = 'Date', ylabel = 'AverageTemperature')
plt.title('Date AverageTemperature');

In [None]:
temperature_forecast

In [None]:
def get_temperature_prediction(df):
    temperature_model = Prophet(interval_width=0.95)
    temperature_model.fit(df)
    temperature_forecast = temperature_model.make_future_dataframe(periods=50, freq='MS')
    temperature_forecast = temperature_model.predict(temperature_forecast)
    temperature_forecast['year'] = temperature_forecast['ds'].dt.year
    temperature_forecast['month'] = temperature_forecast['ds'].dt.month
    temperature_forecast['AverageTemperature'] = temperature_forecast['yhat']
    return temperature_forecast[['year','month','AverageTemperature']]

In [None]:
us_temperature_pred_df = pd.DataFrame(columns=['year','month','AverageTemperature','state_code','state','country'])

In [None]:
for state_code,state in us_code_state.items():
    print(state_code,state)
    try:
        df = us_df_temperature[us_df_temperature["state_code"]==state_code].copy()
        df = get_temperature_prediction(df)
        df = df[df['year'] > 2010]
        df['state_code'] = state_code
        df['state'] = state
        df['country'] = 'United States'
        us_temperature_pred_df = us_temperature_pred_df.append(df,ignore_index=True)
    except Exception as e:
        print(e)
        continue

In [None]:
us_temperature_pred_df[us_temperature_pred_df['year'] == 2016].head(20)

In [None]:
us_temperature_pred_df.to_csv("us_temperature.csv",index=False)

### Immigration Data by State with Origin

In [None]:
months = ["jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec"]
fname = '../18-83510-I94-Data-2016/i94_{}16_sub.sas7bdat'

In [None]:
for month in months:
    filename = fname.format(month)
    print(filename)
    immigrationDF = spark.read.format('com.github.saurfang.sas.spark').load(filename)
    
    immigrationMonthDF=immigrationDF.filter(immigrationDF.i94addr.isNotNull())\
    .filter(immigrationDF.i94res.isNotNull())\
    .filter(col("i94addr").isin(list(us_code_state.keys())))\
    .filter(col("i94port").isin(list(city_codes.keys())))\
    .withColumn("i94res",col("i94res").cast("integer").cast("string"))\
    .withColumn("origin_country",country_codeUDF(col("i94res")))\
    .withColumn("State",code_stateUDF(col("i94addr")))\
    .withColumn("id",col("cicid").cast("integer"))\
    .withColumn("state_code",col("i94addr"))\
    .withColumn("city_code",col("i94port"))\
    .withColumn("year",col("i94yr").cast("integer"))\
    .withColumn("month",col("i94mon").cast("integer"))\
    .withColumn("city",city_codeUDF(col("i94port")))
                                  
    immigrationMonthDF.select('id','year','month','origin_country','city_code','city','state_code','State').write.mode('append').parquet('immigration_data')
    

### U.S. Demographic Data by State

In [None]:
df_demographics.head()

In [None]:
df_demographics.loc[df_demographics["Race"] == "American Indian and Alaska Native","Race"] = "Native"
df_demographics.loc[df_demographics["Race"] == "Black or African-American","Race"] = "Afroamerican"
df_demographics.loc[df_demographics["Race"] == "Hispanic or Latino","Race"] = "Latino"

In [None]:
us_demographics_avg_df = df_demographics.groupby(['State','State Code','Race'])['Median Age'].mean()
us_demographics_sum_df = df_demographics.groupby(['State','State Code','Race'])['Total Population','Count','Male Population','Female Population','Number of Veterans','Foreign-born'].sum()

In [None]:
us_demographics_df = pd.concat([us_demographics_sum_df,us_demographics_avg_df],axis=1)
us_demographics_df.reset_index(inplace=True)
us_demographics_df["state_code"] = us_demographics_df["State Code"]
us_demographics_df["median_age"] = us_demographics_df["Median Age"]

In [None]:
for state_code in us_code_state:
    print(state_code)
    df = us_demographics_df.loc[us_demographics_df['state_code'] == state_code]
    total_population = df['Total Population'].max()
    male_population = df['Male Population'].max()
    female_population = df['Female Population'].max()
    veterans_population = df['Number of Veterans'].max()
    foreign_population = df['Foreign-born'].max()
    median_age = df['median_age'].max()
    
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Total Population'] = total_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Male Population'] = male_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Female Population'] = female_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Number of Veterans'] = veterans_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'Foreign-born'] = foreign_population
    us_demographics_df.loc[us_demographics_df['state_code'] == state_code,'median_age'] = median_age

In [None]:
us_demographics_df["percentage_male"] = us_demographics_df.apply(lambda row: float(row["Male Population"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_female"] = us_demographics_df.apply(lambda row: float(row["Female Population"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_veterans"] = us_demographics_df.apply(lambda row: float(row["Number of Veterans"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_foreign_born"] = us_demographics_df.apply(lambda row: float(row["Foreign-born"]/row["Total Population"])*100.0,axis=1)
us_demographics_df["percentage_race"] = us_demographics_df.apply(lambda row: float(row["Count"]/row["Total Population"])*100.0,axis=1)


In [None]:
us_df_demographics = pd.pivot_table(us_demographics_df,values='percentage_race',index=["State","state_code","median_age","percentage_male","percentage_female","percentage_veterans","percentage_foreign_born"],columns=["Race"], aggfunc=np.mean, fill_value=0)
us_df_demographics = pd.DataFrame(us_df_demographics.to_records())

In [None]:
us_df_demographics

In [None]:
us_df_demographics.to_csv("us_demographics.csv",index=False)

### U.S. Airport Data by State 

In [None]:
df_airport.head()

In [None]:
us_df_airport = df_airport[df_airport["iso_country"]=="US"]
us_df_airport = us_df_airport[(us_df_airport["type"]=="small_airport")|(us_df_airport["type"]=="medium_airport")|(us_df_airport["type"]=="large_airport")]

In [None]:
us_df_airport["elevation_ft"] = us_df_airport.apply(lambda row: float(row["elevation_ft"]),axis=1)
us_df_airport["state_code"] = us_df_airport.apply(lambda row: row["iso_region"].split("-")[-1],axis=1)
us_df_airport["x_coordinate"] = us_df_airport.apply(lambda row: float(row["coordinates"].split(",")[0]),axis=1)
us_df_airport["y_coordinate"] = us_df_airport.apply(lambda row: float(row["coordinates"].split(",")[-1]),axis=1)

In [None]:
us_df_airport["country"] = us_df_airport["iso_country"]
us_df_airport["city_code"] = us_df_airport["local_code"]
us_df_airport = us_df_airport[["ident","type","name","elevation_ft","country","state_code","city_code","municipality","x_coordinate","y_coordinate"]]

In [None]:
us_df_airport

In [None]:
us_df_airport.to_csv("us_airports.csv",index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [6]:
temperatureDF = spark.read.csv("us_temperature.csv",header=True)
demographicsDF = spark.read.csv("us_demographics.csv",header=True)
airportsDF = spark.read.csv("us_airports.csv",header=True)
immigrationDF = spark.read.parquet("immigration_data")

In [7]:
immigrationDF = immigrationDF.filter(col("month")<lit(7))

In [8]:
# Create Dimension tables
temperatureDF.createOrReplaceTempView("temperature")
immigrationDF.createOrReplaceTempView("immigration")
demographicsDF.createOrReplaceTempView("demographics")
airportsDF.createOrReplaceTempView("airports")

In [9]:
immigrationDF.count()

16895557

In [10]:
fact_table = spark.sql("""
    SELECT 
        immig.year,
        immig.month,
        immig.origin_country,
        immig.State,
        immig.state_code,
        COUNT(immig.state_code) as number_immigrants,
        temp.AverageTemperature as avg_temperature,
        demo.median_age,
        demo.percentage_male,
        demo.percentage_female,
        demo.percentage_veterans,
        demo.percentage_foreign_born,
        demo.Afroamerican as percentage_afroamerican,
        demo.Asian as percentage_asian,
        demo.Latino as percentage_latino,
        demo.Native as percentage_native,
        demo.White as percentage_white,
        air.name as airport_name,
        air.x_coordinate,
        air.y_coordinate
    FROM immigration immig
    JOIN temperature temp ON immig.state_code=temp.state_code AND immig.year=temp.year AND immig.month=temp.month
    JOIN demographics demo ON demo.state_code=immig.state_code
    JOIN airports air ON air.state_code=immig.state_code
    WHERE air.type='large_airport'
    GROUP BY 1,2,3,4,5,7,8,9,10,11,12,13,14,15,16,17,18,19,20
    ORDER BY 1,2,3,4
""").cache()

In [11]:
fact_table.count()

167513

In [12]:
fact_table.show()

+----+-----+--------------+----------+----------+-----------------+------------------+-----------------+------------------+------------------+-------------------+-----------------------+-----------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+
|year|month|origin_country|     State|state_code|number_immigrants|   avg_temperature|       median_age|   percentage_male| percentage_female|percentage_veterans|percentage_foreign_born|     Afroamerican|             Asian|            Latino|            Native|             White|        airport_name|       x_coordinate|      y_coordinate|
+----+-----+--------------+----------+----------+-----------------+------------------+-----------------+------------------+------------------+-------------------+-----------------------+-----------------+------------------+------------------+------------------+------------------+--------------------+-----------------

In [13]:
fact_table.write.mode('overwrite').parquet("fact_table")

# 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

In [14]:
fact_table.select(isnull('year').alias('year'),\
                             isnull('month').alias('month'),\
                             isnull('origin_country').alias('country'),\
                             isnull('State').alias('state')).dropDuplicates().show()

+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.