# Project Title
### Data Engineering Capstone Project

#### Project Summary
* The goal of this project is to enable data analysts and other similar parties analyze various aspects of immigration data, by collecting data from four different data sets viz.,immigration, airport codes, US Cities demographics and global temperature.  The project builds a useful schema from these datasets which will enable analysts to get an information like which origin country has more visitors visiting US, how long do they stay in the US, what's the demography of the state where the immigrants land in, and what's the average temparature of the country

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# import necessary libraries and user defined utilities
import pandas as pd

import os
import configparser
import datetime as dt
import time
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, avg, monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql import Row
import util as util
import table_helper as helper

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.show(2)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [4]:
print(df_spark.count(),len(df_spark.columns))

3096313 28


In [5]:
# dt = util.convert_sas_to_date(20573.0)
# dt.minute

0

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use?

* Purpose of this project is to facilitate meaningful US immigration data analysis for parties like data analysts.
    
* Clean the raw data obtained from the above sources
* Write them to parquet files
* Perform ETL using Spark Cluster
* Create necessary Fact and Dimension tables in Star Schema
* Load data into the above table

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?

* i94 Immigration Sample Data <br>
    * This data set includes various information of person entering and leaving US like origin country, departure date, arrival date, port of entry, date of departure, visa type to name a few.
* US City Demographic Data from  https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/ _(provided in udacity workspace)_<br>
    * This data set contains total populattion, median age, race, average household size, no. of veterans etc.
* Airport Codes from https://datahub.io/core/airport-codes#data. _(provided in udacity workspace)_<br>
    * This data includes details of country code, airport code, corresponding city code, elevation, type of airport (small, heliport, large etc).
* Global Temperature Data is available in udacity workspace _(GlobalLandTemperaturesByCity.csv)_ 

In [3]:
immigration_data = 'immigration_data_sample.csv'
immigration_df = pd.read_csv(immigration_data)
immigration_df.head(1)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,782,WT


#### Global Temperature Data

In [4]:
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
global_temperature_df = pd.read_csv(file_name)

In [6]:
global_temperature_df.shape

(8599212, 7)

In [5]:
global_temperature_df.head(1)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E


In [6]:
dt_begin = "2000-01-01"
dt_end = "2019-01-01"
after_dt_begin = global_temperature_df["dt"] >= dt_begin
before_dt_end = global_temperature_df["dt"] < dt_end
dt_range = after_dt_begin & before_dt_end
global_temperature_df = global_temperature_df.loc[dt_range]
global_temperature_df.shape

(579150, 7)

In [8]:
global_temperature_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3074,2000-01-01,3.065,0.372,Århus,Denmark,57.05N,10.33E
3075,2000-02-01,3.724,0.241,Århus,Denmark,57.05N,10.33E
3076,2000-03-01,3.976,0.296,Århus,Denmark,57.05N,10.33E
3077,2000-04-01,8.321,0.221,Århus,Denmark,57.05N,10.33E
3078,2000-05-01,13.567,0.253,Århus,Denmark,57.05N,10.33E


In [7]:
global_temperature_df = global_temperature_df.dropna()

In [8]:
for column in global_temperature_df:
    values = global_temperature_df[column].unique()
    if(True in pd.isnull(values)):
        print(f"column {column} has null value")
print("finished checking for null")

finished checking for null


In [9]:
global_temperature_df.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

 #### AIRPORT CODES

In [10]:
airport_codes_csv = 'airport-codes_csv.csv'
airport_codes_df = pd.read_csv(airport_codes_csv)
airport_codes_df.shape

(55075, 12)

In [11]:
column_list = ['iata_code', 'local_code']
airport_codes_df = util.cleanup_missing_column_values(airport_codes_df,column_list)

removing rows with null values for ['iata_code', 'local_code']
total rows before clean up 55075
total rows after clean up 2987


In [12]:
airport_codes_df.head(1)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"


In [15]:
airport_codes_df.shape

(2987, 12)

#### US CITY DEMOGRAPHICS

In [13]:
us_cities_dem_csv = "us-cities-demographics.csv"
demographics_df = pd.read_csv(us_cities_dem_csv, delimiter=';')
demographics_df.head(1)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924


