# Immigration Cases in the US
### Data Engineering Capstone Project

#### Project Summary
In this project, data from various sources will be utilized for further processing in order to enable fast and efficient access to them. The data includes information about immigration cases in the US. 


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import re

In [2]:
#Create spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? 

* In this project, data from the sources mentioned underneath is used to create a data pipeline and database that uses a star schema. Therefore the data will be analyzed to find data quality issues. Afterwards the data will be cleaned to create a fact table and multiple dimension tables then. In the end data quality checks are performed.

* The project uses the framework Spark to extract and transform the data, to create tables and write them to parquet files.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?

The following data sets are used in this project:
* [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office): The data set comes from the US National Tourism and Trade Office and includes information about immigrants in the United States.  
* [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data): The data set comes from Kaggle and includes data from the Berkeley Earth Surface Temperature Study. It includes historical data about the temperature in over 3000 cities worldwide.
* [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/table/): This data set comes from opendatasoft and consists of different information regarding the demographic data of various US cities. 
* [Airport Code Table](https://datahub.io/core/airport-codes#data): The data set comes from datahub.io and includes the IATA/ICAO airport codes from airports around the world. 
* Additionally the information from the I94 Immigration Label file is seperated into two csv files:
 * The country codes - get a country by the number mentioned in the immigration data
 * The airport codes that are used in the I94port field do not appear in the airport data, a mapping is given in this file 

##### Temperature data

In [3]:
# Read in the temperature data 
temperature_data = '../../data2/GlobalLandTemperaturesByCity.csv'

spark_temperature_data = spark.read.format("csv").option("header", "true").load(temperature_data)
spark_temperature_data.printSchema()


root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [4]:
spark_temperature_data.show(10)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

In [5]:
# Get row count
rows_temperature = spark_temperature_data.count()
print(f"Temperature Rows count : {rows_temperature}")

# Get columns count
cols_temperature = len(spark_temperature_data.columns)
print(f"Temperature Columns count : {cols_temperature}")

Temperature Rows count : 8599212
Temperature Columns count : 7


##### Immigration data

In [6]:
# Read in the immigration data 
immigration_data = 'immigration_data_sample.csv'

spark_immigration_data = spark.read.format("csv").option("header", "true").load(immigration_data)
spark_immigration_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [7]:
spark_immigration_data.show(10)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1

In [8]:
# Get row count
rows_immigration = spark_immigration_data.count()
print(f"Immigration Rows count : {rows_immigration}")

# Get columns count
cols_immigration = len(spark_immigration_data.columns)
print(f"Immigration Columns count : {cols_immigration}")

Immigration Rows count : 1000
Immigration Columns count : 29


##### Demographic data

In [9]:
# Read in the demographic data 
demographic_data = 'us-cities-demographics.csv'

spark_demographic_data = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(demographic_data)
spark_demographic_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [10]:
spark_demographic_data.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [11]:
# Get row count
rows_demo = spark_demographic_data.count()
print(f"Demographic Rows count : {rows_demo}")

# Get columns count
cols_demo = len(spark_demographic_data.columns)
print(f"Demographic Columns count : {cols_demo}")

Demographic Rows count : 2891
Demographic Columns count : 12


##### Airport data

In [12]:
# Read in the airport data 
#airport_data = 'airport-codes_csv.csv'
airport_data = 'airport-codes_json.json'

#spark_airport_data = spark.read.format("csv").option("header", "true").load(airport_data)
spark_airport_data = spark.read.format("json").option("header", "true").load(airport_data)

spark_airport_data.printSchema()

root
 |-- continent: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)



In [13]:
spark_airport_data.show(10)

+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+
|continent|         coordinates|elevation_ft|gps_code|iata_code|ident|iso_country|iso_region|local_code|municipality|                name|         type|
+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+
|       NA|-74.9336013793945...|          11|     00A|     null|  00A|         US|     US-PA|       00A|    Bensalem|   Total Rf Heliport|     heliport|
|       NA|-101.473911, 38.7...|        3435|    00AA|     null| 00AA|         US|     US-KS|      00AA|       Leoti|Aero B Ranch Airport|small_airport|
|       NA|-151.695999146, 5...|         450|    00AK|     null| 00AK|         US|     US-AK|      00AK|Anchor Point|        Lowell Field|small_airport|
|       NA|-86.7703018188476...|         820|    00AL|     null| 00AL|         US|

