# Project Title
### Data Engineering Capstone Project

#### Project Summary
The purpose of this project is to create an data modeling for US immigration by creating ETL pipeline for I94 immigration, global land temperatures and US demographics datasets. With this data modeling (analytics database), data analysts can further deep dive into to find out insights which will helps to understand underlying patterns of US immigration. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, I will use Apache Spark to do data analysis which include below actions:
- Data preprocessing and exploring for I94 immigration dataset
- Data preprocessing and exploring for us-cities-demographic dataset
- Data preprocessing and exploring for global temperature dataset
- Create dimension tables
- Create fact table

#### Describe and Gather Data 
There will be 3 datasets using in this project.
- **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office which non-U.S, citizens from overseas countries and Mexico (traveling by air and sea) must complete an I-94 form to enter the United States. These forms are recorded into I94 Immigration dataset.
You can access the immigration data in a folder with the following path: `../../data/18-83510-I94-Data-2016/`.
- **World Temperature Data**: This dataset came from Kaggle. You can read more about this [here](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data).
You can access the temperature data in a folder with the following path: `../../data2/`. There's just one file in that folder, called `GlobalLandTemperaturesByCity.csv`
- **U.S. City Demographic Data**: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

### Step 2: Explore and Assess the Data


### Import Spark


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [2]:
# import libraries
import pandas as pd
import os
import datetime as dt

### I94 Immigration


#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


In [3]:
# list all files in the customer repository
files = os.listdir('../../data/18-83510-I94-Data-2016/')
files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

There is many files in `118-83510-I94-Data-2016`, in this project template we will do data analysis for one file. The analysis flow is the same for other files.

In [4]:
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
# display the first five records
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
# count the total number of records
total_immigration_row = immigration_df.count()
print('Total number of row of immigration dataset: {}'.format(total_immigration_row))

Total number of row of immigration dataset: 3096313


In [7]:
# lets see the dataframe schema
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [8]:
immigration_nan_count_df = immigration_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in immigration_df.columns]).toPandas()

In [9]:
immigration_nan_count_df = pd.melt(immigration_nan_count_df, var_name='cols', value_name='values')

In [10]:
immigration_nan_count_df['% missing values'] = 100*immigration_nan_count_df['values']/total_immigration_row
print('Missing values summary:\n {}'.format(immigration_nan_count_df))

Missing values summary:
         cols   values  % missing values
0      cicid        0          0.000000
1      i94yr        0          0.000000
2     i94mon        0          0.000000
3     i94cit        0          0.000000
4     i94res        0          0.000000
5    i94port        0          0.000000
6    arrdate        0          0.000000
7    i94mode      239          0.007719
8    i94addr   152592          4.928184
9    depdate   142457          4.600859
10    i94bir      802          0.025902
11   i94visa        0          0.000000
12     count        0          0.000000
13  dtadfile        1          0.000032
14  visapost  1881250         60.757746
15     occup  3088187         99.737559
16   entdepa      238          0.007687
17   entdepd   138429          4.470769
18   entdepu  3095921         99.987340
19   matflag   138429          4.470769
20   biryear      802          0.025902
21   dtaddto      477          0.015405
22    gender   414269         13.379429
23    insnum  2

#### Cleaning Steps
Document steps necessary to clean the data

We see that there is 4 columns with such a high missing values percentages: `visapost`, `occup`, `entdepu`, `insnum`.
We will remove these columns from immigration data table.

In [11]:
cols = ['occup', 'entdepu','insnum', 'visapost']

# drop these columns
immigration_df = immigration_df.drop(*cols)

We also remove rows that have missing values

In [12]:
immigration_df=immigration_df.dropna()

In [13]:
print('Length of immigration data table after remove missing data: {}'.format(immigration_df.count()))

Length of immigration data table after remove missing data: 2384498


In [14]:
# display the new schema for i94 immigration
print('Immigration Schema: \n')
immigration_df.printSchema()

Immigration Schema: 

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



