# Data Warehouse to Analyze US Immigration Data
### Data Engineering Capstone Project

#### Project Summary
The aim of this project is to model and create a analytics solution to help Data Analysts and Data Scientists to gather various insights from the wealth of data that is collected during the US immigration process.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Import the required packages
import pandas as pd
import configparser
import os
from pyspark.sql import SparkSession
from datetime import datetime

### Step 1: Scope the Project and Gather Data

#### Scope 

The goal of this project is to create a data warehouse which can be queried by Data Analysts using various tools and dashbards. This warehouse can be opened up for various independent vendors(such as travel agencies) who can gather various insights from the data with information related to international US visitors. 

Apache Spark has been used to gather/assess and clean the data, which will then be saved in Amazon S3 buckets. This transformed data will then be saved as parquet files, which can be loaded into various types of databases and used as an input for various analytical tools.

#### Describe and Gather Data 

Following are the sources of data:

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. 
- World Temperature Data: This dataset came from Kaggle. This has information for cities/countries.
- U.S. City Demographic Data: This data comes from OpenSoft. This has information related to structure of the population for the US cities.
- Airport Code Table: This is a simple table of airport codes and corresponding cities.
- The I94_SAS_Labels_Descriptions.SAS file also has been provided. This describes the data in the I94 Immigration data file.

##### Gather Data using Spark

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

Immigration Data (I94 Immigration Data)

Data Dictionary

| Field | Description|
| --- | --- |
| cicid | Unique Identifier |
| i94yr | Four digit year |
| i94mon | Numeric month |
| i94res | Country Code |
| i94cit | Country Code |
| i94port |	Port Code |
| arrdate |	Arrival Date |
| i94mode |	Mode of travel |
| i94addr |	Address provided |
| depdate |	Depature date |
| i94bir | Age |
| i94visa |	Visa Code - 1 = Business / 2 = Pleasure / 3 = Student |
| count | Used for summary statistics |
| dtadfile | Date added to I-94 Files |
| visapost | Department of State where where Visa was issued |
| occup | Occupation that will be performed in U.S. |
| entdepa |	Arrival Flag - admitted or paroled into the U.S. |
| entdepd |	Departure Flag - Departed, lost I-94 or is deceased |
| entdepu |	Update Flag - Either apprehended, overstayed, adjusted to perm residence |
| matflag |	Match flag - Match of arrival and departure records |
| biryear |	Four digit year of birth |
| dtaddto |	Date to which admitted to U.S. (allowed to stay until) |
| gender | Gender |
| insnum | INS number |
| airline |	Airline used to arrive in U.S. |
| admnum | Admission Number |
| fltno | Flight number of Airline used to arrive in U.S. |
| visatype | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

In [3]:
immigration_df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
immigration_df_spark.createOrReplaceTempView("immigration_table")
immigration_df_spark.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

Temperature Data

Data Dictionary

| Field | Description|
| --- | --- |
| dt | Date |
| AverageTemperature | Average temperature for the day |
| AverageTemperatureUncertainty | Average temperature uncertaintly for the day |
| City | City name |
| Country | Country name |
| Latitude | Latitude |
| Longitude | Longitude |

In [4]:
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df_spark = spark.read.csv(temperature_fname,header='true')
temperature_df_spark.createOrReplaceTempView("temperature_table")
temperature_df_spark.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



City Demographics Data

Data Dictionary

| Field | Description|
| --- | --- |
| City | City Name |
| State | State Name |
| Median Age | Average age of the population |
| Male Population | Male population in the city |
| Female Population | Female population in the city |
| Total Population | Total population in the city |
| Number of Veterans | Population of veterans |
| Foreign-born | Foreign born population |
| Average Household Size | Average household Size |
| State Code | State Code |
| Race | Race |
| Count | Count of Race |

In [5]:
city_demographic_fname = 'us-cities-demographics.csv'
demographics_df_spark = spark.read.csv(city_demographic_fname,sep=';',header='true')
demographics_df_spark.createOrReplaceTempView("demographics_table")
demographics_df_spark.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

Airport Codes Data

Data dictionary

| Field | Description|
| --- | --- |
| ident | Airport code |
| type | Type of airport |
| name | Name of airport |
| elevation_ft | Elevation of airport |
| continent | Continent where airport is located |
| iso_country | Country Code |
| iso_region | Region Code |
| municipality | Municipality | 
| gps_code | GPS Code |
| iata_code | IATA Code |
| local_code| Local Code |
| coordinates | GPS Coordinates |

In [7]:
airport_codes_fname = 'airport-codes_csv.csv'
airport_codes_df_spark = spark.read.csv(airport_codes_fname,sep=',',header='true')
airport_codes_df_spark.createOrReplaceTempView("airport_codes_table")
airport_codes_df_spark.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

##### Gather Data using Pandas