In [14]:
# Get row count
rows_airport = spark_airport_data.count()
print(f"Airport Rows count : {rows_airport}")

# Get columns count
cols_airport = len(spark_airport_data.columns)
print(f"Airport Columns count : {cols_airport}")

Airport Rows count : 57421
Airport Columns count : 12


##### Airport codes

In [15]:
# Read in the airport codes (from label file)
airport_codes = 'airport_codes.csv'

spark_airport_codes = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(airport_codes)
spark_airport_codes.printSchema()

root
 |-- airport_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)



In [16]:
spark_airport_codes.show(5)
spark_airport_codes.createOrReplaceTempView('airport_codes')

+------------+--------------------+---------------+
|airport_code|                city|     state_code|
+------------+--------------------+---------------+
|         ALC|               ALCAN|AK             |
|         ANC|           ANCHORAGE|    AK         |
|         BAR|BAKER AAF - BAKER...|             AK|
|         DAC|       DALTONS CACHE|        AK     |
|         PIZ|DEW STATION PT LA...|             AK|
+------------+--------------------+---------------+
only showing top 5 rows



##### Country codes

In [17]:
# Read in the country codes (from label file)
country_codes = 'country_codes.csv'

spark_country_codes = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(country_codes)
spark_country_codes.printSchema()
spark_country_codes=spark_country_codes.withColumn("Code", spark_country_codes.Code+".0" )

root
 |-- Code: string (nullable = true)
 |-- Country: string (nullable = true)



In [18]:
spark_country_codes.show(5)
spark_country_codes.createOrReplaceTempView('country_codes')


+-----+--------------------+
| Code|             Country|
+-----+--------------------+
|582.0|  MEXICO Air Sea,...|
|236.0|         AFGHANISTAN|
|101.0|             ALBANIA|
|316.0|             ALGERIA|
|102.0|             ANDORRA|
+-----+--------------------+
only showing top 5 rows



In [19]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data
* Check how many null/nan values are in the tables
 

In [20]:
spark_immigration_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_immigration_data.columns]).show()


+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|_c0|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|  0|    0|    0|     0|     0|     0|      0|      0|      0|     59|     49|     0|      0|    0|       0|     618|  996|      0|     46|   1000|     46|      0|      0|   141|   965|     33|     0|    8|       0|
+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-----

In [21]:
spark_temperature_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_temperature_data.columns]).show()


+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|            364130|                       364130|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+



In [22]:
spark_airport_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_airport_data.columns]).show()


+---------+-----------+------------+--------+---------+-----+-----------+----------+----------+------------+----+----+
|continent|coordinates|elevation_ft|gps_code|iata_code|ident|iso_country|iso_region|local_code|municipality|name|type|
+---------+-----------+------------+--------+---------+-----+-----------+----------+----------+------------+----+----+
|        0|          0|        7813|   15860|    48196|    0|          0|         0|     27391|        5894|   0|   0|
+---------+-----------+------------+--------+---------+-----+-----------+----------+----------+------------+----+----+



In [23]:
spark_demographic_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_demographic_data.columns]).show()


+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



##### Immigration data cleaning:
* Remove column 'entdepu', because it consists of just null values
* Remove column 'occup' and 'insnum', because they nearly consist of just null values
* Remove column i94cit, because it is a duplicate of the column i94res 
* Add n/a to columns where gender is null
* Replace the columns arrdate and depdate with actual dates 
* Create column residence with mapping to the actual country

In [24]:
#remove entdepu, occup, i94cit from immigration data (consists of just null or is duplicate)
clean_imgr_data = spark_immigration_data.drop("entdepu", "occup", "i94cit", "insnum")
clean_imgr_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [25]:
clean_imgr_data.show(5)

