# Project Title
### Data Engineering Capstone Project

#### Project Summary
In our company, Data Scientists have been asked to identify tourists behaviors and called on the Data Engineers to clean, process and develop data model (star schema) that would be the starting point of long-term project that will allow them to establish patterns between the cities that non-immigrants visited. We will create dimension and fact tables and save them as parquet files for star schema model.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Importing the required packages
import pandas as pd
import re
from pyspark.sql import SparkSession
import os
import glob
import configparser
from datetime import datetime, timedelta, date
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id,count,when,date_format
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear,first,lower
from pyspark.sql import *
from pyspark.sql.types import IntegerType,LongType,StructType,StructField,StringType
import datetime

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
I94 Immigration Data: This data comes from the US National Tourism and Trade Office. Though this data has many fields but we will use only below fields:

1) I94YR - Year of admission

2) I94RES - 3 digit code of nationality

3) I94PORT - 3 character code of destination USA city

4) DEPDATE - Departure data in US

5) I94BIR - Age of non-immigrant (in years)

6) I94VISA - Visa code (1=Business,2=Pleasure,3=Student)

7) GENDER - Gender of non-immigrant

8) COUNT - Used for summary statistics

9) I94MODE - Mode of travel

10) VISTYPE - Class of admission

11) ADMNUM - Admission number

U.S. City Demographic Data: This data comes from OpenSoft

1) City

2) Male Population

3) Female Population

4) Median Age

5) Total Population

6) Foreign-Born

7) State Code

8) Race

9) Count

The airport codes: This may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identifying data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Documenting steps necessary to clean the data

