# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project is designed to create an ETL pipeline that will load cleaned data, and build a start schema model for better analysis. 
Apart from I94 immigration and US city demographic data provided by Udacity, a small dataset of global Gini index is introduced to see if there is an obivious trend fueled by income equality in the source countries of the US immigration.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# import modules and libraries
import pandas as pd
import os
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear

from tools.data_cleaning import remove_invalid_data
from tools.data_cleaning import drop_duplicate_rows
from tools.data_cleaning import data_quality_check
from tools.data_cleaning import remove_invalid_immigration
from tools.data_cleaning import clean_country_mapping
from tools.data_cleaning import data_quality_check
from tools.data_cleaning import check_duplicate_pk

import tools.create_tables as create_tables

In [2]:
# Create Spark session
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    config("spark.driver.memory", "15g") \
    .appName('my-cool-app') .\
    enableHiveSupport().getOrCreate()

## Step 1: Scope the Project and Gather Data
-----------------------------------------
###  1.1 Scope 

This project will gather the data from 3 datasets. 

First I will load data into staging dataframes and clean the raw data. Secondly I will write the data into Fact & Dimension tables which is in parquet files stored in a table folder in this workspace. At last perform a quality check. 

The dimension tables and immigration fact table forms a star schema.Dimension table visa type, immigration origin country and fact table immigration come from the immigration data, dimension table us state stem from the US demographic cities data and the other fact table Gini index is casted from a csv file about world Gini index. 

Fact immigration data contains event data about each immigration attempt into the US recorded in date granularity while fact table Gini index is the measurement by year. Data analyst should pay attention to this in the process of analysis. 

### 1.2 Describe and Gather Data
This project utilizes 3 datasets.
* I94 immigration data comes from the US National Tourism and Trade Office. It consists of applications into the US. This dataset will be the source of the fact table and dimension tables such as country and visa types.
* US city demographic data serves as the source of the US state dimension table.
* World Gini index measures the degree of income inequality. Its value ranges from 0, indicating perfect equality (where everyone receives an equal share), to 1, perfect inequality. This dataset was downloaded from https://www.kaggle.com/datasets/mannmann2/world-income-inequality-database with source being The World Income Inequality Database (WIID) which is maintained by the United Nations University-World Institute for Development Economics Research (UNU-WIDER).

#### Load I94 immigration data

In [3]:
# list all files in the repository
files = os.listdir('../../data/18-83510-I94-Data-2016/')
files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [4]:
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
immigration_df.count()
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### Data Dictionary of Immigration
* I94YR - 4 digit year 
* I94MON - Numeric month 
* I94CIT & I94RES - 3 digit code for immigrant country of birth/residence 
* ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a 
* permament format has not been applied.  Please apply whichever date format 
* works for you. 
* I94MODE - There are missing values as well as not reported (9) 
* I94ADDR - There is lots of invalid codes in this variable and the list below 
* DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that 
* I94BIR - Age of Respondent in Years 
* I94VISA - Visa codes collapsed into three categories:
    - 1 = Business
    - 2 = Pleasure
    - 3 = Student
* COUNT - Used for summary statistics 
* DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use 
* VISAPOST - Department of State where Visa was issued - CIC does not use 
* OCCUP - Occupation that will be performed in U.S. - CIC does not use 
* ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use 
* ENTDEPD - Departure Flag - Departed, lost I-94 or is deceased - CIC does not use 
* ENTDEPU - Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use 
* MATFLAG - Match flag - Match of arrival and departure records 
* BIRYEAR - 4 digit year of birth 
* DTADDTO - Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use 
* GENDER - Non-immigrant sex 
* INSNUM - INS number 
* AIRLINE - Airline used to arrive in U.S. 
* ADMNUM - Admission Number 
* FLTNO - Flight number of Airline used to arrive in U.S. 
* VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S. 

#### Load US cities data