We also want to make sure that there is no duplicate value in `cicid` columns

In [15]:
immigration_df = immigration_df.dropDuplicates(['cicid'])
print('Length of immigration data table after remove duplicate cicid: {}'.format(immigration_df.count()))

Length of immigration data table after remove duplicate cicid: 2384498


### **World Temperature Data**

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [16]:
temperature_df = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True, inferSchema=True)

In [17]:
# display the first five records
temperature_df.show(n=5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [18]:
# check the total number of records
temperature_total_row = temperature_df.count()
print(temperature_total_row)

8599212


In [19]:
# print dataframe schema
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Cast `dt` colums to string to summary for missing values, we will create a new dataframe for this purpose of missing values summary

In [20]:
temperature_df_2 = temperature_df.withColumn("dt",col("dt").cast(StringType())) # convert dt column type to string

In [21]:
temp_nan_count_df = temperature_df_2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in temperature_df_2.columns]).toPandas()
temp_nan_count_df = pd.melt(temp_nan_count_df, var_name='cols', value_name='values')
temp_nan_count_df['% missing values'] = 100*temp_nan_count_df['values']/temperature_total_row

Summary for missing values

In [22]:
print(temp_nan_count_df)

                            cols  values  % missing values
0                             dt       0          0.000000
1             AverageTemperature  364130          4.234458
2  AverageTemperatureUncertainty  364130          4.234458
3                           City       0          0.000000
4                        Country       0          0.000000
5                       Latitude       0          0.000000
6                      Longitude       0          0.000000


#### Cleaning Steps
Document steps necessary to clean the data

Drop missing rows

In [23]:
temperature_df = temperature_df.dropna(subset=['AverageTemperature', 'AverageTemperatureUncertainty'])

In [24]:
print(temperature_df.count())

8235082


Drop duplicates rows

In [25]:
temperature_df = temperature_df.drop_duplicates(subset=['dt', 'City', 'Country'])

In [26]:
print(temperature_df.count())

8190783


### **U.S. City Demographic Data**

#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [27]:
demographics_df = spark.read.csv('us-cities-demographics.csv', inferSchema=True, header=True, sep=';')

In [28]:
# display the first five records
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [29]:
# lets see the dataframe schema
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [30]:
# count the number of records in dataset
total_demographics_row = demographics_df.count()
print(total_demographics_row)

2891


In [31]:
demo_nan_count_df = demographics_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in demographics_df.columns]).toPandas()
demo_nan_count_df = pd.melt(demo_nan_count_df, var_name='cols', value_name='values')
demo_nan_count_df['% missing values'] = 100*demo_nan_count_df['values']/demographics_df.count()

Summary missing values

In [32]:
print(demo_nan_count_df)

                      cols  values  % missing values
0                     City       0          0.000000
1                    State       0          0.000000
2               Median Age       0          0.000000
3          Male Population       3          0.103770
4        Female Population       3          0.103770
5         Total Population       0          0.000000
6       Number of Veterans      13          0.449671
7             Foreign-born      13          0.449671
8   Average Household Size      16          0.553442
9               State Code       0          0.000000
10                    Race       0          0.000000
11                   Count       0          0.000000


#### Cleaning Steps
Document steps necessary to clean the data

Remove rows have missing data

In [33]:
demographics_df = demographics_df.dropna()

In [34]:
print(demographics_df.count())

2875


Remove duplicates

In [35]:
demographics_df = demographics_df.dropDuplicates(subset=['City', 'State', 'State Code', 'Race'])

In [36]:
print(demographics_df.count())

2875


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The conceptual data model is as image below.
We have 1 fact table as `i94immigration_fact` and 3 dimension table as `country_temperature_dim`, `calendar_dim` and `us_demographics_dim`.
These dimension tables can link to fact table through `i94res`, `arrdate` and `i94addr` fields in `i94immigration_fact` table.
With this database, data analysit can deep dive to figure out patterns for immigration data, such as there is any commonality between residence country and US country state - are they prefer some specific US states over other US state.