In [17]:
demographics_df.shape

(2891, 12)

In [14]:
demographics_df.shape

for column in demographics_df:
    values = demographics_df[column].unique()
    if(True in pd.isnull(values)):
        print(f"column {column} has null value")
print("finished checking for null")        


column Male Population has null value
column Female Population has null value
column Number of Veterans has null value
column Foreign-born has null value
column Average Household Size has null value
finished checking for null


#### Build country code to country name data frame 
_(The following code is used based on a direction from a mentor)_

In [15]:
#extract country name and country code from I94_SAS_Labels_Descriptions.SAS

with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()
    country_code = {}
    for countries in contents[10:298]:
        pair = countries.split('=')
        code,country = pair[0].strip(), pair[1].strip().strip("'")
        country_code[code] = country
country_code_df = pd.DataFrame(list(country_code.items()),columns=['code','country'])
country_code_df.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
* Clean up data with null values in important columns
* Drop duplicate rows

In [16]:
# clean up Immigration data
column_list = ["arrdate","depdate"]
immigration_df = util.cleanup_missing_column_values(immigration_df,column_list)
immigration_df = immigration_df.drop_duplicates()

#dropping columns insnum and occup as there are null values for majority of the rows
# immigration_df = immigration_df.drop('insnum', axis=1)
# immigration_df = immigration_df.drop('occup', axis=1)
util.report_unique_info(immigration_df, "immigration_df", ["visapost"])

removing rows with null values for ['arrdate', 'depdate']
total rows before clean up 1000
total rows after clean up 951
Displaying number of unique values for given columns in  immigration_df dataframe
There are 97 unique values in column visapost


In [17]:
# clean up Global Temperature data
column_list = ["AverageTemperature", "Country"]
global_temperature_df = util.cleanup_missing_column_values(global_temperature_df,column_list)
global_temperature_df = global_temperature_df.dropna()
global_temperature_df.shape

removing rows with null values for ['AverageTemperature', 'Country']
total rows before clean up 576080
total rows after clean up 576080


(576080, 7)

In [18]:
#discard historical data and work with data for past 20 years
print("Discarding historical data and work with data for recent 20 years")
dt_begin = "2000-01-01"
dt_end = "2019-01-01"
after_dt_begin = global_temperature_df["dt"] >= dt_begin
before_dt_end = global_temperature_df["dt"] < dt_end
dt_range = after_dt_begin & before_dt_end
global_temperature_df = global_temperature_df.loc[dt_range]
global_temperature_df.shape

Discarding historical data and work with data for recent 20 years


(576080, 7)

In [19]:
global_temperature_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3074,2000-01-01,3.065,0.372,Århus,Denmark,57.05N,10.33E
3075,2000-02-01,3.724,0.241,Århus,Denmark,57.05N,10.33E
3076,2000-03-01,3.976,0.296,Århus,Denmark,57.05N,10.33E
3077,2000-04-01,8.321,0.221,Århus,Denmark,57.05N,10.33E
3078,2000-05-01,13.567,0.253,Århus,Denmark,57.05N,10.33E


In [20]:
# clean up Airport codes
column_list = ['iata_code', 'local_code']
airport_codes_df = util.cleanup_missing_column_values(airport_codes_df,column_list)
airport_codes_df.shape

removing rows with null values for ['iata_code', 'local_code']
total rows before clean up 2987
total rows after clean up 2987


(2987, 12)

In [21]:
demographics_df = demographics_df.drop_duplicates()
demographics_df = demographics_df.dropna()
# need to remove rows that have null values for the following columns
# Male Population, Female Population
# column_list = ['Male Population', 'Female Population',]
# demographics_df = util.cleanup_missing_column_values(demographics_df,column_list)
demographics_df.shape

(2875, 12)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

* The process of admitting a visitor to the US triggers events that can be classifed as facts.  In this project we are creating immigration fact table
* Derive dimension tables
    * airport
    * time
    * status
    * visa
    * temperature
    * country
    * state


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

* Load data into staging environment
* Create fact and dimension tables
* Write table data into parquet files
* Run data quality tests

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [22]:

# not including insnum, occup columns as majority of rows contain null value for these columns
# also these columns are dropped from the data frame
immigration_df.drop(["insnum","occup"], inplace=True, axis=1)
immig_schema = StructType([StructField("0", IntegerType(), True)\
                          ,StructField("cicid", FloatType(), True)\
                          ,StructField("i94yr", FloatType(), True)\
                          ,StructField("i94mon", FloatType(), True)\
                          ,StructField("i94cit", FloatType(), True)\
                          ,StructField("i94res", FloatType(), True)\
                          ,StructField("i94port", StringType(), True)\
                          ,StructField("arrdate", FloatType(), True)\
                          ,StructField("i94mode", FloatType(), True)\
                          ,StructField("i94addr", StringType(), True)\
                          ,StructField("depdate", FloatType(), True)\
                          ,StructField("i94bir", FloatType(), True)\
                          ,StructField("i94visa", FloatType(), True)\
                          ,StructField("count", FloatType(), True)\
                          ,StructField("dtadfile", StringType(), True)\
                          ,StructField("visapost", StringType(), True)\
                          ,StructField("entdepa", StringType(), True)\
                          ,StructField("entdepd", StringType(), True)\
                          ,StructField("entdepu", StringType(), True)\
                          ,StructField("matflag", StringType(), True)\
                          ,StructField("biryear", FloatType(), True)\
                          ,StructField("dtaddto", StringType(), True)\
                          ,StructField("gender", StringType(), True)\
                          ,StructField("airline", StringType(), True)\
                          ,StructField("admnum", FloatType(), True)\
                          ,StructField("fltno", StringType(), True)\
                          ,StructField("visatype", StringType(), True)])

In [23]:
immigration_spark = spark.createDataFrame(immigration_df, schema=immig_schema)
print(immigration_spark.count(),len(immigration_spark.columns))

951 27


In [24]:
immigration_spark.toPandas().head(3)

Unnamed: 0,0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepd,entdepu,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,O,,M,1955.0,7202016,F,JL,56582680000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,R,,M,1990.0,10222016,M,*GA,94361990000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,O,,M,1940.0,7052016,M,LH,55780470000.0,00464,WT


In [25]:
globaltemp_schema = StructType([StructField("dt", StringType(), True)\
                          ,StructField("average_temperature", FloatType(), True)\
                          ,StructField("average_temperature_uncertainty", FloatType(), True)\
                          ,StructField("city", StringType(), True)\
                          ,StructField("country", StringType(), True)\
                          ,StructField("latitude", StringType(), True)\
                          ,StructField("longitude", StringType(), True)])
global_temperature_df.rename(columns={'AverageTemperature':'average_temperature'}, inplace=True)
global_temperature_df.rename(columns={'AverageTemperatureUncertainty':'average_temperature_uncertainty'}, inplace=True)
global_temperature_df.rename(columns={'City':'city'}, inplace=True)
global_temperature_df.rename(columns={'Country':'country'}, inplace=True)
global_temperature_df.rename(columns={'Latitude':'latitude'}, inplace=True)
global_temperature_df.rename(columns={'Longitude':'longitude'}, inplace=True)

temp_spark = spark.createDataFrame(global_temperature_df, schema=globaltemp_schema)

temp_spark.toPandas().head()

Unnamed: 0,dt,average_temperature,average_temperature_uncertainty,city,country,latitude,longitude
0,2000-01-01,3.065,0.372,Århus,Denmark,57.05N,10.33E
1,2000-02-01,3.724,0.241,Århus,Denmark,57.05N,10.33E
2,2000-03-01,3.976,0.296,Århus,Denmark,57.05N,10.33E
3,2000-04-01,8.321,0.221,Århus,Denmark,57.05N,10.33E
4,2000-05-01,13.567,0.253,Århus,Denmark,57.05N,10.33E


In [26]:
temp_spark.toPandas().shape

(576080, 7)

In [27]:
dem_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", FloatType(), True)\
                        ,StructField("Female Population", FloatType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", FloatType(), True)\
                        ,StructField("Foreign-born", FloatType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

dem_spark = spark.createDataFrame(demographics_df, schema=dem_schema)

dem_spark.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.799999,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.599998,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [28]:
airport_schema =  StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])
airport_spark = spark.createDataFrame(airport_codes_df, schema=airport_schema)

