# Data Engineering Capstone Project

#### Project Summary
In this project, 3 data sources are processed that show information about:
- the count of immigration in April of 2016 to the United States Of America
- the demographic ratio of female, male and veterans in the respective state in the month of immigration
- the average airport elevation in feet of the respective state

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# installing pyspark when using a new Udacity workspace
#!pip install pyspark

In [1]:
import configparser
import pandas as pd
import os
import glob
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
from I94_codes import I94COUNTRY_codes, I94COUNTRY_udf, I94PORT_codes, I94PORT_udf
from US_cities import state_udf, cities_states

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

#### creating spark session

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()
print('Spark session created ...')

Spark session created ...


In [4]:
#Build SQL context object
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
Data from various sources are used also fact and dimension tables are created to show the development of immigration.
#### Describe and Gather Data 

1 - I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. 
    The dataset contains data from 2016.
2 - World Temperature Data: comes from Kaggle and contains average weather temperatures by city.
3 - U.S. City Demographic Data: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population.
4 - Airport Code Table: This is a simple table of airport codes and corresponding cities.

In [6]:
# Reads all Data from all Sources
I94_df = spark.read.format('com.github.saurfang.sas.spark').load(config['SOURCE_PATH']['I94_DATA'])
TEMP_df = spark.read.format("csv").option("header", "true").load(config['SOURCE_PATH']['TEMP_DATA'])
DEMO_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(config['SOURCE_PATH']['DEMO_DATA'])
AIRPORT_df = spark.read.format("csv").option("header", "true").load(config['SOURCE_PATH']['AIRPORT_DATA'])

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
I94 Immigration Data:
- filtering nulls of state codes and aiport codes
- transform country codes to country of origin
- select important columns and drop duplicates

World Temperature Data:
- ordering year descending
- no data from 2016 are available. 
  Further processing is not possible with the immigration data from 2016.
  The temperature data will not be processed further.

U.S. City Demographic Data:
- ordering demographic data
- calculating percentages 
- drop duplicates

Airport Code Data:
- filtering US airports !=closed
- splitting iso-region into state codes
- calculating percanteges of airport elevation in feet

### I94 Immigration Data

In [7]:
I94_df.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [8]:
I94_df.orderBy("i94yr", ascending=False).show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [9]:
IE94Filter=I94_df.filter(I94_df.i94addr.isNotNull())\
                .filter(I94_df.i94res.isNotNull())\
                .filter(col("i94port").isin(list(I94PORT_codes.keys())))\
                .withColumn("origin_country",I94COUNTRY_udf(I94_df["i94res"]))\
                .withColumn("i94yr",col("i94yr").cast("integer"))\
                .withColumn("i94mon",col("i94mon").cast("integer"))\
                .withColumn("city_dest_airport",I94PORT_udf(I94_df["i94port"]))

In [10]:
CLEAN_I94_df=IE94Filter.select('cicid',col("i94yr").alias("year"),col("i94mon").alias("month"),\
                             "origin_country",col("i94addr").alias("state_code"),"i94port","city_dest_airport")

In [11]:
CLEAN_I94_df.show(5)

+-----+----+-----+--------------+----------+-------+--------------------+
|cicid|year|month|origin_country|state_code|i94port|   city_dest_airport|
+-----+----+-----+--------------+----------+-------+--------------------+
|  7.0|2016|    4|   SOUTH KOREA|        AL|    ATL|  ATLANTA           |
| 15.0|2016|    4|       ALBANIA|        MI|    WAS|WASHINGTON DC    ...|
| 16.0|2016|    4|       ALBANIA|        MA|    NYC|  NEW YORK          |
| 17.0|2016|    4|       ALBANIA|        MA|    NYC|  NEW YORK          |
| 18.0|2016|    4|       ALBANIA|        MI|    NYC|  NEW YORK          |
+-----+----+-----+--------------+----------+-------+--------------------+
only showing top 5 rows



### World Temperature Data

In [12]:
TEMP_df.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [13]:
# filter World Temperature Data for the U.S. only
# drop duplicates
TEMPFilter=TEMP_df.filter(TEMP_df['country']=='United States')\
                  .filter(col("City").isin(list(cities_states.keys())))\
                .withColumn('year',year(TEMP_df['dt']))\
                .withColumn('month',month(TEMP_df['dt']))\
                .withColumn("State",state_udf(TEMP_df["City"]))\
                .orderBy("State")