In [8]:
# Define a function to get data using the 'I94_SAS_Labels_Descriptions.SAS' file
def get_i94_sas_labels(inputStr,column_1,column_2):
    """
        This function parses the data in the file 'I94_SAS_Labels_Descriptions.SAS' and retuns a Pandas dataframe based on the input provided.
        
        The input passed to this function are:
        1. inputStr - input string which looks for the identifier based on which the data is gathered. 
           eg. if $i94prtl is the passed, port inforamation from the file is retreived and stored as a pandas dataframe.
        2. column_1 - First Field name for the dataframe returned.
        3. column_2 - Second Field name for the dataframe returned.
        
        The output returned by this function is a Pandas dataframe.
        """    
    i=0
    startLine = 0
    endLine = 0
    keyWordFound = 0    
    keyWord = inputStr
    print('Searching for:'+keyWord)
    with open("I94_SAS_Labels_Descriptions.SAS") as f:
        for line in f:
            i = i + 1
            if(len((line.split(" "))) < 1 and (line != ';\n')):
                continue
            if(line.split(" ")[0] != '\n'):
                if(keyWord+'\n' in line.split(" ")):
                    startLine = i+1
                    keyWordFound = 1
                if((line.split(" ")[-1] == ';\n') and keyWordFound == 1):
                    endLine = i
                    keyWordFound = 0            
            if((line == ';\n') and keyWordFound == 1):
                print(line.split(" ")[0])
                endLine = i-1
                keyWordFound = 0
    
    with open("I94_SAS_Labels_Descriptions.SAS") as f:
        i=0
        list = []
        for line in f:
            i = i + 1
            if(i>=startLine and i<=endLine):
                line = line.replace('\t', '').replace('\n', '').replace('\'','').replace(';','')
                list.append(line.split('='))
    
    df = pd.DataFrame(list, columns =[column_1, column_2])
    return df

Get country and code mapping data

Data Dictionary

| Field | Description|
| --- | --- |
| Country_Id | Country Identifier |
| Country | Country Name | 


In [9]:
country_mapping_df = get_i94_sas_labels('i94cntyl','Country_Id','Country')
country_mapping_df.head(5)

Searching for:i94cntyl


Unnamed: 0,Country_Id,Country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no l..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


Get port and code mapping data

Data dictionary

| Field | Description|
| --- | --- |
| Port_Id | Port Identifier |
| Port | Port Name | 

In [None]:
port_mapping_df = get_i94_sas_labels('$i94prtl','Port_Id','Port')
port_mapping_df.head(5)

Get Immigration mode and code mapping data

Data Dictionary

| Field | Description|
| --- | --- |
| Mode_Id | Immigration mode Identifier |
| Mode | Immigration Mode | 

In [11]:
immigration_mode_df = get_i94_sas_labels('i94model','Mode_Id','Mode')
immigration_mode_df.head(5)

Searching for:i94model


Unnamed: 0,Mode_Id,Mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


Get State and state code mapping data

Data Dictionary

| Field | Description|
| --- | --- |
| State_Id | State Identifier |
| State | State Name | 

In [12]:
state_code_df = get_i94_sas_labels('i94addrl','State_Id','State')
state_code_df.head(5)

Searching for:i94addrl


Unnamed: 0,State_Id,State
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### Assess the Immigration Data

- Data type and format for the date fields need to be changed - arrdate, depdate, dtafile, dtaddto
- The data source has 3,096,313 entries
- No duplicate entries found based on cicid
- All entries in the immigration data source has valid i94port identifiers
- All entries in the immigration data source has valid i94res identifiers
- There are 239 entries with i94mode as null.

In [13]:
#immigration_df_spark.printSchema()
immigration_df_spark.show(5, truncate = False)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid|i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum        |fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|6.0  |2016.0|4.0   |692.0 |692.0 |XXX    |20573.0|null   |null   |null   |37.0  |2.0    |1.0  |null    |null    |null |T      |null   |U      |null   |1979.0 |10282016|null  |null  |null   |1.897628485E9 |null |B2      |
|7.0  |2016.0|4.0   |254.0 |276.0 |ATL    |20551.0|1.0    |AL     |null   |25.0  |3.0    |1.0  |20130811|SEO    

In [14]:
# check the number of entires in the immigration data source
spark.sql("""
    SELECT
    count(*) as Count
    FROM immigration_table
    """).show()

+-------+
|  Count|
+-------+
|3096313|
+-------+



In [15]:
# check for duplicate entries based on cicid
spark.sql("""
    SELECT cicid, count(*) as Count
    FROM immigration_table
    GROUP BY cicid
    HAVING Count>1    
    """).show()

+-----+-----+
|cicid|Count|
+-----+-----+
+-----+-----+



In [16]:
# check if entries in the immigration data source has null arrival mode identifiers

spark.sql("""
    SELECT
    count(*) as Count
    FROM immigration_table
    WHERE i94mode is null
    """).show()

+-----+
|Count|
+-----+
|  239|
+-----+



In [17]:
#immigration_df_spark.describe().show()

##### Assess the Temperature Data

- The data source has 8,599,212 entries
- 364,130 entries have 'null' entries for average temperature
- Date type of AverateTemperature and AverageTemperatureUncertainty is String.
- This dataset has the data only upt0 2013.

In [18]:
temperature_df_spark.printSchema()
#temperature_df_spark.show(10, truncate = False)

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [19]:
# check the number of entires in the temperature data source
# temperature_df_spark.count()
spark.sql("""
    SELECT
    count(*) as Count
    FROM temperature_table
    """).show()

+-------+
|  Count|
+-------+
|8599212|
+-------+



In [20]:
# entries with null values for average temperature
# temperature_df_spark.filter(temperature_df_spark.AverageTemperature.isNull()).count()
spark.sql("""
    SELECT
    count(*) as Count
    FROM temperature_table
    WHERE averageTemperature is null
    """).show()