![data_model](data_model.png)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The data pipelines steps are as below:
- Load the datasets with Spark dataframe
- Explore and clean immigration dataframe for each month
- Explore and clean temperature dataframe
- Explore and clean demographics dataframe
- Create calendar table
- Create temperature table
- Create demographics table
- Create immigration fact table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Create calendar_dim table

In [37]:
def create_calendar_dim(df):
    """
    Function to create calendar_dim table
    
    :param df: spark immigration dataframe
    :return: calendar_dim table
    """
    # udf function to convert arrival date to datetime
    epoch = dt.datetime(1960, 1, 1)
    udf_datetime = udf(lambda x: (epoch.date() + dt.timedelta(x)).isoformat() if x else None)
    
    # create calendar table
    calendar_df = df.select(['arrdate']).withColumn("arrdate", udf_datetime(df.arrdate)).distinct()
    
    # expand df by adding other calendar columns
    calendar_df = calendar_df.withColumn('arrival_day', dayofmonth('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_week', weekofyear('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_month', month('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_year', year('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_weekday', dayofweek('arrdate'))

    calendar_df = calendar_df.withColumn('id', monotonically_increasing_id())
    
    return calendar_df

In [38]:
calendar_dim = create_calendar_dim(immigration_df)

Check top 5 rows of calendar_dim table

In [39]:
calendar_dim.limit(5).toPandas()

Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday,id
0,2016-04-22,22,16,4,2016,6,8589934592
1,2016-04-15,15,15,4,2016,6,25769803776
2,2016-04-18,18,16,4,2016,2,42949672960
3,2016-04-09,9,14,4,2016,7,68719476736
4,2016-04-11,11,15,4,2016,2,85899345920


#### Create country_temperature_dim table

We created a new file named `i94res.csv` which extracted data from `I94_SAS_Labels_Descriptions.SAS` to map country name with country code.

In [40]:
def create_country_temperature_dim(immigration_df, temp_df):
    """
    Function to create country_temperature_dim
    
    :param immigration_df: spark immigration dataframe
    :param temp_df: spark global land temperature dataframe
    :return: country_temperature_dim dimension table
    """
    
    # load the i94res to country mapping data
    country_code_mapping = pd.read_csv('i94res.csv')
    
    land_temp_df = temp_df.select(['Country', 'AverageTemperature']).groupby('Country').avg()
    land_temp_df = land_temp_df.withColumnRenamed('avg(AverageTemperature)', 'average_temperature') \
                                       .withColumnRenamed('Country', 'country').toPandas()
    
    @udf()
    def get_country_average_temperature(country_name):
        avg_temp = land_temp_df[land_temp_df['country']==country_name]['average_temperature']
        
        if not avg_temp.empty:
            return str(avg_temp.iloc[0])
    
    @udf()
    def get_country_name(country_code):
        name = country_code_mapping[country_code_mapping['code']==country_code]['Name'].iloc[0]
        
        if name:
            return name.title()

    # select and rename i94res column
    country_temperature_dim = immigration_df.select(['i94res']).distinct() \
                .withColumnRenamed('i94res', 'country_code')
    
    # create country_name column
    country_temperature_dim = country_temperature_dim.withColumn('country_name', get_country_name(country_temperature_dim.country_code))
    
    # create average_temperature column
    country_temperature_dim = country_temperature_dim.withColumn('average_temperature', get_country_average_temperature(country_temperature_dim.country_name))
    
    return country_temperature_dim

In [41]:
country_temperature_dim = create_country_temperature_dim(immigration_df, temperature_df)

In [42]:
country_temperature_dim.show(5)

+------------+------------+-------------------+
|country_code|country_name|average_temperature|
+------------+------------+-------------------+
|       692.0|     Ecuador|      20.5391705374|
|       299.0|    Mongolia|     -3.36548531952|
|       576.0| El Salvador|      25.2628525509|
|       735.0|  Montenegro|      10.2210401137|
|       206.0|   Hong Kong|      21.4236961538|
+------------+------------+-------------------+
only showing top 5 rows



In [43]:
print(country_temperature_dim.count())

228


#### Create us_demographics_dim table

In [44]:
def create_us_demographics_dim(df):
    """
    Function to create us_demographics_dim table

    :param df: spark demographics dataframe
    :return: us_demographics_dim table
    """
    us_demographics_dim = df.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('City', 'city') \
            .withColumnRenamed('State', 'state') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code') \
            .withColumnRenamed('Race', 'race') \
            .withColumnRenamed('Count', 'count')

    # lets add an id column
    us_demographics_dim = us_demographics_dim.withColumn('id', monotonically_increasing_id())
    
    return us_demographics_dim

In [45]:
us_demographics_dim = create_us_demographics_dim(demographics_df)

Check top 5 rows of us_demographics_dim

In [46]:
us_demographics_dim.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count,id
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723,0
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152,1
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154,2
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788,3
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057,4


In [47]:
print(us_demographics_dim.count())

2875


#### Create i94immigration_fact table

In [48]:
def create_i94immigration_fact(df):
    """
    Function to create i94 immigration fact
    
    :param df: spark immigration dataframe
    :return: i94immigration_fact table
    """

    # udf function to convert arrival date to datetime
    epoch = dt.datetime(1960, 1, 1)
    udf_datetime = udf(lambda x: (epoch.date() + dt.timedelta(x)).isoformat() if x else None)
    
    # create i94immigration_fact
    i94immigration_fact = df.select("cicid", "i94res", "i94port", "arrdate", "i94mode", "i94addr", "depdate", "i94bir",
                      "count", "entdepa", "entdepd", "biryear", "dtaddto", "gender")
    
    # convert arrival date to datetime
    i94immigration_fact = i94immigration_fact.withColumn("arrdate", udf_datetime(i94immigration_fact.arrdate))
    
    return i94immigration_fact

In [49]:
i94immigration_fact = create_i94immigration_fact(immigration_df)

In [50]:
i94immigration_fact.limit(5).toPandas()

Unnamed: 0,cicid,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,count,entdepa,entdepd,biryear,dtaddto,gender
0,558.0,103.0,SFR,2016-04-01,1.0,CA,20547.0,42.0,1.0,G,O,1974.0,6292016,M
1,596.0,103.0,NAS,2016-04-01,1.0,FL,20547.0,24.0,1.0,G,N,1992.0,6292016,M
2,934.0,104.0,NEW,2016-04-01,1.0,NY,20549.0,54.0,1.0,G,O,1962.0,6292016,F
3,1051.0,104.0,NEW,2016-04-01,1.0,NY,20551.0,28.0,1.0,G,O,1988.0,6292016,M
4,2734.0,107.0,SAJ,2016-04-01,1.0,PR,20552.0,6.0,1.0,G,O,2010.0,9302016,M


In [51]:
print(i94immigration_fact.count())

2384498


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [52]:
def check_no_empty_record(df_name, df):
    """
    Method to check if a data table got empty record
    :param df_name: name of data table
    :param df: datatable
    :return: print out pass/fail check result
    """
    total_count = df.count()
    if total_count == 0:
        print('Data quality checks failed (empty record) for table: {}'.format(df_name))
    else:
        print('Data quality checks passed (no empty record) for table: {}'.format(df_name))

In [53]:
def check_no_duplicate_id(df_name, df, unique_id):
    """
    Method to check if unique id of a table got duplicate.
    :param df_name: name of data table
    :param df: datatable
    :unique_id: column name of unique id of a table
    :return: print out pass/fail result check result
    """
    if df.count() == df.dropDuplicates([unique_id]).count():
        print('Data quality checks passed (no duplicate id) for table: {}'.format(df_name))
    else:
        print('Data quality checks failed (duplicate id) for table: {}'.format(df_name))
    

In [54]:
tables = {
    'i94immigration_fact': [i94immigration_fact, 'cicid'],
    'calendar_dim': [calendar_dim, 'id'],
    'usa_demographics_dim': [us_demographics_dim, 'id'],
    'country_temperature_dim': [country_temperature_dim, 'country_code']
}

for df_name, df_id_list in tables.items():
    check_no_empty_record(df_name, df_id_list[0])
    check_no_duplicate_id(df_name, df_id_list[0], df_id_list[1])

Data quality checks passed (no empty record) for table: i94immigration_fact
Data quality checks passed (no duplicate id) for table: i94immigration_fact
Data quality checks passed (no empty record) for table: calendar_dim
Data quality checks passed (no duplicate id) for table: calendar_dim
Data quality checks passed (no empty record) for table: usa_demographics_dim
Data quality checks passed (no duplicate id) for table: usa_demographics_dim
Data quality checks passed (no empty record) for table: country_temperature_dim
Data quality checks passed (no duplicate id) for table: country_temperature_dim


#### Testing - Query the data model
In order to make sure that we already have the final data model, we will run a query to check.
It might be interesting to see if immigrants would generally flock to states with generally more immigrants.

In [55]:
# create table to count immigrants by arrival states
top_states_df = i94immigration_fact.groupBy("i94addr").count()
top_states_df = top_states_df.orderBy('count', ascending=False)
top_states_df.show(5)

+-------+------+
|i94addr| count|
+-------+------+
|     FL|515286|
|     NY|456061|
|     CA|388254|
|     HI|135723|
|     TX|105597|
+-------+------+
only showing top 5 rows



Now we will join `top_states_df` with `us_demographics_dim` to map top states with state name.

In [56]:
top_states_df = top_states_df.alias('df1')
us_demographics_dim = us_demographics_dim.alias('df2')
top_states_df = top_states_df.join(us_demographics_dim, top_states_df.i94addr==us_demographics_dim.state_code).select(
    'df1.*', 'df2.state').distinct()
top_states_df.show(5)

+-------+------+----------+
|i94addr| count|     state|
+-------+------+----------+
|     FL|515286|   Florida|
|     NY|456061|  New York|
|     CA|388254|California|
|     HI|135723|    Hawaii|
|     TX|105597|     Texas|
+-------+------+----------+
only showing top 5 rows



We have the results for the name of top 5 arrival states for immigrants: `Florida`, `New York`, `California`, `Hawaii` and `Texas`.
Now we want to find top 5 states which have the highest number of foreign born.

In [57]:
top_foreign_born_cities = us_demographics_dim.select('city', 'state_code', 'state', 'foreign_born').orderBy('foreign_born', ascending=False).distinct()
top_foreign_born_states = top_foreign_born_cities.select("state", "foreign_born")
top_foreign_born_states = top_foreign_born_states.groupBy("state").sum().orderBy("sum(foreign_born)", ascending=False)
top_foreign_born_states.show(5)

+----------+-----------------+
|     state|sum(foreign_born)|
+----------+-----------------+
|California|          7448257|
|  New York|          3438081|
|     Texas|          2942164|
|   Florida|          1684897|
|  Illinois|           941735|
+----------+-----------------+
only showing top 5 rows



From the result above, we can see that 4/5 top arrival places for immigrants are actually in top 5 of having highest number of foreign born. 
They are: `California`, `New York`, `Texas` and `Florida`. From this first query, we can see that there is a correlation between immigrants arrival states and the quantity of foreign born in these states.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### i94immigration_fact

#### country_temperature_dim

#### calendar_dim

#### us_demographics_dim

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    - This project uses Apache Spark as it is the right choice to handle big datasets.
* Propose how often the data should be updated and why.
    - The data should be updated monthly as we will have new dataset for every month.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - Currently we just using Spark in local mode, if the data increased by 100x - moving the Spark processing to Amazon EMR will be an optimal choice as it supports scaling.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - We can apply Apache Airflow to handle schedule update.
 * The database needed to be accessed by 100+ people.
     - We can move our analytics database to Amazon Redshift which support scaling.