# Immigration Data by City Population and Racial Demography
### Data Engineering Capstone Project

#### Project Summary
In this project, we will explore immigration data by utilizing Star Schema as the chosen data model. We want to compare the effects between population density and weather 

The project follows the following steps:
* Step 1: Scope of the Project
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [32]:
# Do all imports and installs here
import pandas as pd
import os
import re
import shutil
import pyspark.sql.functions as F
from pyspark.sql.types import NumericType, StringType

### Step 1: Scope the Project

#### Scope 
We will explore I94 immigration data from 2016, US demographies, airport data, and temperature data.


In [3]:
# Create a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
# Read in data
i94_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
cities_spark = spark.read.csv('us-cities-demographics.csv', sep=';', header=True)
airport_spark = spark.read.option("header", "true").csv("airport-codes_csv.csv")
temperature_spark = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)
i94_valid_airport_spark = spark.read.csv('Valid_airport.csv', header=True)

### Step 2: Explore and Assess the Data

#### Exploring and Cleaning Data

##### A) Immigration Data

We first look at immigration data's headers to have a overall understanding about immigration data.

This data comes from the US National Tourism and Trade Office. Click [here](https://travel.trade.gov/research/reports/i94/historical/2016.html) for the original source.

In [5]:
# Grab I94 SAS Labels description
i94_helper = 'I94_SAS_Labels_Descriptions.SAS'

with open(i94_helper) as f:
    lines = f.readlines()

i94_regexp = re.compile(r'/\*\s(.*?)\*/', re.DOTALL)
re.findall(i94_regexp, str(lines))

['I94YR - 4 digit year ',
 'I94MON - Numeric month ',
 'I94CIT & I94RES - This format shows all the valid and invalid codes for processing ',
 'I94PORT - This format shows all the valid and invalid codes for processing ',
 "ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a \\n', '   permament format has not been applied.  Please apply whichever date format \\n', '   works for you. ",
 'I94MODE - There are missing values as well as not reported (9) ',
 'I94ADDR - There is lots of invalid codes in this variable and the list below \\n\', "   shows what we have found to be valid, everything else goes into \'other\' ',
 "DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that \\n', '   a permament format has not been applied.  Please apply whichever date format \\n', '   works for you. ",
 'I94BIR - Age of Respondent in Years ',
 "I94VISA - Visa codes collapsed into three categories:\\n', '   1 = Business\\n', '   2 = Pleasure\\n', '   3 =

##### A.1) Cleaning Valid Airport data

A pre-processed list of I94 valid airports are extracted from I94_SAS_Labels_Description file. The list excludes invalid codes (888 or XXX) as well as collapsed ports.

In [6]:
i94_valid_airport_spark.show()

+------------+--------------------+-----+
|Airport Code|                City|State|
+------------+--------------------+-----+
|         ALC|               ALCAN|  AK |
|         ANC|           ANCHORAGE|  AK |
|         BAR|BAKER AAF - BAKER...|   AK|
|         DAC|       DALTONS CACHE|  AK |
|         PIZ|DEW STATION PT LA...|   AK|
|         DTH|        DUTCH HARBOR|  AK |
|         EGL|               EAGLE|  AK |
|         FRB|           FAIRBANKS|  AK |
|         HOM|               HOMER|  AK |
|         HYD|               HYDER|  AK |
|         JUN|              JUNEAU|  AK |
|         5KE|           KETCHIKAN|   AK|
|         KET|           KETCHIKAN|  AK |
|         MOS|MOSES POINT INTER...|   AK|
|         NIK|             NIKISKI|  AK |
|         NOM|                 NOM|  AK |
|         PKC|         POKER CREEK|  AK |
|         ORI|      PORT LIONS SPB|   AK|
|         SKA|             SKAGWAY|  AK |
|         SNP|     ST. PAUL ISLAND|   AK|
+------------+--------------------

In [7]:
# Need to trim space in each column
i94_valid_port = i94_valid_airport_spark.withColumn('i94validport', F.trim(i94_valid_airport_spark['Airport Code']))
i94_valid_port = i94_valid_port.withColumn('City', F.trim(i94_valid_airport_spark['City']))
i94_valid_port = i94_valid_port.withColumn('State', F.trim(i94_valid_airport_spark['State']))

In [8]:
i94_valid_port = i94_valid_port.drop('Airport Code')

In [9]:
i94_valid_port.show()

+--------------------+-----+------------+
|                City|State|i94validport|
+--------------------+-----+------------+
|               ALCAN|   AK|         ALC|
|           ANCHORAGE|   AK|         ANC|
|BAKER AAF - BAKER...|   AK|         BAR|
|       DALTONS CACHE|   AK|         DAC|
|DEW STATION PT LA...|   AK|         PIZ|
|        DUTCH HARBOR|   AK|         DTH|
|               EAGLE|   AK|         EGL|
|           FAIRBANKS|   AK|         FRB|
|               HOMER|   AK|         HOM|
|               HYDER|   AK|         HYD|
|              JUNEAU|   AK|         JUN|
|           KETCHIKAN|   AK|         5KE|
|           KETCHIKAN|   AK|         KET|
|MOSES POINT INTER...|   AK|         MOS|
|             NIKISKI|   AK|         NIK|
|                 NOM|   AK|         NOM|
|         POKER CREEK|   AK|         PKC|
|      PORT LIONS SPB|   AK|         ORI|
|             SKAGWAY|   AK|         SKA|
|     ST. PAUL ISLAND|   AK|         SNP|
+--------------------+-----+------

##### A.2) Immigration data

We are interested in the following columns in I94 data:

| Field | Description   |
|-------|---------------|
|I94YR  | 4 digit year |
|I94MON | Numeric month|
|I94PORT| Port code|
|I94ADDR| U.S State|
|ARRDATE| Arrival date|
|DEPDATE| Departure date|
|VISATYPE| Visa type|
|BIRYEAR| 4 digit year of birth|
|I94BIR| Age according to i94yr|
|GENDER| Sex|

In [10]:
i94_explre_df = i94_spark.select('i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'visatype', 'biryear', 'i94bir', 'gender')
i94_explre_df.show()

+------+------+-------+-------+-------+-------+--------+-------+------+------+
| i94yr|i94mon|i94port|i94addr|arrdate|depdate|visatype|biryear|i94bir|gender|
+------+------+-------+-------+-------+-------+--------+-------+------+------+
|2016.0|   4.0|    XXX|   null|20573.0|   null|      B2| 1979.0|  37.0|  null|
|2016.0|   4.0|    ATL|     AL|20551.0|   null|      F1| 1991.0|  25.0|     M|
|2016.0|   4.0|    WAS|     MI|20545.0|20691.0|      B2| 1961.0|  55.0|     M|
|2016.0|   4.0|    NYC|     MA|20545.0|20567.0|      B2| 1988.0|  28.0|  null|
|2016.0|   4.0|    NYC|     MA|20545.0|20567.0|      B2| 2012.0|   4.0|  null|
|2016.0|   4.0|    NYC|     MI|20545.0|20555.0|      B1| 1959.0|  57.0|  null|
|2016.0|   4.0|    NYC|     NJ|20545.0|20558.0|      B2| 1953.0|  63.0|  null|
|2016.0|   4.0|    NYC|     NJ|20545.0|20558.0|      B2| 1959.0|  57.0|  null|
|2016.0|   4.0|    NYC|     NY|20545.0|20553.0|      B2| 1970.0|  46.0|  null|
|2016.0|   4.0|    NYC|     NY|20545.0|20562.0|     

In [11]:
# i94 data overview
i94_explre_df.describe('i94yr', 'i94mon', 'biryear').show()

+-------+--------------------+-------+------------------+
|summary|               i94yr| i94mon|           biryear|
+-------+--------------------+-------+------------------+
|  count|             3096313|3096313|           3095511|
|   mean|              2016.0|    4.0|1974.2323855415148|
| stddev|4.282829613261096...|    0.0|17.420260534588262|
|    min|              2016.0|    4.0|            1902.0|
|    max|              2016.0|    4.0|            2019.0|
+-------+--------------------+-------+------------------+



Immigration data observation:

- Immigration data was in 2016. But there is biryear data > 2016.
- Null values in state field (i94addr)
- Invalid ports in i94port
- Some null values in gender and departure date, but missing data does not have any impact.

To clean up immigration data:
- Step 1: I94mode = 1. Note that I94mode have 4 possible values (1 = Air, 2 = Sea, 3 = Land, 9 = Not Reported). We want to filter for people coming to the U.S by Air only.

- Step 1: biryear: Remove invalid rows that have birth year > i94yr (2016)

- Step 2: i94port: Only filter for valid ports

- Step 3: i94addr: Remove null

In [12]:
def clean_i94(df):
    df_filter = df.filter((df.biryear <= df.i94yr) & (df.i94mode == 1))
    df_selected_column = df_filter.select('i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94visa', 'visatype', 'biryear', 'gender')
    # Join i94port column with i94 valid airports
    df_valid_air = df_selected_column.join(i94_valid_port, df_selected_column.i94port == i94_valid_port.i94validport)
    
    df_null_rmv = df_valid_air.dropna(subset='i94addr')
    return df_null_rmv

In [13]:
check = clean_i94(i94_spark)
check.show()

+------+------+-------+-------+-------+-------+-------+--------+-------+------+-------------+------+------------+
| i94yr|i94mon|i94port|i94addr|arrdate|depdate|i94visa|visatype|biryear|gender|         City| State|i94validport|
+------+------+-------+-------+-------+-------+-------+--------+-------+------+-------------+------+------------+
|2016.0|   4.0|    ATL|     AL|20551.0|   null|    3.0|      F1| 1991.0|     M|      ATLANTA|    GA|         ATL|
|2016.0|   4.0|    WAS|     MI|20545.0|20691.0|    2.0|      B2| 1961.0|     M|WASHINGTON DC|  null|         WAS|
|2016.0|   4.0|    NYC|     MA|20545.0|20567.0|    2.0|      B2| 1988.0|  null|     NEW YORK|    NY|         NYC|
|2016.0|   4.0|    NYC|     MA|20545.0|20567.0|    2.0|      B2| 2012.0|  null|     NEW YORK|    NY|         NYC|
|2016.0|   4.0|    NYC|     MI|20545.0|20555.0|    1.0|      B1| 1959.0|  null|     NEW YORK|    NY|         NYC|
|2016.0|   4.0|    NYC|     NJ|20545.0|20558.0|    2.0|      B2| 1953.0|  null|     NEW 

Now we are seeing some issues with the Immigration data.
For example, i94port = NYC but there are multiple states (i94addr): MA, MI, NJ, NY.
We need to slightly modify our initial clean_i94 function.

In [14]:
def clean_i94_v2(df):
    pre_clean_df = clean_i94(df)
    final_df = pre_clean_df.filter('i94addr == State').drop('i94port', 'i94addr')
    return final_df

##### Exploring and Cleaning City Dimension tables

The dataset comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [15]:
cities_spark.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [16]:
cities_df = cities_spark.orderBy('city').toPandas()
cities_df

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,American Indian and Alaska Native,1813
1,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,Hispanic or Latino,33222
2,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,White,95487
3,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,Asian,2929
4,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,Black or African-American,14449
5,Akron,Ohio,38.1,96886,100667,197553,12878,10024,2.24,OH,Black or African-American,66551
6,Akron,Ohio,38.1,96886,100667,197553,12878,10024,2.24,OH,White,129192
7,Akron,Ohio,38.1,96886,100667,197553,12878,10024,2.24,OH,American Indian and Alaska Native,1845
8,Akron,Ohio,38.1,96886,100667,197553,12878,10024,2.24,OH,Asian,9033
9,Akron,Ohio,38.1,96886,100667,197553,12878,10024,2.24,OH,Hispanic or Latino,3684


We can normalize original City data into 2 dimension tables:

    1) Overall population of city

    2) Racial demography by city

In [17]:
# Overall population of city table
city_overall_pop = cities_spark.select('City', 'State Code', 'Male Population', 'Female Population',
                                      'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size').dropDuplicates()
city_overall_pop = city_overall_pop.withColumnRenamed('City','city').\
 withColumnRenamed('State Code', 'state').\
 withColumnRenamed('Male Population', 'malepop').\
 withColumnRenamed('Female Population', 'femalepop').\
 withColumnRenamed('Total Population', 'totpop').\
 withColumnRenamed('Number of Veterans', 'numofveterans').\
 withColumnRenamed('Foreign-born', 'foreignborn').\
 withColumnRenamed('Average Household Size', 'avghousesize')

city_overall_df = city_overall_pop.toPandas()
city_overall_df

Unnamed: 0,city,state,malepop,femalepop,totpop,numofveterans,foreignborn,avghousesize
0,Cedar Rapids,IA,63109,67296,130405,7823,5404,2.32
1,Camarillo,CA,31941,35682,67623,4384,9735,2.77
2,Flint,MI,48984,49313,98297,3757,2138,2.38
3,Brandon,FL,55679,58289,113968,9417,16390,2.64
4,Deerfield Beach,FL,37155,42614,79769,3882,23642,2.46
5,El Cajon,CA,54450,49238,103688,7103,31865,3.14
6,Waco,TX,63452,68890,132342,10716,14235,2.57
7,Vallejo,CA,58379,62890,121269,8103,30592,2.83
8,Atascocita,TX,37424,40816,78240,4416,8657,3.03
9,Boynton Beach,FL,33701,40271,73972,4727,15431,2.45


In [18]:
# Racial demography by city
city_race = (cities_spark.select('City', 'State Code', 'Race', 'Count')
    .groupby('City', 'State Code')
    .pivot('Race')
    .agg(F.first('Count')))

# Rename for parquets files
city_race = city_race.withColumnRenamed('State Code', 'State').\
 withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianandAlaskaNative').\
 withColumnRenamed('Black or African-American', 'BlackorAfricanAmerican').\
 withColumnRenamed('Hispanic or Latino', 'HispanicLatino')
city_race_pd = city_race.toPandas()
city_race_pd

Unnamed: 0,City,State,AmericanIndianandAlaskaNative,Asian,BlackorAfricanAmerican,HispanicLatino,White
0,Asheville,NC,496,1939,9078,4146,77979
1,Mesa,AZ,16044,14608,22699,131425,413010
2,Norwalk,CA,1165,16189,5195,76020,46998
3,Arvada,CO,2370,2922,2985,16419,107639
4,Minneapolis,MN,9238,33854,89996,39981,277862
5,Mount Pleasant,SC,,1827,3730,2569,75908
6,Mountain View,CA,577,23884,1974,16397,51400
7,Waukegan,IL,677,4933,11697,54214,65629
8,Deltona,FL,651,1292,12056,31357,75686
9,Fontana,CA,2656,14281,16986,156134,85044


##### Airport Dimension table

This is a simple airport data with corresponing codes.
Here is the 
[Source](https://datahub.io/core/airport-codes#data)

We clean the data using the following steps:

1) Filter country = US

2) Convert iso_region column into us_state

