# Project Title
### Data Engineering Capstone Project

  
#### Project Summary
In this project we will analyze the immigration into United States.We will try to answer following questions

    * Travel pattern vis-a-vis seasons. This will help improve tourist management.
    
    * Find the busiest port of entries during the year. This will help with capacity planning at the airport.
    
    * Analyze relation between port of entry, final destination and demographics of various cities.
 
 We will use the following datasets for this project:
 
    I94 immigration Dataset: This data is from US National Tourism and Trade Office. This file along with other information has entry-exit information of each foreign national coming into US. This information will be source of our fact table in our data model.This dataset has data dictionary which contains lookup values for i94cit &i94res, i94port, i94addr, i94mode and i94visa.
    
    U.S City Demographic Dataset: This data comes from OpenSoft. It has information about the population in a given city of a state eg: total population, average age of the population, etc.This will be the source of state dimension table data.
    
    Airport Codes Dataset: This dataset comes from DataHub. It has information related to airports eg. Airport code, location information, etc. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [61]:
# Do all imports and installs here
import pandas as pd

from pyspark.sql import SparkSession


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### i94 Immigration Dataset
    Immigration dataset is the main dataset, has more than 3 million rows. This data will be the source of our fact table.


In [62]:
# Read in the data here
i94_sample_df=pd.read_csv("immigration_data_sample.csv")

In [63]:
i94_sample_df.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [64]:
pd.set_option('display.max_columns',30)
i94_sample_df.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


#### Lookup tables
    Immigration dataset has codes as the values in several of its columns. To get actual value of these codes, we need lookup values. To get the names of the country for each of these country codes,
    we can we can use these lookup table.


In [65]:
i94Addr_lookup_df=pd.read_csv("i94addr.csv",sep='=')
i94CitRes_lookup_df=pd.read_csv("i94citRes.csv",sep='=')
i94Prtl_lookup_df=pd.read_csv("i94prtl.csv",sep=',')
i94Mode_lookup_df=pd.read_csv("i94mode.csv",sep='=')
i94Visa_lookup_df=pd.read_csv("i94visa.csv",sep='=')



In [69]:
#i94_addr_lookup_df.columns
i94CitRes_lookup_df.columns
#i94Prtl_lookup_df
#i94Mode_lookup_df
#i94Visa_lookup_df

Index(['CCODE', 'CNAME'], dtype='object')

#### Airport Codes Dataset: 
    Contains data from International Air Transport Association (IATA).  Has data aout Airport Codes. This will be source for our Airport Dimention table.

In [10]:
airports_df = pd.read_csv('airport-codes_csv.csv')

In [11]:
airports_df.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

#### US Demographics Dataset: 
    Contains Data about the demographics of all US cities . This is the source for usDemographics table. This together with the immigration fact table will give interesting insigts to people arriving cities Vs city they choose to stay.

In [12]:
us_city_demographics_df = pd.read_csv('us-cities-demographics.csv',sep=';')

In [13]:
us_city_demographics_df.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [14]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [16]:
mainDF_i94=spark.read.parquet("/home/workspace/sas_data")
mainDF_i94[['gender']].distinct().show(10)

+------+
|gender|
+------+
|     F|
|  null|
|     M|
|     U|
|     X|
+------+



In [18]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

# Performing cleaning tasks here
* Remove records where primary key values are null.
* Convert date column to proper date 
* Convert lower case values to upper case
* remove unneccessary columns
* strip spaces from the values in lookup tables.
* Remove records that has port of entry as non-US 
    






#### Exploring Airport Data    

In [19]:
#Total of 55075 rows with 12 columns of which 247 rows have missing iso_country value.
airports_df.shape

(55075, 12)

In [20]:
# More than 50% of the values in  Continent & local_code columns  and 80% in iata_code are missing so we can ignore these columns.
# Filter out rows with iso_country value is missing.

airports_df[airports_df['iso_country'].isna()].shape
airports_df[airports_df['iata_code'].isna()].shape

(45886, 12)

In [21]:
#Since airport dataset does not have airport code, we will use combination municipality & country to map to the airport code i94prtl lookup table.We will ensure 
# We will ensure iso_country & municipality fields are not null.We will also convert it into uppercase to join with the lookup table.