In [3]:
fname = 'us-cities-demographics.csv'
city_df = spark.read.csv(fname, inferSchema=True, header=True, sep=';')
print(city_df.count())
city_df.limit(5).toPandas()

2891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


#### Load Gini index

In [3]:
fname_gini = 'gini_by_country_by_year.csv'
giniIndex_df = spark.read.csv(fname_gini, inferSchema=True, header=True, sep=',')
giniIndex_df.toPandas().head()

Unnamed: 0,country_code,country_name,year,value
0,AGO,Angola,2000,52.0
1,AGO,Angola,2008,42.7
2,AGO,Angola,2018,51.3
3,ALB,Albania,1996,27.0
4,ALB,Albania,2002,31.7


#### Load I94 SAS country mapping

In [8]:
fname_country_mapping = 'country_mapping.csv'
country_mapping = spark.read.csv(fname_country_mapping, inferSchema=True, header=True, sep=',')
print(country_mapping.count())
country_mapping.limit(5).toPandas()

289


Unnamed: 0,code,country_name
0,582,MEXICO Air Sea
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


## Step 2: Explore and Assess the Data
### 2.1 Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [9]:
# calculate number of distinct immigration source residence country codes.
immigration_df.createOrReplaceTempView("immigration_table")
spark.sql("SELECT count(distinct i94res) FROM immigration_table").show()

+----------------------+
|count(DISTINCT i94res)|
+----------------------+
|                   229|
+----------------------+



In [10]:
spark.sql("SELECT distinct i94visa, visatype FROM immigration_table order by visatype").show()

+-------+--------+
|i94visa|visatype|
+-------+--------+
|    1.0|      B1|
|    2.0|      B2|
|    2.0|      CP|
|    2.0|     CPL|
|    1.0|      E1|
|    1.0|      E2|
|    3.0|      F1|
|    3.0|      F2|
|    1.0|     GMB|
|    2.0|     GMT|
|    1.0|       I|
|    1.0|      I1|
|    3.0|      M1|
|    3.0|      M2|
|    2.0|     SBP|
|    1.0|      WB|
|    2.0|      WT|
+-------+--------+



In [11]:
# check the range of year values
spark.sql("SELECT min(i94yr), max(i94yr) FROM immigration_table").show()
spark.sql("SELECT min(i94mon), max(i94mon) FROM immigration_table").show()

+----------+----------+
|min(i94yr)|max(i94yr)|
+----------+----------+
|    2016.0|    2016.0|
+----------+----------+

+-----------+-----------+
|min(i94mon)|max(i94mon)|
+-----------+-----------+
|        4.0|        4.0|
+-----------+-----------+



In [12]:
# calculate distinct state values in US cities data.
city_df.createOrReplaceTempView("state_table")
spark.sql("SELECT count(distinct `state code`),count(distinct state) FROM state_table").show()

+--------------------------+---------------------+
|count(DISTINCT state code)|count(DISTINCT state)|
+--------------------------+---------------------+
|                        49|                   49|
+--------------------------+---------------------+



In [13]:
# check country with the largest gini index value in 2016
giniIndex_df.createOrReplaceTempView("gini_table")
spark.sql("select * from (SELECT year,country_name,avg(value) as value FROM gini_table where year=2016 group by year,country_name) t order by value desc").show(15) 

+----+------------------+-----+
|year|      country_name|value|
+----+------------------+-----+
|2016|          Eswatini| 54.6|
|2016|            Brazil| 53.3|
|2016|         St. Lucia| 51.2|
|2016|          Colombia| 50.6|
|2016|            Panama| 50.4|
|2016|          Honduras| 49.8|
|2016|        Costa Rica| 48.7|
|2016|          Paraguay| 47.9|
|2016|            Mexico| 47.7|
|2016|Dominican Republic| 45.7|
|2016|           Bolivia| 45.3|
|2016|           Ecuador| 45.0|
|2016|            Malawi| 44.7|
|2016|       South Sudan| 44.1|
|2016|            Rwanda| 43.7|
+----+------------------+-----+
only showing top 15 rows