+-------+---------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null|      G|      O|      M| 1955.0|07202016|     F|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0|    MCA|20567.0|    1.0|     TX|20568.0|  26.0|    2.0|  1.0|20160423|     MTR|      G|      R|      M| 1990.0|10222016|     M|    *GA|94361

In [26]:
clean_imgr_data.createOrReplaceTempView("imgr_data")

In [27]:
#create new columns arrival_date and departure_date that consist of the actual dates 
clean_imgr_data_2 = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM imgr_data")
clean_imgr_data_2.createOrReplaceTempView("imgr_data")
clean_imgr_data_3 = spark.sql("SELECT *, date_add(to_date('1960-01-01'), depdate) AS departure_date FROM imgr_data")

clean_imgr_data_3.show(5)

+-------+---------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+------------+--------------+
|    _c0|    cicid| i94yr|i94mon|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|       admnum|fltno|visatype|arrival_date|departure_date|
+-------+---------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+------------+--------------+
|2027561|4084316.0|2016.0|   4.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null|      G|      O|      M| 1955.0|07202016|     F|     JL|56582674633.0|00782|      WT|  2016-04-22|    2016-04-29|
|2171295|4422636.0|2016.0|   4.0| 582.0|    MCA|20567.0|    1.0|    

In [28]:
#Remove null values from gender
clean_imgr_data_3= clean_imgr_data_3.na.fill(value='n/a',subset=["gender", "i94addr"])

In [29]:
clean_imgr_data_3.createOrReplaceTempView("imgr_data")

clean_imgr_data_4 = spark.sql("SELECT *  FROM imgr_data WHERE arrival_date IS NOT NULL AND departure_date IS NOT NULL")


In [30]:
#drop the columns arrdate and depdate
clean_imgr_data = clean_imgr_data_4.drop(col("arrdate"))

In [31]:
cl_imgr_data = clean_imgr_data.drop(col("depdate"))
cl_imgr_data.printSchema()
cl_imgr_data.createOrReplaceTempView("imgr_data")

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = false)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = false)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)



In [32]:
cl_imgr_data_final = spark.sql("""
SELECT i.*, cc.Country as residence
FROM imgr_data as i
JOIN country_codes as cc
ON i.i94res  = cc.Code  
""")

In [33]:
cl_imgr_data_final.show(10)

+-------+---------+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+------------+--------------+--------------------+
|    _c0|    cicid| i94yr|i94mon|i94res|i94port|i94mode|i94addr|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|       admnum|fltno|visatype|arrival_date|departure_date|           residence|
+-------+---------+------+------+------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+------------+--------------+--------------------+
|2027561|4084316.0|2016.0|   4.0| 209.0|    HHW|    1.0|     HI|  61.0|    2.0|  1.0|20160422|    null|      G|      O|      M| 1955.0|07202016|     F|     JL|56582674633.0|00782|      WT|  2016-04-22|    2016-04-29|               JAPAN|
|2171295|4422636.0|2016.0|   4.0| 582.0|    MCA|

In [34]:
cl_imgr_data_final.createOrReplaceTempView('imgr_data')

##### Temperature data cleaning:
* Remove all entries that are not in the US
* Remove entries where AverageTemperature is null

In [35]:
spark_temperature_data.createOrReplaceTempView('temp_data')

clean_temp_data = spark.sql("""
SELECT * 
FROM temp_data
WHERE Country = 'United States' AND AverageTemperature IS NOT NULL

""")

In [36]:
clean_temp_data.show(5)
clean_temp_data.createOrReplaceTempView('temp_data')

+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1820-01-01|2.1010000000000004|                        3.217|Abilene|United States|  32.95N|  100.53W|
|1820-02-01|             6.926|                        2.853|Abilene|United States|  32.95N|  100.53W|
|1820-03-01|            10.767|                        2.395|Abilene|United States|  32.95N|  100.53W|
|1820-04-01|17.988999999999994|                        2.202|Abilene|United States|  32.95N|  100.53W|
|1820-05-01|            21.809|                        2.036|Abilene|United States|  32.95N|  100.53W|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
only showing top 5 rows