In [19]:
# Convert iso_region into state lambda function
region_to_state_udf = F.udf(lambda z: z[-2:], StringType()) 

In [20]:
# Filter for US airports only
airport_us = airport_spark.filter("iso_country == 'US'").\
 select('ident', 
        'name',
        region_to_state_udf('iso_region').alias('state'),
        F.col('municipality').alias('city'),
        'coordinates').\
    show()

+-----+--------------------+-----+------------+--------------------+
|ident|                name|state|        city|         coordinates|
+-----+--------------------+-----+------------+--------------------+
|  00A|   Total Rf Heliport|   PA|    Bensalem|-74.9336013793945...|
| 00AA|Aero B Ranch Airport|   KS|       Leoti|-101.473911, 38.7...|
| 00AK|        Lowell Field|   AK|Anchor Point|-151.695999146, 5...|
| 00AL|        Epps Airpark|   AL|     Harvest|-86.7703018188476...|
| 00AR|Newport Hospital ...|   AR|     Newport| -91.254898, 35.6087|
| 00AS|      Fulton Airport|   OK|        Alex|-97.8180194, 34.9...|
| 00AZ|      Cordes Airport|   AZ|      Cordes|-112.165000915527...|
| 00CA|Goldstone /Gts/ A...|   CA|     Barstow|-116.888000488, 3...|
| 00CL| Williams Ag Airport|   CA|       Biggs|-121.763427, 39.4...|
| 00CN|Kitchen Creek Hel...|   CA| Pine Valley|-116.4597417, 32....|
| 00CO|          Cass Field|   CO|  Briggsdale|-104.344002, 40.6...|
| 00FA| Grass Patch Airport|   FL|

