# Project Title
### Data Engineering Capstone Project

#### Project Summary
This Project aims to provide an analytical model for immigrations to United States.It'll provide insights needed as well as figures that help in decision making.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
#all imports and installs here
import pandas as pd
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType
from pyspark.sql import SparkSession
import datetime

Initiating spark session

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

Define the Functions that will be used

In [3]:
#This function will check if there exists an earlier load for the table or this is the first time
def check_table(table_path):
    try:
        song_table_df = spark.read.parquet(table_path)
        return 'TRUE'
    except:
        return 'FALSE'

Set the config parameters that will be used

In [4]:
landing_zone_path="./Landing Zone/"
dwh_model_path="./DWH Model/"

### Step 1: Scope the Project and Gather Data

#### Scope 
The Solution will provide analytical model.The Model is dimensional star schema form.This is otimum for Data warehouse solutions to avail best performance for the analytical queries that will run on top of the model.

The Solution is based on:
1.Landing the source files in the landing zone.
2.Read the data in spark tables (schema on read).
3.Transforming the data to the analytical model in the dimensional star schema form.

The data extraction , transformations & loading is done with Pyspark SQL. The data is transformed and written to parque files.

#### Describe and Gather Data 
The data sets consists of the below source files:

1."i94_apr16_sub.sas7bdat":
Holds the Immigration transactions along with all the facts and measures related to it (City immigrated from , US city immigrated to,arrival date,arrival port,immigration mode,departure date,birthdate,visa type,gender,airline used).
The files are extracted on monthly basis.

2."us-cities-demographics.csv":
Holds the US cities details (city, state,median age of citizens,male population , female population,total population, foreign-born citizens)

3."city.txt":
Holds the countries names and corresponding codes , extracted from "I94_SAS_Labels_Description.SAS" file

4."airport-codes_csv.csv":
Holds the airports all details.

5."transport_mode.txt":
Holds transport modes used by immigrants ('Air','Sea',etc...) ,extracted from "I94_SAS_Labels_Description.SAS" file

6."visa_type.txt":
Holds the different type of Visa types and corresponding codes,extracted from "I94_SAS_Labels_Description.SAS" file

### Step 2: Explore and Assess the Data
#### Explore the Data 
Reading the data ,Identify data quality issues, like missing values in keys and maindatory values, duplicate data, granularity,checking distinct values.

#### Cleaning Steps
1.Filtering the required set of data only.

2.Removing records having null values in maindatory columns

3.Setting the required granularity

Read Immigration data and load to schema on read staging table

In [5]:
stg_immigration_df =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

Check Sample data

In [6]:
stg_immigration_df.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


Check Schema and data types as well as nullability

In [7]:
stg_immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Read the data on spark table (schema on read) 

(Hint:We will work on 1M Records) 

In [8]:
stg_immigration_df.limit(1000000).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1.897628e+09,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3.736796e+09,00296,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,6.666432e+08,93,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,9.246846e+10,00199,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,9.246846e+10,00199,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,9.247104e+10,00602,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,9.247140e+10,00602,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,9.247161e+10,00602,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,9.247080e+10,00602,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,9.247849e+10,00608,B1


In [9]:
stg_immigration_df.limit(1000000).createOrReplaceTempView("stg_immigration_transactions")

In [10]:
spark.sql('select  count(*) from stg_immigration_transactions ').show()

+--------+
|count(1)|
+--------+
| 1000000|
+--------+



Check the values and their frequency as well as null values

In [11]:
spark.sql('select distinct i94yr,i94mon from stg_immigration_transactions').show()

+------+------+
| i94yr|i94mon|
+------+------+
|2016.0|   4.0|
+------+------+



In [12]:
spark.sql('select  count(*), i94res from stg_immigration_transactions group by i94res').show()

+--------+------+
|count(1)|i94res|
+--------+------+
|   14531| 692.0|
|   28101| 276.0|
|     295| 101.0|
|    4554| 110.0|
|   22784| 117.0|
|   56760| 112.0|
|   12722| 251.0|
|      44| 102.0|
|    5533| 103.0|
|    8141| 104.0|
|   70820| 111.0|
|      79| 119.0|
|   16351| 123.0|
|    7209| 124.0|
|   16460| 131.0|
|  150330| 135.0|
|    7854| 260.0|
|    3031| 296.0|
|     964| 297.0|
|    4543| 343.0|
+--------+------+
only showing top 20 rows