In [14]:
CLEAN_TEMP_df=TEMPFilter.select('year','month',round(col('AverageTemperature'),1).alias('avg_temp'),\
                                   'City','State','Country').dropDuplicates()

In [15]:
CLEAN_TEMP_df.show(5)

+----+-----+--------+----------+-------+-------------+
|year|month|avg_temp|      City|  State|      Country|
+----+-----+--------+----------+-------+-------------+
|1743|   11|    10.6|Birmingham|Alabama|United States|
|1743|   12|    null|Birmingham|Alabama|United States|
|1744|    1|    null|Birmingham|Alabama|United States|
|1744|    2|    null|Birmingham|Alabama|United States|
|1744|    3|    null|Birmingham|Alabama|United States|
+----+-----+--------+----------+-------+-------------+
only showing top 5 rows



In [16]:
NEW_TEMP_df=CLEAN_TEMP_df.groupBy('year','month', 'State').avg('avg_temp').orderBy('State')

In [17]:
NEW_TEMP_df.show(5)

+----+-----+-------+-----------------+
|year|month|  State|    avg(avg_temp)|
+----+-----+-------+-----------------+
|1743|   11|Alabama|9.866666666666667|
|1743|   12|Alabama|             null|
|1744|    1|Alabama|             null|
|1744|    2|Alabama|             null|
|1744|    3|Alabama|             null|
+----+-----+-------+-----------------+
only showing top 5 rows



In [18]:
RESULT_TEMP_df=NEW_TEMP_df.select('year', 'month', 'State', \
                                    round(col('avg(avg_temp)'),1).alias('avg_temp'))

In [19]:
RESULT_TEMP_df.orderBy("year", ascending=False).show(5)

+----+-----+-------+--------+
|year|month|  State|avg_temp|
+----+-----+-------+--------+
|2013|    1|Alabama|     9.8|
|2013|    6|Alabama|    26.1|
|2013|    2|Alabama|     8.8|
|2013|    3|Alabama|    10.0|
|2013|    4|Alabama|    16.9|
+----+-----+-------+--------+
only showing top 5 rows



##### Weather data are only available up to the year 2013. 
##### A processing for the immigration data from the year 2016 is therefore not possible. 
##### The data source is therefore excluded from the integration.

### U.S. City Demographic Data

##### Please note: Due to the genetic diversity of people I disagree with the division of people into races. 
##### I do not consider the column "races" in the table of demographic data for further processing for this reason. Thank you for your understanding.

In [20]:
DEMO_df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [21]:
DEMOFilter=DEMO_df\
        .withColumn("Median Age",col("Median Age").cast("float"))\
        .withColumn("pct_male_pop",DEMO_df["Male Population"]/DEMO_df["Total Population"]*100)\
        .withColumn("pct_female_pop",DEMO_df["Female Population"]/DEMO_df["Total Population"]*100)\
        .withColumn("pct_veterans",DEMO_df["Number of Veterans"]/DEMO_df["Total Population"]*100)\
        .withColumn("pct_foreign_born",DEMO_df["Foreign-born"]/DEMO_df["Total Population"]*100)\
        .orderBy("State")

In [22]:
CLEAN_DEMO_df=DEMOFilter.select('State',\
                                col('State Code').alias('state_code'),\
                               
                                col('Total Population').alias('total_pop'),\
                                 col('Median Age').alias('median_age'),\
                                'pct_male_pop','pct_female_pop', 'pct_veterans','pct_foreign_born')

In [23]:
CLEAN_DEMO_df.show(5)

+-------+----------+---------+----------+------------------+------------------+-----------------+------------------+
|  State|state_code|total_pop|median_age|      pct_male_pop|    pct_female_pop|     pct_veterans|  pct_foreign_born|
+-------+----------+---------+----------+------------------+------------------+-----------------+------------------+
|Alabama|        AL|   189114|      38.1| 48.52311304292649| 51.47688695707351|8.797339171081992| 6.710767050562094|
|Alabama|        AL|    98338|      29.1| 48.09229392503407| 51.90770607496593|3.708637556183774| 4.785535601700258|
|Alabama|        AL|    67536|      38.9|47.636815920398014|52.363184079601986|9.378701729447998| 2.515695332859512|
|Alabama|        AL|   200586|      35.4| 47.15284217243477| 52.84715782756524|7.455654931052018|4.6548612565184015|
|Alabama|        AL|    67536|      38.9|47.636815920398014|52.363184079601986|9.378701729447998| 2.515695332859512|
+-------+----------+---------+----------+------------------+----