##### Airport data cleaning:
* Remove all entries that are not in the US
* Split coordinates into longitude and latitude
* Split iso_region into country and state 
* Replace null values of elevation_ft with n/a

In [37]:
#Remove airports that are not in the US
spark_airport_data.createOrReplaceTempView('airport_data')

clean_airport_data = spark.sql("""
SELECT * 
FROM airport_data
WHERE iso_country = 'US' AND continent = 'NA' AND elevation_ft IS NOT NULL

""")

In [38]:
#Create columns for longitude and latitude of the coordinates
split_col = split(clean_airport_data['coordinates'], ',')
clean_airport_data = clean_airport_data.withColumn('longitude', split_col[0])
clean_airport_data = clean_airport_data.withColumn('latitude', split_col[1])

clean_airport_data.show(2)

+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+------------------+---------------+
|continent|         coordinates|elevation_ft|gps_code|iata_code|ident|iso_country|iso_region|local_code|municipality|                name|         type|         longitude|       latitude|
+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+------------------+---------------+
|       NA|-74.9336013793945...|          11|     00A|     null|  00A|         US|     US-PA|       00A|    Bensalem|   Total Rf Heliport|     heliport|-74.93360137939453| 40.07080078125|
|       NA|-101.473911, 38.7...|        3435|    00AA|     null| 00AA|         US|     US-KS|      00AA|       Leoti|Aero B Ranch Airport|small_airport|       -101.473911|      38.704022|
+---------+--------------------+------------+--------+------

In [39]:
#Create column with state code
split_col = split(clean_airport_data['iso_region'], '-')
clean_airport_data = clean_airport_data.withColumn('state_code', split_col[1])

clean_airport_data.show(2)

+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+------------------+---------------+----------+
|continent|         coordinates|elevation_ft|gps_code|iata_code|ident|iso_country|iso_region|local_code|municipality|                name|         type|         longitude|       latitude|state_code|
+---------+--------------------+------------+--------+---------+-----+-----------+----------+----------+------------+--------------------+-------------+------------------+---------------+----------+
|       NA|-74.9336013793945...|          11|     00A|     null|  00A|         US|     US-PA|       00A|    Bensalem|   Total Rf Heliport|     heliport|-74.93360137939453| 40.07080078125|        PA|
|       NA|-101.473911, 38.7...|        3435|    00AA|     null| 00AA|         US|     US-KS|      00AA|       Leoti|Aero B Ranch Airport|small_airport|       -101.473911|      38.704022|        KS|
+----

In [40]:
#Replace null values in elevation_ft
clean_airport_data.na.fill(value='n/a',subset=["elevation_ft"])

DataFrame[continent: string, coordinates: string, elevation_ft: string, gps_code: string, iata_code: string, ident: string, iso_country: string, iso_region: string, local_code: string, municipality: string, name: string, type: string, longitude: string, latitude: string, state_code: string]

In [41]:
clean_airport_data.createOrReplaceTempView('airport_data')

##### Demographic data cleaning:
* Compressing because race is not needed
* Replace null values in Foreign-born, Male Population and Female Population with n/a

In [42]:
spark_demographic_data.createOrReplaceTempView('demog_data')

In [43]:
#Removing the columns Race and Count
clean_demog_data = spark_demographic_data.drop("Race", "Count")

In [44]:
#Replace null values in foreign-born, male/female population
clean_demog_data=clean_demog_data.na.fill(value='n/a',subset=["Foreign-born", "Male Population", "Female Population"])

In [45]:
clean_demog_data.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|
|          Hoover|      Alabama|      38.5|          38040|            46799|           84839|              4819|        8229|                  2.58|        AL|
|Rancho Cucamonga|   California|  

In [46]:
clean_demog_data.count()

2891

In [47]:
#Dropping duplicates
clean_demog_data = clean_demog_data.dropDuplicates()