+------+
| Count|
+------+
|364130|
+------+



In [21]:
# Number of entries for countries
spark.sql("""
    SELECT
    Country, City, count(*) as Count
    FROM temperature_table    
    WHERE averageTemperature is null
    GROUP BY Country, City
    ORDER BY Count DESC
    """).show(5)

+----------+------------+-----+
|   Country|        City|Count|
+----------+------------+-----+
| Mauritius|  Port Louis| 1039|
|   Reunion| Saint Denis| 1039|
|Madagascar|Fianarantsoa| 1036|
|Mozambique|      Nacala|  958|
|Madagascar|   Antsirabe|  958|
+----------+------------+-----+
only showing top 5 rows



In [22]:
# entries for different years
spark.sql("""
    SELECT
    substr(dt,1,4) as Year, count(*)
    FROM temperature_table
    WHERE AverageTemperature is not null
    group by Year
    order by Year desc
    """).show(5)

+----+--------+
|Year|count(1)|
+----+--------+
|2013|   28520|
|2012|   42120|
|2011|   42120|
|2010|   42120|
|2009|   42120|
+----+--------+
only showing top 5 rows



In [23]:
# Show a few entries
temperature_df_spark.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



##### Assess the City Demographics Data

- The data source has 2891 entries
- Field names have spaces
- Few rows have NULL values for - 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size'.
- The dataset has multiple entries for the different 'Race'

In [24]:
demographics_df_spark.printSchema()
#temperature_df_spark.show(10, truncate = False)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [25]:
# check the number of entries in the city demographics data source
spark.sql("""
    SELECT
    count(*) as Count
    FROM demographics_table
    """).show()

+-----+
|Count|
+-----+
| 2891|
+-----+



In [26]:
# describe the demographics data source
demographics_df_spark.describe().show()

+-------+-------+---------+-----------------+------------------+------------------+------------------+------------------+------------------+----------------------+----------+--------------------+------------------+
|summary|   City|    State|       Median Age|   Male Population| Female Population|  Total Population|Number of Veterans|      Foreign-born|Average Household Size|State Code|                Race|             Count|
+-------+-------+---------+-----------------+------------------+------------------+------------------+------------------+------------------+----------------------+----------+--------------------+------------------+
|  count|   2891|     2891|             2891|              2888|              2888|              2891|              2878|              2878|                  2875|      2891|                2891|              2891|
|   mean|   null|     null|35.49488066413016| 97328.42624653739|101769.63088642659|198966.77931511588| 9367.832522585128|40653.598679638635|

In [27]:
# check the number of entries in the city demographics data source
spark.sql("""
    SELECT 
    State, City, Count(*) as Count
    FROM demographics_table
    GROUP BY State, City
    HAVING Count >5
    """).show(truncate=False)

+-----+----+-----+
|State|City|Count|
+-----+----+-----+
+-----+----+-----+



##### Assess the Airport Codes Data

- The data source has 55075 entries
- There are seven distinct types of aiports
- iso_region is a combination of iso_country and state/region code

In [28]:
airport_codes_df_spark.printSchema()
#airport_codes_df_spark.show(10, truncate = False)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [29]:
airport_codes_df_spark.show(5, truncate = False)

+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+-------------------------------------+
|ident|type         |name                              |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                          |
+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+-------------------------------------+
|00A  |heliport     |Total Rf Heliport                 |11          |NA       |US         |US-PA     |Bensalem    |00A     |null     |00A       |-74.93360137939453, 40.07080078125   |
|00AA |small_airport|Aero B Ranch Airport              |3435        |NA       |US         |US-KS     |Leoti       |00AA    |null     |00AA      |-101.473911, 38.704022               |
|00AK |small_airport|Lowell Field                      |450         |NA       |U

In [30]:
airport_codes_df_spark.describe().show()

+-------+--------------------+-------------+--------------------+------------------+---------+-----------+----------+---------------+--------------------+---------+-------------------+--------------------+
|summary|               ident|         type|                name|      elevation_ft|continent|iso_country|iso_region|   municipality|            gps_code|iata_code|         local_code|         coordinates|
+-------+--------------------+-------------+--------------------+------------------+---------+-----------+----------+---------------+--------------------+---------+-------------------+--------------------+
|  count|               55075|        55075|               55075|             48069|    55075|      55075|     55075|          49399|               41030|     9189|              28686|               55075|
|   mean|2.3873375337777779E8|         null|                null|1240.7896773388254|     null|       null|      null|           null|2.1920446610204083E8|      0.0|8.5805561785

In [31]:
# check if the iso_region is a combination of iso_country and state/region code
spark.sql("""
    SELECT
    count(*) as Count
    FROM airport_codes_table
    WHERE iso_country = substr(iso_region,1,2)
    """).show(100, truncate=False)

+-----+
|Count|
+-----+
|55075|
+-----+



##### Assess the Country Codes Data (source - I94_SAS_Labels_Descriptions.SAS)

- The data source has 289 entries
- Country_Id data type is not consistent with the immigration data

In [32]:
country_mapping_df.info()
country_mapping_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
Country_Id    289 non-null object
Country       289 non-null object
dtypes: object(2)
memory usage: 4.6+ KB


Unnamed: 0,Country_Id,Country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no l..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


##### Assess the Port Codes Data (source - I94_SAS_Labels_Descriptions.SAS)

- The data source has 661 entries
- 1 entry has a null value
- Port comprises of Port Name and the State Code