In [2]:
#Reading the config file
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
AWS_ACCESS_KEY_ID     = config.get('AWS', 'AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

In [3]:
#Creating the Spark Session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2") \
        .config("fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
        .config("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
        .getOrCreate()

In [4]:
# Reading City data
df_city_temp=spark.read.csv("./us-cities-demographics.csv", sep=';', header=True)

In [5]:
# Displaying the first record of City Dataset
df_city_temp.take(1)

[Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924')]

In [6]:
#Displaying the number of records
print(df_city_temp.count())

2891


In [7]:
#Lets explore further and check the records for one city. We see that there are duplicates due to Race Column
df_city_temp.filter(df_city_temp.City=='Saint George').show()

+------------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|        City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+------------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|Saint George| Utah|      37.3|          38732|            41475|           80207|              4443|        4824|                  2.81|        UT|Black or African-...| 1376|
|Saint George| Utah|      37.3|          38732|            41475|           80207|              4443|        4824|                  2.81|        UT|  Hispanic or Latino|10829|
|Saint George| Utah|      37.3|          38732|            41475|           80207|              4443|        4824|      

In [8]:
# Printing the schema of city dataframe
df_city_temp.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [9]:
# Finding the number of duplicate rows (if any)
duplicates = df_city_temp.count() - df_city_temp.dropDuplicates().count()
print(f"no of duplicates rows = {duplicates}")

no of duplicates rows = 0


In [10]:
# Finding the number of Null values in all the fields in the city dataset
df_city_null = df_city_temp.select([count(when(col(c).isNull(),c)).alias(c) for c in df_city_temp.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [11]:
# Displaying some high level statistics of the columns with Null Values
df_city_temp.describe('Male Population','Female Population','Number of Veterans').show()

+-------+------------------+------------------+------------------+
|summary|   Male Population| Female Population|Number of Veterans|
+-------+------------------+------------------+------------------+
|  count|              2888|              2888|              2878|
|   mean| 97328.42624653739|101769.63088642659| 9367.832522585128|
| stddev|216299.93692873296|231564.57257148277| 13211.21992386408|
|    min|            100135|            100260|             10001|
|    max|             99967|             99430|              9988|
+-------+------------------+------------------+------------------+



In [12]:
# Selecting columns of our interest from df_city
city_columns=['City','State','Male Population','Female Population','Total Population','Race','Count']
# Creating a new data frame from city dataframe
df_city = df_city_temp.select(city_columns)
#Displaying a sample record
df_city.take(1)

[Row(City='Silver Spring', State='Maryland', Male Population='40601', Female Population='41862', Total Population='82463', Race='Hispanic or Latino', Count='25924')]

In [13]:
# We will clean this dataframe further and try to remove the duplicates with loosing any information
df_city_race = df_city.select('City','State','Race','Count').groupBy('City','State').pivot('Race').agg(first('Count'))

In [14]:
# Dropping unnecessary columns from df_city_temp
df_city_temp = df_city_temp.drop("Number of Veterans").drop("Race").drop("Count").drop('Median Age').drop("Foreign-born").dropDuplicates()

In [15]:
# We will join df_city_temp dataframe with df_city_race to denormalize the data
city = df_city_temp.join(df_city_race,["City","State"])

In [16]:
# We will join df_city_temp dataframe with df_city_race to denormalize the data
city = city.select("City","State",col("Male Population").alias("Male_Population"),
                     col("Female Population").alias("Female_Population"),col("Total Population").alias("Total_Population"),
                     col("State Code").alias("State_Code"), 
                     col("American Indian and Alaska Native").alias("Native_P"),
                     col("Asian").alias("Asian_P"),
                     col("Black or African-American").alias("African_P"),
                     col("Hispanic or Latino").alias("Latino_P"),
                     col("White").alias("White_P"))

In [17]:
#Displaying city dataframe
city.show()

+--------------------+--------------+---------------+-----------------+----------------+----------+--------+-------+---------+--------+-------+
|                City|         State|Male_Population|Female_Population|Total_Population|State_Code|Native_P|Asian_P|African_P|Latino_P|White_P|
+--------------------+--------------+---------------+-----------------+----------------+----------+--------+-------+---------+--------+-------+
|          Cincinnati|          Ohio|         143654|           154883|          298537|        OH|    3362|   7633|   133430|    9121| 162245|
|         Kansas City|        Kansas|          74606|            76655|          151261|        KS|    2749|   7301|    40177|   44342|  96113|
|           Lynchburg|      Virginia|          38614|            41198|           79812|        VA|    1024|   2910|    23271|    2689|  53727|
|              Auburn|    Washington|          36837|            39743|           76580|        WA|    3042|  12341|     4032|   10836| 

In [18]:
# We will join df_city_temp dataframe with df_city_race to denormalize the data
city = city.withColumn("Male_Pop",city.Male_Population.cast(IntegerType()))\
                      .withColumn("Female_Pop",city.Female_Population.cast(IntegerType()))\
                      .withColumn("Total_Pop",city.Total_Population.cast(IntegerType()))\
                      .withColumn("Native_Pop",city.Native_P.cast(IntegerType()))\
                      .withColumn("Asian_Pop",city.Asian_P.cast(IntegerType()))\
                      .withColumn("African_Pop",city.African_P.cast(IntegerType()))\
                      .withColumn("Latino_Pop",city.Latino_P.cast(IntegerType()))\
                      .withColumn("White_Pop",city.White_P.cast(IntegerType()))
drop_cols=['Male_Population','Female_Population','Total Population','Native_P','Asian_P','African_P',
          'Latino_P','White_P']
city = city.drop(*drop_cols)
city.printSchema()
city.show()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Total_Population: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Male_Pop: integer (nullable = true)
 |-- Female_Pop: integer (nullable = true)
 |-- Total_Pop: integer (nullable = true)
 |-- Native_Pop: integer (nullable = true)
 |-- Asian_Pop: integer (nullable = true)
 |-- African_Pop: integer (nullable = true)
 |-- Latino_Pop: integer (nullable = true)
 |-- White_Pop: integer (nullable = true)

+--------------------+--------------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+
|                City|         State|Total_Population|State_Code|Male_Pop|Female_Pop|Total_Pop|Native_Pop|Asian_Pop|African_Pop|Latino_Pop|White_Pop|
+--------------------+--------------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+
|          Cincinnati|          Ohio|    

In [19]:
#Checking a sample record of a random city
city.filter(city.City=='Ontario').show()

+-------+----------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+
|   City|     State|Total_Population|State_Code|Male_Pop|Female_Pop|Total_Pop|Native_Pop|Asian_Pop|African_Pop|Latino_Pop|White_Pop|
+-------+----------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+
|Ontario|California|          171200|        CA|   85059|     86141|   171200|      4304|    14313|      12900|    118292|    74765|
+-------+----------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+



In [20]:
#Writing this transformed dataframe to Parquet files partitioned by State
city.write.partitionBy('State').parquet(os.path.join('city_data','cities.parquet'),'overwrite')

In [21]:
#Reading airport data
df_airport=spark.read.csv("airport-codes_csv.csv",header=True)

In [22]:
# Displaying the first record of Airport Dataset
df_airport.take(1)

[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')]

In [23]:
#Displaying the number of records
print(df_airport.count())

55075


In [24]:
# Printing the schema of airport dataframe
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [25]:
# Finding the number of duplicate rows (if any)
duplicates = df_airport.count() - df_airport.dropDuplicates().count()
print(f"no of duplicates rows = {duplicates}")

no of duplicates rows = 0


In [26]:
# Finding the number of Null values in all the fields in the airport dataset
df_airport_null = df_airport.select([count(when(col(c).isNull(),c)).alias(c) for c in df_airport.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|        7006|        0|          0|         0|        5676|   14045|    45886|     26389|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



In [27]:
# Selecting columns of our interest from df_airport
airport_columns=['type','name','iso_country','iso_region','municipality']
# Creating a new data frame from airport dataframe
df_airport = df_airport.select(airport_columns)
#Displaying sample record from airport dataframe
df_airport.take(1)

[Row(type='heliport', name='Total Rf Heliport', iso_country='US', iso_region='US-PA', municipality='Bensalem')]

In [28]:
#Writing this transformed dataframe to Parquet files partitioned by type
df_airport.write.partitionBy('type').parquet(os.path.join('airport_data','airport.parquet'),'overwrite')

In [29]:
# Reading i94 sample data
df_i94=spark.read.parquet("sas_data")

In [30]:
# Printing i94 dataframe schema
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [31]:
# Finding the number of duplicate rows (if any)
duplicates = df_i94.count() - df_i94.dropDuplicates().count()
print(f"no of duplicates rows = {duplicates}")
print(f"no of rows = {df_i94.count()}")

no of duplicates rows = 0
no of rows = 3096313


In [32]:
# Finding the number of Null values in all the fields in the city dataset
df_i94.select([count(when(col(c).isNull(),c)).alias(c) for c in df_i94.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [33]:
#Casting the values appropriately
df_i94=df_i94.select(col("i94res").cast(IntegerType()),col("i94port"),
                           col("arrdate").cast(IntegerType()),
                           col("i94mode").cast(IntegerType()),col("depdate").cast(IntegerType()),
                           col("i94bir").cast(IntegerType()),col("i94visa").cast(IntegerType()), 
                           col("count").cast(IntegerType()),
                           col("gender"),col("admnum").cast(LongType()))

In [34]:
# Displaying a sample record from i94 dataframe
df_i94.take(1)

[Row(i94res=438, i94port='LOS', arrdate=20574, i94mode=1, depdate=20582, i94bir=40, i94visa=1, count=1, gender='F', admnum=94953870030)]

In [35]:
#Admission number is supposed to be unique
df_i94 = df_i94.dropDuplicates(['admnum'])

In [36]:
# No of rows in i94 dataframe now
df_i94.count()

3075579

In [37]:
# Read port_list text file
i94_port_df = pd.read_csv('./data/port_list.txt',sep='=',names=['id','port'])

# Remove whitespaces and single quotes
i94_port_df['id']=i94_port_df['id'].str.strip().str.replace("'",'')

# Create two columns from i94port string: port_city and port_addr
# also remove whitespaces and single quotes
i94_port_df['portcity'], i94_port_df['portstate']=i94_port_df['port'].str.strip().str.replace("'",'').str.strip().str.split(',',1).str

# Remove more whitespace from port_addr
i94_port_df['portstate']=i94_port_df['portstate'].str.strip()

# Drop port column and keep the two new columns: port_city and port_addr
i94_port_df.drop(columns =['port'], inplace = True)

# Convert pandas dataframe to list (objects which had single quotes removed automatically become string again with single quotes)
i94_port_data=i94_port_df.values.tolist()
i94_port_df.head()
i94port_schema = StructType([
    StructField('id', StringType(), True),
    StructField('portcity', StringType(), True),
    StructField('portstate', StringType(), True)
])
i94port=spark.createDataFrame(i94_port_data, i94port_schema)
i94port.write.mode('overwrite').parquet('data/i94port.parquet')

In [38]:
#Creating list for mode of entry
i94_mode=[[1,'Air'],[2,'Sea'],[3,'Land'],[9,'Not Reported']]

In [39]:
#Creating list for mode of entry
i94_mode_df=spark.createDataFrame(i94_mode)
i94_mode_df.write.mode('overwrite').parquet('data/i94_mode.parquet')

In [40]:
# Reading text file for countries
i94res_df = pd.read_csv('data/country_list.txt',sep='=',names=['id','country'])
# Remove whitespaces and single quotes
i94res_df['country']=i94res_df['country'].str.replace("'",'').str.strip()
# Convert pandas dataframe to list
i94res_data=i94res_df.values.tolist()

In [41]:
# Converting list to spark dataframe
i94res_schema = StructType([
    StructField('id', StringType(), True),
    StructField('country', StringType(), True)
])
i94res=spark.createDataFrame(i94res_data, i94res_schema)

In [42]:
# Creating parquet file
i94res.write.mode('overwrite').parquet('data/i94res.parquet')

In [43]:
#Creating dataframe for visa type
i94visa_data = [[1, 'Business'], [2, 'Pleasure'], [3, 'Student']]
i94visa=spark.createDataFrame(i94visa_data)
i94visa.write.mode('overwrite').parquet('./data/i94visa.parquet')


In [44]:
#Joining i94 dataframe with i94port dataframe
df_i94=df_i94.join(i94port,df_i94.i94port==i94port.id,how='left')

In [45]:
# Displaying a record after join
df_i94.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---+--------+---------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum| id|portcity|portstate|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---+--------+---------+
|   135|    BGM|  20556|      1|  20585|    68|      1|    1|     M|93334314430|BGM|  BANGOR|       ME|
|   117|    BGM|  20563|      1|  20604|    57|      1|    1|     M|93957147230|BGM|  BANGOR|       ME|
|   135|    BGM|  20565|      1|  20569|    58|      1|    1|     M|56546314033|BGM|  BANGOR|       ME|
|   111|    BGM|  20563|      1|  20567|    47|      1|    1|     M|93951228330|BGM|  BANGOR|       ME|
|   111|    BGM|  20574|      1|  20577|    34|      1|    1|     M|95002968630|BGM|  BANGOR|       ME|
|   116|    BGM|  20545|      1|  20547|    68|      1|    1|     M|92504949230|BGM|  BANGOR|       ME|
|   117|    BGM|  20560|      1|  20561|    49|      1|    1|   

In [46]:
# Dropping off id column
df_i94 = df_i94.drop('id')

In [47]:
# Displaying i94 dataframe
df_i94.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+--------+---------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|portcity|portstate|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+--------+---------+
|   135|    BGM|  20556|      1|  20585|    68|      1|    1|     M|93334314430|  BANGOR|       ME|
|   117|    BGM|  20563|      1|  20604|    57|      1|    1|     M|93957147230|  BANGOR|       ME|
|   135|    BGM|  20565|      1|  20569|    58|      1|    1|     M|56546314033|  BANGOR|       ME|
|   111|    BGM|  20563|      1|  20567|    47|      1|    1|     M|93951228330|  BANGOR|       ME|
|   111|    BGM|  20574|      1|  20577|    34|      1|    1|     M|95002968630|  BANGOR|       ME|
|   116|    BGM|  20545|      1|  20547|    68|      1|    1|     M|92504949230|  BANGOR|       ME|
|   117|    BGM|  20560|      1|  20561|    49|      1|    1|     M|93697916730|  BANGOR|       ME|


In [48]:
#Joining i94 dataframe with city dataframe
i94_city= df_i94.join(city,(lower(df_i94.portcity)==lower(city.City)) & (lower(df_i94.portstate)==lower(city.State)),how='left')

In [49]:
# Dropping off City and State columuns from the joined dataframe
i94_city=i94_city.drop("City","State")

In [50]:
# Displaying the joined dataframe
i94_city.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+--------+---------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|portcity|portstate|Total_Population|State_Code|Male_Pop|Female_Pop|Total_Pop|Native_Pop|Asian_Pop|African_Pop|Latino_Pop|White_Pop|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+--------+---------+----------------+----------+--------+----------+---------+----------+---------+-----------+----------+---------+
|   582|    ONT|  20553|      1|  20591|    69|      2|    1|     M|  748039585| ONTARIO|       CA|            null|      null|    null|      null|     null|      null|     null|       null|      null|     null|
|   582|    ONT|  20546|      1|  20549|    30|      2|    1|     F|92521757230| ONTARIO|       CA|            null|      null|    null|      null|     

In [51]:
#Converting SAS date to datetime
get_date = udf(lambda x: (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat() if x else None)
i94_city = i94_city.withColumn("arrival_date", get_date(i94_city.arrdate))

In [52]:
# Displaying sample record
i94_city.take(1)

[Row(i94res=582, i94port='ONT', arrdate=20553, i94mode=1, depdate=20591, i94bir=69, i94visa=2, count=1, gender='M', admnum=748039585, portcity='ONTARIO', portstate='CA', Total_Population=None, State_Code=None, Male_Pop=None, Female_Pop=None, Total_Pop=None, Native_Pop=None, Asian_Pop=None, African_Pop=None, Latino_Pop=None, White_Pop=None, arrival_date='2016-04-09')]

In [53]:
# Creating dataframe for date dimensions
i94date=i94_city.select(col('arrdate').alias('arrival_sasdate'),
                                   col('arrival_date').alias('arrivaldate'),
                                   date_format('arrival_date','M').alias('arrival_month'),
                                   date_format('arrival_date','E').alias('arrival_dayofweek'), 
                                   date_format('arrival_date', 'y').alias('arrival_year'), 
                                   date_format('arrival_date', 'd').alias('arrival_day'),
                                  date_format('arrival_date','w').alias('arrival_weekofyear')).dropDuplicates()

In [54]:
# Displaying date dataframe
i94date.show()

+---------------+-----------+-------------+-----------------+------------+-----------+------------------+
|arrival_sasdate|arrivaldate|arrival_month|arrival_dayofweek|arrival_year|arrival_day|arrival_weekofyear|
+---------------+-----------+-------------+-----------------+------------+-----------+------------------+
|          20558| 2016-04-14|            4|              Thu|        2016|         14|                16|
|          20563| 2016-04-19|            4|              Tue|        2016|         19|                17|
|          20547| 2016-04-03|            4|              Sun|        2016|          3|                15|
|          20559| 2016-04-15|            4|              Fri|        2016|         15|                16|
|          20554| 2016-04-10|            4|              Sun|        2016|         10|                16|
|          20557| 2016-04-13|            4|              Wed|        2016|         13|                16|
|          20551| 2016-04-07|            4|   

In [55]:
# Joining date dataframe with i94 dataframe
i94_full = i94_city.join(i94date,i94_city.arrdate==i94date.arrival_sasdate,how='left')

In [56]:
# Displaying i94_full dataframe
#i94_full.show(1)

In [57]:
# Writing i94 data to parquet files partitioned by year and month
i94_full.write.partitionBy('arrival_year','arrival_month').parquet(os.path.join('i94_data','i94.parquet'),'overwrite')

In [58]:
# Displaying schema of i94_full dataframe
#i94_full.printSchema()

### Step 3: Defining the Data Model
#### 3.1 Conceptual Data Model
We have saved the dimension tables as parquet files and we can implement them on any columnar database in Star Schema model. Star Schema model is a popular model widely used for Data Analysis.

![data model](img/i94_data_model.PNG)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Install the required packages and dependencies like pandas, sql functions, os, configparser, Sparksession
2. Read US Cities Demo dataset file df_city_temp dataframe
3. Create df_city from df_city_temp after selecting the column of interest
4. Drop duplicate rows from df_city
5. Rename the columns and reduce the number of rows by pivoting using Race column
6. Drop the columns which are not required
7. Write (and overwrite) transformed City dataset into parquet file partitioned by State
8. Read Airport dataset into df_airport
9. Select the columns of interest
10. Drop the duplicates
11. Write transformed airport dataset into parquet files partitioned by airport type
12. Read i94 non-immigration dataset to df_i94 dataframe
13. Convert numbers to longtype and integertype
14. Drop rows which have duplicate admission number as this is suppose to be a unique value
15. Read i94port dimension parquet file so we can use it to join with df_i94. This will add i94port city and state columns to df_i94 dataframe
16. Drop id column from i94 dataframe
17. Join city with df_i94 to get fact table i94_city
18. Add iso date format column arrival_date inside the i94_city dataframe by using udf.
19. Join i94_city with i94date to create i94_full dataframe.
20. Save i94_full to parquet file partitioned by year and month of arrival

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Creating the data model
Building the data pipelines to create the data model.

In [59]:
# etl.py will be used to build the data pipeline. Please uncomment line below to execute
#%run etl.py

#### 4.2 Data Quality Checks
Below data quality checks are performed to ensure the pipeline ran as expected.

In [None]:
# Data quality checks

if df_city.count() > 0:
    print('File reading successful')
else:
    print('Problem with reading file')

if city.count() == df_city_race.count():
    print('Transformation went successful')
else:
    print('Some problem with transformation')

if df_i94.count() > 0:
    print('File reading successful')
else:
    print('Problem with reading file')
    
if df_i94.count() == i94_full.count():
    print('Transformation went successful')
else:
    print('Some problem with transformation')

File reading successful
Transformation went successful
File reading successful


#### 4.3 Data dictionary 

#### Fact Table - I94 immigration data joined with the city data on i94port Columns:

- admnum = admission no coming from i94 data,
- i94res = 3 digit code of nationality from i94 non-immigration data,
- i94port = 3 character code of destination USA city from i94 non-immigration data,
- arrdate = arrival date in the USA from i94 non-immigration data,
- i94mode = 1 digit mode of travel from i94 non-immigration data,
- depdate = Departure Date from the USA,
- i94bir = year of birth,
- i94visa = reason for immigration from i94 non-immigration data,
- count = used for statistical metrics from i94 non-immigration data,
- gender = Non-immigrant sex from i94 non-immigration data,
- total_population = Total population of city and state from US cities demographics data,
- state_code = State Code from US cities demographics data,
- male_pop = Male population of city and state from US cities demographics data,
- female_pop = Female population of city and state from US cities demographics data,
- native_pop = Aerican Indian population of city and state from US cities demographics data,
- asian_pop = Asian population of city and state from US cities demographics data,
- african_pop = African population of city and state from US cities demographics data,
- latino_pop = Latino population of city and state from US cities demographics data,
- white_pop = White population of city and state from US cities demographics data
- arrival_date = Arrival date in datetime format coming from i94 data

#### Dimension Table - i94date:
- arrival_sasdate = Arrival date in the USA (SAS date field) from i94 non-immigration data,
- arrival_date = Arrival date in datetime format derived from from i94 non-immigration data,
- arrival_month = Month of arrival coming from i94 non-immigration data,
- arrival_year = Year of arrival coming from i94 non-immigration data,
- arrival_day = Day of arrival coming from i94 non-immigration data,
- arrival_week = Week of arrival coming from i94 non-immigration data,

#### Dimension Table - i94res:
- id = 3 digit code of nationality from i94 non-immigration data,
- country = Country coming from SAS Labels

#### Dimension Table - i94mode:
- id = 1 digit mode of travel code from i94 non-immigration data
- mode_transport = String value coming from SAS Labels

#### Dimension Table - i94port:
- id = Admission 3 character code of destination USA city,
- city = String value of the city code
- state = String value of the state code

#### Step 5: Project Conclusion
* We used opensource Apache PySpark and Python tools that can be easily ported over to cloud such as AWS
* If the data were to increase by 100X we will use EMR Cluster on AWS along with S3 for storage.
* To run the pipleline on a schedule basis we can leverage the cover of Apache Airflow.
* If the database need to be accessed by large number of users then we can load the parquet files to petabyte scale AWS data warehouse solution Redshift.