In [13]:
spark.sql('select   count(*) from stg_immigration_transactions where i94res is null').show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [14]:
spark.sql('select   count(*) from stg_immigration_transactions where i94port is null').show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



Read Airport Data

In [15]:
stg_airport_schema = StructType([
        StructField('ident', StringType()),
        StructField('type', StringType()),
        StructField('name', StringType()),
        StructField('elevation_ft', IntegerType()),
        StructField('continent', StringType()),
        StructField('iso_country', StringType()),
        StructField('iso_region', StringType()),
        StructField('municipality', StringType()),
        StructField('gps_code', StringType()),
        StructField('iata_code', StringType()),
        StructField('local_code', StringType()),
        StructField('coordinates', StringType())
    ])

In [16]:
stg_airport_df =spark.read.csv('{}/airport-codes_csv.csv'.format(landing_zone_path),header=True,schema=stg_airport_schema)

In [17]:
stg_airport_df.limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [18]:
stg_airport_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [19]:
stg_airport_df.createOrReplaceTempView("stg_airport_prep")

In [20]:
spark.sql('Drop table if exists stg_airport')

DataFrame[]

Prepare and clense Airport Table

In [21]:
spark.sql('create table stg_airport as SELECT name,type,elevation_ft,iso_region,municipality,gps_code,iata_code,local_code ,coordinates  \
          FROM stg_airport_prep where iso_country="US" and local_code is not null and iata_code is not null \
           ').toPandas()

In [22]:
spark.sql(' SELECT name,type,elevation_ft,iso_region,municipality,gps_code,iata_code,local_code,coordinates  \
          FROM stg_airport').toPandas()

Unnamed: 0,name,type,elevation_ft,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,Ocean Reef Club Airport,small_airport,8.0,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
1,Pilot Station Airport,small_airport,305.0,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
2,Crested Butte Airpark,small_airport,8980.0,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
3,LBJ Ranch Airport,small_airport,1515.0,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"
4,Metropolitan Airport,small_airport,418.0,US-MA,Palmer,13MA,PMX,13MA,"-72.31140136719999, 42.223300933800004"
5,Loring Seaplane Base,seaplane_base,0.0,US-AK,Loring,13Z,WLR,13Z,"-131.636993408, 55.6012992859"
6,Nunapitchuk Airport,small_airport,12.0,US-AK,Nunapitchuk,PPIT,NUP,16A,"-162.440454, 60.905591"
7,Port Alice Seaplane Base,seaplane_base,0.0,US-AK,Port Alice,16K,PTC,16K,"-133.597, 55.803"
8,Icy Bay Airport,small_airport,50.0,US-AK,Icy Bay,19AK,ICY,19AK,"-141.662002563, 59.96900177"
9,Port Protection Seaplane Base,seaplane_base,0.0,US-AK,Port Protection,19P,PPV,19P,"-133.61000061035, 56.328800201416"


In [23]:
spark.sql(' SELECT count(*)  \
          FROM stg_airport').toPandas()

Unnamed: 0,count(1)
0,1969


In [24]:
spark.sql('select count(*), local_code from stg_airport group by local_code having count(*) > 1').show()

+--------+----------+
|count(1)|local_code|
+--------+----------+
+--------+----------+



Read US City Data

In [25]:
stg_us_city_schema = StructType([
        StructField('City', StringType()),
        StructField('State', StringType()),
        StructField('Median Age', DoubleType()),
        StructField('Male Population', IntegerType()),
        StructField('Female Population', IntegerType()),
        StructField('Total Population', IntegerType()),
        StructField('Number of Veterans', IntegerType()),
        StructField('Foreign-born', IntegerType()),
        StructField('Average Household Size', DoubleType()),
        StructField('State Code', StringType()),
        StructField('Race', StringType()),
        StructField('Count', IntegerType())
    ])

In [26]:
stg_us_city_df=spark.read.option("delimiter", ";").csv('{}/us-cities-demographics.csv'.format(landing_zone_path),header=True,schema=stg_us_city_schema)

In [27]:
stg_us_city_df.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


In [28]:
stg_us_city_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [29]:
stg_us_city_df.createOrReplaceTempView("stg_us_city_prep")

In [30]:
spark.sql('SELECT * from stg_us_city_prep where "state code" is null').toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