In [14]:
spark.sql("SELECT year,country_name, avg(value) FROM gini_table where year=2016 group by year, country_name order by avg(value) desc").show(15)

+----+------------------+----------+
|year|      country_name|avg(value)|
+----+------------------+----------+
|2016|          Eswatini|      54.6|
|2016|            Brazil|      53.3|
|2016|         St. Lucia|      51.2|
|2016|          Colombia|      50.6|
|2016|            Panama|      50.4|
|2016|          Honduras|      49.8|
|2016|        Costa Rica|      48.7|
|2016|          Paraguay|      47.9|
|2016|            Mexico|      47.7|
|2016|Dominican Republic|      45.7|
|2016|           Bolivia|      45.3|
|2016|           Ecuador|      45.0|
|2016|            Malawi|      44.7|
|2016|       South Sudan|      44.1|
|2016|            Rwanda|      43.7|
+----+------------------+----------+
only showing top 15 rows



In [15]:
# calculate distinct city values in US cities data.
# city_df_pd = pd.read_csv('us-cities-demographics.csv', sep=';')
# len(list(city_df_pd['State Code'].unique()))

### 2.2 Cleaning Steps
Document steps necessary to clean the data

In [16]:
immigration_df = remove_invalid_immigration(spark, immigration_df)
immigration_df.printSchema()

Initial row number in dataframe: 3,096,313
Drop missing data...
Row number after: 3,096,313
Cleaning complete!

- - - - - - - - 

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: dou

In [17]:
# so remove_invalid_data function was tested with the sample csv file
# immigration_df_csv = pd.read_csv('immigration_data_sample.csv')
# immigration_df_csv.head()

In [18]:
# # drop rows where all fields are empty
# immigration_df_csv = drop_duplicate_rows(immigration_df_csv)
# immigration_df_csv = remove_invalid_data(immigration_df_csv)

In [19]:
# Clean US cities data
state_df = remove_invalid_data(city_df.toPandas())
state_df = drop_duplicate_rows(state_df)
state_df.head()

Initial row number in dataframe: 2,891
Drop missing data...
Columns dropped: 
[]
Row number after: 2,891
Cleaning complete!

- - - - - - - -

Dropping duplicate rows...
Initial row number: 
2891
0
 rows dropped.

- - - - - - - - 



Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [20]:
# Clean gini index data
gini_df = remove_invalid_data(giniIndex_df.toPandas())
gini_df = drop_duplicate_rows(gini_df)
gini_df.head()

Initial row number in dataframe: 1,896
Drop missing data...
Columns dropped: 
[]
Row number after: 1,896
Cleaning complete!

- - - - - - - -

Dropping duplicate rows...
Initial row number: 
1896
0
 rows dropped.

- - - - - - - - 



Unnamed: 0,country_code,country_name,year,value
0,AGO,Angola,2000,52.0
1,AGO,Angola,2008,42.7
2,AGO,Angola,2018,51.3
3,ALB,Albania,1996,27.0
4,ALB,Albania,2002,31.7


In [21]:
# clean country mapping csv
country_mapping = clean_country_mapping(spark,country_mapping)
country_mapping.limit(5).toPandas()

Initial row number in dataframe: 289
Drop missing data...
Row number after: 236
Cleaning complete!

- - - - - - - - 



Unnamed: 0,code,country_name
0,582,MEXICO Air Sea
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [22]:
country_mapping.createOrReplaceTempView("contry_mapping_table")
spark.sql("SELECT code, country_name FROM contry_mapping_table").show()

