# Project Title
### Data Engineering Capstone Project

#### Project Summary


The objective of this project is to use four data sets containing immigration data, airport codes, demographics of US cities and global temperature data. 
The primary purpose of the combination is to create an ETL pipeline which can be used to derive various correlations, trends and analytics. 
A use case for this analytics database is to find immigration patterns to the US.
For instance, one could attempt to correlate the influence of the average temperature of a migrant's resident country on their choice of US state; Which contries are attracting most of the immigrants etc.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [4]:
# Do all imports and installs here
! pip install -U numpy
! pip install missingno

Requirement already up-to-date: numpy in /opt/conda/lib/python3.6/site-packages (1.19.5)


## Import Required Libraries

In [5]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import os
import datetime as dt
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, avg
from pyspark.sql.types import *
import requests
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType
import datetime, time
import utilHelper as util

### Create a Spark Session

In [6]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.python.worker.memory", "15g") \
        .getOrCreate()
    return spark
spark=create_spark_session()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

In this project I will gather the data using immigration data provided in the project resources along with data related to global temperature, airports, and demographics.
I will load this data into staging dataframes. 
I will clean the raw data and write it back to csv files. I will perform an ETL process using a Spark cluster. 
Clean files will be loaded and transformed to create star schema having fact and dimension tables and write it back to parquet files. 
The star schema can then be used by the relevant parties to perform data analytics, correlation and ad-hoc reporting in an effective and efficient manner.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

- **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from. 
- **World Temperature Data**: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
- **U.S. City Demographic Data**: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- **Airport Code Table**: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).
- **ISO Codes**: The dataset contains a list of all states and their codes. It can downloaded from [this Kaggle dataset](https://www.kaggle.com/datasets/juanumusic/countries-iso-codes)
- **Country Codes** , **Modes**, **Visa Codes** tables are extracted from I94_SAS_Labels_Descriptions.SAS file


##### 1. Global Temperatures Data

In [7]:
gtemp_df = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
gtemp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [8]:
gtemp_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


##### Data Dictionary

Feature                       |Description
:-----------------------------|:-----------
dt                            |Date
AverageTemperature            |Average temperature in celsius
AverageTemperatureUncertainty |95% confidence interval around average temperature
City                          |Name of city
Country                       |Name of country
Latitude                      |Latitude of city
Longitude                     |Longitude of city

##### 2. US Cities Demographics Data

In [9]:
us_demographics = 'us-cities-demographics.csv'
demo_df = spark.read.csv(us_demographics, inferSchema=True, header=True, sep=';')

In [10]:
demo_df.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


In [11]:
demo_df.limit(10).toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 12 columns):
City                      10 non-null object
State                     10 non-null object
Median Age                10 non-null float64
Male Population           10 non-null int32
Female Population         10 non-null int32
Total Population          10 non-null int32
Number of Veterans        10 non-null int32
Foreign-born              10 non-null int32
Average Household Size    10 non-null float64
State Code                10 non-null object
Race                      10 non-null object
Count                     10 non-null int32
dtypes: float64(2), int32(6), object(4)
memory usage: 800.0+ bytes


##### Data Dictionary

Feature                       |Description
:-----------------------------|:-----------
City |City Name
State |US State
Median Age |Median population age
Male Population |Male population
Female Population |Female population
Total Population |Total population
Number of Veterans |Number of veterans living in the city
Foreign-born |Number of residents who were not born in the city
Average Household Size |Average size of houses in the city
State Code |Code of the state
Race |Race class
Count |Number of individuals in each race

##### 3. Airport Codes Data

In [12]:
airport_codes_file = 'airport-codes_csv.csv'
air_df = pd.read_csv(airport_codes_file)

In [13]:
pd.set_option('display.max_colwidth',100) 

In [14]:
air_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"



##### Data Dictionary

Feature       |Description
:-------------|:-----------
ident         |Unique identifier
type          |Airport type
name          |Airport name
elevation_ft  |Airport altitude
continent     |Continent
iso_country   |ISO Code of the airport's country
iso_region    |ISO Code for the airport's region
municipality  | City/Municipality where the airport is located
gps_code      |Airport GPS Code
iata_code     |Airport IATA Code
local_code    |Airport local code
coordinates   |Airport coordinates

##### 4. Immigration Data

In [15]:
immigration_sample_data = 'immigration_data_sample.csv'
immigration_sample_df = pd.read_csv(immigration_sample_data)

In [16]:
print(len(immigration_sample_df.index))

1000


In [17]:
immigration_i94_df = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 'sas7bdat', encoding="ISO-8859-1")
immigration_i94_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [18]:
print(len(immigration_i94_df.index))

3096313


In [19]:
immigration_i94_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


##### Data Dictionary

Feature  |Description
:--------|:-----------
cicid    |Unique ID
i94yr    |year
i94mon   |month
i94cit   |3 digit code for immigrant country of birth
i94res   |3 digit code for immigrant country of residence
i94port  |Port of admission
arrdate  |Arrival Date in the USA
i94mode  |Mode of transportation
i94addr  |USA State of arrival
depdate  |Departure Date from the USA
i94bir   |Age of Respondent in Years
i94visa  |Visa codes collapsed into three categories
count    |Field used for summary statistics
dtadfile |Character Date Field - Date added to I-94 Files
visapost |Department of State where where Visa was issued
occup    |Occupation that will be performed in U.S
entdepa  |Arrival Flag - admitted or paroled into the U.S.
entdepd  |Departure Flag - Departed, lost I-94 or is deceased
entdepu  |Update Flag - Either apprehended, overstayed, adjusted to perm residence
matflag  |Match flag - Match of arrival and departure records
biryear  |year of birth
dtaddto  |Date to which admitted to U.S.
gender   |Non-immigrant sex
insnum   |INS number
airline  |Airline used to arrive in U.S.
admnum   |Admission Number
fltno    |Flight number of Airline used to arrive in U.S.
visatype |Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### Extract Tables from I94_SAS_Labels_Description

In [20]:
def extract_table(start, end, sep):
    codes = []
    keys = []
    values = []
    with open('I94_SAS_Labels_Descriptions.SAS', mode='r') as file:
        for i, line in enumerate(file.readlines()):
            if i<start: pass
            elif i==end: break
            else:
                line = line.replace("'","")
                codes.append(line.strip().replace("\t",""))
    for key in codes:
        keys.append(key.split(sep)[0])
        values.append(key.split(sep)[1])
    values[-1] = values[-1].replace(" ;","")
    df = pd.DataFrame({'Code':keys, 'Name':values})
    return df

##### 5. Extracting Country Codes


In [21]:
country_code_df= extract_table(9,298,' =  ')
country_code_df.to_csv('Data/extracted_country_codes.csv', index=False)
country_code_df

Unnamed: 0,Code,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no land arrivals)"
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA



##### Data Dictionary

Feature       |Description
:-------------|:-----------
Code          |Country Code
Name          |Country name

##### 6. Extracting Modes

In [22]:
modes_df = extract_table(972,976,' = ')
modes_df.to_csv('Data/Modes.csv', index=False)
modes_df