In [33]:
port_mapping_df.info()
port_mapping_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 661 entries, 0 to 660
Data columns (total 2 columns):
Port_Id    661 non-null object
Port       660 non-null object
dtypes: object(2)
memory usage: 10.4+ KB


Unnamed: 0,Port_Id,Port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [34]:
port_mapping_df[port_mapping_df.Port.isnull()]

Unnamed: 0,Port_Id,Port
660,,


##### Assess the Immigration Mode (source - I94_SAS_Labels_Descriptions.SAS)

- The data source has 4 entries

In [35]:
immigration_mode_df.info()
immigration_mode_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 2 columns):
Mode_Id    4 non-null object
Mode       4 non-null object
dtypes: object(2)
memory usage: 144.0+ bytes


Unnamed: 0,Mode_Id,Mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


##### Assess the State Code data (source - I94_SAS_Labels_Descriptions.SAS)

- The data source has 55 entries

In [36]:
state_code_df.info()
state_code_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55 entries, 0 to 54
Data columns (total 2 columns):
State_Id    55 non-null object
State       55 non-null object
dtypes: object(2)
memory usage: 960.0+ bytes


Unnamed: 0,State_Id,State
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


##### Cleaning Steps

##### Clean the Immigration Data

- Data type and format for the date fields need to be changed - arrdate, depdate, dtafile, dtaddto
- Removed entries with i94mode with null values

In [37]:
# Correct the date formats for the fields - arrdate, depdate and dtaddto
cleaned_immigration_df_spark = spark.sql("""
    SELECT
    cicid
    ,i94yr 
    ,i94mon
    ,i94cit
    ,i94res
    ,i94port
    ,date_add('1960-01-01',arrdate) as arrdate
    ,i94mode
    ,i94addr
    ,date_add('1960-01-01',depdate) as depdate
    ,i94bir
    ,i94visa
    ,count
    ,to_date(substring(dtadfile,1,4) || '-' || substring(dtadfile,5,2) || '-' || substring(dtadfile,7,2)) as dtadfile
    ,visapost
    ,occup
    ,entdepa
    ,entdepd
    ,entdepu
    ,matflag
    ,biryear
    ,to_date(substring(dtaddto,5,4) || '-' || substring(dtaddto,1,2) || '-' || substring(dtaddto,3,2)) as dtaddto 
    ,gender
    ,insnum
    ,airline
    ,admnum        
    ,fltno
    ,visatype
    FROM immigration_table
    WHERE i94mode IS NOT NULL
    """)
cleaned_immigration_df_spark.createOrReplaceTempView("cleaned_immigration_table")
cleaned_immigration_df_spark.show(5)

+-----+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|   dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|2016-04-07|    1.0|     AL|      null|  25.0|    3.0|  1.0|2013-08-11|     SEO| null|      G|   null|      Y|   null| 1991.0|      null|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|2016-04-01|    1.0|     MI|20

In [38]:
cleaned_immigration_df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |

##### Clean the Temperature Data

- Change the data type for the temperature fields to float
- Remove entries with NULL values for AverageTemperature
- Aggregate the entries per City and Country

In [39]:
# Change the data types
cleaned_temperature_df_spark = spark.sql("""
    SELECT
    City
    ,Country
    ,Latitude
    ,Longitude
    ,avg(cast(AverageTemperature as float)) AverageTemperature
    ,avg(cast(AverageTemperatureUncertainty as float)) AverageTemperatureUncertainty
    FROM temperature_table
    WHERE 
    AverageTemperature IS NOT NULL
    GROUP BY 
    City
    ,Country
    ,Latitude
    ,Longitude
    """)
cleaned_temperature_df_spark.createOrReplaceTempView("cleaned_temperature_table")

In [40]:
cleaned_temperature_df_spark.show(5)

+--------+-----------+--------+---------+------------------+-----------------------------+
|    City|    Country|Latitude|Longitude|AverageTemperature|AverageTemperatureUncertainty|
+--------+-----------+--------+---------+------------------+-----------------------------+
|  Abohar|      India|  29.74N|   73.85E|24.753021123943796|           0.7542563609732382|
|    Aden|      Yemen|  13.66N|   45.41E| 25.54778724600841|           0.9774613979558452|
| Apodaca|     Mexico|  26.52N|  100.30W|22.018180757560987|           0.8791619677371627|
|   Apopa|El Salvador|  13.66N|   90.00W| 25.36419148467728|           0.8325436019078251|
|Belgorod|     Russia|  50.63N|   36.76E|  6.08863708202085|           1.4257706879359857|
+--------+-----------+--------+---------+------------------+-----------------------------+
only showing top 5 rows



In [41]:
cleaned_temperature_df_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)



##### Clean the City Demographics Data

- The dataset has multiple entries for the different 'Race'. Seperate columns with the counts of the RACE are created.
- The data types for numeric fields have been changed.

In [42]:
def createRaceDemographics(columnName, race):
    """
        This function reads data from the 'demographics_table' (demographics_df_spark) Spark Dataframe 
        and returns a Spark Dataframe with demographic data for a particular race. These Race specific
        dataframes can then be used to transform the 'demographics' dataframe to have the counts for 
        the different races in a single row rather than multiple rows.
        
        The input passed to this function are:
        1. columnName - Field name which will store the number of people of a particular race.           
        2. race - The race for this the Dataframe is to be created.
        
        The output returned by this function is a Spark dataframe and table which can be queried using SQL.
        """ 
    raceDemograhpics_df = spark.sql("""
        SELECT 
        City,
        State,
        cast(Count as int) as {}
        FROM demographics_table
        WHERE Race = '{}'
        """.format(columnName,race))
    raceDemograhpics_df.createOrReplaceTempView("{}_demographics_table".format(columnName))
    