Prepare and clense City Data to be a dimension on the state granularity and perform a US State Dimension

In [31]:
spark.sql('drop table IF EXISTS stg_us_state')

DataFrame[]

In [32]:
spark.sql('create table stg_us_state as SELECT state ,state_code , SUM(male_population) as male_population ,sum(female_population) as female_population,\
          sum(total_population) as total_population,sum(number_of_veterans) as number_of_veterans , \
         sum(foreign_born) as foreign_born, sum(Count)  as count from(\
          SELECT  state ,`State Code` as state_code  ,`Male Population` as male_population ,`Female Population` as female_population,\
          `Total Population` as total_population  ,`Number of Veterans` as number_of_veterans , \
         `Foreign-born` as foreign_born, `Average Household Size` as average_household_size, sum(Count)  as count\
           FROM stg_us_city_prep   group by \
           state ,`State Code`  ,`Median Age` ,`Male Population` ,`Female Population` , `Total Population`  ,`Number of Veterans` ,\
            `Foreign-born` , `Average Household Size`)A \
          group by state ,state_code').toPandas()

In [33]:
spark.sql('SELECT  state,state_code,male_population,female_population,total_population,number_of_veterans,foreign_born,count from stg_us_state').toPandas()

Unnamed: 0,state,state_code,male_population,female_population,total_population,number_of_veterans,foreign_born,count
0,Missouri,MO,733262,785932,1519194,91453.0,99095.0,1619575
1,Idaho,ID,199103,199780,398883,26380.0,28126.0,446968
2,Kansas,KS,564145,584129,1148274,64789.0,118645.0,1326815
3,Maine,ME,31480,35392,66872,3666.0,9229.0,71861
4,Ohio,OH,1177546,1256143,2433689,127372.0,175219.0,2651621
5,Illinois,IL,2218541,2343771,4562312,146701.0,941735.0,5375987
6,Pennsylvania,PA,1117708,1213148,2330856,104850.0,290860.0,2675056
7,Indiana,IN,910346,972407,1882753,92165.0,147233.0,2062597
8,Delaware,DE,32680,39277,71957,3063.0,3336.0,75048
9,Hawaii,HI,176807,175959,352766,23213.0,101312.0,393445


In [34]:
spark.sql('SELECT count(*) from stg_us_state').toPandas()

Unnamed: 0,count(1)
0,49


In [35]:
spark.sql('SELECT count(*),state_code from stg_us_state group by state_code having state_code > 1').toPandas()

Unnamed: 0,count(1),state_code


In [36]:
spark.sql('SELECT * from stg_us_state where state_code is null').toPandas()

Unnamed: 0,state,state_code,male_population,female_population,total_population,number_of_veterans,foreign_born,count


Read Cities file

In [37]:
stg_all_city_schema = StructType([
        StructField('city_cd', IntegerType()),
        StructField('city', StringType())
    ])

In [38]:
stg_all_city_df =spark.read.csv('{}/city.txt'.format(landing_zone_path),header=True,schema=stg_all_city_schema)

In [39]:
stg_all_city_df.limit(10).toPandas()

Unnamed: 0,city_cd,city
0,582,MEXICO Air Sea
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


In [40]:
stg_all_city_df.printSchema()

root
 |-- city_cd: integer (nullable = true)
 |-- city: string (nullable = true)



In [41]:
stg_all_city_df.createOrReplaceTempView("stg_all_city")

In [42]:
spark.sql('SELECT count(*) from stg_all_city ').toPandas()

Unnamed: 0,count(1)
0,289


Read Visa type file

In [43]:
stg_visa_type_schema = StructType([
        StructField('visa_cd', IntegerType()),
        StructField('visa', StringType())
    ])

In [44]:
stg_visa_type_df =spark.read.csv('{}/visa_type.txt'.format(landing_zone_path),header=True,schema=stg_visa_type_schema)

In [45]:
stg_visa_type_df.limit(10).toPandas()

Unnamed: 0,visa_cd,visa
0,1,Business
1,2,Pleasure
2,3,Student


In [46]:
stg_visa_type_df.printSchema()

root
 |-- visa_cd: integer (nullable = true)
 |-- visa: string (nullable = true)



In [47]:
stg_visa_type_df.createOrReplaceTempView("stg_visa_type")

Read transportation Mode data