After exploring the airport data, we decide not to use it because there is no useful field to link with other tables.

##### Temperature Dimension table

In [21]:
# Filter for U.S only, create Year column
us_temp_df = temperature_spark.select('dt', 'AverageTemperature', 'City', 'Country', 'Latitude', 'Longitude').\
  filter("Country == 'United States'").\
  withColumn('Year', F.year('dt'))

In [22]:
# Now let's explore the time frame of temperature data
us_temp_df.describe('year').show()

+-------+------------------+
|summary|              year|
+-------+------------------+
|  count|            687289|
|   mean|1897.2013359736588|
| stddev| 71.60162484679064|
|    min|              1743|
|    max|              2013|
+-------+------------------+



We only have temperature of USA until 2013. I94 data, however, was in 2016. Our assumption is 2016 temperature is not too far off from the average temperature within 5 years (2009 - 2013).

In [23]:
# Convert AverageTemperature from StringType to Double
us_temp_df = us_temp_df.withColumn('AverageTemperature', us_temp_df.AverageTemperature.cast('float'))

In [24]:
us_temp_clean = us_temp_df.filter("Year >= 2009").dropna().\
  groupby('City').\
  agg(F.avg('AverageTemperature').alias('avgtemp')).\
  orderBy('City')
us_temp_clean.show()