Unnamed: 0,Code,Name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported



##### Data Dictionary

Feature       |Description
:-------------|:-----------
Code          |Code
Name          |Mode name

##### 7. Extracting Visa Codes

In [23]:
visa_df = extract_table(1046, 1049,'= ')
visa_df.to_csv('Data/visa_codes.csv', index=False)
visa_df

Unnamed: 0,Code,Name
0,1,Business
1,2,Pleasure
2,3,Student



##### Data Dictionary

Feature       |Description
:-------------|:-----------
Code          |Code
Name          |Visa Category

In [None]:
# from pyspark.sql import SparkSession

#spark = SparkSession.builder.\
#config("spark.jars.repositories", "https://repos.spark-packages.org/").\
#config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
#enableHiveSupport().getOrCreate()

#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [11]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

 - Remove rows having null values for id
 - Check if primary keys are unique
 - Check we don't have any duplicate rows

#### Cleaning Steps
Document steps necessary to clean the data

##### 1. Global Temperatures Data

In [24]:
util.display_dataframe_info(gtemp_df)



---------------------
DF Info
---------------------
Dataframe shape: (8599212, 7) 

Column Headers: ['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude'] 

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object


In [25]:
util.display_dataframe_report(gtemp_df)

dt has 3239 unique values
Listing up to 12 unique values:
['1743-11-01' '1743-12-01' '1744-01-01' '1744-02-01' '1744-03-01'
 '1744-04-01' '1744-05-01' '1744-06-01' '1744-07-01' '1744-08-01'
 '1744-09-01' '1744-10-01']

---------------------

AverageTemperature has 111995 unique values
Listing up to 12 unique values:
[ 6.068    nan  5.788 10.644 14.051 16.082 12.781  7.95   4.639  0.122
 -1.333 -2.732]

---------------------

AverageTemperatureUncertainty has 10903 unique values
Listing up to 12 unique values:
[1.737   nan 3.624 1.283 1.347 1.396 1.454 1.63  1.302 1.756 1.642 1.358]

---------------------

City has 3448 unique values
Listing up to 12 unique values:
['Århus' 'Çorlu' 'Çorum' 'Öskemen' 'Ürümqi' 'A Coruña' 'Aachen' 'Aalborg'
 'Aba' 'Abadan' 'Abakaliki' 'Abakan']

---------------------

Country has 159 unique values
Listing up to 12 unique values:
['Denmark' 'Turkey' 'Kazakhstan' 'China' 'Spain' 'Germany' 'Nigeria'
 'Iran' 'Russia' 'Canada' "Côte D'Ivoire" 'United Kingdom']


##### 2. US Cities Demographics Data

In [26]:
util.display_dataframe_info(demo_df.toPandas())



---------------------
DF Info
---------------------
Dataframe shape: (2891, 12) 

Column Headers: ['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race', 'Count'] 

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int32
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int32
dtype: object


In [27]:
util.display_dataframe_report(demo_df.toPandas())

City has 567 unique values
Listing up to 12 unique values:
['Silver Spring' 'Quincy' 'Hoover' 'Rancho Cucamonga' 'Newark' 'Peoria'
 'Avondale' 'West Covina' "O'Fallon" 'High Point' 'Folsom' 'Philadelphia']

---------------------

State has 49 unique values
Listing up to 12 unique values:
['Maryland' 'Massachusetts' 'Alabama' 'California' 'New Jersey' 'Illinois'
 'Arizona' 'Missouri' 'North Carolina' 'Pennsylvania' 'Kansas' 'Florida']

---------------------

Median Age has 180 unique values
Listing up to 12 unique values:
[33.8 41.  38.5 34.5 34.6 33.1 29.1 39.8 36.  35.5 40.9 34.1]

---------------------

Male Population has 594 unique values
Listing up to 12 unique values:
[ 40601.  44129.  38040.  88127. 138040.  56229.  38712.  51629.  41762.
  51751.  41051. 741270.]

---------------------

Female Population has 595 unique values
Listing up to 12 unique values:
[ 41862.  49500.  46799.  87105. 143873.  62432.  41971.  56860.  43270.
  58077.  35317. 826172.]

---------------------


##### 3. Airport Codes Data

In [28]:
util.display_dataframe_info(air_df)



---------------------
DF Info
---------------------
Dataframe shape: (55075, 12) 

Column Headers: ['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country', 'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code', 'coordinates'] 

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object


In [33]:
util.display_dataframe_report(air_df)

ident has 55075 unique values
Listing up to 12 unique values:
['00A' '00AA' '00AK' '00AL' '00AR' '00AS' '00AZ' '00CA' '00CL' '00CN'
 '00CO' '00FA']

---------------------

type has 7 unique values
name has 52144 unique values
Listing up to 12 unique values:
['Total Rf Heliport' 'Aero B Ranch Airport' 'Lowell Field' 'Epps Airpark'
 'Newport Hospital & Clinic Heliport' 'Fulton Airport' 'Cordes Airport'
 'Goldstone /Gts/ Airport' 'Williams Ag Airport'
 'Kitchen Creek Helibase Heliport' 'Cass Field' 'Grass Patch Airport']

---------------------

elevation_ft has 5450 unique values
Listing up to 12 unique values:
[  11. 3435.  450.  820.  237. 1100. 3810. 3038.   87. 3350. 4830.   53.]

---------------------

continent has 7 unique values
iso_country has 244 unique values
Listing up to 12 unique values:
['US' 'PR' 'MH' 'MP' 'GU' 'SO' 'AQ' 'GB' 'PG' 'AD' 'SD' 'SA']

---------------------