airport_spark.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
1,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
2,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
3,0CO2,small_airport,Crested Butte Airpark,8980.0,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
4,0TE7,small_airport,LBJ Ranch Airport,1515.0,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"


##### Define output path

In [29]:
output_path="table_data/"

##### 1.Create airport dimension

In [30]:
airport_spark = helper.create_airport(airport_spark, output_path)

Writing table airport to table_data/airport
Write complete!


##### 2.Create time dimension

In [31]:
time = helper.create_time(immigration_spark,output_path)

Writing table time to table_data/time
Write complete!


##### 3.Create status dimension

In [32]:
status = helper.create_status(immigration_spark,output_path)

Writing table status to table_data/status
Write complete!


##### 4.Create visa dimension

In [33]:
visa = helper.create_visa(immigration_spark,output_path)

Writing table visa to table_data/visa
Write complete!


In [34]:
def create_temperature(input_df, output_path):
    """
        Gather temperature data, create dataframe and write data into parquet files.
        
        :param input_df: dataframe of input data.
        :param output_data: path to write data to.
        :return: dataframe representing temperature dimension
    """
    print("creating temperature table data")
    output_df = input_df.groupBy("country").agg(
                round(mean('average_temperature'),1).alias("average_temperature"),\
                round(mean("average_temperature_uncertainty"),1).alias("average_temperature_uncertainty")
            ).dropna()\
            .withColumn("temperature_id", monotonically_increasing_id()) \
            .select(["temperature_id", "country", "average_temperature", "average_temperature_uncertainty"])
    
    util.output_to_parquet_file(output_df, output_path, "temperature")
    
    return output_df


##### 5.Create temperature dimension

In [35]:
temperature = create_temperature(temp_spark,output_path)

creating temperature table data
Writing table temperature to table_data/temperature
Write complete!


##### 6.Create country dimension

In [36]:
country_spark = spark.createDataFrame(country_code_df)

In [37]:
country = helper.create_country(country_spark,output_path)

Writing table country to table_data/country
Write complete!


##### 9.Create state dimension

In [38]:
def create_state(input_df, output_path):
    """
        Get state specific data and create dataframe and write data into parquet files.
        Here we will group the information by state code
        Rename the columns from Xxxx Yyyy to xxx_yyy
        Drop rows with null values
        
        :param input_df: dataframe of input data.
        :param output_data: path to write data to.
        :return: dataframe representing state dimension
    """
    
    output_df = input_df.select(["State Code", "State", "Median Age", "Male Population", "Female Population", "Total Population", "Average Household Size",\
                          "Foreign-born", "Race", "Count"])\
                .withColumnRenamed("State","state")\
                .withColumnRenamed("State Code", "state_code")\
                .withColumnRenamed("Median Age", "median_age")\
                .withColumnRenamed("Male Population", "male_population")\
                .withColumnRenamed("Female Population", "female_population")\
                .withColumnRenamed("Total Population", "total_population")\
                .withColumnRenamed("Average Household Size", "avg_household_size")\
                .withColumnRenamed("Foreign-born", "foreign_born")\
                .withColumnRenamed("Race", "race")\
                .withColumnRenamed("Count", "count")
    print(output_df.show(2))
    output_df = output_df.groupBy("state_code","state").agg(\
                round(mean('median_age'),0).alias("median_age"),\
                sum("total_population").alias("total_population"),\
                sum("male_population").alias("male_population"),\
                sum("female_population").alias("female_population"),\
                sum("foreign_born").alias("foreign_born"), \
                sum("avg_household_size").alias("average_household_size")
                ).dropna()
    
    util.output_to_parquet_file(output_df, output_path, "state")
    
    return output_df

In [39]:
state = create_state(dem_spark,output_path)

+----------+-------------+----------+---------------+-----------------+----------------+------------------+------------+------------------+-----+
|state_code|        state|median_age|male_population|female_population|total_population|avg_household_size|foreign_born|              race|count|
+----------+-------------+----------+---------------+-----------------+----------------+------------------+------------+------------------+-----+
|        MD|     Maryland|      33.8|        40601.0|          41862.0|           82463|               2.6|     30908.0|Hispanic or Latino|25924|
|        MA|Massachusetts|      41.0|        44129.0|          49500.0|           93629|              2.39|     32935.0|             White|58723|
+----------+-------------+----------+---------------+-----------------+----------------+------------------+------------+------------------+-----+
only showing top 2 rows