+-----------+-------------------+
|       City|            avgtemp|
+-----------+-------------------+
|    Abilene|   18.3217543635452|
|      Akron| 11.104298256468354|
|Albuquerque| 12.230438614623589|
| Alexandria| 13.455386004688448|
|  Allentown| 11.340228084540158|
|   Amarillo| 16.446508804957073|
|    Anaheim|  16.75122813174599|
|  Anchorage|-1.4094106943479605|
|  Ann Arbor| 10.160122826826154|
|    Antioch| 15.034543907433225|
|  Arlington| 16.425877208118898|
|     Arvada|  3.486508764718708|
|    Atlanta| 15.511701803458365|
|     Aurora| 10.662000040736114|
|     Austin|  21.26559645669502|
|Bakersfield|  16.65456143596716|
|  Baltimore| 13.455386004688448|
|Baton Rouge| 21.270561402304132|
|   Beaumont| 21.095666801720334|
|   Bellevue|  8.243122764846735|
+-----------+-------------------+
only showing top 20 rows



We are aware that there many cities share the same name but belong to different states. Grouping by only city is a problem that needs to be addressed when the codes comes to production stage. One improvement opportunity is determining location based on Latitude and Longitude. 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We will employ Star Schema as it provides performance and flexibility for data analysts and data scientists in data analysis. For demonstration, our data model will employ 2 fact tables: 

      1) Immigration and overall population
      2) Immigration and racial demography