airports_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [22]:
#There are serveral airport type which are either closed or used for recreational purposes and not a potential international flights. we will filter out those.

airports_df=airports_df[['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'local_code', 'coordinates']]

airports_df=airports_df.dropna(subset=['iso_country','municipality'])
airports_df.groupby('type')['type'].count()

typenotin=['balloonport','closed','seaplane_base','heliport']
airports_df_final=airports_df[~airports_df['type'].isin(typenotin)].copy()

airports_df_final['municipality']=airports_df_final['municipality'].str.upper()


airports_df_final.columns
airports_df.groupby('type')['type'].count()


type
balloonport          23
closed             2966
heliport          10927
large_airport       610
medium_airport     4007
seaplane_base       792
small_airport     29929
Name: type, dtype: int64

#### US Demographics Dataset   

In [23]:
#This Dataset has a total 2891 records with 12 columns.

us_city_demographics_df.shape

(2891, 12)

In [24]:
us_city_demographics_df.head(2)
#us_city_demographics_df.columns

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [46]:
#There are no missing values for City,State_code & Race columns, This will form our Primary Key

us_city_demographics_df[['City','State Code','Race']].isnull().values.any()

False

In [49]:
# Need to convert City and State columns values to uppercase.

us_city_demographics_df['City']=us_city_demographics_df['City'].str.upper()
us_city_demographics_df['State']=us_city_demographics_df['State'].str.upper()
us_city_demographics_df_final=us_city_demographics_df
us_city_demographics_df_final.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,SILVER SPRING,MARYLAND,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,QUINCY,MASSACHUSETTS,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


#### Lookup Tables Dataset

    This data is available as part of the data dictionary in the SAS file. Several columns in the immigration dataset has codes, 
    to get to the details of the codes, we need to get it from the lookup tables. These lookup values have been extracted from the data dictionary 
    file 'I94_SAS_Labels_Descriptions.SAS'.
                         

In [70]:
#i94Addr_lookup_df : - This has the code referenced by the column 'i94addr' which is basically a state code ex:AZ
#                    - Has no missing value .
#                    - i94addr is corresponds to scode(State Code) in this lookup and state name can be obtained.
#                    - All the invalid codes are grouped with a value of 99
#                    -*Need to trim the spaces.


i94Addr_lookup_df['SCODE']=i94Addr_lookup_df['SCODE'].str.strip()
i94Addr_lookup_df['SNAME']=i94Addr_lookup_df['SNAME'].str.strip()
i94Addr_lookup_df_final=i94Addr_lookup_df
i94Addr_lookup_df_final.head(2)

Unnamed: 0,SCODE,SNAME
0,AL,ALABAMA
1,AK,ALASKA


In [71]:
#i94CitRes_lookup_df : - This has the code referenced by the column 'i94cit & i94res' representing country of citizenship and country of residency.
#                      - This column has a nemeric code for each of the country ex:242 for Bhutan 
#                      - This has about 289 values , there are few invalid country code. we will keep it for now.
#                      - *Need to trim the leading and trailing spaces.

i94CitRes_lookup_df.info()
i94CitRes_lookup_df['CNAME']=i94CitRes_lookup_df['CNAME'].str.strip()
i94CitRes_lookup_df_final=i94CitRes_lookup_df
i94CitRes_lookup_df_final.head(2)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
CCODE    289 non-null int64
CNAME    289 non-null object
dtypes: int64(1), object(1)
memory usage: 4.6+ KB


Unnamed: 0,CCODE,CNAME
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN


In [72]:
#i94Prtl_lookup_df   : -This has a code referenced by the column 'i94port' reprsenting port of entry into US. 
#                      -This has the city Code. ex: CHI Chigago IL.
#                      -This lookup table has three columns - code, city name, state name.
#                      -Some of the values are invalid.
#                      -*Need to trim the leading and trailing spaces.
#                      -*We will filter out where state value is missing

i94Prtl_lookup_df['PECODE']=i94Prtl_lookup_df['PECODE'].str.strip()
i94Prtl_lookup_df['PECITY']=i94Prtl_lookup_df['PECITY'].str.strip()
i94Prtl_lookup_df['PESTATE']=i94Prtl_lookup_df['PESTATE'].str.strip()

i94Prtl_lookup_df_final=i94Prtl_lookup_df.dropna(subset=['PESTATE'])
i94Prtl_lookup_df_final.head(2)

Unnamed: 0,PECODE,PECITY,PESTATE
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK


In [73]:
#i94Mode_lookup_df   : -This is a code referenced by the 'i94mode' representing mode of entry into US ex: land, sea, air
#                      -This column has a numerical value . ex: 1 for  Air.
#                      -This lookup table has 2 columns - code and value 
#                      -*Need to trim the leading and trailing spaces.
#                      -*Convert the value to uppercase.



i94Mode_lookup_df['MNAME']=i94Mode_lookup_df['MNAME'].str.strip()
i94Mode_lookup_df['MNAME']=i94Mode_lookup_df['MNAME'].str.upper()
i94Mode_lookup_df_final=i94Mode_lookup_df
i94Mode_lookup_df_final


Unnamed: 0,MCODE,MNAME
0,1,AIR
1,2,SEA
2,3,LAND
3,9,NOT REPORTED


In [74]:
#i94Visa_lookup_df   : -This is a code referenced by the 'i94visa' representing the type of visa of traveller :ex: visit, travel
#                      -This column has a numerical value . ex: 1 for  Business.
#                      -This lookup table has 2 columns - code and value 
#                      -*Need to trim the leading and trailing spaces.
#                      -*Convert the value to uppercase.

i94Visa_lookup_df['VNAME']=i94Visa_lookup_df['VNAME'].str.strip()
i94Visa_lookup_df['VNAME']=i94Visa_lookup_df['VNAME'].str.upper()
i94Visa_lookup_df_final=i94Visa_lookup_df
i94Visa_lookup_df_final


Unnamed: 0,VCODE,VNAME
0,1,BUSINESS
1,2,PLEASURE
2,3,STUDENT


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
    There is one fact table , 3 dimention tables, 5 lookup tables.
    
US_DEMOGRPHY_D
----------------
Destination                     Cons    Relation
========================================================
City                             PK     I94prtl_L(port_city)
State_code                       PK     I94prtl_L(port_state)
Race                             PK
Total_population    
Male_population     
Female_population   
Median_age          
Number_of_veterans 
Forgein_born        
Avg_Household_size  
State               

Airport_D
----------------
Destination                  Cons    Relation
========================================================
id                            PK
Airport_type        
Airport_name       
Elevation_feet      
country             
state                                 Trim country from region field
municipality                          I94prtl_L(port_city)


Immigration_F
----------------
Destination                         Cons       Relation
==========================================================================
cicid                                PK
Citizenship_country_code             FK        I94cit_res_L(country_code)
Residence_country_code               FK        I94cit_res_L(country_code)
Port_of_entry                        FK        I94port_L(port_code)
Arrival_date                         FK        Date_Time_D(Date_field)
Departure_date                       FK        Date_Time_D(Date_field)
Entry_mode                           FK        I94Mode_L(mode_code)
Visa_type                            FK        I94visa_L(visa_code)
Destination_state                    FK        I94addr_L(state_code)
age                               
occupation                       
gender                           



Date_Time_D
------------------
Date_field  PK
year
month
day
week
weekday

i94cit_res_L                                i94Mode_L
------------------                     ------------------
country_code PK                        mode_code       PK
country_name                           mode_name


i94visa_L                                i94cit_res_L
-------------------                     ------------------          
visa_code       PK                      country_code    PK
visa_name                               country_name
 
i94prtl_L                                 i94Addr_L
-------------------                    ------------------                                   
Port_code      PK                       state_code   PK
Port_city                               state_name
Port_state

    

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

#### Data Extraction: 
      * Load all the data from files (csv and Parquet)
#### Data Cleanup:
   #### IMMIGRATION_F
      *  Remove the rows where gender is null
      *  Convert arrival and departure date into date format
      *  remove any non-us i94port(port of entry)
      *  insert data into immigration_f fact table
      *  write to parquet file
   #### us_demography_D
      *  convert city and state fields to upper case
      *  insert data into us_demography_d dimension table
   #### Airports_D
      *  remove rows with iso_country value is missing
      *  remove where airport_type=('balloonport', 'closed', 'heliport', 'seaplane_base')
      *  insert data into us_demography_d dimension table
      *  separate country from the region in the iso_region column
   #### i94Addr_L
      *  trim the leading and trailing spaces for all the columns
      *  convert the values into upper case
      *  insert data into i94Addr_l dimension table
   #### i94CitRes_L
      *  trim the leading and trailing spaces for all the columns
      *  convert the values into upper case
      *  insert data into i94CitRes_l dimension table
   #### i94Prtl_L
      *  trim the leading and trailing spaces for all the columns
      *  convert the values into upper case
      *  remove records where state value is missing
      *  insert data into i94Prtl_l dimension table
   #### i94Mode_L
      *  trim the leading and trailing spaces for all the columns
      *  convert the values into upper case
      *  insert data into i94Mode_l dimension table
   #### i94Visa_L
      *  trim the leading and trailing spaces for all the columns
      *  convert the values into upper case
      *  insert data into i94Visa_l dimension table
   
      
        

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [75]:
# Create a Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()


#### Extract 


In [76]:
# Spark reads the data from the source files into their respective DataFrames
mainDF_i94=spark.read.parquet("/home/workspace/sas_data")
airports_df = spark.read.csv('airport-codes_csv.csv',inferSchema=True,header=True)
us_city_demographics_df = spark.read.csv('us-cities-demographics.csv',sep=';',inferSchema=True,header=True)
i94Addr_lookup_df=spark.read.csv("i94addr.csv",sep='=',inferSchema=True,header=True)
i94CitRes_lookup_df=spark.read.csv("i94citRes.csv",sep='=',inferSchema=True,header=True)
i94Prtl_lookup_df=spark.read.csv("i94prtl.csv",sep=',',inferSchema=True,header=True)
i94Mode_lookup_df=spark.read.csv("i94mode.csv",sep='=',inferSchema=True,header=True)
i94Visa_lookup_df=spark.read.csv("i94visa.csv",sep='=',inferSchema=True,header=True)

#### Transform

In [77]:
#Transform the data from airports DataFrame and apply various filters and create a view a dimension table AIRPORT_D

airports_df['ident',
'type',
'name',
'elevation_ft',
'iso_country',
'iso_region',
'municipality'].createOrReplaceTempView('airports_v')

airports_t=spark.sql("""select ident as id,type as airport_type,name as airport_name, iso_country as country, substr(iso_region,instr(iso_region,'-')+1) as state, upper(municipality) as municipality 
             from airports_v
             where (municipality is NOT NULL AND iso_country is NOT NULL AND type NOT IN ('heliport','small_airport','closed','seaplane_base','balloonport'))
           """)

airports_t.createOrReplaceTempView('airports_d')

In [78]:
# Transform the data from US city demography DataFrame and apply various filters and create a view a dimension table US_DEMOGRAPHY_D

us_city_demographics_df[['City',
'Race',
'State Code',
'State',
'Median Age',
'Male Population',
'Female Population',
'Total Population',
'Number of Veterans',
'Foreign-born',
'Average Household Size']].createOrReplaceTempView('us_demography_v')

us_demography_t=spark.sql("""SELECT UPPER(City) as city,
                    UPPER(Race) as race,
                    UPPER(`State Code`) as state_code,
                    State as state,
                    `Median Age` as median_age,
                    `Male Population` as male_population,
                    `Female Population` as female_population,
                    `Total Population` as total_population,
                    `Number of Veterans` as number_of_veterans,
                    `Foreign-born` as number_of_foreign_born,
                    `Average Household Size` as avg_household_size 
            FROM us_demography_v
            WHERE city is NOT NULL and race is NOT NULL and `state code` is NOT NULL """)

us_demography_t.createOrReplaceTempView('us_demography_d')


In [79]:
# Transform the data from various lookup DataFrame and apply clean up routines and create a corresponding view of the lookup tables.
# Transform lookup or data dictionary tables for i94addr, i94citres, i94port, i94mode, i94visa columns of immigration table 

i94Addr_lookup_df.createOrReplaceTempView('i94addr_v')
i94CitRes_lookup_df.createOrReplaceTempView('i94citres_v')
i94Prtl_lookup_df.createOrReplaceTempView('i94port_v')
i94Mode_lookup_df.createOrReplaceTempView('i94mode_v')
i94Visa_lookup_df.createOrReplaceTempView('i94visa_v')

i94addr_t=spark.sql("select trim(scode) state_code,trim(sname) as state_name from i94addr_v")
i94citres_t=spark.sql("select int(ccode) as country_code, UPPER(trim(cname)) as country_name from i94citres_v")
i94port_t=spark.sql("""select trim(PECODE) as port_of_entry_code,trim(PECITY) as port_of_entry_city,trim(PESTATE) as port_of_entry_state
                       from i94port_v 
                       """)
i94mode_t=spark.sql("select trim(MCODE) as mode_code,trim(upper(MNAME)) as mode_name from i94mode_v")
i94visa_t=spark.sql("select int(vcode) as visa_code,trim(upper(vname)) as visa_name from i94visa_v")
i94addr_t.createOrReplaceTempView('i94addr_l')
i94citres_t.createOrReplaceTempView('i94citres_l')
i94port_t.createOrReplaceTempView('i94port_l')
i94mode_t.createOrReplaceTempView('i94mode_l')
i94visa_t.createOrReplaceTempView('i94visa_l')



In [80]:
# Transform the data of immigration  DataFrame and apply clean up routines,filters  and create a corresponding view of the fact table.
mainDF_i94[ 'cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'dtadfile',
 'i94mode',
 'i94addr',
 'depdate',
 'dtaddto',
 'i94bir',
 'i94visa',
 'occup',
 'biryear',
 'gender',
 'visatype'].createOrReplaceTempView('immigration_v')


immigration_t=spark.sql("""select int(cicid) as cicid ,int(i94cit) as Citizenship_country_code,int(i94res) as Residence_country_code,i94port as Port_of_entry,i94mode as Entry_mode ,
 i94addr final_dest_state,to_date(dtadfile,'yyyyMMdd') as arrival_date,to_date(dtaddto,'yyyyMMdd') as departure_date,
 int(i94bir) as age, int(i94visa) Visa_type, occup as occupation, int(biryear) birth_year, gender, visatype admission_type
 from immigration_v
 where gender is NOT NULL
       and date_format(to_date(dtadfile,'yyyyMMdd'),'YYYY') !='2013'
       and i94port IN (select port_of_entry_code from i94port_l where length(port_of_entry_state)=2 )""")

immigration_t.createOrReplaceTempView('immigration_f')

In [81]:
# Create a Date_Time_D dimention table, source as arrival and departure date , apply computational function to extract various aspects of date .
#Create a Date_time_d tables with the values of the arrival and departure date
date_time_df=spark.sql("""select distinct arrival_date arr_dep_date from immigration_f
                     UNION
                     select distinct departure_date arr_dep_date from immigration_f""")
date_time_df.createOrReplaceTempView('time_temp_v')

date_time_t=spark.sql("""select arr_dep_date,year(arr_dep_date) year,month(arr_dep_date) month,day(arr_dep_date) day,
              CASE dayofweek(arr_dep_date) 
              WHEN 0 THEN 'SUN' 
              WHEN 1 THEN 'MON'
              WHEN 2 THEN 'TUE'
              WHEN 3 THEN 'WED'
              WHEN 4 THEN 'THU'
              WHEN 5 THEN 'FRI'
              WHEN 6 THEN 'SAT'
              END weekday,weekofyear(arr_dep_date) week from time_temp_v""")

date_time_t.createOrReplaceTempView('time_d')

#### Load

In [82]:
# Write  all the data  into its repective data models in the form of parquet files.
immigration_t.write.parquet("data_out/immigration_f.parquet",mode='overwrite')
airports_t.write.parquet("data_out/airports_d.parquet",mode='overwrite')
us_demography_t.write.parquet("data_out/us_demography_d.parquet",mode='overwrite')
date_time_t.write.parquet("data_out/date_time_d.parquet",mode='overwrite')
i94addr_t.write.parquet("data_out/i94addr_l.parquet",mode='overwrite')
i94citres_t.write.parquet("data_out/i94citres_l.parquet",mode='overwrite')
i94port_t.write.parquet("data_out/i94port_l.parquet",mode='overwrite')
i94mode_t.write.parquet("data_out/i94mode_l.parquet",mode='overwrite')
i94visa_t.write.parquet("data_out/i94visa_l.parquet",mode='overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [83]:
# Perform quality checks here

def data_quality_check():
    
        immigration_q=spark.read.parquet("data_out/immigration_f.parquet")
        airports_q=spark.read.parquet("data_out/airports_d.parquet")
        us_demography_q=spark.read.parquet("data_out/us_demography_d.parquet")
        date_time_q=spark.read.parquet("data_out/date_time_d.parquet")
        i94addr_q=spark.read.parquet("data_out/i94addr_l.parquet")
        i94citres_q=spark.read.parquet("data_out/i94citres_l.parquet")
        i94port_q=spark.read.parquet("data_out/i94port_l.parquet")
        i94mode_q=spark.read.parquet("data_out/i94mode_l.parquet")
        i94visa_q=spark.read.parquet("data_out/i94visa_l.parquet")

        #Check if there are null values 
        v_immi_null=immigration_q.filter(immigration_q.gender.isNull()).count()
        v_airport_null=airports_q.filter(airports_q['airport_type']=='closed').count()
        v_demography_null=us_demography_q.filter(us_demography_q.city.isNull()).count()

        if v_immi_null==0 and v_airport_null==0 and v_demography_null==0:
            v_all_null_check=0
            print('Null Value Check.....................Pass')
        else:
            v_all_null_check=1
            print('Null Value Check.....................Fail')

        # Check if all dates from immigration arrival date in time_d table
        v_immi_arrival_date = spark.sql("""select distinct arrival_date from immigration_f
                     MINUS
                     select arr_dep_date from time_d""").count()
        if v_immi_arrival_date==0:
            print("Arrival date check ..................Pass")
        else:
            print("Arrival date check ..................Fail")

        # Check for primary key values are distinct
        v_imm_pk=spark.sql("""select A-B FROM (select count(cicid) A,count(distinct cicid) B  from immigration_f )""").collect()[0][0]
        v_airport_pk=spark.sql("""select A-B FROM (select count(id) A,count(distinct id) B  from airports_d )""").collect()[0][0]
        v_dem_pk=spark.sql("""select A-B FROM (select count(city,race,state_code) A,count(distinct city,race,state_code) B  from us_demography_d) """).collect()[0][0]
        v_time_pk=spark.sql( """select A-B FROM (select count(arr_dep_date) A, count(distinct arr_dep_date) B from time_d)""").collect()[0][0]
        v_i94cit_pk=spark.sql( """select A-B FROM (select count(country_code) A, count(distinct country_code) B from i94citres_l)""").collect()[0][0]
        v_i94mode_pk=spark.sql( """select A-B FROM (select count(mode_code) A, count(distinct mode_code) B from i94Mode_L)""").collect()[0][0]
        v_i94visa_pk=spark.sql( """select A-B FROM (select count(visa_code) A, count(distinct visa_code) B from i94visa_L) """).collect()[0][0]
        v_i94port_pk=spark.sql( """select A-B FROM (select count( port_of_entry_code) A, count(distinct  port_of_entry_code) B from i94port_l) """).collect()[0][0]
        i94addr_pk=spark.sql( """select A-B FROM (select count(state_code) A, count(distinct state_code) B from i94Addr_L) """).collect()[0][0]

        if (v_imm_pk==0 and v_airport_pk==0 and v_dem_pk==0 and v_time_pk==0 and v_i94cit_pk==0 and v_i94mode_pk==0 and v_i94visa_pk==0 and v_i94port_pk==0 and i94addr_pk==0) :
            v_all_pk_check=0
            print("Distinct primary key values check ...Pass")
        else:
            v_all_pk_check=1
            print("Distinct primary key values check ...Fail")
        
        if (v_all_pk_check==0 and v_immi_arrival_date==0 and v_all_null_check==0):
            return True
        else:
            return False
        
                
data_check=data_quality_check()

if data_check:
    print("===============================================")
    print("      All Data Quality Checks  :     PASSED    ")
    print("===============================================")
else:
    print("===============================================")
    print("      All Data Quality Checks  :     FAILED    ")
    print("===============================================")



Null Value Check.....................Pass
Arrival date check ..................Pass
Distinct primary key values check ...Pass
      All Data Quality Checks  :     PASSED    


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
Please refer to the file "DataDictionary_Fig.png" for a pictorial representation of the DataDictionary


#### Step 5: Complete Project Write Up

* The tool used for data processing is Spark(pyspark) as the main dataset which is the immigration dataset is about 2.5 million records and spark is a good tool to process large data.

* This tool/program can handle larger data without making much changes. I have used spark to process smaller datasets just to be consistent.

* The immigration data which happens to be our main fact table is posted monthly to trade.gov, we should run this process monthly.This can be scheduled via cron utility.

Different Scenario:
   
   * If the data is increated by 100x -  The source data files will be stored in aws s3 bucktes. then we can use Redshift as the target database. This is the AWS datawarehouse database and will enable a low latency inserts and fast querying option.The current data processing (pyspark program) will be able to handle 100x data. We will run this program on Spark frame work on AWS EMR cluster.
   
   * The data populates a dashboard that must be updated on a daily basis by 7am every day. We can use Airflow to execute this data pipeline. This pipeline, will first Extract the data from aws s3 buckets, Transforms the data and Loads the data into aws redshift database. Pipeline also performs data quality checks . Pipeline will also pre-aggregates the data into OLAP cubes and stores it into Amazon RDS database and sends email to relavant teams.

   *  The database needed to be accessed by 100+ people .  Once the data is ready in the Amazon redshift database(DWH) to be consumed, this DWH data is pre-aggregated into OLAP cubes and are stored in Amazon RDS database which then is consumed by the BI Apps visualization dashboards. Amazon RDS can easily be accessed by 100+ simultaneous users and can be scaled up even further.



#### Sample output from each of the saved output tables

In [84]:
immigration_f=spark.read.parquet("data_out/immigration_f.parquet")
airports_d=spark.read.parquet("data_out/airports_d.parquet")
us_demography_d=spark.read.parquet("data_out/us_demography_d.parquet")
date_time_d=spark.read.parquet("data_out/date_time_d.parquet")
i94addr_l=spark.read.parquet("data_out/i94addr_l.parquet")
i94citres_l=spark.read.parquet("data_out/i94citres_l.parquet")
i94port_l=spark.read.parquet("data_out/i94port_l.parquet")
i94mode_l=spark.read.parquet("data_out/i94mode_l.parquet")
i94visa_l=spark.read.parquet("data_out/i94visa_l.parquet")

In [85]:
#Immigration Fact table

immigration_f.show(2)

+-------+------------------------+----------------------+-------------+----------+----------------+------------+--------------+---+---------+----------+----------+------+--------------+
|  cicid|Citizenship_country_code|Residence_country_code|Port_of_entry|Entry_mode|final_dest_state|arrival_date|departure_date|age|Visa_type|occupation|birth_year|gender|admission_type|
+-------+------------------------+----------------------+-------------+----------+----------------+------------+--------------+---+---------+----------+----------+------+--------------+
|5748517|                     245|                   438|          LOS|       1.0|              CA|  2016-04-30|          null| 40|        1|      null|      1976|     F|            B1|
|5748518|                     245|                   438|          LOS|       1.0|              NV|  2016-04-30|          null| 32|        1|      null|      1984|     F|            B1|
+-------+------------------------+----------------------+-------------

In [86]:
airports_d.show(2)
us_demography_d.show(2)
date_time_d.show(2)
i94addr_l.show(2)
i94citres_l.show(2)
i94port_l.show(2)
i94mode_l.show(2)
i94visa_l.show(2)

+----+--------------+--------------------+-------+-----+------------+
|  id|  airport_type|        airport_name|country|state|municipality|
+----+--------------+--------------------+-------+-----+------------+
| 5A8|medium_airport|Aleknagik / New A...|     US|   AK|   ALEKNAGIK|
|AGGH|medium_airport|Honiara Internati...|     SB|   CT|     HONIARA|
+----+--------------+--------------------+-------+-----+------------+
only showing top 2 rows

+-------------+------------------+----------+-------------+----------+---------------+-----------------+----------------+------------------+----------------------+------------------+
|         city|              race|state_code|        state|median_age|male_population|female_population|total_population|number_of_veterans|number_of_foreign_born|avg_household_size|
+-------------+------------------+----------+-------------+----------+---------------+-----------------+----------------+------------------+----------------------+------------------+
|SILVE