# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--


This project will be working with immigration data and using other sources to make analyses and find relationships to immigration traffic to the differnet parts of the US. We will use the sources to transform the data into a star schema.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

I will plan to create tables that relate to the demographics, temperature, and elevation of the state resulting after cleaning, transforming, and parqueting the files. These tables along with the country, state, and visa codes will become the dimension tables. The immigration table created from the I94 immigration data will be the fact table. I used Apache spark to handle large data and easy transoformations.  

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

1. U.S. City Demographic Data: comes from OpenSoft containing details of demographics in each U.S. city.
2. World Temperature Data: comes from Kaggle containing temperature history since 1850. 
3. Airport Code Table: comes from datahub.io containing details of airports
4. I94 Immigration Data: comes from U.S. National Tourism and Trade Office containing details of immigrants coming into U.S. 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [1]:
import pandas as pd
pd.set_option('display.max_columns', None)
import boto3
import pandas as pd
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, year, month, dayofmonth, round, regexp_replace
from pyspark.sql.types import IntegerType, FloatType
from data_dictionary import i94_state_codes, i94_visa_codes, i94_country_codes

In [2]:
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

## Demographics Data

#### Cleaning Steps
1. Convert columns that need to be aggregated into integer type
2. Create race dataframe
3. Create population dataframe
4. Join dataframes together by state code
5. Drop duplicates
5. Rename columns to be able to parquet

In [3]:
df_demographics = spark.read.csv("us-cities-demographics.csv", sep=';', header=True)

In [4]:
df_demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [5]:
df_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [6]:
# Convert speicficed columns to integer type for summing capaibility
df_demographics = df_demographics.withColumn("Count", col("Count").cast(IntegerType())) \
.withColumn("Total Population", col("Total Population").cast(IntegerType())) \
.withColumn("Male Population", col("Male Population").cast(IntegerType())) \
.withColumn("Female Population", col("Female Population").cast(IntegerType())) \
.withColumn("Foreign-born", col("Foreign-born").cast(IntegerType()))

In [7]:
# Create dataframe containing population sums
df_population_sum = df_demographics.groupBy("State Code").sum()

In [8]:
df_population_sum.show(5)

+----------+--------------------+----------------------+---------------------+-----------------+----------+
|State Code|sum(Male Population)|sum(Female Population)|sum(Total Population)|sum(Foreign-born)|sum(Count)|
+----------+--------------------+----------------------+---------------------+-----------------+----------+
|        AZ|            11137275|              11360435|             22497710|          3411565|   5754881|
|        SC|             1265291|               1321685|              2586976|           134019|    565751|
|        LA|             3134990|               3367985|              6502975|           417095|   1391090|
|        MN|             3478803|               3565362|              7044165|          1069888|   1546985|
|        NJ|             3423033|               3507991|              6931024|          2327750|   1795916|
+----------+--------------------+----------------------+---------------------+-----------------+----------+
only showing top 5 rows



In [9]:
# Show distinct races
df_demographics.select('Race').distinct().show()

+--------------------+
|                Race|
+--------------------+
|Black or African-...|
|  Hispanic or Latino|
|               White|
|               Asian|
|American Indian a...|
+--------------------+



In [10]:
# Create dataframe by pivoting  race column and tranform rows values to individual columns
# Sum the races with the same state code
df_race = df_demographics.groupBy("State Code").pivot("Race").sum("Count").orderBy("State Code")

In [11]:
df_race.show(5)

+----------+---------------------------------+-------+-------------------------+------------------+--------+
|State Code|American Indian and Alaska Native|  Asian|Black or African-American|Hispanic or Latino|   White|
+----------+---------------------------------+-------+-------------------------+------------------+--------+
|        AK|                            36339|  36825|                    23107|             27261|  212696|
|        AL|                             8084|  28769|                   521068|             39313|  498920|
|        AR|                             9381|  22062|                   149608|             77813|  384733|
|        AZ|                           129708| 229183|                   296222|           1508157| 3591611|
|        CA|                           401386|4543730|                  2047009|           9856464|14905129|
+----------+---------------------------------+-------+-------------------------+------------------+--------+
only showing top 5 