None
Writing table state to table_data/state
Write complete!


##### Create Immigration Fact

In [40]:
# join city dimension and temperature dimension
country_temperature = country.select(["*"])\
            .join(temperature, (country.country == upper(temperature.country)), how='full')\
            .select([country.code, country.country, temperature.temperature_id, temperature.average_temperature, temperature.average_temperature_uncertainty])

country_temperature.write.mode("overwrite").parquet(output_path+"country_temperature_mapping")

In [41]:
immigration = helper.create_immigration(immigration_spark, output_path, spark)

Writing table immigration to table_data/immigration
Write complete!


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 


##### 1. Generic data check

_In this section, top 5 rows are printed form each of the dimension and fact tables_

In [42]:
airport = spark.read.parquet(output_path+"airport")
airport.toPandas().head()

Unnamed: 0,ident,type,iata_code,name,iso_country,iso_region,municipality,gps_code,coordinates,elevation_ft
0,BNM,small_airport,BNM,Bodinumu Airport,PG,PG-CPM,Bodinumu,AYBD,"147.666722222, -9.107777777779999",3700.0
1,CFF4,small_airport,DAS,Great Bear Lake Airport,CA,CA-NT,Great Bear Lake,CFF4,"-119.707000732, 66.7031021118",562.0
2,CFQ4,small_airport,TIL,Cheadle Airport,CA,CA-AB,Cheadle,CFQ4,"-113.62400054932, 51.057498931885",3300.0
3,KAPN,medium_airport,APN,Alpena County Regional Airport,US,US-MI,Alpena,KAPN,"-83.56030273, 45.0780983",690.0
4,KBFI,large_airport,BFI,Boeing Field King County International Airport,US,US-WA,Seattle,KBFI,"-122.302001953125, 47.529998779296875",21.0


In [43]:
time = spark.read.parquet(output_path+"time")
time.toPandas().head()

Unnamed: 0,arrdate,arrival_date,day,month,year,week,weekday
0,20566.0,2016-04-22,22,4,2016,16,6
1,20567.0,2016-04-23,23,4,2016,16,7
2,20551.0,2016-04-07,7,4,2016,14,5
3,20572.0,2016-04-28,28,4,2016,17,5
4,20550.0,2016-04-06,6,4,2016,14,4


In [44]:
status = spark.read.parquet(output_path+"status")
status.toPandas().head()

Unnamed: 0,status_flag_id,arrival_flag,departure_flag,match_flag
0,8,O,O,M
1,35,A,D,M
2,101,Z,O,M
3,45,G,I,M
4,15,P,D,M


In [45]:
visa = spark.read.parquet(output_path+"visa")
visa.toPandas().head()

Unnamed: 0,visa_id,i94visa,visatype,visapost
0,32,2.0,B2,CPT
1,78,2.0,B2,BGT
2,223,2.0,B2,KEV
3,425,2.0,B2,ABD
4,154,2.0,B2,BRA


In [46]:
temperature = spark.read.parquet(output_path+"temperature")
temperature.toPandas().head()

Unnamed: 0,temperature_id,country,average_temperature,average_temperature_uncertainty
0,1675037245440,United Kingdom,10.1,0.3
1,1675037245441,Moldova,10.1,0.4
2,1675037245442,Congo (Democratic Republic Of The),23.9,0.6
3,1649267441664,Bosnia And Herzegovina,11.6,0.4
4,1236950581248,United Arab Emirates,28.0,0.4


In [47]:
country = spark.read.parquet(output_path+"country")
country.toPandas().head()

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [48]:
state = spark.read.parquet(output_path+"state")
state.toPandas().head()

Unnamed: 0,state_code,state,median_age,total_population,male_population,female_population,foreign_born,average_household_size
0,DC,District of Columbia,34.0,3361140,1598525.0,1762615.0,475585.0,11.2
1,AZ,Arizona,35.0,22497710,11137275.0,11360435.0,3411565.0,221.950004
2,MO,Missouri,35.0,7595970,3666310.0,3929660.0,495475.0,108.449998
3,KS,Kansas,35.0,5741370,2820725.0,2920645.0,593225.0,90.700001
4,AR,Arkansas,33.0,2882889,1400724.0,1482165.0,307753.0,73.279999