In [24]:
NEW_CLEAN_DEMO_df=CLEAN_DEMO_df.groupBy("State",'state_code').avg('median_age',"pct_male_pop","pct_female_pop", "pct_veterans","pct_foreign_born").orderBy('State')

In [25]:
NEW_CLEAN_DEMO_df.show(5)

+----------+----------+-----------------+------------------+-------------------+-----------------+---------------------+
|     State|state_code|  avg(median_age)| avg(pct_male_pop)|avg(pct_female_pop)|avg(pct_veterans)|avg(pct_foreign_born)|
+----------+----------+-----------------+------------------+-------------------+-----------------+---------------------+
|   Alabama|        AL|36.16176476198084| 47.31893411899914|  52.68106588100086| 6.79067568450305|      4.9988281961214|
|    Alaska|        AK|32.20000076293945|51.204405832036024| 48.795594167963976|9.204037563400794|   11.134434791342338|
|   Arizona|        AZ|35.03750014305115| 48.80588472481988| 51.194115275180124|6.605226419604703|   12.641983065857145|
|  Arkansas|        AR|32.73793049516349| 48.43811311257044|  51.56188688742957|5.172062842819985|   10.948156422255321|
|California|        CA|36.17396435935117| 49.35197598832696|  50.64802401167306|4.134256546024143|    27.57416715942217|
+----------+----------+---------

In [26]:
RESULT_DEMO_df=NEW_CLEAN_DEMO_df.select('State','state_code',\
                                    round(col('avg(median_age)'),1).alias('avg_median_age'),\
                                    round(col('avg(pct_male_pop)'),1).alias('avg_male_pop'),\
                                    round(col('avg(pct_female_pop)'),1).alias('avg_female_pop'),\
                                    round(col('avg(pct_veterans)'),1).alias('avg_veterans'),\
                                    round(col('avg(pct_foreign_born)'),1).alias('avg_foreign_born'))

In [27]:
RESULT_DEMO_df.show(5)

+----------+----------+--------------+------------+--------------+------------+----------------+
|     State|state_code|avg_median_age|avg_male_pop|avg_female_pop|avg_veterans|avg_foreign_born|
+----------+----------+--------------+------------+--------------+------------+----------------+
|   Alabama|        AL|          36.2|        47.3|          52.7|         6.8|             5.0|
|    Alaska|        AK|          32.2|        51.2|          48.8|         9.2|            11.1|
|   Arizona|        AZ|          35.0|        48.8|          51.2|         6.6|            12.6|
|  Arkansas|        AR|          32.7|        48.4|          51.6|         5.2|            10.9|
|California|        CA|          36.2|        49.4|          50.6|         4.1|            27.6|
+----------+----------+--------------+------------+--------------+------------+----------------+
only showing top 5 rows



### Airport Code Table

In [28]:
AIRPORT_df.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [29]:
AIRPORTFilter=AIRPORT_df.filter(AIRPORT_df['type']!='closed')\
                        .filter(AIRPORT_df['iso_country']=='US')\
                        .withColumn('iso_region',substring(AIRPORT_df['iso_region'],4,2))\
                        .withColumn('elevation_ft',col('elevation_ft').cast('float'))

In [30]:
NEW_AIRPORT_df=AIRPORTFilter.groupBy('iso_country','iso_region').avg('elevation_ft')

In [31]:
CLEAN_AIRPORT_df=NEW_AIRPORT_df.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state_code"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")

In [32]:
CLEAN_AIRPORT_df.show(20)

+-------+----------+----------------+
|country|state_code|avg_elevation_ft|
+-------+----------+----------------+
|     US|        AK|           449.7|
|     US|        AL|           434.2|
|     US|        AR|           511.6|
|     US|        AZ|          2722.1|
|     US|        CA|          1051.1|
|     US|        CO|          6220.5|
|     US|        CT|           333.5|
|     US|        DC|           185.2|
|     US|        DE|            84.0|
|     US|        FL|            70.1|
|     US|        GA|           677.0|
|     US|        HI|           714.1|
|     US|        IA|          1020.8|
|     US|        ID|          3780.1|
|     US|        IL|           676.2|
|     US|        IN|           760.7|
|     US|        KS|          1564.8|
|     US|        KY|           759.4|
|     US|        LA|            67.9|
|     US|        MA|           259.3|
+-------+----------+----------------+
only showing top 20 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Due to simplicity, the following star scheme is used