In [12]:
# Join race and population dataframes to create final US demographics by state dataframe
# Drop duplicates
df_us_demographics = df_population_sum.join(df_race, "State Code").dropDuplicates()

In [13]:
df_us_demographics.show(5)

+----------+--------------------+----------------------+---------------------+-----------------+----------+---------------------------------+------+-------------------------+------------------+-------+
|State Code|sum(Male Population)|sum(Female Population)|sum(Total Population)|sum(Foreign-born)|sum(Count)|American Indian and Alaska Native| Asian|Black or African-American|Hispanic or Latino|  White|
+----------+--------------------+----------------------+---------------------+-----------------+----------+---------------------------------+------+-------------------------+------------------+-------+
|        AZ|            11137275|              11360435|             22497710|          3411565|   5754881|                           129708|229183|                   296222|           1508157|3591611|
|        SC|             1265291|               1321685|              2586976|           134019|    565751|                             3705| 13355|                   175064|             29863

In [14]:
# Fix column names for correct format for parquet files
df_us_demographics = df_us_demographics.select(col('State Code').alias('state_code'),
                                               col('sum(Male Population)').alias('male_population'),
                                               col('sum(Female Population)').alias('female_population'), 
                                               col('sum(Total Population)').alias('total_population'),
                                               col('sum(Foreign-born)').alias('foreign_born'),
                                               col('American Indian and Alaska Native').alias('american_indian_or_alaska_native'),
                                               col('Asian').alias('asian'),
                                               col('Black or African-American').alias('black_or_african_american'),
                                               col('Hispanic or Latino').alias('hispanic_or_latino'),
                                               col('White').alias('white')
                                            )

In [15]:
df_us_demographics.show(5)

+----------+---------------+-----------------+----------------+------------+--------------------------------+------+-------------------------+------------------+-------+
|state_code|male_population|female_population|total_population|foreign_born|american_indian_or_alaska_native| asian|black_or_african_american|hispanic_or_latino|  white|
+----------+---------------+-----------------+----------------+------------+--------------------------------+------+-------------------------+------------------+-------+
|        AZ|       11137275|         11360435|        22497710|     3411565|                          129708|229183|                   296222|           1508157|3591611|
|        SC|        1265291|          1321685|         2586976|      134019|                            3705| 13355|                   175064|             29863| 343764|
|        LA|        3134990|          3367985|         6502975|      417095|                            8263| 38739|                   602377|        

In [16]:
# Parquet demographics
df_us_demographics.write.mode('overwrite').parquet("us_demographics_by_state")

## Temperature Data 

#### Cleaning Steps
1. Filter by US only
2. Break down datetime to year, month
3. Create udf to convert state names to state codes
4. Replace Georgia(State) to Georgia
5. Convert Celcius to Fahrenheit
6. Filter by year 2013, the most recent data
7. Drop duplicates
8. Rename columns to be able to parquet

In [17]:
df_temperature = spark.read.csv("GlobalLandTemperaturesByState.csv", header=True)

In [18]:
df_temperature.show(5)

+----------+------------------+-----------------------------+-----+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+----------+------------------+-----------------------------+-----+-------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|
+----------+------------------+-----------------------------+-----+-------+
only showing top 5 rows



In [19]:
# Filter the temperature by US only  
df_us_temperature = df_temperature.filter(df_temperature["Country"] == "United States")

In [20]:
df_us_temperature.select('State').distinct().show(55)

+--------------------+
|               State|
+--------------------+
|                Utah|
|              Hawaii|
|           Minnesota|
|                Ohio|
|            Arkansas|
|              Oregon|
|District Of Columbia|
|     Georgia (State)|
|               Texas|
|        North Dakota|
|        Pennsylvania|
|         Connecticut|
|            Nebraska|
|             Vermont|
|              Nevada|
|          Washington|
|            Illinois|
|            Oklahoma|
|            Delaware|
|              Alaska|
|          New Mexico|
|       West Virginia|
|            Missouri|
|        Rhode Island|
|             Montana|
|            Michigan|
|            Virginia|
|      North Carolina|
|             Wyoming|
|              Kansas|
|          New Jersey|
|            Maryland|
|             Alabama|
|             Arizona|
|                Iowa|
|       Massachusetts|
|            Kentucky|
|           Louisiana|
|         Mississippi|
|       New Hampshire|
|          

