# US Immigration Data Engineering Project
### Data Engineering Capstone Project

#### Project Summary
The purpose of this project is to build data lakes for the Analytics team, so that they can use it further for the data analysis tasks in  fast and efficient manner. Data used in this project come from a variety of sources. Mainly 4 datasets are used which are as follows:
*  I94 Immigration Data
*  World Temperature Data
*  U.S. City Demographic Data
*  Airport Code Data

A detailed project overview can be found in Step 1.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [160]:
# Importing all the necessary libraries
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime,timedelta
from pyspark.sql.functions import from_unixtime, unix_timestamp, to_date, expr,\
                                  date_add,udf,col,avg,mean,year,month,split
from pyspark.sql.types import StringType, DateType
import csv

### Step 1: Scope the Project and Gather Data

#### 1.1 Scope: 
**Why using data lakes over data warehouse?**
> *The idea here is to use these raw data files to build refined data lakes tables. The benefit of data lakes over data ware house is that it has more flexibility in terms of data availability, storage and data management. In the data lakes provides the data in more manageable way, at the same time it does not modify the raw data too much, by having this it allows the analaytics team more flexilbility and they can mold the data as per their requirement.*  

**How end solution look like?**
> *Final product is a data lake in form of Parquet files having following tables:*
> 1. Population Table
    * Columns: city, state_code, state, median_age, male, female, total, age_household_size.
> 2. Airport Table
    * Columns: type, name, elevation_ft, continent, iso_country, iso_region, municipality, 
                  gps_code, iata_code, local_code, co_ordinates.
> 3. Temperature Table
    * Columns: month, city, avg_temp, avg_temp_uncertainty.
> 4. Immigration Table
    * Columns: i94port, i94mon, i94yr, i94addr, i94bir, count.    

**What tool do I use?**
> *I used following tools in this project:*
  * Apache Spark
  * Pyspark


#### 1.2 Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Mainly 4 datasets are used which are as follows:
>  
   *  I94 Immigration Data
   *  World Temperature Data
   *  U.S. City Demographic Data
   *  Airport Code Table

Let's have a detailed look on each dataset:
*  **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. It contains international visitor arrival statistics by world regions and selected countries, type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry. Details of the columns like their names and meanings can be seen in the data dictionary included at the end of this document.

*  **World Temperature Data:**  This dataset came from Kaggle. This dataset contains temperature data of US from 1850 to 2013. A detailed data dictionary is provided at the end of this documents.

* **U.S. City Demographic Data:** This data comes from Opensoft. This data comes from the US Census Bureau's 2015 American Community Survey. This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. A detailed data dictionary is provided at the end of this documents.

* **Airport Code Table:** This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code. 
A detailed data dictionary is provided at the end of this documents.

##### 1.2.1 Importing Immigration dataset:

In [84]:
# Creating Spark Session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [85]:
# Checking spark version for compatibility issue
print("Spark Version is :: {}".format(spark.version))

Spark Version is :: 2.4.3


In [86]:
%%time
# Loading immigration dataset
# First loading Jun'16 file (different number of columns )
df_immig_merged =spark.read.format('com.github.saurfang.sas.spark').option("inferSchema", "true").\
                 option("dateFormat", "yyyyMMdd").\
                 load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
df_immig_merged=df_immig_merged.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa')
# Adding rest files of the year
files=['i94_jan16_sub','i94_feb16_sub','i94_mar16_sub','i94_apr16_sub','i94_may16_sub','i94_jul16_sub','i94_aug16_sub',\
      'i94_sep16_sub','i94_oct16_sub','i94_nov16_sub','i94_dec16_sub']
for file in files:
    path='../../data/18-83510-I94-Data-2016/{}.{}'.format(file,'sas7bdat')
    #print(path)
    df_immig =spark.read.format('com.github.saurfang.sas.spark').option("inferSchema", "true").\
              option("dateFormat", "yyyyMMdd").load(path)
    df_immig_merged = df_immig_merged.union(df_immig)

CPU times: user 20.5 ms, sys: 5 ms, total: 25.5 ms
Wall time: 837 ms


In [87]:
# Inspecting a few records
df_immig_merged.take(1)