In [49]:
country_temperature = spark.read.parquet(output_path+"country_temperature_mapping")
country_temperature.toPandas().head()

Unnamed: 0,code,country,temperature_id,average_temperature,average_temperature_uncertainty
0,383.0,CENTRAL AFRICAN REPUBLIC,773094100000.0,25.7,0.6
1,,,575525600000.0,26.2,0.3
2,104.0,BELGIUM,386547100000.0,10.9,0.2
3,688.0,BOLIVIA,644245100000.0,11.8,0.7
4,240.0,EAST TIMOR,,,


In [50]:
immigration = spark.read.parquet(output_path+"immigration")
immigration.toPandas().head()

Unnamed: 0,cicid,i94res,depdate,i94mode,i94port,i94cit,i94addr,airline,fltno,ident,code,temperature_id,status_flag_id,visa_id,state_code,country,arrdate
0,4084316.0,209.0,20573.0,1.0,HHW,209.0,HI,JL,782,,209,1417339000000.0,0,0,HI,JAPAN,20566.0
1,4084316.0,209.0,20573.0,1.0,HHW,209.0,HI,JL,782,,209,1417339000000.0,0,0,HI,JAPAN,20566.0
2,4084316.0,209.0,20573.0,1.0,HHW,209.0,HI,JL,782,,209,1417339000000.0,0,0,HI,JAPAN,20566.0
3,4084316.0,209.0,20573.0,1.0,HHW,209.0,HI,JL,782,,209,1417339000000.0,0,0,HI,JAPAN,20566.0
4,4084316.0,209.0,20573.0,1.0,HHW,209.0,HI,JL,782,,209,1417339000000.0,0,0,HI,JAPAN,20566.0


##### 2. Record count check
##### _In this section, a generic record count is done for each table_

In [51]:
util.run_record_count_check(airport, "airport")
util.run_record_count_check(time, "time")
util.run_record_count_check(status, "status")
util.run_record_count_check(visa, "visa")
util.run_record_count_check(state, "state")
util.run_record_count_check(temperature, "temperature")
util.run_record_count_check(immigration, "immigration")

Record count check passed for airport table with record count: 2987 records.
Record count check passed for time table with record count: 951 records.
Record count check passed for status table with record count: 28 records.
Record count check passed for visa table with record count: 141 records.
Record count check passed for state table with record count: 48 records.
Record count check passed for temperature table with record count: 159 records.
Record count check passed for immigration table with record count: 31425 records.


0

##### 2. Custom data check
_In this section, results for custom queries are printed_

* Get total count of immigrants for each country

In [52]:
imm_df = immigration.toPandas()
imm_df = imm_df.groupby(['country'])['country'].count()
imm_df.head()

country
ANGOLA               35
ANTIGUA-BARBUDA      29
ARGENTINA           685
AUSTRALIA          1503
AUSTRIA             179
Name: country, dtype: int64

#### Data Dictionary

__Please refer Data Dictionary.ipynb file__

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### 5.1 Rationale for the choice of tools and technologies for the project.

* Used Apache Spark, so that we can take advantage of Spark's feature of caching data in memory and also parallel processing.
* Used Pandas data frames for convenience in data manipulation.

##### 5.2 How often the data should be updated/refreshed, why?

* From the project's perspective, it depends on how often the immigration dataset gets updated as the data on other dimension tables depneds on immigration data.  If the immigration data gets updated on a monthly basis, then all other data should be updated monthly

##### 5.3 Approach in different scenarios

###### a. If the data is increased by 100x
* I would have to use frameworks like Amazon Redshift, EMR Cluster etc to handle such a magnitude of data

###### b. If the data populates a dashboard that needs to be updated by 7am daily
* In this case, I would use Apache Airflow DAG which facilitates scheduling building/managing data pipelines on a timely basis.

###### c. If 100+ people are going to access the system
* In this case I would have to use a tried and tested set up like Amazon Redshift, Snowflakes and few other similar setup which are known to handle high database traffic.