Both fact tables will utilize Immigration and Temperature data. The difference lies at demography data:

- The first fact table uses overall population by city. Accompanied with the temperature data, the fact table may reveal, at a high level, if immigrants/ visitors prefer cities with high density population or favorable weather. 
- The second table uses racial demography by city. This fact table may reveal, at a more granular level, cities that are more favorable towards a certain group of immigrants/ visitors with certain backgrounds.

#### 3.2 Mapping Out Data Pipelines
- Load cleaned data from the previous step into parquet files.
- I94 immigration data is expected to change. So we will implement a function to automate the cleaning process.
- Join all tables together.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [25]:
# Load data into parquet files
# 1) I94 data
i94_final_df = clean_i94_v2(i94_spark)
i94_final_df.write.mode('append').partitionBy('i94validport')\
  .parquet("tables/i94/i94.parquet")

# 2) Overall population
city_overall_pop.write.mode('append').partitionBy('state', 'City')\
  .parquet("tables/city/city.parquet")

# 3) Racial demography
city_race.write.mode('append').partitionBy('State', 'City')\
  .parquet("tables/racial/race.parquet")

# 4) Temperature data
us_temp_clean.write.mode('append').partitionBy('City')\
  .parquet("tables/temperature/temperature.parquet")

In [26]:
# Join tables
# Fact table #1: Immigration and overall population
fact_overall = i94_final_df.join(city_overall_pop,
                                 (F.lower(i94_final_df['City']) == F.lower(city_overall_pop['City'])) & 
                                 (F.lower(i94_final_df['State']) == F.lower(city_overall_pop['State']))).\
                            join(us_temp_clean,
                                F.lower(i94_final_df['City']) == F.lower(us_temp_clean['City']))