createRaceDemographics('Black','Black or African-American')
createRaceDemographics('Hispanic','Hispanic or Latino')
createRaceDemographics('White','White')
createRaceDemographics('Asian','Asian')
createRaceDemographics('Native','American Indian and Alaska Native')    

In [43]:
cleaned_demographics_df_spark = spark.sql("""
    SELECT DISTINCT
    D.City
    ,D.State
    ,D.`State Code` as State_Code    
    ,cast(D.`Median Age` as int) as Median_Age
    ,cast(D.`Male Population`as int) as Male_Population
    ,cast(D.`Female Population` as int) as Female_Population
    ,cast(D.`Total Population` as int) as Total_Population
    ,cast(D.`Number of Veterans` as int) as Number_Of_Veterans
    ,cast(D.`Foreign-born` as int) as Foreign_Born
    ,cast(D.`Average Household Size` as int) as Average_Household_Size
    ,B.Black as Black_Population
    ,H.Hispanic as Hispanic_Population
    ,W.White as White_Population
    ,A.Asian as Asian_Population
    ,N.Native as Native_Population
    FROM demographics_table D 
    LEFT JOIN black_demographics_table B
    ON (D.City = B.City AND D.State = B.State)
    LEFT JOIN hispanic_demographics_table H
    ON (D.City = H.City AND D.State = H.State)
    LEFT JOIN white_demographics_table W
    ON (D.City = W.City AND D.State = W.State)
    LEFT JOIN asian_demographics_table A
    ON (D.City = A.City AND D.State = A.State)
    LEFT JOIN native_demographics_table N
    ON (D.City = N.City AND D.State = N.State)
    """)
cleaned_demographics_df_spark.createOrReplaceTempView("cleaned_demographics_table")
cleaned_demographics_df_spark.show(5)

+-------------+-----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------------+-------------------+----------------+----------------+-----------------+
|         City|      State|State_Code|Median_Age|Male_Population|Female_Population|Total_Population|Number_Of_Veterans|Foreign_Born|Average_Household_Size|Black_Population|Hispanic_Population|White_Population|Asian_Population|Native_Population|
+-------------+-----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------------+-------------------+----------------+----------------+-----------------+
|     Gulfport|Mississippi|        MS|        35|          33108|            38764|           71872|              6646|        3072|                     2|           27799|               4247|           42870|            1723|             null|
|Ellicott City|   Ma

In [44]:
cleaned_demographics_df_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Median_Age: integer (nullable = true)
 |-- Male_Population: integer (nullable = true)
 |-- Female_Population: integer (nullable = true)
 |-- Total_Population: integer (nullable = true)
 |-- Number_Of_Veterans: integer (nullable = true)
 |-- Foreign_Born: integer (nullable = true)
 |-- Average_Household_Size: integer (nullable = true)
 |-- Black_Population: integer (nullable = true)
 |-- Hispanic_Population: integer (nullable = true)
 |-- White_Population: integer (nullable = true)
 |-- Asian_Population: integer (nullable = true)
 |-- Native_Population: integer (nullable = true)



##### Clean the i94 Labels - Country Codes Data

- Strip spaces from Country_Id and Country
- Convert Country_Id to numeric field
- Create a Spark Dataframe/Table from Pandas Dataframe

In [45]:
country_mapping_df['Country_Id'] = country_mapping_df['Country_Id'].str.strip()
country_mapping_df['Country'] = country_mapping_df['Country'].str.strip()
country_mapping_df["Country_Id"] = pd.to_numeric(country_mapping_df["Country_Id"],downcast='float')

cleaned_country_mapping_sdf = spark.createDataFrame(country_mapping_df)
cleaned_country_mapping_sdf.printSchema()
cleaned_country_mapping_sdf.createOrReplaceTempView("cleaned_country_mapping_table")

root
 |-- Country_Id: double (nullable = true)
 |-- Country: string (nullable = true)



##### Clean the i94 Labels - Port Data

- Strip spaces from Port_Id and Port
- Remove the entry with Null Value
- Create a Spark Dataframe/Table from Pandas Dataframe

In [46]:
port_mapping_df['Port_Id'] = port_mapping_df['Port_Id'].str.strip()
port_mapping_df['Port'] = port_mapping_df['Port'].str.strip()
port_mapping_df = port_mapping_df[port_mapping_df.Port.notnull()]

cleaned_port_mapping_sdf = spark.createDataFrame(port_mapping_df)
cleaned_port_mapping_sdf.printSchema()
cleaned_port_mapping_sdf.createOrReplaceTempView("cleaned_port_mapping_table")

root
 |-- Port_Id: string (nullable = true)
 |-- Port: string (nullable = true)



##### Clean the i94 Labels - Immigration Mode Data

- Strip spaces from Mode_Id and Mode
- Convert Mode_Id to numeric field
- Create a Spark Dataframe/Table from Pandas Dataframe