In [48]:
clean_demog_data.count()

596

In [49]:
clean_demog_data.createOrReplaceTempView('demog_data')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
* The data model will be a star schema. Therefore dimension tables are created that contain information. A fact table is created which stores facts that connect to the dimension tables. The star schema makes it easy to execute queries and make all the information accessible for multiple types of users.
* The fact table includes the field duration_of_stay, which returns the duration of the stay of the immigrant in the US in days.

Conceptual Data Model: 

![Conceptual Data Model](data_model.png)


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
* Loaded data into four staging tables
* Create dimension table airport 
* Create dimension table temperature 
* Create dimension table demographics 
* Create dimension table immigrant
* Create dimension table state
* Create dimension table visa
* Create fact table immigration_cases_fact
* Write all tables into parquet files
* Run data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create dimension table visa

In [50]:
#Create dimension table visa
dim_visa = spark.sql("""
SELECT DISTINCT i94visa as visa_type, 
    CASE
    WHEN i94visa= 1.0 THEN 'Business'
    WHEN i94visa= 2.0 THEN 'Pleasure'
    WHEN i94visa= 3.0 THEN 'Student' END as visa_description
FROM imgr_data 

""")

In [51]:
dim_visa.show(5)

+---------+----------------+
|visa_type|visa_description|
+---------+----------------+
|      3.0|         Student|
|      1.0|        Business|
|      2.0|        Pleasure|
+---------+----------------+



In [52]:
#Write to parquet files
dim_visa.write.mode("overwrite").parquet("dim_visa")

##### Create dimension table immigrant

In [53]:
#Create dimension table immigrant
dim_immigrant = spark.sql("""
SELECT DISTINCT cicid as cic_id, 
   gender, 
   biryear as birthyear, 
   i94bir as age
FROM imgr_data

""")

In [54]:
dim_immigrant.show(5)

+---------+------+---------+----+
|   cic_id|gender|birthyear| age|
+---------+------+---------+----+
|3426228.0|     F|   1998.0|18.0|
|2758683.0|     F|   1985.0|31.0|
|5471104.0|     M|   1939.0|77.0|
|3085571.0|     F|   1984.0|32.0|
| 211301.0|     F|   1973.0|43.0|
+---------+------+---------+----+
only showing top 5 rows



In [55]:
#Write to parquet files
dim_immigrant.write.mode("overwrite").parquet("dim_immigrant")

##### Create dimension table state

In [56]:
#Create dimension table state
dim_state = spark.sql("""
SELECT DISTINCT d.`State Code` as state_code, 
    d.State as state_name, 
    a.iso_country as country,
    a.continent as continent
FROM airport_data as a, demog_data as d
WHERE a.state_code = d.`State Code` 
""")

In [57]:
dim_state.show(20)

+----------+------------+-------+---------+
|state_code|  state_name|country|continent|
+----------+------------+-------+---------+
|        AK|      Alaska|     US|       NA|
|        MI|    Michigan|     US|       NA|
|        OH|        Ohio|     US|       NA|
|        TN|   Tennessee|     US|       NA|
|        UT|        Utah|     US|       NA|
|        CT| Connecticut|     US|       NA|
|        NM|  New Mexico|     US|       NA|
|        AR|    Arkansas|     US|       NA|
|        ND|North Dakota|     US|       NA|
|        NE|    Nebraska|     US|       NA|
|        NY|    New York|     US|       NA|
|        MS| Mississippi|     US|       NA|
|        ID|       Idaho|     US|       NA|
|        IN|     Indiana|     US|       NA|
|        IL|    Illinois|     US|       NA|
|        OK|    Oklahoma|     US|       NA|
|        GA|     Georgia|     US|       NA|
|        NV|      Nevada|     US|       NA|
|        IA|        Iowa|     US|       NA|
|        KS|      Kansas|     US

In [58]:
#Write to parquet files
dim_state.write.mode("overwrite").parquet("dim_state")

##### Create dimension table demographics