+----+---------------+
|code|   country_name|
+----+---------------+
| 582| MEXICO Air Sea|
| 236|    AFGHANISTAN|
| 101|        ALBANIA|
| 316|        ALGERIA|
| 102|        ANDORRA|
| 324|         ANGOLA|
| 529|       ANGUILLA|
| 518|ANTIGUA-BARBUDA|
| 687|     ARGENTINA |
| 151|        ARMENIA|
| 532|          ARUBA|
| 438|      AUSTRALIA|
| 103|        AUSTRIA|
| 152|     AZERBAIJAN|
| 512|        BAHAMAS|
| 298|        BAHRAIN|
| 274|     BANGLADESH|
| 513|       BARBADOS|
| 104|        BELGIUM|
| 581|         BELIZE|
+----+---------------+
only showing top 20 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [4]:
output_path = "tables/"

In [24]:
# create dimension table dim_us_state using uncleaned df.
# US cities dataset doesn't need cleaning as tested in the step 2.2.
create_tables.create_dim_us_state(spark, city_df, output_path)
dim_us_state = spark.read.parquet("tables/dim_us_state")
dim_us_state.limit(5).toPandas()

Writing table dim_us_state to tables/dim_us_state
Write complete!

- - - - - - - - -



Unnamed: 0,state,state_code,median_age,total_population,average_household_size,id
0,District of Columbia,DC,33.8,672228.0,2.24,1108101562368
1,South Carolina,SC,33.825,107790.666667,2.469583,1047972020224
2,New Hampshire,NH,37.8,99099.0,2.43,171798691840
3,North Dakota,ND,34.35,94745.0,2.145,936302870528
4,Pennsylvania,PA,33.951515,348569.727273,2.507576,910533066752


In [25]:
# create dimension table dim_visa_type
create_tables.create_dim_visa_type(immigration_df, output_path)
dim_visa_type = spark.read.parquet("tables/dim_visa_type")
dim_visa_type.toPandas().head()

Writing table dim_visa_type to tables/dim_visa_type
Write complete!

- - - - - - - - -



Unnamed: 0,visa_type,visa_type_code,id
0,SBP,2.0,541165879296
1,CPL,2.0,335007449088
2,GMB,1.0,395136991232
3,M1,3.0,1314259992576
4,GMT,2.0,1314259992577


In [26]:
# create dimension table dim_from_country
create_tables.create_dim_from_country(spark, immigration_df, country_mapping, output_path)
dim_from_country = spark.read.parquet("tables/dim_from_country")
dim_from_country.limit(5).toPandas()

Writing table dim_from_country to tables/dim_from_country
Write complete!

- - - - - - - - -



Unnamed: 0,country_code,country_name,id
0,723.0,FAROE ISLANDS (PART OF DENMARK),1546188226560
1,760.0,MAYOTTE (AFRICA - FRENCH),1176821039104
2,748.0,REPUBLIC OF SOUTH SUDAN,1494648619008
3,296.0,UNITED ARAB EMIRATES,738734374912
4,508.0,NETHERLANDS ANTILLES,317827579904


In [27]:
# create date dimension table
dim_date = create_tables.create_dim_date(immigration_df, output_path)
dim_date.limit(5).toPandas()

Writing table dim_date to tables/dim_date
Write complete!

- - - - - - - - -



Unnamed: 0,arrdate,arrival_date,day,month,year,week,weekday,id
0,20550.0,2016-04-06,6,4,2016,14,4,77309411328
1,20556.0,2016-04-12,12,4,2016,15,3,137438953472
2,20553.0,2016-04-09,9,4,2016,14,7,146028888064
3,20551.0,2016-04-07,7,4,2016,14,5,257698037760
4,20565.0,2016-04-21,21,4,2016,16,5,266287972352


In [28]:
# create immigation fact table
create_tables.create_fact_immigration(spark, immigration_df, output_path)
fact_immigration = spark.read.parquet("tables/fact_immigration")
fact_immigration.limit(5).toPandas()

Writing table fact_immigration to tables/fact_immigration
Write complete!

- - - - - - - - -