In [47]:
immigration_mode_df['Mode_Id'] = immigration_mode_df['Mode_Id'].str.strip()
immigration_mode_df['Mode'] = immigration_mode_df['Mode'].str.strip()
immigration_mode_df["Mode_Id"] = pd.to_numeric(immigration_mode_df["Mode_Id"],downcast='float')

cleaned_immigration_mode_sdf = spark.createDataFrame(immigration_mode_df)
cleaned_immigration_mode_sdf.printSchema()
cleaned_immigration_mode_sdf.createOrReplaceTempView("cleaned_immigration_mode_table")

root
 |-- Mode_Id: double (nullable = true)
 |-- Mode: string (nullable = true)



##### Clean the i94 Labels - State Code Data

- Strip spaces from State_Id and State
- Create a Spark Dataframe/Table from Pandas Dataframe

In [48]:
state_code_df['State_Id'] = state_code_df['State_Id'].str.strip()
state_code_df['State'] = state_code_df['State'].str.strip()

cleaned_state_code_sdf = spark.createDataFrame(state_code_df)
cleaned_state_code_sdf.printSchema()
cleaned_state_code_sdf.createOrReplaceTempView("cleaned_state_mapping_table")

root
 |-- State_Id: string (nullable = true)
 |-- State: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

A star schema has been designed which consists of the following Fact and Dimension tables. The advantages of using a star schema design are:
1. Query performance - Because a star schema database has a small number of tables and clear join paths, queries run faster than they do against an OLTP system.
2. Load performance and administration - Structural simplicity also reduces the time required to load large batches of data into a star schema database. By defining facts and dimensions and separating them into different tables, the impact of a load operation is reduced. Dimension tables can be populated once and occasionally refreshed. You can add new facts regularly and selectively by appending records to a fact table.
3. Built-in referential integrity - A star schema has referential integrity built in when data is loaded.
4. Easily understood - A star schema is easy to understand and navigate, with dimensions joined only through the fact table. 

Fact Table:
 - ImmigrationDetails
 
Dimention Tables:
 - CityDemographics
 - CountryDetails
 - PortCodes
 - ImmigrationMode


<img src="Images/Data Model.png">

#### 3.2 Mapping Out Data Pipelines
The above mentioned Fact and dimention tables are created by transforming the original data sources. This star schema is then written into parquet files using the function 'write_to_parquet_files'.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create the Data Models

##### ImmigrationDetails

In [49]:
ImmigrationDetails = spark.sql("""
    SELECT
    cast(cicid as bigint)    AS cicid
    ,arrdate                 AS arrival_date
    ,depdate                 AS departure_date
    ,i94addr                 AS us_address
    ,cast(i94res as int)     AS resident_country
    ,cast(i94cit as int)     AS resident_city
    ,cast(i94mode as int)    AS travel_mode
    ,i94port                 AS port
    ,cast(i94visa as int)    AS visa_code
    ,cast(i94bir as int)     AS age
    ,visatype                AS visa_type
    ,biryear                 AS birth_year
    ,gender                  AS gender
    ,cast(admnum as bigint)  AS admission_number
    ,airline                 AS airline
    ,fltno                   AS flight_number
    FROM cleaned_immigration_table
    """)
ImmigrationDetails.show(5)

##### CityDemographics

In [51]:
CityDemographics = spark.sql("""
    SELECT
     State_Code               AS state_code
    ,State                    AS state
    ,City                     AS city
    ,Median_Age               AS median_age
    ,Average_Household_Size   AS average_household_size
    ,Total_Population         AS total_population
    ,Male_Population          AS male_population
    ,Female_Population        AS female_population
    ,Foreign_Born             AS foreign_born
    ,Number_of_Veterans       AS veteran_population
    ,Black_Population         AS black_population
    ,Hispanic_Population      AS hispanic_population
    ,White_Population         AS white_population
    ,Asian_Population         AS asian_population
    ,Native_Population        AS native_population              
    FROM cleaned_demographics_table
    """)
CityDemographics.show(5)

##### CountryDetails

In [53]:
# Create intermediate spark dataframe with aggregated values grouped by Country
intermediate_CountryDetails = spark.sql("""
    SELECT
     Country
    ,avg(cast(AverageTemperature as float)) AverageTemperature
    ,first_value(latitude) as latitude
    ,first_value(longitude) as longitude
    FROM cleaned_temperature_table
    GROUP BY 
    Country
    """)
intermediate_CountryDetails.createOrReplaceTempView("intermediate_CountryDetails_table")

In [54]:
# Create final Country Details dataframe after joining the data with Country code
CountryDetails = spark.sql("""
    SELECT 
     cast(C.Country_Id as int) AS country_code
    ,T.Country                 AS country
    ,T.AverageTemperature      AS average_temperature
    ,T.latitude                AS latitude
    ,T.longitude               AS longitude
    FROM 
    cleaned_country_mapping_table C
    ,intermediate_CountryDetails_table T
    WHERE 
    C.Country=upper(T.Country)
    """)
CountryDetails.show(5)

root
 |-- country_code: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)

+------------+------------+-------------------+--------+---------+
|country_code|     country|average_temperature|latitude|longitude|
+------------+------------+-------------------+--------+---------+
|         151|     Armenia|   8.37559700012207|  40.99N|   44.73E|
|         512|     Bahamas| 24.786977767944336|  24.92N|   78.03W|
|         373|South Africa|  16.36072629928589|  26.52S|   26.87E|
|         243|       Burma|   26.0009747505188|  21.70N|   96.06E|
|         274|  Bangladesh|  25.05098473398309|  23.31N|   90.00E|
+------------+------------+-------------------+--------+---------+
only showing top 5 rows