In [48]:
stg_trans_mode_schema = StructType([
        StructField('trans_mode_cd', IntegerType()),
        StructField('trans_mode', StringType())
    ])

In [49]:
stg_trans_mode_df =spark.read.csv('{}/transport_mode.txt'.format(landing_zone_path),header=True,schema=stg_trans_mode_schema)

In [50]:
stg_trans_mode_df.limit(10).toPandas()

Unnamed: 0,trans_mode_cd,trans_mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not Reported


In [51]:
stg_trans_mode_df.printSchema()

root
 |-- trans_mode_cd: integer (nullable = true)
 |-- trans_mode: string (nullable = true)



In [52]:
stg_trans_mode_df.createOrReplaceTempView("stg_trans_mode")

Read date data

In [53]:
spark.sql("Drop table if exists stg_date")

DataFrame[]

In [54]:
spark.sql(" create table stg_date as select date_add(cast(to_timestamp('01-01-1960', 'mm-dd-yyyy') as date),arrdate ) as dt  from stg_immigration_transactions where arrdate is not null \
 union     select date_add(cast(to_timestamp('01-01-1960', 'mm-dd-yyyy') as date),depdate) as dt from stg_immigration_transactions where depdate is not null")

DataFrame[]

In [55]:
spark.sql("select count(*) from stg_date where dt is null").toPandas()

Unnamed: 0,count(1)
0,0


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The conceptual Data Model is as follows:
1.City: entity resembling the City that the immigrant is immigrating from.

2.US_State: entity resembling US state that the immigrant is immigrating to

3.Airport: entity resembling the airport that the immigrant arrived to

4.Visa Type: entity that resembling type of immigrant visa

5.Transportation mode: entity resembling the transportation mode that the immigrant used to immigrate.

6.Date: entity resembling the date with all its details

7.Immigration stream: Transactions for the immigrations to the us states

#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model
1.Stage the data into staging tables based on "Schema on read" tables

2.check if the dimension table already exists (i.e this is an incremental run) or the table is not existing in the DWH (i.e this is initial load)

3.load the incremental data in dimension tables

4.load transactional data into Fact table (immigration_fact)

Load the Data Model which is a star schema dimensional model consisting of Fact_Immigration surrounded by denormalized dimensions :
All_City_Dim ,
US_state_Dim ,
Airport_Dim ,
Visa_type_Dim ,
Transport_mode_Dim,
Date_Dim

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Load All_City_Dim

In [240]:
print('Load Dim_All_City')

Load Dim_All_City


In [241]:
dim_all_city_path='{}/all_city_dim.parquet'

In [242]:
#check if the parquet file is loaded before or not
tab_exist_flg=check_table(dim_all_city_path.format(dwh_model_path))

In [243]:
tab_exist_flg

'TRUE'

In [244]:
#if loaded before then make left join to increment on the existing data else load the full delta
if tab_exist_flg=='TRUE':
    dim_all_city_df=spark.read.parquet(dim_all_city_path.format(dwh_model_path))
    dim_all_city_df.createOrReplaceTempView("dim_all_city")
    all_city_insrt_df=spark.sql('SELECT city_cd ,city from stg_all_city where city_cd not in (select city_cd from dim_all_city) and city_cd is not null')
    all_city_insrt_df.write.parquet(dim_all_city_path.format(dwh_model_path), 'append')
else:
    all_city_insrt_df=spark.sql('SELECT city_cd ,city from stg_all_city where city_cd is not null')
    all_city_insrt_df.write.parquet(dim_all_city_path.format(dwh_model_path), 'append')

In [245]:
dim_all_city_df=spark.read.parquet(dim_all_city_path.format(dwh_model_path))
dim_all_city_df.createOrReplaceTempView("dim_all_city")

In [246]:
dim_all_city_df.limit(10).toPandas()

Unnamed: 0,city_cd,city
0,582,MEXICO Air Sea
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


In [247]:
spark.sql('SELECT count(*)  from  dim_all_city').toPandas()

Unnamed: 0,count(1)
0,287


Load Dim_US_city data

In [107]:
print('Load Dim_US_State')

Load Dim_US_State


In [108]:
dim_us_state_path='{}/us_state_dim.parquet'

In [109]:
#check if the parquet file is loaded before or not
tab_exist_flg=check_table(dim_us_state_path.format(dwh_model_path))

In [110]:
tab_exist_flg

'TRUE'