In [59]:
#Create dimension table demographics
dim_demographics = spark.sql("""
SELECT City as city, 
    State as state, 
    `Total Population` as total_population, 
    `Male Population` as male_population, 
    `Female Population` as female_population, 
    `Foreign-Born` as foreign_born, 
    `Median Age` as median_age
FROM demog_data 

""")

In [60]:
dim_demographics.show(20)

+---------------+--------------+----------------+---------------+-----------------+------------+----------+
|           city|         state|total_population|male_population|female_population|foreign_born|median_age|
+---------------+--------------+----------------+---------------+-----------------+------------+----------+
|   Johnson City|     Tennessee|           65369|          31019|            34350|        2878|      38.2|
|   San Clemente|    California|           65532|          34076|            31456|        8109|      45.2|
|   Redwood City|    California|           85300|          42676|            42624|       27652|      37.1|
|     Fort Myers|       Florida|           74015|          36850|            37165|       15365|      37.3|
|West Palm Beach|       Florida|          106782|          49262|            57520|       30675|      39.6|
|      San Diego|    California|         1394907|         693826|           701081|      373842|      34.5|
|         German|      Maryl

In [61]:
#Write to parquet files
dim_demographics.write.mode("overwrite").parquet("dim_demographics")

##### Create dimension table airport

In [62]:
#Create dimension table airport
dim_airport = spark.sql("""
SELECT a.ident as airport_code, 
    a.state_code as state_code,
    a.type as airport_type,
    a.elevation_ft,
    a.name as airport_name, 
    a.longitude, 
    a.latitude
FROM airport_data as a
WHERE a.type ='small_airport' OR a.type ='medium_airport' OR a.type ='large_airport'
SORT BY airport_code asc
""")

In [63]:
dim_airport.show(10)

+------------+----------+-------------+------------+--------------------+-------------------+-------------------+
|airport_code|state_code| airport_type|elevation_ft|        airport_name|          longitude|           latitude|
+------------+----------+-------------+------------+--------------------+-------------------+-------------------+
|        00AA|        KS|small_airport|        3435|Aero B Ranch Airport|        -101.473911|          38.704022|
|        00AK|        AK|small_airport|         450|        Lowell Field|     -151.695999146|        59.94919968|
|        00AL|        AL|small_airport|         820|        Epps Airpark| -86.77030181884766|  34.86479949951172|
|        00AS|        OK|small_airport|        1100|      Fulton Airport|        -97.8180194|         34.9428028|
|        00AZ|        AZ|small_airport|        3810|      Cordes Airport|-112.16500091552734| 34.305599212646484|
|        00CA|        CA|small_airport|        3038|Goldstone /Gts/ A...|     -116.88800

In [64]:
#Write to parquet files
dim_airport.write.mode("overwrite").parquet("dim_airport")

##### Create dimension table temperature

In [65]:
#Create dimension table temperature
dim_temperature = spark.sql("""
SELECT dt as datetime,
    City, 
    AverageTemperature as average_temperature, 
    AverageTemperatureUncertainty as average_temperature_uncertainty

FROM temp_data

""")

In [66]:
dim_temperature.show(10)

+----------+-------+-------------------+-------------------------------+
|  datetime|   City|average_temperature|average_temperature_uncertainty|
+----------+-------+-------------------+-------------------------------+
|1820-01-01|Abilene| 2.1010000000000004|                          3.217|
|1820-02-01|Abilene|              6.926|                          2.853|
|1820-03-01|Abilene|             10.767|                          2.395|
|1820-04-01|Abilene| 17.988999999999994|                          2.202|
|1820-05-01|Abilene|             21.809|                          2.036|
|1820-06-01|Abilene|             25.682|                          2.008|
|1820-07-01|Abilene|             26.268|             1.8019999999999998|
|1820-08-01|Abilene|             25.048|                          1.895|
|1820-09-01|Abilene|             22.435|             2.2159999999999997|
|1820-10-01|Abilene|              15.83|                          2.169|
+----------+-------+-------------------+-----------