In [27]:
# Fact table #2: Immigration and racial demography
fact_racial = i94_final_df.join(city_race,
                               (F.lower(i94_final_df['City']) == F.lower(city_race['City'])) & 
                               (F.lower(i94_final_df['State']) == F.lower(city_race['State']))).\
                           join(us_temp_clean,
                               F.lower(i94_final_df['City']) == F.lower(us_temp_clean['City']))

In [29]:
fact_overall.show()

+------+------+-------+-------+-------+--------+-------+------+-------+-----+------------+-------+-----+-------+---------+------+-------------+-----------+------------+-------+------------------+
| i94yr|i94mon|arrdate|depdate|i94visa|visatype|biryear|gender|   City|State|i94validport|   city|state|malepop|femalepop|totpop|numofveterans|foreignborn|avghousesize|   City|           avgtemp|
+------+------+-------+-------+-------+--------+-------+------+-------+-----+------------+-------+-----+-------+---------+------+-------------+-----------+------------+-------+------------------+
|2016.0|   4.0|20545.0|20556.0|    2.0|      WT| 1944.0|     F|OAKLAND|   CA|         OAK|Oakland|   CA| 203827|   215451|419278|        12159|     113896|        2.56|Oakland|15.034543907433225|
|2016.0|   4.0|20545.0|20557.0|    2.0|      WT| 1948.0|     F|OAKLAND|   CA|         OAK|Oakland|   CA| 203827|   215451|419278|        12159|     113896|        2.56|Oakland|15.034543907433225|
|2016.0|   4.0|20545

In [30]:
fact_racial.show()

+------+------+-------+-------+-------+--------+-------+------+-------+-----+------------+-------+-----+-----------------------------+-----+----------------------+--------------+------+-------+------------------+
| i94yr|i94mon|arrdate|depdate|i94visa|visatype|biryear|gender|   City|State|i94validport|   City|State|AmericanIndianandAlaskaNative|Asian|BlackorAfricanAmerican|HispanicLatino| White|   City|           avgtemp|
+------+------+-------+-------+-------+--------+-------+------+-------+-----+------------+-------+-----+-----------------------------+-----+----------------------+--------------+------+-------+------------------+
|2016.0|   4.0|20545.0|20556.0|    2.0|      WT| 1944.0|     F|OAKLAND|   CA|         OAK|Oakland|   CA|                         8380|75446|                118228|        114054|171599|Oakland|15.034543907433225|
|2016.0|   4.0|20545.0|20557.0|    2.0|      WT| 1948.0|     F|OAKLAND|   CA|         OAK|Oakland|   CA|                         8380|75446|        