In [111]:
#if loaded before then make left join to increment on the existing data else load the full delta
if tab_exist_flg=='TRUE':
    dim_us_state_df=spark.read.parquet(dim_us_state_path.format(dwh_model_path))
    dim_us_state_df.createOrReplaceTempView("dim_us_state")
    us_state_insrt_df=spark.sql('SELECT  state,state_code,male_population,female_population,total_population,number_of_veterans,foreign_born,count from stg_us_state \
    where state_code not in (select state_code from dim_us_state) ')
    us_state_insrt_df.write.parquet(dim_us_state_path.format(dwh_model_path), 'append')
else:
    us_state_insrt_df=spark.sql('SELECT  state,state_code,male_population,female_population,total_population,number_of_veterans,foreign_born,count from stg_us_state')
    us_state_insrt_df.write.parquet(dim_us_state_path.format(dwh_model_path), 'append')

In [112]:
dim_us_state_df=spark.read.parquet(dim_us_state_path.format(dwh_model_path))
dim_us_state_df.createOrReplaceTempView("dim_us_state")

In [113]:
dim_us_state_df.count()

49

Load Dim_airport data

In [114]:
print('Load Dim_airport')

Load Dim_airport


In [115]:
dim_airport_path='{}/airport_dim.parquet'

In [116]:
#check if the parquet file is loaded before or not
tab_exist_flg=check_table(dim_airport_path.format(dwh_model_path))

In [117]:
tab_exist_flg

'TRUE'

In [118]:
#if loaded before then make left join to increment on the existing data else load the full delta
if tab_exist_flg=='TRUE':
    dim_airport_df=spark.read.parquet(dim_airport_path.format(dwh_model_path))
    dim_airport_df.createOrReplaceTempView("dim_airport")
    airport_insrt_df=spark.sql('SELECT name,type,elevation_ft,iso_region,municipality,gps_code,iata_code,local_code,coordinates  \
          FROM stg_airport \
          where local_code not in (select local_code from dim_airport) ')
    airport_insrt_df.write.parquet(dim_airport_path.format(dwh_model_path), 'append')
else:
    airport_insrt_df=spark.sql('SELECT name,type,elevation_ft,iso_region,municipality,gps_code,iata_code,local_code,coordinates  \
          FROM stg_airport')
    airport_insrt_df.write.parquet(dim_airport_path.format(dwh_model_path), 'append')

In [119]:
dim_airport_df=spark.read.parquet(dim_airport_path.format(dwh_model_path))
dim_airport_df.createOrReplaceTempView("dim_airport")

In [120]:
dim_airport_df.count()

1969

Load Dim_visa_type data

In [121]:
print('Load Dim_visa_type')

Load Dim_visa_type


In [122]:
dim_visa_type_path='{}/visa_type_dim.parquet'.format(dwh_model_path)

In [123]:
#check if the parquet file is loaded before or not
tab_exist_flg=check_table(dim_visa_type_path.format(dwh_model_path))

In [124]:
tab_exist_flg

'TRUE'

In [125]:
#if loaded before then make left join to increment on the existing data else load the full delta
if tab_exist_flg=='TRUE':
    dim_visa_type_df=spark.read.parquet(dim_visa_type_path.format(dwh_model_path))
    dim_visa_type_df.createOrReplaceTempView("dim_visa_type")
    visa_type_insrt_df=spark.sql('SELECT visa_cd,visa  \
          FROM stg_visa_type \
          where visa_cd not in (select visa_cd from dim_visa_type) ')
    visa_type_insrt_df.write.parquet(dim_visa_type_path.format(dwh_model_path), 'append')
else:
    visa_type_insrt_df=spark.sql('SELECT visa_cd, visa  \
          FROM stg_visa_type')
    visa_type_insrt_df.write.parquet(dim_visa_type_path.format(dwh_model_path), 'append')

In [126]:
dim_visa_type_df=spark.read.parquet(dim_visa_type_path.format(dwh_model_path))
dim_visa_type_df.createOrReplaceTempView("dim_visa_type")

In [127]:
dim_visa_type_df.count()

3

Load Dim_transp_mode

In [128]:
print('Load Dim_trans_mode')

Load Dim_trans_mode


In [129]:
dim_trans_mode_path='{}/trans_mode_dim.parquet'.format(dwh_model_path)