149

##### PortCodes

In [55]:
PortCodes = spark.sql("""
    SELECT 
     Port_Id          AS port_code
    ,Port             AS port    
    FROM 
    cleaned_port_mapping_table
    """)
PortCodes.show(5,truncate=False)

root
 |-- port_code: string (nullable = true)
 |-- port: string (nullable = true)

+---------+----------------------------+
|port_code|port                        |
+---------+----------------------------+
|ALC      |ALCAN, AK                   |
|ANC      |ANCHORAGE, AK               |
|BAR      |BAKER AAF - BAKER ISLAND, AK|
|DAC      |DALTONS CACHE, AK           |
|PIZ      |DEW STATION PT LAY DEW, AK  |
+---------+----------------------------+
only showing top 5 rows



660

##### ImmigrationMode

In [61]:
ImmigrationModes = spark.sql("""
    SELECT 
     cast(Mode_Id as int) AS mode_id
    ,Mode                 AS mode
    FROM 
    cleaned_immigration_mode_table
    """)
ImmigrationModes.printSchema()
ImmigrationModes.show(5,truncate=False)
ImmigrationModes.count()

root
 |-- mode_id: integer (nullable = true)
 |-- mode: string (nullable = true)

+-------+------------+
|mode_id|mode        |
+-------+------------+
|1      |Air         |
|2      |Sea         |
|3      |Land        |
|9      |Not reported|
+-------+------------+



4

##### Write to Parquet Files

In [57]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

path                     = config['LOCAL']['PATH']
immigration_details_data = config['DATA']['IMMIGRATION_DATA']
city_demographics_data   = config['DATA']['CITY_DEMOGRAPHICS_DATA']
country_details_data     = config['DATA']['COUNTRY_DETAILS_DATA']
port_codes_data          = config['DATA']['PORT_CODES_DATA']
immigration_modes_data   = config['DATA']['IMMIGRATION_MODES_DATA']

In [58]:
def write_to_parquet_files(dataframe, name, full_path):
    """
        This function writes the data stored in a Spark Dataframe in parquet format.
        This function also logs the current Dataframe being processed, the path where
        the parquet files are being written and the time taken to write the data.
        
        The input passed to this function are:
        1. dataframe - Dataframe to written out to parquet files.
        2. name - Name of the dataframe to display status.
        3. full_path - Path where the parquet files are to be stored.
        
        """     
    print("\nWriting {} to parquet files in {}".format(name,full_path))
    start_time = datetime.now()
    dataframe.write.mode("overwrite").parquet(full_path)
    end_time = datetime.now()
    print("{} written in {}.".format(name,end_time-start_time))

In [59]:
write_to_parquet_files(ImmigrationDetails, 'ImmigrationDetails', path+immigration_details_data) 
write_to_parquet_files(CityDemographics, 'CityDemographics', path+city_demographics_data) 
write_to_parquet_files(CountryDetails, 'CountryDetails', path+country_details_data) 
write_to_parquet_files(PortCodes, 'PortCodes', path+port_codes_data) 
write_to_parquet_files(ImmigrationModes, 'ImmigrationModes', path+immigration_modes_data) 


Writing ImmigrationDetails to parquet files in ./udacity-dend-rp/immigration_data/
ImmigrationDetails written in 0:00:55.592554.

Writing CityDemographics to parquet files in ./udacity-dend-rp/city_demographics_data/
CityDemographics written in 0:00:08.809060.

Writing CountryDetails to parquet files in ./udacity-dend-rp/country_details_data/
CountryDetails written in 0:00:40.555483.

Writing PortCodes to parquet files in ./udacity-dend-rp/port_codes_data/
PortCodes written in 0:00:00.232835.

Writing ImmigrationModes to parquet files in ./udacity-dend-rp/immigration_modes_data/
ImmigrationModes written in 0:00:00.195931.


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [60]:
def quality_check(parquet_file, dataframe, primary_key):
    """
        This function is to perform quality checks on the parquet files which have been
        written. This function checks for:
        i.  The number of entries in the parquet file.
        ii. Number of NULL values for the primary key for the field in the dataframe.
        
        This function also logs the current parquet file being processed, the number of 
        entries, the number of entries with NULL values for the primary key field, the 
        time taken to perform the validation and whether the quality check has passed.
        The Quality check passes if:
        i.  There are more than zero entries.
        ii. There are no NULL values for the primary key.
        
        The input passed to this function are:
        1. parquet_file - The parquet file path.
        2. dataframe - The name of the dataframe
        3. primary_key - The primary key for the dataframe.
        
        """    
    start_time = datetime.now()
    print("\nQuality check for {}:".format(dataframe))
    parquet_df = spark.read.parquet(parquet_file)
    row_count = parquet_df.count()
    parquet_df.createOrReplaceTempView("parquet_table")
    pk_null_count = spark.sql("SELECT count(*) FROM parquet_table WHERE {} IS NULL".format(primary_key)).collect()[0][0]
    print("No. of rows for {}:{}".format(dataframe,str(row_count)))
    print("No. of null values primary key {}:{}".format(primary_key,str(pk_null_count)))
    end_time = datetime.now()
    if(row_count>0 and pk_null_count==0):
        print('Quality check passed. Time taken is {}'.format(end_time-start_time))
    else:
        print('Quality check failed. Time taken is {}'.format(end_time-start_time))