iso_region has 2810 unique values
Listing up to 12 unique values:
['US-PA' 'US-KS' 'US-AK' 'US-AL' 'US-A

##### 4. Immigration Data

In [34]:
util.display_dataframe_info(immigration_i94_df)



---------------------
DF Info
---------------------
Dataframe shape: (3096313, 28) 

Column Headers: ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype'] 

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile     object
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu      object
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum       object
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object


In [35]:
util.display_dataframe_report(immigration_i94_df)

cicid has 3096313 unique values
Listing up to 12 unique values:
[ 6.  7. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24.]

---------------------

i94yr has 1 unique values
i94mon has 1 unique values
i94cit has 243 unique values
Listing up to 12 unique values:
[692. 254. 101. 102. 103. 104. 105. 107. 108. 109. 110. 111.]

---------------------

i94res has 229 unique values
Listing up to 12 unique values:
[692. 276. 101. 110. 117. 112. 251. 102. 103. 104. 111. 119.]

---------------------

i94port has 299 unique values
Listing up to 12 unique values:
['XXX' 'ATL' 'WAS' 'NYC' 'TOR' 'BOS' 'HOU' 'MIA' 'CHI' 'LOS' 'CLT' 'DEN']

---------------------

arrdate has 30 unique values
Listing up to 12 unique values:
[20573. 20551. 20545. 20546. 20547. 20548. 20549. 20550. 20552. 20553.
 20554. 20555.]

---------------------

i94mode has 5 unique values
i94addr has 459 unique values
Listing up to 12 unique values:
[nan 'AL' 'MI' 'MA' 'NJ' 'NY' 'MO' 'TX' 'CT' 'FL' 'IL' 'CA']

---------------------

depdate 

# Performing cleaning tasks here

- **Drop columns**: drop columns containing over 90% missing values
- **Drop duplicate values**: Drop duplicate values from all dataframes
- **Change Data Types**: All columns should have approriate column data types
- **Immigration**: Conversion of arrdate, depdate, and dtaddto into datetime format
- **Temparature**: Remove Rows having null AverageTemperature
- **Airport Code**: Remove Rows having null iso_country

In [36]:
# Drop columns with over 90% missing values
clean_gtemp = util.remove_missing_data(gtemp_df)

Removing missing data...
Cleaning complete!


In [37]:
clean_gtemp = util.remove_duplicate_rows(clean_gtemp)

Removing duplicate rows...
0 rows removed.


In [38]:
start_date = "2010-01-01"
end_date = "2020-01-01"

after_start_date = clean_gtemp["dt"] >= start_date
before_end_date = clean_gtemp["dt"] <= end_date
between_two_dates = after_start_date & before_end_date
clean_gtemp = clean_gtemp.loc[between_two_dates]

In [39]:
clean_gtemp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3194,2010-01-01,-3.799,0.24,Århus,Denmark,57.05N,10.33E
3195,2010-02-01,-2.691,0.272,Århus,Denmark,57.05N,10.33E
3196,2010-03-01,2.429,0.427,Århus,Denmark,57.05N,10.33E
3197,2010-04-01,7.123,0.234,Århus,Denmark,57.05N,10.33E
3198,2010-05-01,10.657,0.314,Århus,Denmark,57.05N,10.33E


In [40]:
util.display_dataframe_info(clean_gtemp)



---------------------
DF Info
---------------------
Dataframe shape: (157950, 7) 

Column Headers: ['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude'] 

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object


In [41]:
# Convert dt to datetime
clean_gtemp.dt = pd.to_datetime(clean_gtemp.dt)

In [42]:
util.display_dataframe_info(clean_gtemp)



---------------------
DF Info
---------------------
Dataframe shape: (157950, 7) 

Column Headers: ['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude'] 

dt                               datetime64[ns]
AverageTemperature                      float64
AverageTemperatureUncertainty           float64
City                                     object
Country                                  object
Latitude                                 object
Longitude                                object
dtype: object


In [43]:
# Remove rows with missing Avg Temperature
clean_gtemp = clean_gtemp[clean_gtemp.AverageTemperature.isnull()==False]

In [44]:
# check if Missing values
clean_gtemp.isnull().sum()

dt                               0
AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
Country                          0
Latitude                         0
Longitude                        0
dtype: int64

In [45]:
clean_gtemp.to_csv('Data/CleanTemperature.csv', index=False)

In [46]:
# Drop columns with over 90% missing values
clean_demog = util.remove_missing_data(demo_df.toPandas())

Removing missing data...
Cleaning complete!


In [47]:
clean_demog = util.remove_duplicate_rows(clean_demog)

Removing duplicate rows...
0 rows removed.


In [48]:
clean_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [49]:
# Remove null values
clean_demog = clean_demog[clean_demog['Male Population'].isnull()==False]
clean_demog = clean_demog[clean_demog['Female Population'].isnull()==False]
clean_demog = clean_demog[clean_demog['Number of Veterans'].isnull()==False]
clean_demog = clean_demog[clean_demog['Foreign-born'].isnull()==False].reset_index(drop=True)

In [50]:
# Change datatype
clean_demog['Male Population'] = clean_demog['Male Population'].astype(int)
clean_demog['Female Population'] = clean_demog['Female Population'].astype(int)
clean_demog['Number of Veterans'] = clean_demog['Number of Veterans'].astype(int)
clean_demog['Foreign-born'] = clean_demog['Foreign-born'].astype(int)

In [51]:
util.display_dataframe_info(clean_demog)



---------------------
DF Info
---------------------
Dataframe shape: (2875, 12) 

Column Headers: ['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race', 'Count'] 

City                       object
State                      object
Median Age                float64
Male Population             int64
Female Population           int64
Total Population            int32
Number of Veterans          int64
Foreign-born                int64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int32
dtype: object


In [52]:
clean_demog.to_csv('Data/CleanDemographics.csv', index=False)

In [53]:
# Drop columns with over 90% missing values
clean_airdf = util.remove_missing_data(air_df)

Removing missing data...
Cleaning complete!


In [54]:
clean_airdf = util.remove_duplicate_rows(clean_airdf)

Removing duplicate rows...
0 rows removed.


In [55]:
clean_airdf.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [56]:
clean_airdf.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [57]:
#Remove rows missing iso_country
clean_airdf = clean_airdf[clean_airdf.iso_country.isnull()==False].reset_index(drop=True)

In [58]:
clean_airdf.isnull().sum()

ident               0
type                0
name                0
elevation_ft     6990
continent       27719
iso_country         0
iso_region          0
municipality     5574
gps_code        13872
iata_code       45670
local_code      26142
coordinates         0
dtype: int64

In [59]:
clean_airdf.to_csv('Data/CleanAirportCode.csv', index=False)

In [60]:
# Drop columns with over 90% missing values
clean_immi94 = util.remove_missing_data(immigration_i94_df)

Removing missing data...
Cleaning complete!


In [61]:
clean_immi94 = util.remove_duplicate_rows(clean_immi94)

Removing duplicate rows...
0 rows removed.


In [62]:
clean_immi94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,T,,,1979.0,10282016,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,G,,,1991.0,D/S,M,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,T,O,M,1961.0,09302016,M,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,O,O,M,1988.0,09302016,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,O,O,M,2012.0,09302016,,AA,92468460000.0,199.0,B2


In [63]:
# Change Datatypes
clean_immi94.i94cit = clean_immi94.i94cit.astype(int)
clean_immi94.i94res = clean_immi94.i94res.astype(int)
clean_immi94.i94yr = clean_immi94.i94yr.astype(int)
clean_immi94.i94mon = clean_immi94.i94mon.astype(int)
clean_immi94.i94mode = clean_immi94.i94mode.astype('category')
clean_immi94.i94visa = clean_immi94.i94visa.astype('category')

In [64]:
# Change arrdate,depdate, dtaddto, and dtaddfile to datetime
clean_immi94.arrdate=pd.to_timedelta(clean_immi94.arrdate, unit='D') + pd.Timestamp('1960-1-1')
clean_immi94.depdate=pd.to_timedelta(clean_immi94.depdate, unit='D') + pd.Timestamp('1960-1-1')
clean_immi94.dtaddto=pd.to_datetime(clean_immi94.dtaddto, format='%m%d%Y', errors = 'coerce')
clean_immi94.dtadfile=pd.to_datetime(clean_immi94.dtadfile, format='%Y%m%d')

In [65]:
util.display_dataframe_info(clean_immi94)



---------------------
DF Info
---------------------
Dataframe shape: (3096313, 25) 

Column Headers: ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'entdepa', 'entdepd', 'matflag', 'biryear', 'dtaddto', 'gender', 'airline', 'admnum', 'fltno', 'visatype'] 