In [130]:
#check if the parquet file is loaded before or not
tab_exist_flg=check_table(dim_trans_mode_path.format(dwh_model_path))

In [131]:
tab_exist_flg

'TRUE'

In [132]:
#if loaded before then make left join to increment on the existing data else load the full delta
if tab_exist_flg=='TRUE':
    dim_trans_mode_df=spark.read.parquet(dim_trans_mode_path.format(dwh_model_path))
    dim_trans_mode_df.createOrReplaceTempView("dim_trans_mode")
    trans_mode_insrt_df=spark.sql('SELECT trans_mode_cd,trans_mode  \
          FROM stg_trans_mode \
          where trans_mode_cd not in (select trans_mode_cd from dim_trans_mode) ')
    trans_mode_insrt_df.write.parquet(dim_trans_mode_path.format(dwh_model_path), 'append')
else:
    trans_mode_insrt_df=spark.sql('SELECT trans_mode_cd, trans_mode  \
          FROM stg_trans_mode')
    trans_mode_insrt_df.write.parquet(dim_trans_mode_path.format(dwh_model_path), 'append')

In [133]:
dim_trans_mode_df=spark.read.parquet(dim_trans_mode_path.format(dwh_model_path))
dim_trans_mode_df.createOrReplaceTempView("dim_trans_mode")

In [134]:
dim_trans_mode_df.count()

4

Load dim_date Table

In [135]:
print('Load Dim_date'.format(dwh_model_path))

Load Dim_date


In [136]:
dim_date_path='{}/date_dim.parquet'.format(dwh_model_path)

In [137]:
#check if the parquet file is loaded before or not
tab_exist_flg=check_table(dim_date_path.format(dwh_model_path))

In [138]:
tab_exist_flg

'TRUE'

In [139]:
spark.sql('SELECT *\
          FROM stg_date').show()

+----------+
|        dt|
+----------+
|2016-05-08|
|2016-09-05|
|2016-05-12|
|2016-04-05|
|2016-08-17|
|2016-01-31|
|2016-06-15|
|2016-04-30|
|2016-08-13|
|2016-06-27|
|2016-04-23|
|2016-06-09|
|2016-06-21|
|2016-08-24|
|2016-07-03|
|2016-08-23|
|2016-07-09|
|2016-09-03|
|2016-04-04|
|2016-06-13|
+----------+
only showing top 20 rows



In [140]:
#if loaded before then make left join to increment on the existing data else load the full delta
if tab_exist_flg=='TRUE':
    dim_date_df=spark.read.parquet(dim_date_path.format(dwh_model_path))
    dim_date_df.createOrReplaceTempView("dim_date")
    date_insrt_df=spark.sql('SELECT dt, dayofmonth(dt) as day, weekofyear(dt) as week, \
                                                month(dt) as month, year(dt) as year, dayofweek(dt) as weekday  \
          FROM stg_date \
          where dt not in (select dt from dim_date) ')
    date_insrt_df.write.parquet(dim_date_path.format(dwh_model_path), 'append')
else:
    date_insrt_df=spark.sql('SELECT dt, dayofmonth(dt) as day, weekofyear(dt) as week, \
                                                month(dt) as month, year(dt) as year, dayofweek(dt) as weekday  \
          FROM stg_date')
    date_insrt_df.write.parquet(dim_date_path.format(dwh_model_path), 'append')

In [141]:
dim_date_df=spark.read.parquet(dim_date_path.format(dwh_model_path))
dim_date_df.createOrReplaceTempView("dim_date")

In [142]:
dim_date_df.limit(10).toPandas()

Unnamed: 0,dt,day,week,month,year,weekday
0,2016-08-05,5,31,8,2016,6
1,2016-04-26,26,17,4,2016,3
2,2016-05-13,13,19,5,2016,6
3,2016-05-31,31,22,5,2016,3
4,2016-06-30,30,26,6,2016,5
5,2016-06-03,3,22,6,2016,6
6,2016-07-20,20,29,7,2016,4
7,2016-07-06,6,27,7,2016,4
8,2016-05-03,3,18,5,2016,3
9,2016-08-31,31,35,8,2016,4


In [143]:
dim_date_df.count()

186

Load The Fact_Immigration Table

In [144]:
print('Load Fact_Immigration Table')

Load Fact_Immigration Table


In [145]:
fact_immig_path='{}/immig_fact.parquet'