quality_check(path+immigration_details_data,'ImmigrationDetails','cicid') 
quality_check(path+city_demographics_data,'CityDemographics','state_code') 
quality_check(path+country_details_data,'CountryDetails','country_code') 
quality_check(path+port_codes_data,'PortCodes','port_code')    
quality_check(path+immigration_modes_data,'ImmigrationModes','mode_id')


Quality check for ImmigrationDetails:
No. of rows for ImmigrationDetails:3096074
No. of null values primary key cicid:0
Quality check passed. Time taken is 0:00:01.161392

Quality check for CityDemographics:
No. of rows for CityDemographics:596
No. of null values primary key state_code:0
Quality check passed. Time taken is 0:00:02.383112

Quality check for CountryDetails:
No. of rows for CountryDetails:149
No. of null values primary key country_code:0
Quality check passed. Time taken is 0:00:01.170871

Quality check for PortCodes:
No. of rows for PortCodes:660
No. of null values primary key port_code:0
Quality check passed. Time taken is 0:00:00.391623

Quality check for ImmigrationModes:
No. of rows for ImmigrationModes:4
No. of null values primary key mode_id:0
Quality check passed. Time taken is 0:00:00.329444


#### 4.3 Data dictionary 

##### ImmigrationDetails (Fact Table)
The main source of data for this table is the I94 Immigration Data. The data dictionary is provided below.

| Field | Datatype|
| --- | --- |
| cicid				| long 	   |
| arrival_date		| date 	   |
| departure_date	| date 	   |
| us_address		| string 	|
| resident_country	| integer  |
| resident_city		| integer  |
| travel_mode		| integer  |
| port				| string 	|
| visa_code			| integer  |
| age				| integer  |
| visa_type			| string 	|
| birth_year		| double 	|
| gender			| string 	|
| admission_number	| long 	   |
| airline			| string 	|
| flight_number		| string 	|


##### CityDemographics (Dimension Table)
The main source of data for this table is the U.S. City Demographic Data from OpenSoft. The data dictionary is provided below.

| Field | Datatype|
| --- | --- |
| state_code			 | string  |
| state					 | string  |
| city					 | string  |
| median_age			 | integer |
| average_household_size | integer |
| total_population		 | integer |
| male_population		 | integer |
| female_population		 | integer |
| foreign_born			 | integer |
| veteran_population	 | integer |
| black_population		 | integer |
| hispanic_population	 | integer |
| white_population		 | integer |
| asian_population		 | integer |
| native_population		 | integer |


##### CountryDetails (Dimension Table)
The main source of data for this table is the Kaggle dataset for World Temperature Data and the country codes provided in the SAS file I94_SAS_Labels_Descriptions. The Kaggle dataset contained data for several years, this was aggregated based on the Country after joinig this with the Country Codes in the SAS file. The data dictionary is provided below.

| Field | Datatype|
| --- | --- |
| country_code			| integer |
| country				| string  |
| average_temperature	| double  |
| latitude				| string  |
| longitude				| string  |


##### PortCodes (Dimension Table)
The main source of data for this table is the SAS file I94_SAS_Labels_Descriptions. The data dictionary is provided below.

| Field | Datatype|
| --- | --- |
| port_code		| string |
| port			| string |


##### ImmigrationModes (Dimension Table)
The main source of data for this table is the SAS file I94_SAS_Labels_Descriptions. The data dictionary is provided below.

| Field | Datatype|
| --- | --- |
| mode_id	| integer |
| mode		| string  |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

This project demonstrates the simplicity with which a Data Warehouse can be implemented with a few tools which have been proven to be very versatile and scalable. If required, this can easily be transformed into a cloud based solution which is low cost, scalable and highly reliable infrastructure platform. The tools which has been used here and it's advantages are provided below:

- Python:
        + Learning ease and support available
        + Extensive support libraries
        + Presence of third party modules
        + Open Source and community development
        + User friendly data structures
        + Productivity and speed


- Spark:
        + Provides highly reliable fast in memory computation
        + Efficient in interactive queries and iterative algorithm
        + Highly efficent in real time analytics using spark streaming and spark sql
        + Compatibilty with any api JAVA, SCALA, PYTHON, R makes programming easy


- Pandas:
        + Efficiently handles large data
        + Made for Python
        + Makes data flexible and customizable


- Parquet files:
        + These can be used in any Hadoop ecosystem like Hive, Impala, Pig and Spark.
        + Organizing by column allows for better compression, as data is more homogeneous.
        + The space savings are very noticeable at the scale of a Hadoop cluster.
        + I/O will be reduced as we can efficiently scan only a subset of the columns while reading the data. 
        + Better compression also reduces the bandwidth required to read the input.


* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
         + This solution can be easily scaled by moving it to the Amazon cloud which has the ability to scale without issues. The parquet files can also be loaded to Amazon Redshift databases which is very scalable.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
         + Airflow can be incorporated to handle the piplelines and run at customized schedules as well as populate dashboards as per the required schedules.
 * The database needed to be accessed by 100+ people.
         + As mentioned earlier, the data can be loaded into Amazon Redshift where nodes can be added and removed as required.