![title](Capstone_Schema.png)

#### I94 Immigration Data

In [33]:
CLEAN_I94_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- city_dest_airport: string (nullable = true)



#### U.S. City Demographic Data

In [34]:
RESULT_DEMO_df.printSchema()

root
 |-- State: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- avg_median_age: double (nullable = true)
 |-- avg_male_pop: double (nullable = true)
 |-- avg_female_pop: double (nullable = true)
 |-- avg_veterans: double (nullable = true)
 |-- avg_foreign_born: double (nullable = true)



#### Airport Code Table

In [35]:
CLEAN_AIRPORT_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- avg_elevation_ft: double (nullable = true)



#### 3.2 Mapping Out Data Pipelines
These following steps are necessary to pipeline the data into the chosen data model:

1. Creating three dimension tables I94_Data, DEMO_Data, AIRPORT_Data
2. Creating the fact table I94_IMMIGRATION_DATA using SQL (SELECT and JOIN)
3. Creating a new Spark Dataframe
4. Writing the Dataframe into parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [36]:
# creating the dimension tables
CLEAN_I94_df.createOrReplaceTempView("I94_Data")
RESULT_DEMO_df.createOrReplaceTempView("DEMO_Data")
CLEAN_AIRPORT_df.createOrReplaceTempView("AIRPORT_Data")

In [37]:
# creating the fact table
I94_IMMIGRATION_DATA=spark.sql("""
                                SELECT 
                                    i.year AS year_of_immig, 
                                    i.month AS month_of_immig,
                                    i.origin_country AS immig_from,
                                    d.state AS immig_to_state, 
                                    COUNT(i.state_code) AS count_immigration,
                                    i.state_code,
                                    d.avg_median_age,
                                    d.avg_female_pop,
                                    d.avg_male_pop, 
                                    d.avg_veterans,
                                    d.avg_foreign_born,
                                    a.avg_elevation_ft AS avg_airport_elevation_ft  
                                    
                                   
                                    FROM I94_Data i 
                                    JOIN DEMO_Data d ON d.state_code=i.state_code
                                    JOIN AIRPORT_Data a ON a.state_code=d.state_code
                                    
                                                            
                                    GROUP BY i.year,i.month,\
                                         i.origin_country,  d.state, i.state_code,\
                                         d.avg_median_age, d.avg_female_pop, d.avg_male_pop,\
                                         d.avg_veterans, d.avg_foreign_born, a.avg_elevation_ft
                                         
                                         
                                    ORDER BY i.origin_country, i.state_code      
                                """)

In [38]:
I94_IMMIGRATION_DATA.printSchema()

root
 |-- year_of_immig: integer (nullable = true)
 |-- month_of_immig: integer (nullable = true)
 |-- immig_from: string (nullable = true)
 |-- immig_to_state: string (nullable = true)
 |-- count_immigration: long (nullable = false)
 |-- state_code: string (nullable = true)
 |-- avg_median_age: double (nullable = true)
 |-- avg_female_pop: double (nullable = true)
 |-- avg_male_pop: double (nullable = true)
 |-- avg_veterans: double (nullable = true)
 |-- avg_foreign_born: double (nullable = true)
 |-- avg_airport_elevation_ft: double (nullable = true)



In [39]:
# creating a new DataFrame
I94_IMMIGRATION_DATA.toDF('year_of_immig', 'month_of_immig', 'immig_from',\
                          'immig_to_state', 'count_immigration', 'state_code', 'avg_median_age','avg_female_pop',\
                          'avg_male_pop', 'avg_veterans','avg_foreign_born','airport_avg_elevation_ft').show(5)

+-------------+--------------+-----------+--------------------+-----------------+----------+--------------+--------------+------------+------------+----------------+------------------------+
|year_of_immig|month_of_immig| immig_from|      immig_to_state|count_immigration|state_code|avg_median_age|avg_female_pop|avg_male_pop|avg_veterans|avg_foreign_born|airport_avg_elevation_ft|
+-------------+--------------+-----------+--------------------+-----------------+----------+--------------+--------------+------------+------------+----------------+------------------------+
|         2016|             4|AFGHANISTAN|             Arizona|                1|        AZ|          35.0|          51.2|        48.8|         6.6|            12.6|                  2722.1|
|         2016|             4|AFGHANISTAN|          California|               34|        CA|          36.2|          50.6|        49.4|         4.1|            27.6|                  1051.1|
|         2016|             4|AFGHANISTAN|   