In [146]:
immig_insrt_df=spark.sql("select \
cast(cast(i94yr as integer) as string)||''||case when length(cast(cast(i94mon as integer) as string)) <2 then '0'||cast(cast(i94mon as integer) as string)\
else cast(cast(i94mon as integer) as string) end as  extract_month, \
cast(i94res as integer) as immig_from_city_cd, \
i94port as immig_port_cd ,\
date_add(cast(to_timestamp('01-01-1960', 'mm-dd-yyyy') as date),arrdate ) as arrival_date,\
cast(coalesce(i94mode,9) as integer) as immig_transport_mode ,\
i94addr as immig_to_us_state_cd ,\
date_add(cast(to_timestamp('01-01-1960', 'mm-dd-yyyy') as date),depdate) as depart_date ,\
cast(i94visa as integer) as visa_type_cd ,\
matflag as match_flg ,\
cast(biryear as integer) as immgrnt_birth_year ,\
dtaddto as US_dt_addmt_to ,\
gender as immgrnt_gender ,\
airline as airline_used ,\
admnum as admission_no , \
fltno as flight_no_used , \
visatype as visa_type \
from stg_immigration_transactions")
immig_insrt_df.write.parquet(fact_immig_path.format(dwh_model_path), 'append')

In [147]:
fact_immigration_df=spark.read.parquet(fact_immig_path.format(dwh_model_path))
fact_immigration_df.createOrReplaceTempView("fact_immigration")

In [148]:
fact_immigration_df.count()

1000000

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Source/Count checks to ensure completeness

In [149]:
# Perform quality checks here
spark.sql("select count(*),'STG Model Table' as table_layer from stg_airport where   iata_code is not null and local_code is not null \
union all  select count(*),'DWH Model Table' as table_layer from dim_airport").toPandas()

Unnamed: 0,count(1),table_layer
0,1969,STG Model Table
1,1969,DWH Model Table


In [150]:
spark.sql("select count(*),'STG Model Table' as table_layer from stg_us_state  \
union all  select count(*),'DWH Model Table' as table_layer from dim_us_state").toPandas()

Unnamed: 0,count(1),table_layer
0,49,STG Model Table
1,49,DWH Model Table


In [248]:
spark.sql("select count(*),'STG Model Table' as table_layer from stg_all_city where city_cd is not null \
union all  select count(*),'DWH Model Table' as table_layer from dim_all_city").toPandas()

Unnamed: 0,count(1),table_layer
0,287,STG Model Table
1,287,DWH Model Table


In [152]:
spark.sql("select count(*),'STG Model Table' as table_layer from stg_visa_type  \
union all  select count(*),'DWH Model Table' as table_layer from dim_visa_type").toPandas()

Unnamed: 0,count(1),table_layer
0,3,STG Model Table
1,3,DWH Model Table


In [153]:
spark.sql("select count(*),'STG Model Table' as table_layer from stg_trans_mode  \
union all  select count(*),'DWH Model Table' as table_layer from dim_trans_mode").toPandas()

Unnamed: 0,count(1),table_layer
0,4,STG Model Table
1,4,DWH Model Table


In [154]:
spark.sql("select count(*),'STG Model Table' as table_layer from stg_date  \
union all  select count(*),'DWH Model Table' as table_layer from dim_date").toPandas()

Unnamed: 0,count(1),table_layer
0,186,STG Model Table
1,186,DWH Model Table


In [155]:
spark.sql("select count(*),'STG Model Table' as table_layer from stg_immigration_transactions  \
union all  select count(*),'DWH Model Table' as table_layer from fact_immigration ").toPandas()

Unnamed: 0,count(1),table_layer
0,1000000,STG Model Table
1,1000000,DWH Model Table


Integrity Constraints Checks

In [156]:
spark.sql("select count(*),local_code from dim_airport group by local_code having count(*)> 1").toPandas()

Unnamed: 0,count(1),local_code


In [157]:
spark.sql("select count(*),state_code from dim_us_state group by state_code having count(*) > 1").toPandas()

Unnamed: 0,count(1),state_code


In [158]:
spark.sql("select count(*),city_cd from dim_all_city group by city_cd having count(*) > 1").toPandas()

Unnamed: 0,count(1),city_cd
0,2,471
1,2,243
2,2,392
3,2,737
4,2,516
5,2,251
6,2,255
7,2,472
8,2,296
9,2,513