#### 4.2 Data Quality Checks

In [28]:
# Run some reading data check on the 2 fact tables
def count_check(df):
    return df.count() > 0

# Check if there is any repeated rows in the columnlist.
# Return null if all rows in the columnlist are unique.
def unique_check(df, columnlist):
    check = df.groupBy(columnlist)\
       .count()\
       .where(F.col('count') > 1)\
       .select(F.sum('count'))\
       .show()

In [60]:
count_check(fact_overall)
count_check(fact_racial)

True

In [67]:
# City and state should be unique key in overall population by city and racial demography data
unique_check(city_race, ['city', 'state'])
unique_check(city_overall_pop, ['city', 'state'])
# City should be unique key in temperature data
unique_check(us_temp_clean, ['city'])

+----------+
|sum(count)|
+----------+
|      null|
+----------+

+----------+
|sum(count)|
+----------+
|      null|
+----------+

+----------+
|sum(count)|
+----------+
|      null|
+----------+



#### 4.3 Data dictionary 
###### Dimension tables
    
   1) Overall population by city
   - City
   - State Code
   - Male Population
   - Female Population
   - Total Population
   - Number of Veterans
   - Foreign-born
   - Average household size
    
   2) Racial demography by city
   - City
   - State Code
   - American Indian and Alaska Native
   - Asian
   - Black or African-American
   - Hispanic or Latino
   - White
    
   3) Temperature table
   - City
   - Average temperature
  
###### 2 Fact tables
    A) Immigration and overall population:
   |Field|Description|
   |-----|-----------|
   |i94year|Year of immigration data|
   |i94mon|Month of immigration data|
   |arrdate|Arrival date|
   |deptdate|Departure date|
   |i94visa|Visa code categories|
   |visatype|Visa type based on Visa code|
   |biryear|Birth year|
   |gender|Sex|
   |city|Arrival city|
   |state|Arrival state|
   |i94validport|Arrival airport|
   |malepop|Male population at the arrival city|
   |femalepop|Female population at the city|
   |totpop|Total population at the city|
   |numberofveterans|Number of veterans at the city|
   |foreignborn|Number of foreign-born at the city|
   |avghousesize|Average household size at the city|
   |avgtemp|5-year Average temperature at the city|
   
    B) Immigration and racial demography:
    
   |Field|Description|
   |-----|-----------|
   |i94year|Year of immigration data|
   |i94mon|Month of immigration data|
   |arrdate|Arrival date|
   |deptdate|Departure date|
   |i94visa|Visa code categories|
   |visatype|Visa type based on Visa code|
   |biryear|Birth year|
   |gender|Sex|
   |city|Arrival city|
   |state|Arrival state|
   |i94validport|Arrival airport|
   |AmericanIndianandAlaskaNative|American Indian and Alaska Native population|
   |Asian|Asian population|
   |BlackAfricanAmerican|African American or Black population|
   |HispanicLatino|Hispanic or Latino population|
   |White|Caucasian population|
   |avgtemp|5-year Average temperature at the city|
   

#### Step 5: Complete Project Write Up

* Spark is used for this project because of its scalability. It's been a go-to tool for analysis due to its operation in memory.
* Should the data increased by 100x or needs to be accessed by 100+ people, we can still reuse most of the codes in this project and implement cloud computing such as EMR. We can employ more cloud workers to handle the data. For storage, we can utilize S3. 
* The data should be updated annually due to the immgration data is refreshed in the aforementioned interval.
* If the data populates on a daily basis in the morning, we should implement Airflow to run the pipeline at certain time.