Unnamed: 0,file_id,residence_country_code,birth_country_code,admission_port_code,arrdate,arrival_state_code,departure_date_sas,applicant_age,applicant_birth_year,applicant_gender,visa_type_code,arrival_flag,departure_flag,airline,admission_number,flight_number,id
0,5748517.0,438.0,245.0,LOS,20574.0,CA,20582.0,40.0,1976.0,F,1.0,G,O,QF,94953870000.0,11,111669149696
1,5748518.0,438.0,245.0,LOS,20574.0,NV,20591.0,32.0,1984.0,F,1.0,G,O,VA,94955620000.0,7,111669149697
2,5748519.0,438.0,245.0,LOS,20574.0,WA,20582.0,29.0,1987.0,M,1.0,G,O,DL,94956410000.0,40,111669149698
3,5748520.0,438.0,245.0,LOS,20574.0,WA,20588.0,29.0,1987.0,F,1.0,G,O,DL,94956450000.0,40,111669149699
4,5748521.0,438.0,245.0,LOS,20574.0,WA,20588.0,28.0,1988.0,M,1.0,G,O,DL,94956390000.0,40,111669149700


In [5]:
# create Gini index fact table
create_tables.create_fact_gini_index(spark, giniIndex_df, output_path)
fact_gini_index = spark.read.parquet("tables/fact_gini_index")
fact_gini_index.limit(5).toPandas()

Unnamed: 0,country_name,year,index_value,id
0,BELGIUM,2015,27.7,1288490188800
1,CZECHIA,2008,26.3,1288490188801
2,FINLAND,2005,27.6,1288490188802
3,FINLAND,2009,27.5,1288490188803
4,FINLAND,2011,27.6,1288490188804


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [30]:
# check if dimension fact table is empty
data_quality_check(dim_from_country,'origin_country')
data_quality_check(dim_us_state,'dim_us_state')
data_quality_check(dim_visa_type,'dim_visa_type')
data_quality_check(dim_date,'dim_date')

229
Data quality check passed for origin_country with record_count: 229 records.
49
Data quality check passed for dim_us_state with record_count: 49 records.
17
Data quality check passed for dim_visa_type with record_count: 17 records.
30
Data quality check passed for dim_date with record_count: 30 records.


In [31]:
# check if fact table is empty
data_quality_check(fact_immigration, 'fact_immigration')
data_quality_check(fact_gini_index, 'fact_gini_index')

3096313
Data quality check passed for fact_immigration with record_count: 3096313 records.
1896
Data quality check passed for fact_gini_index with record_count: 1896 records.


In [6]:
# check if there is duplicate primary key in fact table Gini index
check_duplicate_pk(spark, fact_gini_index, 'fact_gini_index')

Data quality check passed for fact_gini_index with zero duplicate primary key!


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [32]:
# refer to Data_Dictionary.txt in the home directory.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [33]:
# refer to README.mD in the home diretory

# Example Queries

In [34]:
fact_immigration.createOrReplaceTempView("immigration")
dim_from_country.createOrReplaceTempView("country")
dim_date.createOrReplaceTempView("date")
fact_gini_index.createOrReplaceTempView("gini_index")

spark.sql("""
    select 
        date.year,
        c.country_name, 
        count(f.file_id) as application_time,
        round(avg(g.index_value),1) as gini_index
    from immigration f 
    left join country c on 
        f.residence_country_code = c.country_code
    left join date on date.arrdate = f.arrdate
    left join gini_index g 
        on upper(g.country_name) = upper(c.country_name)
        and date.year =g.year
    
    group by 
        date.year,
        c.country_name 
    order by count(f.file_id) desc
    """).toPandas().head(15)

Unnamed: 0,year,country_name,application_time,gini_index
0,2016,UNITED KINGDOM,368421,34.8
1,2016,JAPAN,249167,
2,2016,CHINA,185609,38.5
3,2016,FRANCE,185339,31.9
4,2016,MEXICO Air Sea,179603,
5,2016,GERMANY,156613,31.6
6,2016,SOUTH KOREA,136312,
7,2016,BRAZIL,134907,53.3
8,2016,AUSTRALIA,112407,33.7
9,2016,INDIA,107193,34.8