In [159]:
spark.sql("select count(*),visa_cd from dim_visa_type group by visa_cd having count(*) > 1").toPandas()

Unnamed: 0,count(1),visa_cd


In [160]:
spark.sql("select count(*),trans_mode_cd from dim_trans_mode group by trans_mode_cd having count(*) > 1").toPandas()

Unnamed: 0,count(1),trans_mode_cd


In [161]:
spark.sql("select count(*),dt from dim_date group by dt having count(*) > 1").toPandas()

Unnamed: 0,count(1),dt


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

# Table : dim_airport
name: Airport name

type: Airport type

elevation_ft: Elevation of Airport measured in feets

iso_region:Airport Region

municipality: municipality to which the Airport belongs

gps_code: GPS of the Airport

iata_code: Airport iata code

local_code: Airport local code , this is primary key of the Airport table

coordinates :Airport coordinates

# Table :dim_us_state
state: State name

state_code: Code of the state

male_population:male population for the state

female_population: female population for the state

total_population: The state total population

number_of_veterans: the state number of veterans

foreign_born: number of foreign born in the state

# Table : dim_all_city
city_cd: the code of the city (unique)

city: City name

# Table : dim_visa_type
visa_cd : code of the visa type (unique)

visa: Visa Type

# Table : dim_trans_mode
trans_mode_cd : Transport mode code (unique)

trans_mode: Transportation mode

# Table : dim_date
dt: date (unique)

day: day in month

week : week in year

month : month number in year

year: year (YYYY)

weekday: weekday number in week

# Table : fact_immigration
extract_month : Month of extracted data

immig_from_city_cd : immigration city code that the immigrant immigrated from (foreign key to the table "dim_all_city") 

immig_port_cd : immigration port to which the immigrant arrived through (foreign key to the table "dim_airport" )

arrival_date : arrival date for the arrival of the immigrant

immig_transport_mode : mode of the transportation used by the immigrant (foreign key to the table "dim_trans_mode")

immig_to_city_cd : US state code to which the immigrant immigrated to (forign key to the table "dim_us_state")

depart_date : departure date in which the immigrant departed 

visa_type_cd : code for the visa type acquired by the immigrant (forign key to the table "dim_visa_type")

match_flg : flag if the arrival and departure are matched

immgrnt_birth_year : birth year of the immigrant

US_dt_addmt_to : admission date of the immigrant

immgrnt_gender : gender of the immigrant

airline_used : airline used by the immigrant

admission_no : admission number admitted by the immigrant

flight_no_used : flight number used by the immigrant

visa_type: Visa type detailed (B2, F1 , etc..)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

# The tools and Technologies Used for the project
The Model is dimensional model star schema (Fact and denormalized dimensions), This enables the fast and easy extraction of data needed by reports that run on top of the model.

The data extraction is done by pySpark-SQL , Spark is very fast for the processing and also enables the extraction from and to a lot of file formats and databases.The extraction is done by "Schema on read" and no need to hold physical tables.

The Landing zone holds the files recieved to be processed , the files have various formats SAS ,CSV, TXT.

Loading the data back in the dimensional model hosted under DWH Folder, each table is loaded to Parquet file.Parquet files save space and is partitioned and gives high performance read and write. 

# how often the data should be updated and why
Data will be updated on monthly basis , by which the month transaction file arrives , it will be extracted and processed.This is because the transactional SAS file is in month granularity , so the expectation for the extraction is to be done on monthly basis.

# description of how would the problem be approached differently under the following scenarios:

### The data was increased by 100x:
In that case , we need to boost the performance in order to cope with the data increase.so we can upload the solution to be hosted on the cloud (AWS). and we can use S3 for storage and EMR cluster that would process the data through multiple nodes and we can scale up or out based on our needs.

### The data populates a dashboard that must be updated on a daily basis by 7am every day:
In that case the data extraction frequency must be done on daily basis , the transactional file of the immigrations should be recieved on daily basis at 4 am max , and the ETL process will be started on daily level at 4 am.We can use appache Airflow Datapipeline tool to fire the Processing (DAGs) on daily basis at 4 am.

### The database needed to be accessed by 100+ people:
In that case we need to increase the power of the database in terms of memory , processing.We can populate the model to Redshift cluster that will process the data through multiple nodes in parallel and the leader node will handle the communication with ODBC , JDBC applications used by different users. 