In [40]:
# writing fact table to parquet files
I94_IMMIGRATION_DATA.write.parquet("I94_IMMIGRATION_DATA")

#### 4.2 Data Quality Checks
- searching for nulls in year_of_immig, month_of_immig, immig_from, immig_to_state
- for the correctness of all 4 datasets a false must be returned
- counting all immigrations in the fact table
- counting all immigrations in the dimension table of I94 Immigration

In [41]:
# searching for nulls in year_of_immig, month_of_immig, immig_from, immig_to_state
# for the correctness of all 4 datasets a false must be returned
I94_IMMIGRATION_DATA.select(isnull('year_of_immig').alias('year'),\
                            isnull('month_of_immig').alias('month'),\
                            isnull('immig_from').alias('country'),\
                            isnull('immig_to_state').alias('state')).dropDuplicates().show()

+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



In [42]:
# counting all immigrations in the fact table
spark.sql('SELECT COUNT(*) FROM I94_Data').show()

+--------+
|count(1)|
+--------+
| 2940284|
+--------+



In [43]:
# counting all immigrations in the dimension table of I94 Immigration
CLEAN_I94_df.select(count('origin_country').alias('CLEAN_I94_df')).show()

+------------+
|CLEAN_I94_df|
+------------+
|     2940284|
+------------+



#### 4.3 Data dictionary

##### Fact Table

I94_IMMIGRATION_DATA
 |-- year_of_immig: integer (nullable = true) | year of immigration from Immigration Data table
 |-- month_of_immig: integer (nullable = true) | month of immigration from Immigration Data table
 |-- immig_from: string (nullable = true) | country of origin from Immigration Data table
 |-- immig_to_state: string (nullable = true) | US state Code of immigration from Immigration Data table
 |-- count_immigration: long (nullable = false) | Count of immigrations
 |-- state_code: string (nullable = true) | US state code of immigration from Immigration Data table
 |-- avg_median_age: double (nullable = true) | average of median age in US state from Demographic Data table
 |-- avg_female_pop: double (nullable = true) | average % of female population of US state  from Demographic Data table
 |-- avg_male_pop: double (nullable = true) | average % of male population of US state from Demographic Data table
 |-- avg_veterans: double (nullable = true) | average % of veterans of US state from Demographic Data table
 |-- avg_foreign_born: double (nullable = true) | average % of foreign born population of US state from Demographic Data table
 |-- avg_airport_elevation_ft: double (nullable = true) | average elevation in feet of airport location of US state from Aiport Data table

##### Dimension tables

I94 Immigration Data table
 |-- cicid: double (nullable = true)
 |-- year: integer (nullable = true) | year of immigration
 |-- month: integer (nullable = true) | month of immigration
 |-- origin_country: string (nullable = true) | country of origin
 |-- state_code: string (nullable = true) | US state Code
 |-- i94port: string (nullable = true) | City Port Code of Immigration
 |-- city_dest_airport: string (nullable = true) | City of destination Airport

U.S. City Demographic Data table
 |-- State: string (nullable = true) | US state Name
 |-- state_code: string (nullable = true) | US state Code
 |-- avg_median_age: double (nullable = true) | average of median age
 |-- avg_male_pop: double (nullable = true) | average % of male population
 |-- avg_female_pop: double (nullable = true) | average % of female population
 |-- avg_veterans: double (nullable = true) | average % of veterans
 |-- avg_foreign_born: double (nullable = true) | average % of foreign born

Airport Code table
 |-- country: string (nullable = true) | Airports in the United States
 |-- state_code: string (nullable = true) | US state Code
 |-- avg_elevation_ft: double (nullable = true) average elevation in feet of airport location

#### Step 5: Complete Project Write Up
1. Clearly state the rationale for the choice of tools and technologies for the project.
2. Propose how often the data should be updated and why.
3. Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1.) For a small amount of data, Spark was used for fast reading, processing, output and analysis.
2.) All data should be provided monthly by agencies and organizations.
3.) - For a 100x data set I would use Hadoop to create a distributed processing system for faster processing.
    - For a daily update I would use Airflow. For a structured update for all data, this can be used to create a schedule.
    - For more than 100 data access I would use a web application (GUI). Amazon AWS is suitable for this.