[Row(cicid=4.0, i94yr=2016.0, i94mon=6.0, i94cit=135.0, i94res=135.0, i94port='XXX', arrdate=20612.0, i94mode=None, i94addr=None, depdate=None, i94bir=59.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='Z', entdepd=None, entdepu='U', matflag=None, biryear=1957.0, dtaddto='10032016', gender=None, insnum=None, airline=None, admnum=14938462027.0, fltno=None, visatype='WT')]

##### 1.2.2 Importing world_temperature dataset:

In [88]:
%%time
# Importing world temperature dataset
df_temp=spark.read.format('csv').option('header','True').\
load('../../data2/GlobalLandTemperaturesByCity.csv')

CPU times: user 2.39 ms, sys: 0 ns, total: 2.39 ms
Wall time: 442 ms


In [89]:
# Inspecting a few records
df_temp.show(1)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 1 row



##### 1.2.3 Importing airport_code dataset:

In [90]:
%%time
# Importing airport code dataset
df_port=spark.read.format('csv').option('header','True').load('airport-codes_csv.csv')

CPU times: user 2.6 ms, sys: 0 ns, total: 2.6 ms
Wall time: 351 ms


In [91]:
# Inspecting a few records
df_port.take(1)

[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')]

##### 1.2.4 Importing us_cities_demographics dataset:

In [92]:
# Importing US cities demographic dataset
df_demog=spark.read.format('csv').option('header','True').option('delimiter',';').\
         load('us-cities-demographics.csv')

In [93]:
# Inspecting a few records
df_demog.take(1)

[Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924')]

### Step 2: Explore and Assess the Data




#### 2.1 Explore the Data 
*In this section of the code, different data anomalies (like missing values, duplicate data) are identified by all datasets one by one and those issues are fixed in cleaning dataset step.*

##### 2.1.1 Exploring Immigration dataset:

In [94]:
# Printing shape of the dataset
print("Number of Columns: {}".format(len(df_immig_merged.columns)))
print("Number of Rows: {}".format(df_immig_merged.count()))

Number of Columns: 28
Number of Rows: 40790529


In [95]:
# Printing schema of the dataset
df_immig_merged.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### 2.1.2 Exploring world_temperature dataset:

In [96]:
# Printing Shape of the Dataset
print("Number of Columns: {}".format(len(df_temp.columns)))
print("Number of Rows: {}".format(df_temp.count()))

Number of Columns: 7
Number of Rows: 8599212


In [97]:
# Printing schema of the dataset
df_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [159]:
# Dropping Duplicates Values
df_temp=df_temp.drop_duplicates()

##### 2.1.3 Exploring airport_code dataset:

In [101]:
# Printing Schema of the dataset
df_port.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [102]:
# Dropping Duplicates Values
df_port=df_port.drop_duplicates()

##### 2.1.4 Exploring us_cities_demographics dataset:

In [104]:
# Printing schema of the dataset
df_demog.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [105]:
# Dropping duplicate values
df_demog=df_demog.drop_duplicates()

#### 2.2 Cleaning Steps
In this section I am going to discuss various steps taken to clean the raw data, aggragte it and make it ready for data pileline step. 

##### 2.2.1 Cleaning Immigration dataset:
*Followings are the steps listed for cleaning immigration dataset with explanatory remarks:*

In [106]:
# Correcting data types
df_immig_merged=df_immig_merged.withColumn("cicid", df_immig_merged["cicid"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94yr", df_immig_merged["i94yr"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94mon", df_immig_merged["i94mon"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94cit", df_immig_merged["i94cit"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94res", df_immig_merged["i94res"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94mode", df_immig_merged["i94mode"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94bir", df_immig_merged["i94bir"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("i94visa", df_immig_merged["i94visa"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("biryear", df_immig_merged["biryear"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("admnum", df_immig_merged["admnum"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("count", df_immig_merged["count"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("arrdate", df_immig_merged["arrdate"].cast('integer'))
df_immig_merged=df_immig_merged.withColumn("depdate", df_immig_merged["depdate"].cast('integer'))

In [107]:
# removing records with i94mode='2','3' and '9'
df_immig_merged=df_immig_merged.filter('i94mode=1')

In [108]:
# Aggregating data by airport by month
df_immig_agg=df_immig_merged.groupBy('i94port','i94mon').agg({'i94yr':'first','i94addr':'first',\
                                        'i94bir':'avg','count':'sum'})

In [109]:
# Renaming columns' names
df_immig_agg=df_immig_agg.select(col("i94port").alias("i94port"),col("i94mon").alias("i94mon"),
                col("sum(count)").alias("count"),col("first(i94addr)").alias("i94addr"),\
                col("avg(i94bir)").alias("i94bir"),col("first(i94yr)").alias("i94yr"))

In [110]:
# Printing dataset size
print("Number of Columns: {}".format(len(df_immig_agg.columns)))
print("Number of Rows: {}".format(df_immig_agg.count()))

Number of Columns: 6
Number of Rows: 2273


In [111]:
# Creating dictionary for the airport code to airport name mapping
dict ={}
with open('city_code.csv') as f:
    file=csv.DictReader(f,delimiter=',')
    for line in file:
        dict[line['code']]=line['city_name']

In [112]:
# Converting airport code into airport name/city
convert_udf = udf(lambda x: dic[x])
df_immig_agg = df_immig_agg.withColumn('i94port', convert_udf('i94port').alias('city'))

In [113]:
# Inspecting a few records
df_immig_agg.show(5)

+-------------+------+-----+-------+------------------+-----+
|      i94port|i94mon|count|i94addr|            i94bir|i94yr|
+-------------+------+-----+-------+------------------+-----+
|    Champlain|     2|   89|     NY|30.103448275862068| 2016|
|Christiansted|     3|  276|   null|              43.0| 2016|
|    Cleveland|     5|  327|     OH|44.764525993883794| 2016|
|       Calais|    10|    1|     ME|              61.0| 2016|
|    Charlotte|     4|16085|     FL| 43.73229717127759| 2016|
+-------------+------+-----+-------+------------------+-----+
only showing top 5 rows



##### 2.2.2 Cleaning world_temperature dataset:
*Followings are the steps listed for cleaning  temperature dataset with explanatory remarks:*

In [114]:
# Correcting data types
df_temp=df_temp.withColumn("dt", df_temp["dt"].cast('date'))
df_temp=df_temp.withColumn("AverageTemperature", df_temp["AverageTemperature"].cast('float'))
df_temp=df_temp.withColumn("AverageTemperatureUncertainty", df_temp["AverageTemperatureUncertainty"].cast('float'))
df_temp=df_temp.withColumn("Latitude", df_temp["Latitude"].cast('float'))
df_temp=df_temp.withColumn("Longitude", df_temp["Longitude"].cast('float'))

In [115]:
# Printing Schema
df_temp.printSchema()

root
 |-- dt: date (nullable = true)
 |-- AverageTemperature: float (nullable = true)
 |-- AverageTemperatureUncertainty: float (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)



In [116]:
# Filtering data of US only
df_temp=df_temp.filter('Country="United States"')

In [117]:
# Renaming columns' names
df_temp=df_temp.select(col("dt").alias("date"),year("dt").alias("year"),month("dt").alias("month"),\
                col("AverageTemperature").alias("avg_temp"),col("AverageTemperatureUncertainty").\
                alias("avg_temp_uncertainty"),col("City").alias("city"),col("Latitude").alias("latitude"),\
                col("Longitude").alias("longitude"))

In [118]:
# Aggregating data by city by month
df_temp_agg=df_temp.groupBy('city','month').agg({'avg_temp':'avg','avg_temp_uncertainty':'avg'})

In [119]:
# Renaming columns' names
df_temp_agg=df_temp_agg.select(col("city").alias("city"),col("month").alias("month"),
                col("avg(avg_temp)").alias("avg_temp"),\
                col("avg(avg_temp_uncertainty)").alias("avg_temp_uncertainty"))

In [120]:
# Printing Shape of the Dataset
print("Number of Columns: {}".format(len(df_temp_agg.columns)))
print("Number of Rows: {}".format(df_temp_agg.count()))

Number of Columns: 4
Number of Rows: 2976


In [121]:
# Inspecting a few records
df_temp_agg.filter('city="New York"').show(5)

+--------+-----+------------------+--------------------+
|    city|month|          avg_temp|avg_temp_uncertainty|
+--------+-----+------------------+--------------------+
|New York|    8| 21.30470931622409|   1.154484500401011|
|New York|    5|14.827175792306662|    1.16700780892279|
|New York|    4| 8.641388474060939|   1.275153846007127|
|New York|   11|4.5656346096442295|  1.3679269235008038|
|New York|    6|19.867753850496733|   1.256369226339918|
+--------+-----+------------------+--------------------+
only showing top 5 rows



##### 2.2.3 Cleaning airport_code dataset:
*Followings are the steps listed for cleaning airport dataset with explanatory remarks:*

In [122]:
# Printing dataset's schema
df_port.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [123]:
# Correcting data types
df_port=df_port.withColumn("elevation_ft", df_port["elevation_ft"].cast('float'))

In [124]:
# Extracting airport type i.e. 'small_airport','medium_airport', 'large_airport'
df_port=df_port.where(col("type").isin({"small_airport", "medium_airport","large_airport"}))

In [125]:
# Removing extraneous columns
df_port=df_port.drop('ident')

In [126]:
# Dropping duplicates rows
df_port=df_port.drop_duplicates()

In [127]:
# Splitting column 'iso_region' to extract state code
df_port = df_port.withColumn('iso_region', split(df_port['iso_region'], '-').getItem(1))

In [128]:
# Inspecting a few records
df_port.show(1)

+-------------+----------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|         type|            name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-------------+----------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|small_airport|The Farm Airport|       375.0|       NA|         US|        GA|Wrightsville|    01GE|     null|      01GE|-82.7711029052734...|
+-------------+----------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
only showing top 1 row



In [129]:
# Printing Shape of the Dataset
print("Number of Columns: {}".format(len(df_port.columns)))
print("Number of Rows: {}".format(df_port.count()))

Number of Columns: 11
Number of Rows: 39132


#### 2.2.4 Cleaning us_demographic dataset:
*Followings are the steps listed for cleaning us_demographic dataset with explanatory remarks:*

In [130]:
# Dropping extraneous columns
df_demog=df_demog.drop('Number of Veterans','Foreign-born','Race','Count')

In [131]:
# Correcting data types
df_demog=df_demog.withColumn("Median Age", df_demog["Median Age"].cast('float'))
df_demog=df_demog.withColumn("Male Population", df_demog["Male Population"].cast('integer'))
df_demog=df_demog.withColumn("Female Population", df_demog["Female Population"].cast('integer'))
df_demog=df_demog.withColumn("Total Population", df_demog["Total Population"].cast('integer'))
df_demog=df_demog.withColumn("Average Household Size", df_demog["Average Household Size"].cast('float'))

In [132]:
# Renaming coulmns' names
df_demog=df_demog.select(col("Median Age").alias("median_age"),col("Male Population").alias("male"),\
                col("Female Population").alias("female"),col("Total Population").alias("total"),\
                col("Average Household Size").alias("avg_household_size"),\
                col("City").alias("city"),col("State").alias("state"),\
                col("State Code").alias("state_code"))

In [133]:
# Dropping Duplicate rows
df_demog=df_demog.drop_duplicates()

In [134]:
# Printing schema
df_demog.printSchema()

root
 |-- median_age: float (nullable = true)
 |-- male: integer (nullable = true)
 |-- female: integer (nullable = true)
 |-- total: integer (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)



In [135]:
# Aggregating US demographic data by city
df_demog_agg=df_demog.groupBy('city').agg({'median_age':'first','male':'first','female':'first',\
            'total':'first','avg_household_size':'first','state_code':'first','state':'first'})

In [136]:
# Renaming columns' names
df_demog_agg=df_demog_agg.select(col("city").alias("city"),col("first(median_age)").alias("median_age"),\
                col("first(male)").alias("male"),\
                col("first(female)").alias("female"),col("first(total)").alias("total"),\
                col("first(avg_household_size)").alias("avg_household_size"),\
                col("first(state)").alias("state"),\
                col("first(state_code)").alias("state_code"))

In [137]:
# Printing a few records for sanity check
df_demog_agg.filter('state_code="NY"').show(5)

+------------+----------+-----+------+------+------------------+--------+----------+
|        city|median_age| male|female| total|avg_household_size|   state|state_code|
+------------+----------+-----+------+------+------------------+--------+----------+
|   Brentwood|      34.2|31395| 32397| 63792|              4.98|New York|        NY|
|New Rochelle|      40.6|38871| 40967| 79838|              2.85|New York|        NY|
| Cheektowaga|      40.7|37476| 38599| 76075|               2.3|New York|        NY|
|Mount Vernon|      38.5|31876| 36745| 68621|              2.85|New York|        NY|
|     Yonkers|      38.0|96580|104538|201118|               2.8|New York|        NY|
+------------+----------+-----+------+------+------------------+--------+----------+
only showing top 5 rows



In [None]:
# Printing shape of the dataset
print("Number of Columns: {}".format(len(df_demog_agg.columns)))
print("Number of Rows: {}".format(df_demog_agg.count()))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
To map out the conceptual data model, I chosen star schema to design the data model. Star schema is widely used in datawarehouse data modeling. It is easy to understand, flexible and scalable. These are some prominent reason I chose star schema. 
##### 3.1.1 Data modeling using Star Schema: 
In our data modeling following are the dimension and fact tables:- 
##### Dimension Tables:
> 1. Population Table
    * Columns: city, state_code, state, median_age, male, female, total, age_household_size.
> 2. Airport Table
    * Columns: type, name, elevation_ft, continent, iso_country, iso_region, municipality, 
                  gps_code, iata_code, local_code, co_ordinates.
> 3. Temperature Table
    * Columns: month, city, avg_temp, avg_temp_uncertainty.
               
##### Fact Table:
> 1. Immigration Table
    * Columns: i94port, i94mon, i94yr, i94addr, i94bir, count.

*A diagramatic representation of the schema and its relationships (Ref: [dbdigram.io](https://dbdiagram.io/d))*
![Image Not Found!](Schema.png "Schema")

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
*Followings are steps involved in data pipelines design and implementation:*

##### 4.1.1. Implementing data pipelines for Population Table:

**Population Table Schema:**
> *CREATE TABLE IF NOT EXISTS 
      population( city VARCHAR NOT NULL, 
                  state_code VARCHAR NOT NULL,
                  state VARCHAR,
                  median_age FLOAT, 
                  male INTEGER,
                  female INTEGER,
                  total INTEGER,
                  avg_household_size FLOAT,
                  PRIMARY KEY(state_code,city)
                  )*
                          


In [139]:
# Printing dataset schema
df_demog_agg.printSchema()

root
 |-- city: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- male: integer (nullable = true)
 |-- female: integer (nullable = true)
 |-- total: integer (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)



In [140]:
# Writing population table data in form of parquet files and partioned by state then city.
df_demog_agg.write.partitionBy('state','city').option('compression','snappy').parquet("population",mode='overwrite')

##### 4.1.2 Implementing data pipelines for Temperature Table:

**Temperature Table Schema:**
> *CREATE TABLE IF NOT EXISTS 
      temperature(month INTEGER NOT NULL, 
                  city VARCHAR NOT NULL,
                  country VARCHAR NOT NULL, 
                  avg_temp FLOAT, 
                  avg_temp_uncertainty FLOAT,
                  PRIMARY KEY (month,city))*
                          


In [141]:
# Printing dataset schema
df_temp_agg.printSchema()

root
 |-- city: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- avg_temp_uncertainty: double (nullable = true)



In [142]:
# Writing temperature table data in form of parquet files and partioned by city then month.
df_temp_agg.write.partitionBy('city','month').option('compression','snappy').parquet("temperature",mode='overwrite')

##### 4.1.3. Implementing data pipelines for Airport Table:

**Airport Table Schema:**
> *CREATE TABLE IF NOT EXISTS 
      airport(    type VARCHAR NOT NULL,
                  name VARCHAR NOT NULL, 
                  elevation_ft FLOAT, 
                  continent VARCHAR,
                  iso_country VARCHAR,
                  iso_region VARCHAR NOT NULL,
                  municipality VARCHAR,
                  gps_code VARCHAR PRIMARY KEY,
                  iata_code VARCHAR,
                  local_code VARCHAR,
                  co_ordinates VARCHAR,
                  PRIMARY KEY(name, iso_region)*
                          


In [143]:
# Printing dataset schema
df_port.printSchema()

root
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [144]:
# Writing airport table data in form of parquet files and partioned by iso_region.
df_port.write.partitionBy('iso_region').option('compression','snappy').parquet("airport",mode='overwrite')

##### 4.1.4. Implementing data pipelines for Immigration Table:

**Immigration Table Schema:**
> *CREATE TABLE IF NOT EXISTS 
      i94( i94port STRING NOT NULL,
           i94mon INTEGER NOT NULL,
           i94yr INTEGER NOT NULL,
           i94addr VARCHAR,
           i94bir FLOAT,
           count INTEGER,
           PRIMARY KEY(i94port, i94mon, i94yr)*

In [145]:
# Printing dataset schema
df_immig_agg.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- count: long (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94yr: integer (nullable = true)



In [146]:
# Writing immigration table data in form of parquet files and partioned by airport by month.
df_immig_agg.write.partitionBy('i94port','i94mon').option('compression','snappy').parquet("immigration",mode='overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 4.2.1. Running quality checks for Population Table:

In [147]:
# Reading population table which is in parquet file format 
df_pop=spark.read.option('compression','snappy').parquet("population")

In [148]:
# Printing number of records and columns in the population table
print("Number of Columns: {}".format(len(df_pop.columns)))
print("Number of Rows: {}".format(df_pop.count()))

Number of Columns: 8
Number of Rows: 567


##### 4.2.2. Running quality checks for Temperature Table:

In [149]:
# Reading temperature table which is in parquet file format 
df_temp=spark.read.option('compression','snappy').parquet("temperature")

In [150]:
# Checking for missing values
print("Number of Columns: {}".format(len(df_temp.columns)))
print("Number of Rows: {}".format(df_temp.count()))

Number of Columns: 4
Number of Rows: 2976


In [151]:
df_temp.show(5)

+------------------+--------------------+--------+-----+
|          avg_temp|avg_temp_uncertainty|    city|month|
+------------------+--------------------+--------+-----+
|6.4173079751743565|  1.2440000039599695| Buffalo|    4|
|11.944357548337994|  0.5757030320890022| Antioch|    3|
|12.716219483352289|  0.7552743885756993| Norwalk|   11|
|14.243999977701717|  0.8946804154241822| Killeen|    3|
| 7.688056985948988|  0.9391036319686341|Mesquite|   12|
+------------------+--------------------+--------+-----+
only showing top 5 rows



##### 4.2.3. Running quality checks for Airport Table:

In [152]:
# Reading airport table which is in parquet file format 
df_port=spark.read.option('compression','snappy').parquet("airport")

In [153]:
# Checking for missing values
print("Number of Columns: {}".format(len(df_port.columns)))
print("Number of Rows: {}".format(df_port.count()))

Number of Columns: 11
Number of Rows: 39132


In [154]:
df_port.show(5)

+-------------+--------------------+------------+---------+-----------+-------------------+--------+---------+----------+--------------------+----------+
|         type|                name|elevation_ft|continent|iso_country|       municipality|gps_code|iata_code|local_code|         coordinates|iso_region|
+-------------+--------------------+------------+---------+-----------+-------------------+--------+---------+----------+--------------------+----------+
|small_airport|Lagrone Ranch Air...|       567.0|       NA|         US|Mc Clendon-Chisholm|    19TA|     null|      19TA|-96.4169006347656...|        TX|
|small_airport|Temple Ranch Airport|       490.0|       NA|         US|              Freer|    42XS|     null|      42XS|-98.403889, 27.95...|        TX|
|small_airport|Anchorage Farm Field|       440.0|       NA|         US|          Warrenton|    56TX|     null|      56TX|-96.7593994140625...|        TX|
|small_airport|Kubecka Flying Se...|        65.0|       NA|         US|     

##### 4.2.4. Running quality checks for Immigration Table:

In [155]:
# Reading immigration table which is in parquet file format 
df_fact=spark.read.option('compression','snappy').parquet("immigration")

In [156]:
# Checking for missing values
print("Number of Columns: {}".format(len(df_fact.columns)))
print("Number of Rows: {}".format(df_fact.count()))

Number of Columns: 6
Number of Rows: 2273


In [157]:
df_fact.show(5)

+-----+-------+------------------+-----+-------+------+
|count|i94addr|            i94bir|i94yr|i94port|i94mon|
+-----+-------+------------------+-----+-------+------+
|   12|     TX|43.833333333333336| 2016|  other|     1|
|   56|     TX|27.785714285714285| 2016|  other|     1|
|    4|     PR|              54.0| 2016|  other|     5|
|   25|   null|             39.96| 2016|  other|     5|
|    1|     TX|              39.0| 2016|  other|    11|
+-----+-------+------------------+-----+-------+------+
only showing top 5 rows



#### 4.3 Data dictionary 
*Data dictionary is an important tool to understand the underlying data. It is data about the data. So, before start working on the data model it is essential to know the context of the data, its use case and where it came from. Hence, let's dive into the details of the data :*

##### 4.3.1 Immigration Table:
*Followings are the fields of Immigration table:*
~~~ 
  * i94yr    - This field contains year in 'YYYY' format E.g. '2016'.
  * i94mon   - This field contains month in numeric format('1','2','3','4','5','6','7',
               '8','9','10','11','12').
  * i94port  - This field contains airport names(or the cities name) E.g. 'New York'. 
  * i94addr  - This field contains US state code E.g. 'NY','AL' etc.
  * i94bir   - This field contains average age (grouped by airport and month ) of 
               Respondent in Years E.g. '44.50'.
  * count    - This field contains summary statistics number of count (grouped by airport and month ).
 ~~~

##### 4.3.2 Temperature Table:
*Followings are the fields of temperature table:*
```
* Month - This field contains month in numeric format('1','2','3','4','5','6','7','8','9','10','11','12'). 
* City  - This field contains US city names E.g. 'New York','Chicago' etc.
* AverageTemperature - This field contains average temperature (summarized by city and month) E.g. '23.44'
* AverageTemperatureUncertainty - This field contains average temperature uncertainty 
                                  (summarized by city and month) E.g. '3.44'
```

##### 4.3.3 Population Table:
*Followings are the fields of the population table:*
```
* City - This field contains US cities name E.g. 'New York', 'Chicago' etc.
* State - Ths field contains US states name E.g. 'Arizona','California' etc.
* State Code - This field contains US states code E.g. 'AZ', 'NY' etc.
* Median Age - This field contains median age (by city).
* Male       - This field contains male pupulation (by city).
* Female     - This field contains female pupulation (by city).
* Total      - This field contains total pupulation (by city).
* Average Household Size - This field contains average household size (by city) E.g. '2.54'
```

##### 4.3.4 Airport Codes:
```
* type          - This field contains type of airport e.g. 'small_airport', 
                  'medium_airport', 'large_airport' etc.
* name          - This field contains Airport full name.
* elevation_ft  - This field contains Elevation of the port measured in feet.  
* continent     - This field contains Continent code (e.g 'AS' for Asia,'NA' for North America).
* iso_country   - This field contains two letter ISO country code assigned by 
                  International Organization for Standardization E.g. 'US' etc.
* iso_region    - This field contains iso region code assigned by ISO E.g. 'US-AL'. 
                  Modified into US state code E.g. 'NY'.
* municipality  - This field contains local Municipality name.  
* gps_code      - This field contains Global Positioning System code.
* iata_code     - This field contains IATA(Internation Air Transport Association) airport code, also known as IATA 
                  location identifier is a three letter code assigned to airports around the world.         
* local_code    - This field contains local_code.
* coordinates   - This field contains (x,y) co-ordinates of the location.
```       

### Step 5: Complete Project Write Up


#### 5.1 Tools and Technologies:
In this project main challenges that I see are:
1. How to handle large amount of data in efficient and fast way.
2. Data are coming from various heterogeneous ecosystems and that pose a greate challenge while establishing relationship between them.

> *Keeping point 1 in my mind Apache Spark is the clear and straight choice for me in this case because eventhough I am more comfortable in using Pandas that won't help me here given the fact large amount of data. Apache Spark is fast and efficient in handly large amount of data. 
Secondly, After choosing Apache Spark as tool it gives option of different programming language for the coding like Scala, Java and Python.
Again Python API Pyspark is the choice here as it has quite similarity with Pandas and easy to implements things in it.*

#### 5.2 Data Updation policy:
*As nowadays volume of fliers have been increased significantly, I reckon a daily updation of the data would be appropriate choice.*

#### 5.3 My approach in following cases:

#####    5.3.1 If the data was increased by 100x.
*If the was increased by 100x then also Apache Spark would be fast and reliable choice and the above given solution would work fine.*

##### 5.3.2 The data populates a dashboard that must be updated on a daily basis by 7am every day.
*In this case, I need to use a tool like Apache Airflow to pragramatically schedule and monitor the whole process of the data pipelines.*

##### 5.3 The database needed to be accessed by 100+ people.
*If the database needed to be accessed by 100+ people then I think a cloud based data warehouse(E.g. Amazon Redshift) would be be
more effiecient data modeling option.*

### References:
1. https://travel.trade.gov/research/reports/i94/historical/2016.html
2. https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
3. https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/
4. https://stackoverflow.com/questions/51830697/convert-date-from-integer-to-date-format
5. https://stackoverflow.com/questions/16253060/how-to-convert-country-names-to-iso-3166-1-alpha-2-values-using-python