In [21]:
# Reverse key value in i94addr_codes where key = full name and value = abbrev
state_code_dict = dict((v, k) for k, v in i94_state_codes.items())

In [22]:
# Remove (State) from Georgia 
df_us_temperature = df_us_temperature.withColumn('State', regexp_replace('State', 'Georgia \(State\)', 'Georgia'))

In [23]:
 # Define full name conversion to abbreviation udf 
state_abbrev_udf = udf(lambda x: state_code_dict[x])

In [24]:
# Break down datetime to year, month
# Convert state names to state codes
# Convert Celcius to Fahrenheit
df_us_temperature = df_us_temperature.withColumn("year", year(df_us_temperature["dt"])) \
.withColumn("month", month(df_us_temperature["dt"])) \
.withColumn("state_code", state_abbrev_udf(df_us_temperature["State"])) \
.withColumn("average_temperature_fahrenheit",round(df_us_temperature["AverageTemperature"]*(9/5)+32,1))

In [25]:
df_us_temperature.show(5)

+----------+------------------+-----------------------------+-------+-------------+----+-----+----------+------------------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|  State|      Country|year|month|state_code|average_temperature_fahrenheit|
+----------+------------------+-----------------------------+-------+-------------+----+-----+----------+------------------------------+
|1743-11-01|10.722000000000001|                        2.898|Alabama|United States|1743|   11|        AL|                          51.3|
|1743-12-01|              null|                         null|Alabama|United States|1743|   12|        AL|                          null|
|1744-01-01|              null|                         null|Alabama|United States|1744|    1|        AL|                          null|
|1744-02-01|              null|                         null|Alabama|United States|1744|    2|        AL|                          null|
|1744-03-01|              null|          

In [26]:
# Filter by year 2013, the most recent data
df_us_temperature_2013 = df_us_temperature.filter(year(df_us_temperature["dt"]) == 2013)

In [27]:
df_us_temperature_2013.show(5)

+----------+------------------+-----------------------------+-------+-------------+----+-----+----------+------------------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|  State|      Country|year|month|state_code|average_temperature_fahrenheit|
+----------+------------------+-----------------------------+-------+-------------+----+-----+----------+------------------------------+
|2013-01-01|            10.284|                        0.241|Alabama|United States|2013|    1|        AL|                          50.5|
|2013-02-01|             9.161|                        0.213|Alabama|United States|2013|    2|        AL|                          48.5|
|2013-03-01|            10.226|                        0.158|Alabama|United States|2013|    3|        AL|                          50.4|
|2013-04-01|            17.067|                        0.221|Alabama|United States|2013|    4|        AL|                          62.7|
|2013-05-01|            20.619|          

In [28]:
# Drop duplicates
# Pick columns and format for parquet
df_us_temperature_2013 = df_us_temperature_2013.select("year", "month", "state_code", "average_temperature_fahrenheit").dropDuplicates()

In [29]:
df_us_temperature_2013.show(5)

+----+-----+----------+------------------------------+
|year|month|state_code|average_temperature_fahrenheit|
+----+-----+----------+------------------------------+
|2013|    3|        FL|                          60.0|
|2013|    6|        KY|                          74.0|
|2013|    5|        ID|                          51.1|
|2013|    6|        VA|                          73.0|
|2013|    9|        WI|                          61.8|
+----+-----+----------+------------------------------+
only showing top 5 rows



In [30]:
df_us_temperature_2013.write.mode('overwrite').parquet("us_temperature_2013")

df_airport_codes = pd.read_csv('airport-codes_csv.csv')

## Airport Data