In [67]:
#Write to parquet files
dim_temperature.write.mode("overwrite").parquet("dim_temperature")

##### Create fact table immigration_case

In [68]:
#Create fact table immigration_case
fact_immigration_case = spark.sql("""
SELECT cicid as cic_id,
    arrival_date, 
    i94addr as state_code,
    i94mode as immigration_mode, 
    departure_date, 
    i94port as airport_code,
    residence as residence, 
    i94visa as visa_code, 
    visatype as visa_type, 
    DATEDIFF(departure_date, arrival_date) as duration_of_stay

FROM imgr_data

""")

In [69]:
fact_immigration_case.show(10)

+---------+------------+----------+----------------+--------------+------------+--------------------+---------+---------+----------------+
|   cic_id|arrival_date|state_code|immigration_mode|departure_date|airport_code|           residence|visa_code|visa_type|duration_of_stay|
+---------+------------+----------+----------------+--------------+------------+--------------------+---------+---------+----------------+
|4084316.0|  2016-04-22|        HI|             1.0|    2016-04-29|         HHW|               JAPAN|      2.0|       WT|               7|
|4422636.0|  2016-04-23|        TX|             1.0|    2016-04-24|         MCA|  MEXICO Air Sea,...|      2.0|       B2|               1|
|1195600.0|  2016-04-07|        FL|             1.0|    2016-04-27|         OGG|             GERMANY|      2.0|       WT|              20|
|5291768.0|  2016-04-28|        CA|             1.0|    2016-05-07|         LOS|               QATAR|      2.0|       B2|               9|
| 985523.0|  2016-04-06|   

In [70]:
#Write to parquet files
fact_immigration_case.write.mode("overwrite").parquet("fact_immigration_case")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

 
Run Quality Checks
* Get the count of entries for each table 
* Check each column in each table for null values

In [71]:
# Perform quality checks here

##### Fact immigration table

In [72]:
fact_immigration_case.count()

951

In [73]:
fact_immigration_case.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in list(set(fact_immigration_case.columns) - {'arrival_date','departure_date'})]).show()

+---------+------+----------------+----------------+---------+----------+---------+------------+
|visa_type|cic_id|duration_of_stay|immigration_mode|residence|state_code|visa_code|airport_code|
+---------+------+----------------+----------------+---------+----------+---------+------------+
|        0|     0|               0|               0|        0|         0|        0|           0|
+---------+------+----------------+----------------+---------+----------+---------+------------+



##### Dimension temperature table

In [74]:
dim_temperature.count()

661524

In [75]:
dim_temperature.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dim_temperature.columns]).show()


+--------+----+-------------------+-------------------------------+
|datetime|City|average_temperature|average_temperature_uncertainty|
+--------+----+-------------------+-------------------------------+
|       0|   0|                  0|                              0|
+--------+----+-------------------+-------------------------------+



##### Dimension airport table

In [76]:
dim_airport.count()

14343

In [77]:
dim_airport.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dim_airport.columns]).show()


+------------+----------+------------+------------+------------+---------+--------+
|airport_code|state_code|airport_type|elevation_ft|airport_name|longitude|latitude|
+------------+----------+------------+------------+------------+---------+--------+
|           0|         0|           0|           0|           0|        0|       0|
+------------+----------+------------+------------+------------+---------+--------+



##### Dimension visa table

In [78]:
dim_visa.count()

3

In [79]:
dim_visa.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dim_visa.columns]).show()


+---------+----------------+
|visa_type|visa_description|
+---------+----------------+
|        0|               0|
+---------+----------------+



##### Dimension state table

In [80]:
dim_state.count()

48

In [81]:
dim_state.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dim_state.columns]).show()


+----------+----------+-------+---------+
|state_code|state_name|country|continent|
+----------+----------+-------+---------+
|         0|         0|      0|        0|
+----------+----------+-------+---------+



##### Dimension demographics table

In [82]:
dim_demographics.count()

596

In [83]:
dim_demographics.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dim_demographics.columns]).show()