cicid              float64
i94yr                int64
i94mon               int64
i94cit               int64
i94res               int64
i94port             object
arrdate     datetime64[ns]
i94mode           category
i94addr             object
depdate     datetime64[ns]
i94bir             float64
i94visa           category
count              float64
dtadfile    datetime64[ns]
visapost            object
entdepa             object
entdepd             object
matflag             object
biryear            float64
dtaddto     datetime64[ns]
gender              object
airline             object
admnum             float64
fltno    

In [66]:
util.display_dataframe_info(immigration_i94_df)



---------------------
DF Info
---------------------
Dataframe shape: (3096313, 28) 

Column Headers: ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype'] 

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile     object
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu      object
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum       object
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object


In [67]:
clean_immi94[clean_immi94.arrdate>clean_immi94.depdate]

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
84805,168392.0,2016,4,582,582,HOU,2016-04-01,1.0,,2016-03-31,59.0,2.0,1.0,2016-04-01,MTR,U,N,M,1957.0,2016-05-07,M,UA,7.882699e+10,5574,B2
121285,244218.0,2016,4,131,131,KOA,2016-04-02,1.0,,2016-03-19,33.0,2.0,1.0,2016-04-02,,T,O,M,1983.0,2016-06-09,F,LX,6.718949e+08,52,WT
164077,298072.0,2016,4,258,117,BOS,2016-04-02,1.0,NY,2016-01-26,32.0,1.0,1.0,2016-04-02,ISL,T,Q,M,1984.0,2016-04-03,M,EK,6.730104e+08,239,B1
179117,367773.0,2016,4,582,582,TUC,2016-04-02,1.0,,2016-04-01,37.0,1.0,1.0,2016-04-02,HER,U,R,M,1979.0,2016-08-28,M,*GA,8.938800e+10,XAJRS,B1
182346,414662.0,2016,4,585,585,MIA,2016-04-02,1.0,,2016-01-31,39.0,2.0,1.0,2016-04-02,,T,Q,M,1977.0,2016-07-22,F,B6,6.730086e+08,924,B2
191338,425452.0,2016,4,691,691,MIA,2016-04-02,1.0,,2016-04-01,48.0,2.0,1.0,2016-04-02,,T,O,M,1968.0,2016-10-01,M,4O,6.693548e+08,2950,B2
275275,575574.0,2016,4,582,582,MIA,2016-04-03,1.0,FL,2016-04-02,30.0,2.0,1.0,2016-04-03,MEX,T,O,M,1986.0,2016-10-02,F,4O,6.724898e+08,5964,B2
294625,630663.0,2016,4,111,111,BOS,2016-04-04,1.0,,2016-03-12,21.0,3.0,1.0,2016-04-04,,T,K,M,1995.0,NaT,F,FI,6.767670e+08,631,F1
395868,826051.0,2016,4,117,117,NOL,2016-04-05,2.0,TX,2016-04-04,32.0,2.0,1.0,2016-04-05,,U,O,M,1984.0,2016-06-17,M,VES,5.469735e+10,91388,WT
398137,828816.0,2016,4,124,124,HOU,2016-04-05,1.0,,2016-04-04,29.0,2.0,1.0,2016-04-05,,T,N,M,1987.0,2016-07-03,M,UA,6.808067e+08,47,WT


In [68]:
# Remove rows having arrdate>depdate
clean_immi94 =clean_immi94[clean_immi94.arrdate<=clean_immi94.depdate]
clean_immi94[clean_immi94.arrdate>clean_immi94.depdate]

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype


In [69]:
clean_immi94.to_csv('Data/CleanImmigration.csv', index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

We will be studying the immigration data. With reference to Kimball Dimensional Modelling Techniques, laid out in this document (http://www.kimballgroup.com/wp-content/uploads/2013/08/2013.09-Kimball-Dimensional-Modeling-Techniques11.pdf), following Four-Step Dimensional Design Process is being considered:

1. Select the Business Process:
    The immigration department follows their business process of admitting migrants into the country. This process generates events which are captured and translated to facts in a fact table

2. Declare the Grain:
    The grain identifies exactly what is represented in a single fact table row.
    In this project, the grain is declared as a single occurrence of a migrant entering the USA.

3. Identify the Dimensions:
    Dimension tables provide context around an event or business process.        

4. Identify the Facts:
    Fact tables focus on the occurrences of a singular business process, and have a one-to-one relationship with the events described in the grain. i94 data will be used as fact table.     

For this application, I have created a set of Fact and Dimension tables in a Relational Database Management System to form a Star Schema which is represented in below image:

![Conceptual_data_model.PNG](attachment:Conceptual_data_model.PNG)
    

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

In this project, I will be performing below steps:
1. Extract the clean data
2. Create Dimension tables
3. Create Fact table
4. Write data into parquet files
5. Perform data quality checks

I will be using below **ETL** steps:
    
1. Extract
   - Create spark dataframe for all the clean data
2. Transform
   - **fact_immigration**
       - DROP Rows with null cicid
       - DROP Rows with null gender
       - DROP Rows with NULL arrival_date
       - DROP ROWS with Invalid/should not show /No COuntry Codes as citizens or resident
       - Replace citizen, resident, mode from the details given in I94_SAS_Labels_Descriptions.SAS
   - **dim_temperature**
       - DROP Rows with null city/country
       - Create Avg temperature field which is the average of data for all the years
   - **dim_aiport**
       - DROP Rows with Null id values
       - DROP Rows having aiport type other than small, medium and large as immigration is not possible on other airport types
   - **dim_demographics**
       - DROP Rows with no city or country data
   - **dim_time**
       - Created for all the dates in arrival_date and departure date (using i94 data)
   - **dim_iso**
       - Convert to upper case country names
       
3. Load
    - Write back data models in parquet format

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Step-1. Extract

In [70]:
# Create Spark dataframe
gtemp_spark = spark.read.format('csv').option('header', 'true').load('Data/CleanTemperature.csv')
demo_spark = spark.read.format('csv').option('header', 'true').load('Data/CleanDemographics.csv')
aircodes_spark = spark.read.format('csv').option('header', 'true').load('Data/CleanAirportCode.csv')
i94_immi_spark = spark.read.format('csv').option('header', 'true').load('Data/CleanImmigration.csv')
countrycodes_spark = spark.read.format('csv').option('header', 'true').load('Data/extracted_country_codes.csv')
modes_spark = spark.read.format('csv').option('header', 'true').load('Data/Modes.csv')
visacodes_spark = spark.read.format('csv').option('header', 'true').load('Data/visa_codes.csv')
countryiso_spark = spark.read.format('csv').option('header', 'true').load('Data/wikipedia-iso-country-codes.csv')

In [71]:
gtemp_spark.createOrReplaceTempView("global_temperature")
gtemp_spark.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [72]:
demo_spark.createOrReplaceTempView("demographics")
demo_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [73]:
aircodes_spark.createOrReplaceTempView("airport_codes")
aircodes_spark.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [74]:
i94_immi_spark.createOrReplaceTempView("immigration_i94")
i94_immi_spark.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [75]:
countrycodes_spark.createOrReplaceTempView("country_codes")
countrycodes_spark.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Name: string (nullable = true)



In [76]:
modes_spark.createOrReplaceTempView("modes")
modes_spark.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Name: string (nullable = true)



In [77]:
visacodes_spark.createOrReplaceTempView("visa_codes")
visacodes_spark.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Name: string (nullable = true)



In [78]:
countryiso_spark.createOrReplaceTempView("iso_code")
countryiso_spark.printSchema()

root
 |-- English short name lower case: string (nullable = true)
 |-- Alpha-2 code: string (nullable = true)
 |-- Alpha-3 code: string (nullable = true)
 |-- Numeric code: string (nullable = true)
 |-- ISO 3166-2: string (nullable = true)



##### Step-2. Transform

In [79]:
# Create Fact table
fact_immigration = spark.sql('''
    SELECT 
        CAST(CAST(cicid AS int) AS string)   AS id,
        UPPER(c1.Name)                 AS citizen,
        UPPER(c2.Name)                 AS resident,  
        i94addr                        AS us_state,
        NVL(modes.Name,'Not reported') AS modes,
        visa.Name                      AS visa,
        arrdate                        AS arrival_date,
        depdate                        AS departure_date,
        i94bir                         AS age,
                                          gender,
        visatype                       AS visa_type
    FROM immigration_i94
    LEFT JOIN country_codes c1 on immigration_i94.i94cit = c1.Code
    LEFT JOIN country_codes c2 on immigration_i94.i94res = c2.Code
    LEFT JOIN modes on cast(immigration_i94.i94mode AS int) = modes.Code
    LEFT JOIN visa_codes visa on immigration_i94.i94visa = cast(visa.Code AS float)
    WHERE cicid IS NOT NULL
    AND gender IS NOT NULL
    AND arrdate IS NOT NULL
    AND c1.Name NOT LIKE 'INVALID%'
    AND c1.Name NOT LIKE '%(should not show)'
    AND c1.Name NOT LIKE 'No Country Code%'
    AND c2.Name NOT LIKE 'INVALID%'
    AND c2.Name NOT LIKE '%(should not show)'
    AND c2.Name NOT LIKE 'No Country Code%'
    
''')
fact_immigration.createOrReplaceTempView('fact_immigration')
fact_immigration.show()

+---+-------+--------+--------+-----+--------+------------+--------------+----+------+---------+
| id|citizen|resident|us_state|modes|    visa|arrival_date|departure_date| age|gender|visa_type|
+---+-------+--------+--------+-----+--------+------------+--------------+----+------+---------+
| 15|ALBANIA| ALBANIA|      MI|  Air|Pleasure|  2016-04-01|    2016-08-25|55.0|     M|       B2|
| 27|ALBANIA| ALBANIA|      MA|  Air|Business|  2016-04-01|    2016-04-05|58.0|     M|       B1|
| 28|ALBANIA| ALBANIA|      MA|  Air|Business|  2016-04-01|    2016-04-05|56.0|     F|       B1|
| 29|ALBANIA| ALBANIA|      MA|  Air|Pleasure|  2016-04-01|    2016-04-17|62.0|     M|       B2|
| 30|ALBANIA| ALBANIA|      NJ|  Air|Pleasure|  2016-04-01|    2016-05-04|49.0|     M|       B2|
| 31|ALBANIA| ALBANIA|      NY|  Air|Pleasure|  2016-04-01|    2016-06-06|43.0|     M|       B2|
| 33|ALBANIA| ALBANIA|      TX|  Air|Pleasure|  2016-04-01|    2016-04-10|53.0|     F|       B2|
| 36|ALBANIA| ALBANIA|      NJ

In [78]:
fact_immigration.describe()

DataFrame[summary: string, id: string, citizen: string, resident: string, us_state: string, modes: string, visa: string, arrival_date: string, departure_date: string, age: string, gender: string, visa_type: string]

In [80]:
# Create dim table-Tempertaure
dim_temperature = spark.sql('''
    SELECT city, 
        UPPER(country) AS country, 
        Round(AVG(AverageTemperature),2) AS all_time_avg_temperature
    FROM global_temperature
    WHERE city IS NOT NULL
    AND country IS NOT NULL
    GROUP BY city, country
''')
dim_temperature.createOrReplaceTempView('dim_temperature')
dim_temperature.show()

+------------+------------------+------------------------+
|        city|           country|all_time_avg_temperature|
+------------+------------------+------------------------+
|   Allentown|     UNITED STATES|                   11.66|
|      Atyrau|        KAZAKHSTAN|                   10.32|
|     Bintulu|          MALAYSIA|                   26.96|
| Butterworth|          MALAYSIA|                    28.2|
|      Cainta|       PHILIPPINES|                   27.33|
|      Ciamis|         INDONESIA|                   25.51|
|      Dodoma|          TANZANIA|                   22.91|
|      Fuling|             CHINA|                   17.67|
|      Fuyang|             CHINA|                    16.2|
|         Ife|           NIGERIA|                   27.22|
|  Jhunjhunun|             INDIA|                   26.21|
|      Maxixe|        MOZAMBIQUE|                   24.75|
|      Owerri|           NIGERIA|                   27.45|
|Puerto Plata|DOMINICAN REPUBLIC|                   26.7

In [81]:
# Create dim table- Airport codes
dim_airport_codes = spark.sql('''
    SELECT ident AS id,
           CASE
               WHEN type='small_airport' THEN 'Small'
               WHEN type='medium_airport' THEN 'Medium'
               WHEN type='large_aiport' THEN 'Large'
            END         AS type,
           name,
           iso_country AS country,
           iso_region  AS region
    FROM airport_codes
    WHERE ident IS NOT NULL 
    AND type IN ('small_aiport', 'medium_airport', 'large_aiport')
''')
dim_airport_codes.createOrReplaceTempView('dim_airport_codes')
dim_airport_codes.head()

Row(id='5A8', type='Medium', name='Aleknagik / New Airport', country='US', region='US-AK')

In [82]:
dim_airport_codes.show()

+-------+------+--------------------+-------+------+
|     id|  type|                name|country|region|
+-------+------+--------------------+-------+------+
|    5A8|Medium|Aleknagik / New A...|     US| US-AK|
|AE-0030|Medium|FIVE STAR FINANCE...|     AE| AE-DU|
|   AGGH|Medium|Honiara Internati...|     SB| SB-CT|
|   AGGM|Medium|       Munda Airport|     SB| SB-WE|
|    AHJ|Medium|    Hongyuan Airport|     CN| CN-51|
|AL-0004|Medium|Çá¸¾á¸á¸ á¸®á¸...|     AL|AL-U-A|
|   ANYN|Medium|Nauru Internation...|     NR| NR-14|
|AU-0118|Medium|                 LST|     AU|AU-TAS|
|AU-0120|Medium|                 Wbl|     AU|AU-VIC|
|AU-0121|Medium|                YKCY|     AU|AU-QLD|
|    AXF|Medium|Alxa Left Banner ...|     CN| CN-15|
|   AYBK|Medium|        Buka Airport|     PG|PG-NSB|
|   AYCH|Medium|      Chimbu Airport|     PG|PG-CPK|
|   AYDU|Medium|        Daru Airport|     PG|PG-WPD|
|   AYGA|Medium|      Goroka Airport|     PG|PG-EHG|
|   AYGN|Medium|      Gurney Airport|     PG|P

In [83]:
# Create dim table- Demographics
dim_demographics = spark.sql('''
    SELECT City              AS city,
    UPPER(State)             AS state,
    `Median Age`             AS median_age,
    `Male Population`        AS male_population,
    `Female Population`      AS female_population,
    `Total Population`       AS total_population,
    `Foreign-born`           AS foreign_Born,
    `Average Household Size` AS avg_household_size,
    `State Code`             AS state_code,
    Race                     AS race
    FROM demographics
    WHERE City IS NOT NULL
    AND State IS NOT NULL
''')
dim_demographics.createOrReplaceTempView('dim_demographics')
dim_demographics.describe()

DataFrame[summary: string, city: string, state: string, median_age: string, male_population: string, female_population: string, total_population: string, foreign_Born: string, avg_household_size: string, state_code: string, race: string]

In [84]:
dim_demographics.limit(10).show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------+------------------+----------+--------------------+
|            city|         state|median_age|male_population|female_population|total_population|foreign_Born|avg_household_size|state_code|                race|
+----------------+--------------+----------+---------------+-----------------+----------------+------------+------------------+----------+--------------------+
|   Silver Spring|      MARYLAND|      33.8|          40601|            41862|           82463|       30908|               2.6|        MD|  Hispanic or Latino|
|          Quincy| MASSACHUSETTS|      41.0|          44129|            49500|           93629|       32935|              2.39|        MA|               White|
|          Hoover|       ALABAMA|      38.5|          38040|            46799|           84839|        8229|              2.58|        AL|               Asian|
|Rancho Cucamonga|    CALIFORNIA|      3

In [85]:
dim_time = spark.sql('''
    WITH date AS ( SELECT DISTINCT date FROM (
        SELECT arrdate AS date FROM immigration_i94
        UNION 
        SELECT depdate AS date FROM immigration_i94
        )
    )
    
    SELECT date,
        YEAR(date)                   AS year,
        DATE_FORMAT(date, 'MMM')     AS month,
        DAY(date)                    AS day,
        WEEKOFYEAR(date)             AS week,
        QUARTER(date)                AS quarter
    FROM date
    ORDER BY date
''')
dim_time.createOrReplaceTempView('dim_time')
dim_time.describe()

DataFrame[summary: string, date: string, year: string, month: string, day: string, week: string, quarter: string]

In [86]:
dim_time.show()

+----------+----+-----+---+----+-------+
|      date|year|month|day|week|quarter|
+----------+----+-----+---+----+-------+
|2016-04-01|2016|  Apr|  1|  13|      2|
|2016-04-02|2016|  Apr|  2|  13|      2|
|2016-04-03|2016|  Apr|  3|  13|      2|
|2016-04-04|2016|  Apr|  4|  14|      2|
|2016-04-05|2016|  Apr|  5|  14|      2|
|2016-04-06|2016|  Apr|  6|  14|      2|
|2016-04-07|2016|  Apr|  7|  14|      2|
|2016-04-08|2016|  Apr|  8|  14|      2|
|2016-04-09|2016|  Apr|  9|  14|      2|
|2016-04-10|2016|  Apr| 10|  14|      2|
|2016-04-11|2016|  Apr| 11|  15|      2|
|2016-04-12|2016|  Apr| 12|  15|      2|
|2016-04-13|2016|  Apr| 13|  15|      2|
|2016-04-14|2016|  Apr| 14|  15|      2|
|2016-04-15|2016|  Apr| 15|  15|      2|
|2016-04-16|2016|  Apr| 16|  15|      2|
|2016-04-17|2016|  Apr| 17|  15|      2|
|2016-04-18|2016|  Apr| 18|  16|      2|
|2016-04-19|2016|  Apr| 19|  16|      2|
|2016-04-20|2016|  Apr| 20|  16|      2|
+----------+----+-----+---+----+-------+
only showing top

In [87]:
dim_iso = spark.sql('''
SELECT UPPER(`English short name lower case`) AS country,
       `Alpha-2 code`                         AS iso_code
FROM iso_code
''')
dim_iso.createOrReplaceTempView('dim_iso_code')
dim_iso.describe()

DataFrame[summary: string, country: string, iso_code: string]

In [88]:
dim_iso.show()

+-------------------+--------+
|            country|iso_code|
+-------------------+--------+
|        AFGHANISTAN|      AF|
|      ÅLAND ISLANDS|      AX|
|            ALBANIA|      AL|
|            ALGERIA|      DZ|
|     AMERICAN SAMOA|      AS|
|            ANDORRA|      AD|
|             ANGOLA|      AO|
|           ANGUILLA|      AI|
|         ANTARCTICA|      AQ|
|ANTIGUA AND BARBUDA|      AG|
|          ARGENTINA|      AR|
|            ARMENIA|      AM|
|              ARUBA|      AW|
|          AUSTRALIA|      AU|
|            AUSTRIA|      AT|
|         AZERBAIJAN|      AZ|
|            BAHAMAS|      BS|
|            BAHRAIN|      BH|
|         BANGLADESH|      BD|
|           BARBADOS|      BB|
+-------------------+--------+
only showing top 20 rows



#### Analysis- Check Data points

1. Top 10 countries having most no. of immigrants

In [89]:
immigrants_count = spark.sql('''
    SELECT citizen AS country,
    COUNT(id) AS count_of_immigrants
    FROM fact_immigration
    GROUP by citizen
    ORDER BY COUNT(id) DESC
    LIMIT 10
''')
immigrants_count.show()

+--------------------+-------------------+
|             country|count_of_immigrants|
+--------------------+-------------------+
|      UNITED KINGDOM|             278459|
|               JAPAN|             162934|
|          CHINA, PRC|             162818|
|MEXICO AIR SEA, A...|             162288|
|              FRANCE|             153047|
|              BRAZIL|             113377|
|               INDIA|              83725|
|           AUSTRALIA|              81781|
|           ARGENTINA|              64362|
|         NETHERLANDS|              60038|
+--------------------+-------------------+



2. Common Visa Types

In [90]:
visa = spark.sql('''
    SELECT Visa, visa_type, COUNT(id) FROM fact_immigration
    GROUP BY visa, visa_type
    ORDER BY COUNT(id) DESC
''')
visa.show()

+--------+---------+---------+
|    Visa|visa_type|count(id)|
+--------+---------+---------+
|Pleasure|       B2|   931791|
|Pleasure|       WT|   911096|
|Business|       B1|   175476|
|Business|       WB|   150561|
| Student|       F1|    24670|
|Business|       E2|    13443|
|Pleasure|       CP|    11977|
|Business|        I|     2658|
|Business|       E1|     2336|
|Pleasure|      GMT|     2205|
| Student|       F2|     1489|
| Student|       M1|      584|
|Business|       I1|      196|
|Business|      GMB|       43|
| Student|       M2|       23|
|Pleasure|      CPL|        7|
|Pleasure|      SBP|        2|
+--------+---------+---------+



3. Immigrants by mode of transport

In [86]:
mode = spark.sql('''SELECT modes, count(id) FROM fact_immigration
    GROUP BY modes
''')
mode.show()

+------------+---------+
|       modes|count(id)|
+------------+---------+
|         Sea|    15917|
|        Land|    51663|
|         Air|  2158755|
|Not reported|     2222|
+------------+---------+



4. Immigrants by gender

In [87]:
gender = spark.sql('''SELECT gender, count(id) FROM fact_immigration
    GROUP BY gender
''')
gender.show()

+------+---------+
|gender|count(id)|
+------+---------+
|     F|  1078494|
|     M|  1149452|
|     U|      224|
|     X|      387|
+------+---------+



5. Countries with Highest Avg temperature

In [88]:
temp = spark.sql('''
    SELECT country, all_time_avg_temperature
    FROM dim_temperature
    ORDER BY all_time_avg_temperature DESC
    LIMIT 8
''')
temp.show()

+--------+------------------------+
| country|all_time_avg_temperature|
+--------+------------------------+
|DJIBOUTI|                   30.36|
|   SUDAN|                    30.3|
|   SUDAN|                    30.3|
|   NIGER|                   30.08|
|   SUDAN|                   30.07|
|    MALI|                   29.74|
|   SUDAN|                   29.68|
|   SUDAN|                   29.68|
+--------+------------------------+



6. count of immigrants month wise

In [89]:
count = spark.sql('''
    SELECT t.Month, COUNT(id) FROM fact_immigration f
    LEFT JOIN dim_time t ON f.departure_date=t.date
    GROUP BY t.month
    ORDER BY count(id) DESC
''')
count.show()

+-----+---------+
|Month|count(id)|
+-----+---------+
|  Apr|  1376430|
|  May|   693670|
|  Jun|    77672|
|  Jul|    42809|
|  Aug|    24378|
|  Sep|    13597|
|  Jan|        1|
+-----+---------+



 7. Population, Avg Household size for US States

In [90]:
population = spark.sql('''
    WITH immigrants AS (
    SELECT us_state, count(id) AS no_of_Immigrants
    FROM fact_immigration WHERE us_state <> '.N' 
    GROUP BY us_state
    )
    
    SELECT state_code, state, no_of_immigrants, 
        CAST(SUM(male_population) AS int) AS male_population,
        CAST(SUM(female_population) AS int) AS female_population, 
        CAST(SUM(total_population) AS int) AS total_population, 
        ROUND(AVG(avg_household_size),2) AS avg_household_size
    FROM immigrants i 
    LEFT JOIN dim_demographics d
        ON i.us_state=d.state_code
    GROUP BY state_code, no_of_immigrants, state
''')
population.show()

+----------+-------------+----------------+---------------+-----------------+----------------+------------------+
|state_code|        state|no_of_immigrants|male_population|female_population|total_population|avg_household_size|
+----------+-------------+----------------+---------------+-----------------+----------------+------------------+
|      null|         null|            2371|           null|             null|            null|              null|
|        OK|     OKLAHOMA|            2211|        3572865|          3672110|         7244975|               2.6|
|      null|         null|              22|           null|             null|            null|              null|
|      null|         null|               7|           null|             null|            null|              null|
|        HI|       HAWAII|          119665|         884035|           879795|         1763830|              2.69|
|      null|         null|           16010|           null|             null|           

##### Step-3. Load

In [91]:
util.write_to_parquet(fact_immigration,"transformed_data/","fact_immigration")

Writing table fact_immigration to transformed_data/fact_immigration
Write to parquet completed


In [92]:
util.write_to_parquet(dim_temperature,"transformed_data/","dim_temperature")

Writing table dim_temperature to transformed_data/dim_temperature
Write to parquet completed


In [93]:
util.write_to_parquet(dim_airport_codes,"transformed_data/","dim_airport_codes")

Writing table dim_airport_codes to transformed_data/dim_airport_codes
Write to parquet completed


In [94]:
util.write_to_parquet(dim_demographics,"transformed_data/","dim_demographics")

Writing table dim_demographics to transformed_data/dim_demographics
Write to parquet completed


In [95]:
util.write_to_parquet(dim_time,"transformed_data/","dim_time")

Writing table dim_time to transformed_data/dim_time
Write to parquet completed


In [96]:
util.write_to_parquet(dim_iso,"transformed_data/","dim_iso")

Writing table dim_iso to transformed_data/dim_iso
Write to parquet completed


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 1. Check all table columns

In [97]:
dim_iso_code = spark.read.parquet("transformed_data/dim_iso")
dim_iso_code.toPandas().head()

Unnamed: 0,country,iso_code
0,AFGHANISTAN,AF
1,ÅLAND ISLANDS,AX
2,ALBANIA,AL
3,ALGERIA,DZ
4,AMERICAN SAMOA,AS


In [98]:
dim_gtemp = spark.read.parquet("transformed_data/dim_temperature")
dim_gtemp.toPandas().head()

Unnamed: 0,city,country,all_time_avg_temperature
0,Altay,CHINA,1.04
1,Babakan,INDONESIA,25.51
2,Bac Lieu,VIETNAM,27.81
3,Bandar Penggaram,MALAYSIA,27.05
4,Chaozhou,CHINA,21.94


In [99]:
dim_time1 = spark.read.parquet("transformed_data/dim_time")
dim_time1.toPandas().head()

Unnamed: 0,date,year,month,day,week,quarter
0,2016-08-13,2016,Aug,13,32,3
1,2016-05-25,2016,May,25,21,2
2,2016-09-04,2016,Sep,4,35,3
3,2016-04-14,2016,Apr,14,15,2
4,2016-04-27,2016,Apr,27,17,2


In [100]:
dim_aircod = spark.read.parquet("transformed_data/dim_airport_codes")
dim_aircod.toPandas().head()

Unnamed: 0,id,type,name,country,region
0,5A8,Medium,Aleknagik / New Airport,US,US-AK
1,AE-0030,Medium,FIVE STAR FINANCE COMPANY,AE,AE-DU
2,AGGH,Medium,Honiara International Airport,SB,SB-CT
3,AGGM,Medium,Munda Airport,SB,SB-WE
4,AHJ,Medium,Hongyuan Airport,CN,CN-51


In [101]:
dim_dg = spark.read.parquet("transformed_data/dim_demographics")
dim_dg.toPandas().head()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,foreign_Born,avg_household_size,state_code,race
0,Silver Spring,MARYLAND,33.8,40601,41862,82463,30908,2.6,MD,Hispanic or Latino
1,Quincy,MASSACHUSETTS,41.0,44129,49500,93629,32935,2.39,MA,White
2,Hoover,ALABAMA,38.5,38040,46799,84839,8229,2.58,AL,Asian
3,Rancho Cucamonga,CALIFORNIA,34.5,88127,87105,175232,33878,3.18,CA,Black or African-American
4,Newark,NEW JERSEY,34.6,138040,143873,281913,86253,2.73,NJ,White


In [None]:
#Running below step is taking a bit of time.
fact_i94 = spark.read.parquet("transformed_data/fact_immigration")
fact_i94.toPandas().head()

##### 2. Check Record Count
Running below step is taking a bit of time.

In [52]:
all_tables = {
    "fact_immigration": fact_immigration,
    "dim_temperature": dim_temperature,
    "dim_airport_codes": dim_airport_codes,
    "dim_demographics": dim_demographics,
    "dim_time": dim_time,
    "dim_iso": dim_iso,
}

for table_name, table in all_tables.items():
    util.perform_quality_check(table, table_name)

Data quality check passed for fact_immigration with record_count: 2228557 records.
Data quality check passed for dim_temperature with record_count: 3490 records.
Data quality check passed for dim_airport_codes with record_count: 4539 records.
Data quality check passed for dim_demographics with record_count: 2875 records.
Data quality check passed for dim_time with record_count: 175 records.
Data quality check passed for dim_iso with record_count: 246 records.


#####  3. Validate fact_immigration table: 
- Citizen or resident should not have Invalid/No Country Code

In [53]:
DQ_fact_immigration = spark.sql('''
    SELECT * FROM fact_immigration
    WHERE citizen LIKE 'INVALID%'
    OR citizen LIKE '%(should not show)'
    OR citizen LIKE 'No Country Code%'
    OR resident LIKE 'INVALID%'
    OR resident LIKE '%(should not show)'
    OR resident LIKE 'No Country Code%'
''')
if len(DQ_fact_immigration.head(1))==0:
    print('DQ Check for fact_immigration Passed!')

DQ Check for fact_immigration Passed!


#####  4. Validate dimension airport table: 
    - Airport type should be small, medium or large

In [54]:
DQ_dim_aiport = spark.sql('''
    SELECT * FROM dim_airport_codes
    WHERE type NOT IN ('Small', 'Medium', 'Large')
''')
if len(DQ_dim_aiport.head(1))==0:
    print('DQ Check for dim_aiport Passed!')

DQ Check for dim_aiport Passed!


##### 5. Check non-null and unique keys

In [94]:
def non_null(spark, table_columns):
    flag = 0
    for table in table_columns:
        print(f'Non Null DQ Check for {table}')
        for column in table_columns[table]:
            DQ_NonNull = spark.sql(f'SELECT * FROM {table} WHERE {column} IS NULL')
            if len(DQ_NonNull.head(1))==0:
                print(f'\t- Non Null DQ check passed for {column}')
            else: 
                print(f'\t- Non Null DQ check failed for {column}')
                flag = 1
    if flag==0: print('Non Null DQ check passed for all tables')
    else: print('Non Null DQ check not passed for all tables')

In [98]:
table_columns = {'fact_immigration':['id', 'gender'], 
                     'dim_temperature': ['city', 'country'],
                     'dim_airport_codes': ['id'],
                     'dim_demographics': ['city', 'state'],
                     'dim_time': ['date'],
                     'dim_iso_code': ['country', 'iso_code']}

non_null(spark, table_columns)

Non Null DQ Check for fact_immigration
	- Non Null DQ check passed for id
	- Non Null DQ check passed for gender
Non Null DQ Check for dim_temperature
	- Non Null DQ check passed for city
	- Non Null DQ check passed for country
Non Null DQ Check for dim_airport_codes
	- Non Null DQ check passed for id
Non Null DQ Check for dim_demographics
	- Non Null DQ check passed for city
	- Non Null DQ check passed for state
Non Null DQ Check for dim_time
	- Non Null DQ check passed for date
Non Null DQ Check for dim_iso_code
	- Non Null DQ check passed for country
	- Non Null DQ check passed for iso_code
Non Null DQ check passed for all tables


In [99]:
def unique_check(spark, table_columns):
    flag = 0
    for table in table_columns:
        print(f'Unique DQ Check for {table}')
        column = ', '.join(table_columns[table])
        DQ_Unique = spark.sql(f'SELECT IF(COUNT({column})=COUNT(DISTINCT {column}),True,False) FROM {table}')
        if DQ_Unique.head()[0]==True:
            print(f'\t- Unique DQ check passed for {column}')
        else:
            flag = 1
            print(f'\t- Unique DQ check failed for {column}')
    if flag==0: print('Unique DQ check passed for all tables')
    else: print('Unique DQ check not passed for all tables')

In [100]:
table_columns = {'fact_immigration':['id'], 
                     'dim_temperature': ['city', 'country'],
                     'dim_airport_codes': ['id'],
                     'dim_demographics': ['city', 'state', 'race'],
                     'dim_time': ['date'],
                     'dim_iso_code': ['country']}

unique_check(spark, table_columns)

Unique DQ Check for fact_immigration
	- Unique DQ check passed for id
Unique DQ Check for dim_temperature
	- Unique DQ check passed for city, country
Unique DQ Check for dim_airport_codes
	- Unique DQ check passed for id
Unique DQ Check for dim_demographics
	- Unique DQ check passed for city, state, race
Unique DQ Check for dim_time
	- Unique DQ check passed for date
Unique DQ Check for dim_iso_code
	- Unique DQ check passed for country
Unique DQ check passed for all tables


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### - Please refer to Data_Dictionary.txt

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### 1. Clearly state the rationale for the choice of tools and technologies for the project:
- Our largest table i.e. fact_immigration contains ~3m rows with airport, demographics, and temperature data. For processing this in efficient manner, spark will work well here due to its ability to handle massive amount of data as well as the use of its unified analytics engine; Pandas, due to its convenient dataframe manipulation functions.

##### 2. Propose how often the data should be updated and why:

- The immigration (i94) data set is updated monthly, hence all relevant data can also be refreshed monthly.

##### 3. Write a description of how you would approach the problem differently under the following scenarios:

##### 3.1 The data was increased by 100x:
- If the data was increased by 100x I would use more sophisticated and appropriate frameworks such as Amazon Redshift, Amazon EMR for better processing and storage functions. 

##### 3.2 The data populates a dashboard that must be updated on a daily basis by 7am every day:
If the data populates a dashboard that must be updated on a daily basis by 7am every day, we can use Apache Airflow and create DAGs to schedule and monitor daily Data Quality checks and run ETL pipelines.

##### 3.3 The database needed to be accessed by 100+ people:
- If the database needed to be accessed by 100+ people, we can use cluster on Amzon Redshift which is scalable and can manage access for multiple users easily.