#### Cleaning steps
1. Filter iso_region code by US only
2. Convert airport codes to find the average elevation of each state
3. Remove "US-U-A" from "state_code"
3. Create udf to remove "US-" from iso_region to transform to "state_code"
5. Convert elevation type to float
6. Group by "state_code", finding averages of elevation
7. Parquet file

In [31]:
df_airport_codes = spark.read.csv("airport-codes_csv.csv", header=True)

In [32]:
df_airport_codes.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [33]:
# Filter iso_region code by US only
df_airport_codes = df_airport_codes.filter(df_airport_codes["iso_region"].contains("US-"))

In [34]:
df_airport_codes.select('iso_region').distinct().orderBy("iso_region").show(55)

+----------+
|iso_region|
+----------+
|     US-AK|
|     US-AL|
|     US-AR|
|     US-AZ|
|     US-CA|
|     US-CO|
|     US-CT|
|     US-DC|
|     US-DE|
|     US-FL|
|     US-GA|
|     US-HI|
|     US-IA|
|     US-ID|
|     US-IL|
|     US-IN|
|     US-KS|
|     US-KY|
|     US-LA|
|     US-MA|
|     US-MD|
|     US-ME|
|     US-MI|
|     US-MN|
|     US-MO|
|     US-MS|
|     US-MT|
|     US-NC|
|     US-ND|
|     US-NE|
|     US-NH|
|     US-NJ|
|     US-NM|
|     US-NV|
|     US-NY|
|     US-OH|
|     US-OK|
|     US-OR|
|     US-PA|
|     US-RI|
|     US-SC|
|     US-SD|
|     US-TN|
|     US-TX|
|    US-U-A|
|     US-UT|
|     US-VA|
|     US-VT|
|     US-WA|
|     US-WI|
|     US-WV|
|     US-WY|
+----------+



In [35]:
# Remove iso region 'US-U-A'
df_airport_codes = df_airport_codes.filter(df_airport_codes["iso_region"] != "US-U-A")

In [36]:
# Create udf to remove "US-" from iso_region to transform to "state_code"
get_state = udf(lambda x: x.replace("US-",""))

In [37]:
# Convert elevation type to float
df_airport_codes = df_airport_codes.withColumn("elevation_ft", col("elevation_ft").cast(FloatType()))\
.withColumn("iso_region", get_state(col("iso_region")))

In [38]:
# Average elevation
df_us_elevations = df_airport_codes.groupBy("iso_region").avg("elevation_ft").orderBy("iso_region")

In [39]:
# Round values two decimal places
df_us_elevations = df_us_elevations.select(col("iso_region").alias("state_code"),\
                                        round(col("avg(elevation_ft)"),2).alias("avg_elevation_ft"))

In [40]:
df_us_elevations.show()

+----------+----------------+
|state_code|avg_elevation_ft|
+----------+----------------+
|        AK|          447.09|
|        AL|          437.11|
|        AR|          517.48|
|        AZ|         2710.69|
|        CA|         1043.41|
|        CO|         6200.13|
|        CT|          329.78|
|        DC|          176.38|
|        DE|           83.36|
|        FL|            70.2|
|        GA|          675.69|
|        HI|          668.85|
|        IA|         1025.26|
|        ID|         3750.29|
|        IL|          677.32|
|        IN|          759.16|
|        KS|         1565.94|
|        KY|          763.11|
|        LA|           66.73|
|        MA|           255.2|
+----------+----------------+
only showing top 20 rows



In [41]:
df_us_elevations.write.mode('overwrite').parquet("us_elevations_by_state")

## Immigration Data

#### Cleaning Data
1. Drop Duplicates in original immigration dataframe
2. Select columns for immigration dataframe
3. Create dataframes state_codes, visa_codes, and residence_codes based on i94 sas labels 
4. Parquet all files

In [42]:
df_immigration = spark.read.parquet("sas_data")

In [43]:
df_immigration.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [44]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [45]:
df_immigration.count()

3096313

In [46]:
# Drop duplicates
df_immigration = df_immigration.dropDuplicates()