+----+-----+----------------+---------------+-----------------+------------+----------+
|city|state|total_population|male_population|female_population|foreign_born|median_age|
+----+-----+----------------+---------------+-----------------+------------+----------+
|   0|    0|               0|              0|                0|           0|         0|
+----+-----+----------------+---------------+-----------------+------------+----------+



##### Dimension immigrant table

In [84]:
dim_immigrant.count()

951

In [85]:
dim_immigrant.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dim_immigrant.columns]).show()


+------+------+---------+---+
|cic_id|gender|birthyear|age|
+------+------+---------+---+
|     0|     0|        0|  0|
+------+------+---------+---+



No column has entries with null values, so the data quality check passed!

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.


Sources: 
* opendatasoft (ods): Demographic data
* Kaggle (k): Temperature data
* datahub.io (dh): Airport data
* National Travel and Tourism Office (NTTO): Immigration data

<center>dim_airport</center>

Column name | Description | Source|
-------- | -------- | -------- | 
airport_code   | Code of the airport   |dh  |
state_code   | Code of the state the airport is in   |  dh|
airport_type   | Type of the airport   | dh|
elevation_ft   | Height that the airport lies in ft  | dh|
airport_name | Name of the airport|dh|
longitude | Coordinate of the airport| dh|
latitude | Coordinate of the airport |dh|

<center>dim_temperature</center>

Column name | Description |  Source|
-------- | -------- | -------- | 
datetime   | Time the temperature was measured   |k |
city   | City that the temperature was measured in   | k|
average_temperatur | Average temperature|k|
average_temperature_uncertainty | Uncertainty of the measured temperatures|k|

<center>dim_immigrant</center>

Column name | Description |  Source|
-------- | -------- | -------- | 
immigrant_id   | Id of the immigrant   | NTTO|
gender   | Gender of the immigrant   |NTTO |
birthyear | Year that the immigrant is born in|NTTO|
age | Age of the immigrant by the time of immigration |NTTO|

<center>dim_state</center>

Column name | Description |  Source|
-------- | -------- | -------- | 
state_code   | Code of the state   | ods|
state_name   | Name of the state   | ods|
continent | Continent that the state is in |dh|

<center>dim_visa</center>

Column name | Description |  Source|
-------- | -------- | -------- | 
visa_code   | Code of the visa   | NTTO|
visa_description   | Description of the code   |NTTO |

<center>dim_demographic</center>

Column name | Description |  Source|
-------- | -------- | -------- | 
city  | Name of the city |ods |
state  | Name of the state the city lies in | ods|
total_population   | Total population of the city   | ods|
male_population | Male population of the city |ods|
female_population | Female population of the city|ods|
foreign_born | Foreign-born people living in the city |ods|
median_age | Median age of the population|ods|

<center>fact_immigration_case</center>

Column name | Description |  Source|
-------- | -------- | -------- | 
cicid   | Identifier for the immigration case   |NTTO |
arrival_date   | Arrival date of the immigrant   |NTTO|
state_code | Code of the state the immigrant came to |NTTO|
immigration_mode | Mode that the immigrant came to the US |NTTO|
departure_date | Departure date of the immigrant |NTTO|
airport_code | Code of the airport the immigrant came to |NTTO|
residence | Home of the immigrant |NTTO|
visa_code | Code of the visa the immigrant has |NTTO|
visa_type | Type of the visa the immigrant has |NTTO|
duration_of_stay | Duration of the immigrants stay| Calculated|

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
 * Spark was used as a tool to transform the data because it is made for working with large datasets 
* Propose how often the data should be updated and why.
 * The data should be updated monthly, since it relies on the immigration data which is partitioned into months. 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   * If the data was increased by 100x dezentralized methods for the ETL process would be useful. Therefore, the process could be ported to Amazon Reshift.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   * For this purpose, one could use Apache Airflow to create a DAG that runs on a daily schedule. 
 * The database needed to be accessed by 100+ people.
   * If the database needed to be accessed by multiple users, a data storage like Amazon S3 would be useful, because it can handle multi party access.