In [47]:
df_immigration.count()

3096313

In [48]:
# Select columns for dataframe 
df_immigration = df_immigration.select(col("i94yr").alias("i94_year"),
                                       col("i94mon").alias("i94_month"),
                                       col("i94res").alias("i94_residence"),
                                       col("i94addr").alias("i94_address"),
                                       col("i94visa").alias("i94_visa"),
                                       "gender",
                                       col("biryear").alias("birth_year")
                                      )

In [49]:
df_immigration.show(5)

+--------+---------+-------------+-----------+--------+------+----------+
|i94_year|i94_month|i94_residence|i94_address|i94_visa|gender|birth_year|
+--------+---------+-------------+-----------+--------+------+----------+
|  2016.0|      4.0|        103.0|       null|     2.0|     F|    1991.0|
|  2016.0|      4.0|        104.0|         NY|     2.0|     F|    2000.0|
|  2016.0|      4.0|        104.0|         FL|     2.0|     M|    1959.0|
|  2016.0|      4.0|        104.0|         NY|     2.0|  null|    1965.0|
|  2016.0|      4.0|        104.0|         FL|     2.0|  null|    2013.0|
+--------+---------+-------------+-----------+--------+------+----------+
only showing top 5 rows



In [50]:
# Create state code abbreviations dataframe
i94_state_codes_list = list(map(list, i94_state_codes.items()))
df_i94_state_codes = spark.createDataFrame(i94_state_codes_list, ["state_code", "state_name"])

In [51]:
df_i94_state_codes.show(5)

+----------+----------+
|state_code|state_name|
+----------+----------+
|        AL|   Alabama|
|        AK|    Alaska|
|        AZ|   Arizona|
|        AR|  Arkansas|
|        CA|California|
+----------+----------+
only showing top 5 rows



In [52]:
# Create country code abbreviations dataframe
i94_country_codes_list = list(map(list, i94_country_codes.items()))
df_i94_country_codes = spark.createDataFrame(i94_country_codes_list, ["country_code", "country_name"])

In [53]:
df_i94_country_codes.show(5)

+------------+--------------------+
|country_code|        country_name|
+------------+--------------------+
|         582|MEXICO Air Sea, a...|
|         236|         AFGHANISTAN|
|         101|             ALBANIA|
|         316|             ALGERIA|
|         102|             ANDORRA|
+------------+--------------------+
only showing top 5 rows



In [54]:
# Create visa code abbreviations dataframe
i94_visa_codes_list = list(map(list, i94_visa_codes.items()))
df_i94_visa_codes = spark.createDataFrame(i94_visa_codes_list, ["visa_code", "visa_type"])

In [55]:
df_i94_visa_codes.show()

+---------+---------+
|visa_code|visa_type|
+---------+---------+
|        1| Business|
|        2| Pleasure|
|        3|  Student|
+---------+---------+



In [56]:
# Parquet all immigration, state, country, and visa dataframes 
df_immigration.write.mode('overwrite').partitionBy("i94_year", "i94_month").parquet("immigration")
df_i94_state_codes.write.mode('overwrite').parquet("state_codes")
df_i94_country_codes.write.mode('overwrite').parquet("country_codes")
df_i94_visa_codes.write.mode('overwrite').parquet("visa_codes")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The data model is a star schema because of its flexibility of querying whatever data we want and for easier analysis.


1. Dimension Tables
 - demographics
     - state_code
     - male_population
     - female_population
     - total_population
     - foreign_born
     - american_indian_or_alaska_native
     - asian, black_or_african_american
     - hispanic_or_latino
     - white
 - temperature
     - year
     - month
     - state_code
     - average_temperature_fahrenheit
 - elevation
     - state_code
     - avg_elevation_ft
 - state
     - state_code
     - state_name
 - country
     - country_code
     - country_name
 - visa
     - visa_code
     - visa_type


2. Fact Table
 - immigration 
     - i94_year
     - i94_month
     - i94_residence (Immigrant's nationality)
     - i94_address (The state that the immigrant is visiting)
     - i94_visa
     - gender
     - birth_year
     

     
#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Create the spark dataframes for the dimension tables
2. Create fact table that joins the dimension tables 
3. Parquet the tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

- Data Pipelines can be run with etl.py

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [61]:
# Perform quality checks here

#Count Test 
def row_count(df, name):
    """
    Checks the row count of a dataframe.
    
    Keyword arguments:
    df -- spark dataframe
    name -- string
    """
    count= df.count()
    if count > 0 :
        print("Passed. There are {} rows in {}. ".format(count, name))
    else:
        print("Failed. There are 0 rows in {}.".format(name))

row_count(df_us_demographics, "demographics")
row_count(df_us_elevations, "elevation")
row_count(df_us_temperature_2013, "temperature")
row_count(df_immigration, "immigration")
row_count(df_i94_state_codes, "state")
row_count(df_i94_country_codes, "country")
row_count(df_i94_visa_codes, "visa")



Passed. There are 49 rows in demographics. 
Passed. There are 51 rows in elevation. 
Passed. There are 459 rows in temperature. 
Passed. There are 3096313 rows in immigration. 
Passed. There are 52 rows in state. 
Passed. There are 289 rows in country. 
Passed. There are 3 rows in visa. 


In [None]:
# Integrity Test between dimension and fact table

In [64]:
df_immigration.join(df_i94_state_codes,df_immigration.i94_address == df_i94_state_codes.state_code).drop(df_i94_state_codes.state_code).show(5)

+--------+---------+-------------+-----------+--------+------+----------+----------+
|i94_year|i94_month|i94_residence|i94_address|i94_visa|gender|birth_year|state_name|
+--------+---------+-------------+-----------+--------+------+----------+----------+
|  2016.0|      4.0|        111.0|         AL|     2.0|     F|    1961.0|   Alabama|
|  2016.0|      4.0|        687.0|         AL|     2.0|     M|    1964.0|   Alabama|
|  2016.0|      4.0|        112.0|         AL|     1.0|     M|    1969.0|   Alabama|
|  2016.0|      4.0|        245.0|         AL|     2.0|     F|    1969.0|   Alabama|
|  2016.0|      4.0|        213.0|         AL|     1.0|     M|    1962.0|   Alabama|
+--------+---------+-------------+-----------+--------+------+----------+----------+
only showing top 5 rows



In [65]:
df_immigration.join(df_us_demographics,df_immigration.i94_address == df_us_demographics.state_code).drop(df_us_demographics.state_code).show(5)

+--------+---------+-------------+-----------+--------+------+----------+---------------+-----------------+----------------+------------+--------------------------------+------+-------------------------+------------------+-------+
|i94_year|i94_month|i94_residence|i94_address|i94_visa|gender|birth_year|male_population|female_population|total_population|foreign_born|american_indian_or_alaska_native| asian|black_or_african_american|hispanic_or_latino|  white|
+--------+---------+-------------+-----------+--------+------+----------+---------------+-----------------+----------------+------------+--------------------------------+------+-------------------------+------------------+-------+
|  2016.0|      4.0|        129.0|         AZ|     2.0|     M|    1977.0|       11137275|         11360435|        22497710|     3411565|                          129708|229183|                   296222|           1508157|3591611|
|  2016.0|      4.0|        323.0|         AZ|     2.0|     M|    1960.0|   

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

 - Data dictionaries will be found in data_dicitonary.py


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. I chose to use Apache Spark because of the ability to clean,transform, and  create outputs that have large data  
2. The data should be updated monthly because I94 immigration data is updated monthly. We can perform more analysis and find more trends if data is updated monthly.
3. Approaching different scenarios:
- If data was increased by 100, then I would create an EMR cluster on AWS for handling even larger data. I can scale and and more nodes if needded.
- If the data needs to be populated daily, then I would implement Apache airflow to schedule piplines to run at 7am every day.
- If data needs to be accessed by 100+ people, then I would implement Amazon Redshift so that they database will be able to handle big data queries as well as queries made by multiple users.