# Project Title
### Data Engineering Capstone Project

#### Project Summary

Based on the provided immigration data for usa. we will achieve to provide data to analyse following metrics
* Age group of travellers
* Count of traveller during the year
* Effect on temperature by immigration
* Effect on race,population and demographics of port of entry cities
* Count of travellers by different visa type
* Average footfall by airports
* Travellers yearly and monthly trends

Following data dictionaries will be used to enrich data

* countries.csv : table containing country codes based on I94_SAS_Labels_Descriptions.SAS file 
* i94portCodes.csv: table containing city codes based on I94_SAS_Labels_Descriptions.SAS file 
* visa_type.csv : table containing visa code and visa type desc based on I94_SAS_Labels_Descriptions.SAS file 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long, TimestampType as Ts
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### First read immigration sample data to see all the columns and get a high level overview of the data

In [2]:
fname = 'immigration_data_sample.csv'
df_immig = pd.read_csv(fname)

In [3]:
df_immig.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### Check temperature data

In [4]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [5]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Check i94 Port data

In [6]:
fname = 'i94portCodes.csv'
df_i94port = pd.read_csv(fname)

In [7]:
df_i94port.head()

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


#### Check demographics data

In [8]:
fname = 'us-cities-demographics.csv'
df_demographics = pd.read_csv(fname,sep=';')

In [9]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### create countries dictionary table

In [10]:
fname = 'countries.csv'
df_countries = pd.read_csv(fname)

In [11]:
df_countries.head()

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


#### check airport data

In [12]:
fname = 'airport-codes_csv.csv'
df_airports = pd.read_csv(fname)

In [13]:
df_airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### Read Full immigration data

In [14]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration_stats = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [15]:
df_immigration_stats.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [16]:
df_immigration_stats.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [17]:
#write to parquet
df_immigration_stats.write.mode('overwrite').parquet("sas_data")
df_immigration_stats=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Temperature dataset
Lets check temperature data first

In [18]:
#perform cleaning task here

In [19]:
df_temp.count()

dt                               8599212
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8599212
Country                          8599212
Latitude                         8599212
Longitude                        8599212
dtype: int64

In [20]:
df_temp.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

First check latest available date in temperature data set

In [21]:
df_temp['dt'].max()

'2013-09-01'

Since it contains only data untill 2013, so date field can't be used to join with immigration data

Keep data only for United states only as i94 data is for united states only

In [22]:
df_temp = df_temp[df_temp['Country']=='United States']

In [23]:
df_temp.count()

dt                               687289
AverageTemperature               661524
AverageTemperatureUncertainty    661524
City                             687289
Country                          687289
Latitude                         687289
Longitude                        687289
dtype: int64

Data reduces drastically after keeping data for united states

Now check for null values

In [24]:
df_temp.isnull().sum()

dt                                   0
AverageTemperature               25765
AverageTemperatureUncertainty    25765
City                                 0
Country                              0
Latitude                             0
Longitude                            0
dtype: int64

Remove Nulls from data

In [25]:
df_temp=df_temp[df_temp.AverageTemperature.notnull()]

##### City and country can be the natural key for temparature showing average temparature

In [26]:
df_temp.groupby(['City','Country'])['AverageTemperature'].mean()

City              Country      
Abilene           United States    16.892500
Akron             United States     9.605076
Albuquerque       United States    11.135264
Alexandria        United States    11.918475
Allentown         United States     9.523296
Amarillo          United States    15.023386
Anaheim           United States    16.124837
Anchorage         United States    -2.301646
Ann Arbor         United States     8.520341
Antioch           United States    14.447987
Arlington         United States    14.542532
Arvada            United States     2.419405
Atlanta           United States    14.436726
Aurora            United States     9.423826
Austin            United States    19.980095
Bakersfield       United States    15.817692
Baltimore         United States    11.918475
Baton Rouge       United States    20.303142
Beaumont          United States    20.018035
Bellevue          United States     7.503998
Berkeley          United States    14.447987
Birmingham        Unite

### Demographics dataset
check demographics data

In [27]:
df_demographics.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

In [28]:
df_demographics.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

Now check for null values

In [29]:
df_demographics.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

Removing Nulls will not improve quality of data so skipping the step

##### City,State and Race should be the natural key for demographics as it is unique combination

In [30]:
df_demographics[df_demographics[['City', 'State','Race']].duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


### Airport dataset
check airport data

In [31]:
df_airports.count()

ident           55075
type            55075
name            55075
elevation_ft    48069
continent       27356
iso_country     54828
iso_region      55075
municipality    49399
gps_code        41030
iata_code        9189
local_code      28686
coordinates     55075
dtype: int64

In [32]:
df_airports.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

Keep data only for United states only as i94 data is for united states only

In [33]:
df_airports = df_airports[df_airports['iso_country']=='US']

In [34]:
df_airports.count()

ident           22757
type            22757
name            22757
elevation_ft    22518
continent           1
iso_country     22757
iso_region      22757
municipality    22655
gps_code        20984
iata_code        2019
local_code      21236
coordinates     22757
dtype: int64

Keeping only US data halved the dataset

Now check for null values

In [35]:
df_airports.isnull().sum()

ident               0
type                0
name                0
elevation_ft      239
continent       22756
iso_country         0
iso_region          0
municipality      102
gps_code         1773
iata_code       20738
local_code       1521
coordinates         0
dtype: int64

Since ident can't be used for joining with immigration data. so municipality, state is required as join with immigration data
so null municipality should be removed.

Check types of airport

In [36]:
df_airports.groupby('type')['type'].count()


type
balloonport          18
closed             1326
heliport           6265
large_airport       170
medium_airport      692
seaplane_base       566
small_airport     13720
Name: type, dtype: int64

i94 port entry can be done by large_airport,medium_airport,small_airport
so only these types should be kept in the data set

In [37]:
airportType = ['large_airport', 'medium_airport', 'small_airport']
df_airports = df_airports[df_airports['type'].isin(airportType)]

In [38]:
df_airports.count()

ident           14582
type            14582
name            14582
elevation_ft    14519
continent           0
iso_country     14582
iso_region      14582
municipality    14532
gps_code        14183
iata_code        1865
local_code      14383
coordinates     14582
dtype: int64

Filtering removed half of the records

##### ident/airport code should be the natural key for demographics as it is unique combination

In [39]:
df_airports[df_airports[['ident']].duplicated()]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


### Immigration stats dataset
check imimigration stats data

In [40]:
df_immigration_stats.count()

3096313

In [41]:
df_immig.dtypes

Unnamed: 0      int64
cicid         float64
i94yr         float64
i94mon        float64
i94cit        float64
i94res        float64
i94port        object
arrdate       float64
i94mode       float64
i94addr        object
depdate       float64
i94bir        float64
i94visa       float64
count         float64
dtadfile        int64
visapost       object
occup          object
entdepa        object
entdepd        object
entdepu       float64
matflag        object
biryear       float64
dtaddto        object
gender         object
insnum        float64
airline        object
admnum        float64
fltno          object
visatype       object
dtype: object

Convert immigration data to table for sql query analysis

In [42]:
df_immigration_stats.createOrReplaceTempView("immig_stats_table")

check if cicid can be natural_key

In [43]:
#check for duplicates
spark.sql("""
SELECT cicid,count(1)
FROM immig_stats_table
group by cicid
having count(1)>1
""").show()

+-----+--------+
|cicid|count(1)|
+-----+--------+
+-----+--------+



In [44]:
#check for null values
spark.sql("""
SELECT *
FROM immig_stats_table
where cicid is null
""").show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+



arrival and departure dates in immigration data set are relative with 1960-01-01. so it needs to be converted into proper date

In [45]:
df_immigration_stats = spark.sql("""SELECT *, 
                                 date_add(to_date('1960-01-01'), arrdate) AS arrival_date,
                                 date_add(to_date('1960-01-01'), depdate) AS departure_date
                                 FROM immig_stats_table""")
df_immigration_stats.createOrReplaceTempView("immig_stats_table")
spark.sql("""
SELECT *
FROM immig_stats_table
""").show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|  2016-04-30|

Check for distinct i94 ports and confirm if it matches with format with airports dataset

In [46]:
spark.sql("""
SELECT distinct i94port
FROM immig_stats_table
""").show()

+-------+
|i94port|
+-------+
|    FMY|
|    BGM|
|    HEL|
|    DNS|
|    MOR|
|    FOK|
|    HVR|
|    SNA|
|    PTK|
|    CLG|
|    SPM|
|    OPF|
|    DLB|
|    ABS|
|    NAS|
|    MYR|
|    PVD|
|    OAK|
|    FAR|
|    OTT|
+-------+
only showing top 20 rows



Check for distinct visa type and see if visa_type.csv can be used as a dictionary

In [47]:
spark.sql("""
SELECT distinct i94visa,visatype
FROM immig_stats_table
""").show()

+-------+--------+
|i94visa|visatype|
+-------+--------+
|    1.0|      B1|
|    2.0|      WT|
|    1.0|      E2|
|    1.0|      I1|
|    3.0|      M2|
|    2.0|     CPL|
|    1.0|      E1|
|    3.0|      M1|
|    3.0|      F1|
|    1.0|     GMB|
|    2.0|     GMT|
|    2.0|      CP|
|    3.0|      F2|
|    1.0|      WB|
|    1.0|       I|
|    2.0|     SBP|
|    2.0|      B2|
+-------+--------+



Check age of travellers(age at the time of entry) and take count by age

In [48]:
spark.sql("""
SELECT EXTRACT(year from arrival_date)-biryear as age,count(1)
FROM immig_stats_table
group by EXTRACT(year from arrival_date)-biryear
""").show()

+-----+--------+
|  age|count(1)|
+-----+--------+
|  8.0|   14607|
| 67.0|   32063|
| 70.0|   24891|
| 69.0|   28451|
|  0.0|     765|
|  7.0|   14233|
|108.0|       2|
| 88.0|     884|
| 49.0|   56041|
|101.0|       2|
| 98.0|      26|
| 29.0|   67762|
|107.0|       1|
| 64.0|   37002|
| 75.0|   12305|
| 47.0|   58127|
| 42.0|   62150|
| 44.0|   62001|
| 35.0|   69626|
| 62.0|   41352|
+-----+--------+
only showing top 20 rows



check for null values in immigration_stats data set

In [49]:
df_immigration_stats.select([count(when(col(c).isNull(), c)).alias(c) for c in df_immigration_stats.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+------------+--------------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|arrival_date|departure_date|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+------------+--------------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|           0|        142457|
+-----+-----+------+------+-

gender has lots of null value so null values can be removed from dataset

In [50]:
spark.sql("""SELECT * FROM immig_stats_table WHERE gender is not null""").createOrReplaceTempView("immig_stats_table")

In [51]:
spark.sql("""
SELECT gender,count(1)
FROM immig_stats_table
group by gender
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1302743|
|     M| 1377224|
|     U|     467|
|     X|    1610|
+------+--------+



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Data is analysed for outside traveller movement united states. The i94 data will serve as our fact table.
Our **fact_immigration_stats** table will be :
* cicid,
* citizenship_country,
* residence_country,
* city, -> Joined with demographics and temperature table
* state,
* arrival_date, -> Joined with immig_date table
* departure_date, -> Joined with immig_date table
* age,
* visa_type -> joined with vis_type table
* airport_code -> joined with airports table



**dim_immig_date** : Only date,month, year date is relevant for analysis
* date, 
* month,
* year

**dim_airports**: refrential data for airport
* ident,
* airport_code,
* type, 
* name, 
* elevation_ft, 
* state,
* municipality, 
* iata_code

**dim_demographics**: Data to check the demography of arrival city
* City, 
* state, 
* median_age, 
* male_population, 
* female_population, 
* total population,
* Average_Household_Size, 
* Race, 
* Count

**dim_temperatures**: Analyse average temperature of cities with traveller arrivals 

* City,
* average_temperature, 
* average_temperature_uncertainty 

**dim_visa_type**: Referential data for visa

* visa_code,
* visa_type, 
* visa_type_desc


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

##### fact_immigration_stats:
* Filter null gender data
* convert arrival and departure dates
* Drop rows where the mode of arrival is not air travel
* Derive country of citizenship and country of residence from countries dictionary csv
* compute age using birth year and year of our arrival date.
* replace port of entry with city and state using i94 dictionary csv
* keep only i94visa and rename it to visa_id and remove excess columns
* insert data into immmigration_stats fact table
* Write to parquet

##### dim_temperature:
* For the temperature table keep only united states data
* Filter null temperature
* Convert city to upper case
* Remove date as it is not relevant for joining with fact data
* get average temperature by city
* Insert into the temperature table
* Write to parquet

##### dim_immig_date:
* Get all the arrival dates from the immigration data_set;
* extract year, month
* insert into immig_date table
* Write to parquet

##### dim_airports:
* Keep only USA data
* Keep only data for airport based location like large_airport, medium_airport & small_airport
* Filter null municipality data required for airport_codes
* Insert to dim_airports table
* Write to parquet

##### dim_demographics:
* Convert to city names to upper case
* Insert to dim_demographics table
* Write to parquet

##### dim_visa_type:
* get visa code and visa type desc from visa dictionary file
* Extract visa_type and i94visa from i94 data
* Merge data with dictionary tavle
* Insert to dim_visa_type table
* Write to parquet


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Immigration Fact Load

In [52]:
# Read i94 data
df_immigration_stats=spark.read.parquet("sas_data")
df_countries = spark.read.option("header",True).csv('countries.csv')
df_i94port = spark.read.option("header",True).csv('i94portCodes.csv')

In [53]:
# Create staging table
df_immigration_stats.createOrReplaceTempView("immig_stats_table")
df_countries.createOrReplaceTempView("countries")
df_i94port.createOrReplaceTempView("port")

In [54]:
#Filter null gender data
spark.sql("""
SELECT *
FROM immig_stats_table
WHERE gender is not null
""").createOrReplaceTempView("immig_stats_table")

#Filter null state port data
spark.sql("""
SELECT *
FROM port
WHERE state is not null
""").createOrReplaceTempView("port")

In [55]:
# convert arrival and departure dates
spark.sql("""SELECT *, 
                                 date_add(to_date('1960-01-01'), arrdate) AS arrival_date,
                                 date_add(to_date('1960-01-01'), depdate) AS departure_date
                                 FROM immig_stats_table""").createOrReplaceTempView("immig_stats_table")


In [56]:
# Drop rows where the mode of arrival is not air travel
spark.sql("""
SELECT *
FROM immig_stats_table
WHERE i94mode = 1
""").createOrReplaceTempView("immig_stats_table")

In [57]:
# Derive country of citizenship from countries dictionary csv
spark.sql("""
SELECT imm.*, 
citz.country as citizenship_country,
res.country as residence_country
FROM immig_stats_table imm, countries citz, countries res
WHERE imm.i94cit=citz.code
and imm.i94res=res.code
""").createOrReplaceTempView("immig_stats_table")

In [58]:
# compute age using birth year and year of our arrival date.
spark.sql("""
SELECT *,EXTRACT(year from arrival_date)-biryear as age
FROM immig_stats_table
""").createOrReplaceTempView("immig_stats_table")

In [59]:
# replace port of entry with city and state
spark.sql("""
SELECT imm.*, 
loc.location AS entry_port_city,
loc.state AS entry_port_state,
loc.location || ', ' || loc.state as airport_code
FROM immig_stats_table imm,port loc
where imm.i94port = loc.code
""").createOrReplaceTempView("immig_stats_table")

In [60]:
# keep only i94visa and rename it to visa_code and remove excess columns
# Remove duplicates
# insert data into immmigration_stats fact table
df_immigration_stats_fact=spark.sql("""
SELECT DISTINCT
cicid,
citizenship_country,
residence_country,
entry_port_city,
entry_port_state,
arrival_date,
departure_date,
age,
visatype as visa_type,
airport_code as airport_code
FROM immig_stats_table
""")

In [61]:
# Write to parquet
df_immigration_stats_fact.write.partitionBy("entry_port_state","entry_port_city","airport_code").mode('overwrite').parquet("output_data/fact_immigration_stats")


### Temperature dimension load

In [62]:
# Read temperature data
df_temp = spark.read.option("header",True).csv('../../data2/GlobalLandTemperaturesByCity.csv')

In [63]:
# Create staging table
df_temp.createOrReplaceTempView("temperature")

In [64]:
# For the temperature table keep only united states data;
spark.sql("""
SELECT *
FROM temperature
WHERE country='United States'
""").createOrReplaceTempView("temperature")

In [65]:
# Filter null temperature 
spark.sql("""
SELECT *
FROM temperature
WHERE averagetemperature is not null
""").createOrReplaceTempView("temperature")

In [66]:
# Convert city,country to upper case
spark.sql("""
SELECT *, upper(city) as upper_city,upper(country) as upper_country
FROM temperature
""").createOrReplaceTempView("temperature")

In [67]:
# Remove date as it is not relevant for joining with fact data
# get average temperature by city,country
spark.sql("""
select upper_city as city,
upper_country as country,
avg(averagetemperature) as average_temperature,
avg(averagetemperatureuncertainty) as average_temperature_uncertainty
FROM temperature
group by upper_city,upper_country
""").createOrReplaceTempView("temperature")

In [68]:
# Insert into the temperature table
# Write to parquet

df_temp_dim=spark.sql("""
SELECT 
*
FROM temperature
""")
df_temp_dim.write.partitionBy("city","country").mode('overwrite').parquet("output_data/dim_temperature")

### Date Dimension Data Load

In [69]:
# Get all the arrival dates from the immigration data_set
# extract year, month
# insert into immig_date table
df_immig_date_dim=spark.sql("""
SELECT DISTINCT
arrival_date as date,
extract(month from arrival_date) as month,
extract(year from arrival_date) as year
FROM immig_stats_table
""")

In [70]:
# write partition

df_immig_date_dim.write.partitionBy("year","month").mode('overwrite').parquet("output_data/dim_immig_date")

### Airport dimension data load

In [71]:
# Read Data
df_airports = spark.read.option("header",True).csv('airport-codes_csv.csv')

In [72]:
# Create staging table
df_airports.createOrReplaceTempView("airports")

In [73]:
# Keep only USA data
spark.sql("""
SELECT *
FROM airports
WHERE iso_country='US'
""").createOrReplaceTempView("airports")

In [74]:
#Keep only data for airport based location like large_airport, medium_airport & small_airport
spark.sql("""
SELECT *
FROM airports
WHERE municipality is not null
""").createOrReplaceTempView("airports")

In [75]:
# Filter null municipality data required for airport_codes
spark.sql("""
SELECT *
FROM airports
WHERE type in ('large_airport', 'medium_airport', 'small_airport')
""").createOrReplaceTempView("airports")

In [76]:
# Insert to dim_airports table
# Write to parquet
df_airports_dim=spark.sql("""
SELECT DISTINCT
ident,
upper(municipality) || ', ' ||split(iso_region,'-')[1] as airport_code,
type,
name,
iso_country as country
FROM airports
""")

df_airports_dim.write.mode('overwrite').parquet("output_data/dim_airports")

### Demographics dimension data load

In [77]:
# Read Data
df_demographics = spark.read.option("header",True).option("delimiter", ';').csv('us-cities-demographics.csv')

In [78]:
# Create staging table
df_demographics.createOrReplaceTempView("demographics")

In [79]:
# change city to upper case
spark.sql("""
SELECT *, upper(city) as upper_city
FROM demographics
""").createOrReplaceTempView("demographics")

In [80]:
# Insert to dim_demographics table
# Write to parquet
df_demographics_dim=spark.sql("""
SELECT DISTINCT
upper_city as city,
state,
`Median Age` as median_age,
`Male Population` male_population,
`Female Population` female_population,
`Total Population` total_population,
`Average Household Size` avg_household_size,
Race,
Count
FROM demographics
""")
df_demographics_dim.write.mode('overwrite').parquet("output_data/dim_demographics")

### Visa type dimension data load

In [81]:
# Read Data
df_visa_dict = spark.read.option("header",True).csv('visa_type.csv')

In [82]:
# Create staging table
df_visa_dict.createOrReplaceTempView("visa_type")

In [83]:
# get visa code and visa type desc from visa dictionary file
# Extract visa_type and i94visa from i94 data
# Merge data with dictionary tavle
spark.sql("""
SELECT vt.*,imm.visatype
FROM visa_type vt, immig_stats_table imm
where vt.visacode=imm.i94visa
""").createOrReplaceTempView("visa_type")

In [84]:
# Insert to dim_visa_type table
# Write to parquet
df_visa_type_dim=spark.sql("""
SELECT DISTINCT
visacode as visa_code,
visatype as visa_type,
visacategory as visa_type_desc
FROM visa_type
""")
df_visa_type_dim.write.mode('overwrite').parquet("output_data/dim_visa_type")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [85]:
# Perform quality checks here

### Read data from final tables

In [86]:
# Read immigration fact data
df_immigration_stats_fact=spark.read.parquet("output_data/fact_immigration_stats")
df_immigration_stats_fact.show()

+-------+-------------------+-----------------+------------+--------------+----+---------+----------------+---------------+------------+
|  cicid|citizenship_country|residence_country|arrival_date|departure_date| age|visa_type|entry_port_state|entry_port_city|airport_code|
+-------+-------------------+-----------------+------------+--------------+----+---------+----------------+---------------+------------+
| 2625.0|             POLAND|           POLAND|  2016-04-01|    2016-04-30|76.0|       B2|              NY|       NEW YORK|NEW YORK, NY|
| 5859.0|             FRANCE|           FRANCE|  2016-04-01|    2016-04-15|12.0|       WT|              NY|       NEW YORK|NEW YORK, NY|
| 5929.0|             FRANCE|           FRANCE|  2016-04-01|    2016-04-18|35.0|       WT|              NY|       NEW YORK|NEW YORK, NY|
| 6030.0|             FRANCE|           FRANCE|  2016-04-01|    2016-04-04|20.0|       WT|              NY|       NEW YORK|NEW YORK, NY|
| 6241.0|             FRANCE|           F

In [87]:
# Read immigration date data
df_immig_date_dim=spark.read.parquet("output_data/dim_immig_date")
df_immig_date_dim.show()

+----------+----+-----+
|      date|year|month|
+----------+----+-----+
|2016-04-29|2016|    4|
|2016-04-24|2016|    4|
|2016-04-02|2016|    4|
|2016-04-21|2016|    4|
|2016-04-01|2016|    4|
|2016-04-06|2016|    4|
|2016-04-16|2016|    4|
|2016-04-23|2016|    4|
|2016-04-13|2016|    4|
|2016-04-12|2016|    4|
|2016-04-30|2016|    4|
|2016-04-07|2016|    4|
|2016-04-25|2016|    4|
|2016-04-17|2016|    4|
|2016-04-11|2016|    4|
|2016-04-18|2016|    4|
|2016-04-04|2016|    4|
|2016-04-20|2016|    4|
|2016-04-27|2016|    4|
|2016-04-26|2016|    4|
+----------+----+-----+
only showing top 20 rows



In [88]:
# Read temperature data
df_temperature_dim=spark.read.parquet("output_data/dim_temperature")
df_temperature_dim.show()

+-------------------+-------------------------------+----------------+-------------+
|average_temperature|average_temperature_uncertainty|            city|      country|
+-------------------+-------------------------------+----------------+-------------+
|  23.06892444289695|             1.1867802924791078|         HIALEAH|UNITED STATES|
| 10.051168201978832|             1.2700310474240897|           OMAHA|UNITED STATES|
| 18.062719999999995|             0.7863341935483864|        MESQUITE|UNITED STATES|
|  14.43672555306185|             1.3709018916319313|         ATLANTA|UNITED STATES|
| 10.177262800181238|             0.9249913910285453|WEST VALLEY CITY|UNITED STATES|
|  9.777756186984398|             0.7787758936755294|          EUGENE|UNITED STATES|
|  8.435723654886985|             1.3091483603947778|    GRAND RAPIDS|UNITED STATES|
| 18.062719999999995|             0.7863341935483864|          DENTON|UNITED STATES|
|  16.12483712696008|             0.7674734446130481|       ESCON

In [89]:
# Read demographics data
df_demographics_dim=spark.read.parquet("output_data/dim_demographics")
df_demographics_dim.show()

+----------------+------------+----------+---------------+-----------------+----------------+------------------+--------------------+------+
|            city|       state|median_age|male_population|female_population|total_population|avg_household_size|                Race| Count|
+----------------+------------+----------+---------------+-----------------+----------------+------------------+--------------------+------+
|          FRESNO|  California|      30.0|         256130|           263942|          520072|              3.12|American Indian a...| 11380|
|STERLING HEIGHTS|    Michigan|      39.6|          64985|            67077|          132062|              2.66|Black or African-...|  8054|
|         TRENTON|  New Jersey|      33.3|          42581|            41650|           84231|              3.04|               Asian|  1437|
|           CHICO|  California|      29.9|          46168|            44168|           90336|               2.5|  Hispanic or Latino| 15578|
|       ELK G

In [90]:
# Read airports data
df_airports_dim=spark.read.parquet("output_data/dim_airports")
df_airports_dim.show()

+-----+----------------+-------------+--------------------+-------+
|ident|    airport_code|         type|                name|country|
+-----+----------------+-------------+--------------------+-------+
| 01AL|     CLANTON, AL|small_airport| Ware Island Airport|     US|
|  01U|   DUCKWATER, NV|small_airport|   Duckwater Airport|     US|
| 0KY4|   OWENSBORO, KY|small_airport|       Cambron Field|     US|
| 0LS5|     CANKTON, LA|small_airport|Trahan Ultralight...|     US|
| 0MS8|      BENOIT, MS|small_airport|Catfish Point Air...|     US|
| 0NK0| CATTARAUGUS, NY|small_airport|       Berdick Field|     US|
| 19NY|     READING, NY|small_airport|Four Seasons Airport|     US|
| 23NE|  STROMSBURG, NE|small_airport|Stromsburg Munici...|     US|
| 28MO|      ROSCOE, MO|small_airport|      Pasley Airport|     US|
| 2CA8|YUCCA VALLEY, CA|small_airport| B & E Ranch Airport|     US|
| 2ND9|       NORMA, ND|small_airport|       Brekhus Field|     US|
| 2NY0| SOUTH CAIRO, NY|small_airport|Catskill V

In [91]:
# Read demographics data
df_visa_type_dim=spark.read.parquet("output_data/dim_visa_type")
df_visa_type_dim.show()

+---------+---------+--------------+
|visa_code|visa_type|visa_type_desc|
+---------+---------+--------------+
|        2|      SBP|      Pleasure|
|        2|      CPL|      Pleasure|
|        1|      GMB|      Business|
|        1|       E1|      Business|
|        2|       B2|      Pleasure|
|        1|       I1|      Business|
|        2|       WT|      Pleasure|
|        1|       E2|      Business|
|        1|       B1|      Business|
|        3|       F1|       Student|
|        3|       M1|       Student|
|        3|       M2|       Student|
|        2|      GMT|      Pleasure|
|        3|       F2|       Student|
|        1|       WB|      Business|
|        1|        I|      Business|
|        2|       CP|      Pleasure|
+---------+---------+--------------+



### Check for null and duplicated values

In [240]:
# function to check null values
def nullCheck(spark, table_list):
    """
    Check null values
    spark: spark context 
    table_list: table list along with column names for null check   
    """  
    for table_name in table_list:
        for column in table_list[table_name]:
            records = spark.sql(f"""SELECT COUNT(1) as col_count FROM {table_name} WHERE {column} IS NULL""")
            if records.head()[0] > 0:
                raise ValueError(f"!!!Data quality test FAILED and {table_name} contains {records.head()[0]} null values for {column} column!!!")
            else:
                print(f"Data quality test PASSED and {table_name} contains {records.head()[0]} null values for {column} column")
        

In [241]:
def duplicateCheck(spark, table_list):
    """
    Check duplicate records
    spark: spark context 
    table_list: table list along with column names for null check   
    """  
    for table_name in table_list:
        column_list=str(table_list[table_name])[1:-1].replace("'","")
        records = spark.sql(f"""SELECT COUNT(1) as col_count FROM {table_name} 
                                         GROUP BY {column_list}
                                         having count(1)>1""")
        len_records=records.count()
        
        if len_records == 0:
            print(f"Data quality test PASSED and {table_name} doesn't contain duplicate records")
        else :
            raise ValueError(f"!!!Data quality test FAILED and {table_name} contains duplicate records!!!")

In [242]:
# Create tables from dataframes
df_immigration_stats_fact.createOrReplaceTempView("immig_stats_table")
df_immig_date_dim.createOrReplaceTempView("immig_date_table")
df_temperature_dim.createOrReplaceTempView("temperature")
df_demographics_dim.createOrReplaceTempView("demographics")
df_airports_dim.createOrReplaceTempView("airports")
df_visa_type_dim.createOrReplaceTempView("visa_type")

In [243]:
# List of tables along with the natural key
table_list = { 'immig_stats_table' : ['cicid'],
                   'immig_date_table':['date'],
                   'temperature':['city','country'],
                   'demographics': ['city','state','race'],
                   'airports':['ident'],
                   'visa_type':['visa_type']
                  }

In [244]:
# Call function to check null value
nullCheck(spark, table_list)

Data quality test PASSED and immig_stats_table contains 0 null values for cicid column
Data quality test PASSED and immig_date_table contains 0 null values for date column
Data quality test PASSED and temperature contains 0 null values for city column
Data quality test PASSED and temperature contains 0 null values for country column
Data quality test PASSED and demographics contains 0 null values for city column
Data quality test PASSED and demographics contains 0 null values for state column
Data quality test PASSED and demographics contains 0 null values for race column
Data quality test PASSED and airports contains 0 null values for ident column
Data quality test PASSED and visa_type contains 0 null values for visa_type column


In [245]:
# Call function to check duplicate value
duplicateCheck(spark, table_list)

Data quality test PASSED and immig_stats_table doesn't contain duplicate records
Data quality test PASSED and immig_date_table doesn't contain duplicate records
Data quality test PASSED and temperature doesn't contain duplicate records
Data quality test PASSED and demographics doesn't contain duplicate records
Data quality test PASSED and airports doesn't contain duplicate records
Data quality test PASSED and visa_type doesn't contain duplicate records


### Check null in other columns

In [104]:
# Immigration stats table
df_immigration_stats_fact.select([count(when(col(c).isNull(), c)).alias(c) for c in df_immigration_stats_fact.columns]).show()

+-----+-------------------+-----------------+------------+--------------+---+---------+----------------+---------------+------------+
|cicid|citizenship_country|residence_country|arrival_date|departure_date|age|visa_type|entry_port_state|entry_port_city|airport_code|
+-----+-------------------+-----------------+------------+--------------+---+---------+----------------+---------------+------------+
|    0|                  0|                0|           0|         97893|285|        0|               0|              0|           0|
+-----+-------------------+-----------------+------------+--------------+---+---------+----------------+---------------+------------+



In [105]:
# Immigration date table
df_immig_date_dim.select([count(when(col(c).isNull(), c)).alias(c) for c in df_immig_date_dim.columns]).show()

+----+----+-----+
|date|year|month|
+----+----+-----+
|   0|   0|    0|
+----+----+-----+



In [106]:
# temperature table
df_temperature_dim.select([count(when(col(c).isNull(), c)).alias(c) for c in df_temperature_dim.columns]).show()

+-------------------+-------------------------------+----+-------+
|average_temperature|average_temperature_uncertainty|city|country|
+-------------------+-------------------------------+----+-------+
|                  0|                              0|   0|      0|
+-------------------+-------------------------------+----+-------+



In [107]:
# demographics table
df_demographics_dim.select([count(when(col(c).isNull(), c)).alias(c) for c in df_demographics_dim.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+----+-----+
|city|state|median_age|male_population|female_population|total_population|avg_household_size|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+----+-----+
|   0|    0|         0|              3|                3|               0|                16|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+----+-----+



In [108]:
# airports table
df_airports_dim.select([count(when(col(c).isNull(), c)).alias(c) for c in df_airports_dim.columns]).show()

+-----+------------+----+----+-------+
|ident|airport_code|type|name|country|
+-----+------------+----+----+-------+
|    0|           0|   0|   0|      0|
+-----+------------+----+----+-------+



In [109]:
# visa type table
df_visa_type_dim.select([count(when(col(c).isNull(), c)).alias(c) for c in df_visa_type_dim.columns]).show()

+---------+---------+--------------+
|visa_code|visa_type|visa_type_desc|
+---------+---------+--------------+
|        0|        0|             0|
+---------+---------+--------------+



### Join fact and dimension tables to verify data and join values

In [110]:
spark.sql("""select distinct
imm.cicid,
imm.citizenship_country,
imm.age,
imm.entry_port_city,
dt.year as arrival_year,
temp.average_temperature,
dem.total_population,
air.name as airport_name,
vt.visa_type_desc
from immig_stats_table imm,
immig_date_table dt,
temperature temp,
demographics dem,
airports air,
visa_type vt
where 1=1
and imm.arrival_date=dt.date
and imm.entry_port_city=temp.city
and imm.entry_port_city=dem.city
and imm.airport_code=air.airport_code
and imm.visa_type=vt.visa_type
""").show()

+---------+-------------------+----+---------------+------------+-------------------+----------------+--------------------+--------------+
|    cicid|citizenship_country| age|entry_port_city|arrival_year|average_temperature|total_population|        airport_name|visa_type_desc|
+---------+-------------------+----+---------------+------------+-------------------+----------------+--------------------+--------------+
| 435217.0|             POLAND|32.0|       NEW YORK|        2016|  9.523295607566514|         8550405|  La Guardia Airport|      Business|
| 968273.0|         ARGENTINA |66.0|       NEW YORK|        2016|  9.523295607566514|         8550405|John F Kennedy In...|      Pleasure|
|1159872.0|             FRANCE|12.0|       NEW YORK|        2016|  9.523295607566514|         8550405|  La Guardia Airport|      Pleasure|
|1171896.0|              SPAIN|54.0|       NEW YORK|        2016|  9.523295607566514|         8550405|John F Kennedy In...|      Pleasure|
|1201784.0|              JA

#### 4.3 Sample Queries to answer reporting questions in scope

In [111]:
#Age group of travellers
spark.sql("""select 
case when imm.age >0 and imm.age <=20
then '0-20 years'
when imm.age >20 and imm.age <=50
then '20-50 years'
when imm.age >50 and imm.age <=70
then '50-70 years'
when imm.age >70
then '> 70 years'
else 'Age Unknown'
end as age_group,
count(distinct imm.cicid) as traveller_count
from immig_stats_table imm
group by case when imm.age >0 and imm.age <=20
then '0-20 years'
when imm.age >20 and imm.age <=50
then '20-50 years'
when imm.age >50 and imm.age <=70
then '50-70 years'
when imm.age >70
then '> 70 years'
else 'Age Unknown'
end
""").show()

+-----------+---------------+
|  age_group|traveller_count|
+-----------+---------------+
|20-50 years|        1276098|
| > 70 years|         107803|
| 0-20 years|         243213|
|50-70 years|         616956|
|Age Unknown|            790|
+-----------+---------------+



In [112]:
#Count of traveller during the year
spark.sql("""select 
dt.year as arrival_year,count(distinct imm.cicid) as traveller_count
from immig_stats_table imm,
immig_date_table dt
where  1=1
and imm.arrival_date=dt.date
group by dt.year
""").show()

+------------+---------------+
|arrival_year|traveller_count|
+------------+---------------+
|        2016|        2244860|
+------------+---------------+



In [246]:
#Effect on temperature by immigration
spark.sql("""select 
imm.entry_port_city,count(distinct imm.cicid) as traveller_count,
avg(temp.average_temperature) as average_temperature
from immig_stats_table imm,
temperature temp
where 1=1
and imm.entry_port_city=temp.city
group by imm.entry_port_city
""").show()


+---------------+---------------+-------------------+
|entry_port_city|traveller_count|average_temperature|
+---------------+---------------+-------------------+
|        ORLANDO|         118089|  22.30260243667857|
|       SAVANNAH|              1| 19.406439563962774|
|   INDIANAPOLIS|            342| 11.228673933953177|
|        ATLANTA|          56646| 14.436725553061898|
|          MIAMI|         282233| 23.068924442897472|
|        MEMPHIS|             37| 16.075406540557875|
|        OAKLAND|           3059| 14.447987354577648|
|       PORTLAND|           3834|  9.762706232813944|
|      VANCOUVER|           7939|   9.76270623281393|
|      ANCHORAGE|             81|-2.3016456107756658|
|         DENVER|          12198|  8.777836262323202|
|         LAREDO|            201|  21.85121503496502|
|        PHOENIX|          29886| 21.048769050958402|
|      BALTIMORE|           3125| 11.918474511061222|
|        HOUSTON|          85620| 20.228964301075347|
|   JACKSONVILLE|           

In [247]:
#Effect on race,population and demographics of port of entry cities
spark.sql("""select
dt.year as arrival_year,
imm.entry_port_city,
dem.race,
avg(dem.total_population) as total_population
from immig_stats_table imm,
immig_date_table dt,
demographics dem
where 1=1
and imm.arrival_date=dt.date
and imm.entry_port_city=dem.city
group by 
dt.year,
imm.entry_port_city,
dem.race
""").show()

+------------+---------------+--------------------+----------------+
|arrival_year|entry_port_city|                race|total_population|
+------------+---------------+--------------------+----------------+
|        2016|WEST PALM BEACH|               White|        106782.0|
|        2016|          OMAHA|American Indian a...|        443887.0|
|        2016|    ALBUQUERQUE|American Indian a...|        559131.0|
|        2016|     SACRAMENTO|Black or African-...|        490715.0|
|        2016|        NORFOLK|               Asian|        246393.0|
|        2016|     FORT MYERS|               White|         74015.0|
|        2016|      SANTA ANA|               Asian|        335423.0|
|        2016|       NEW YORK|               Asian|       8550405.0|
|        2016|    LOS ANGELES|               Asian|       3971896.0|
|        2016|      BALTIMORE|               White|        621849.0|
|        2016|      PITTSBURG|               White|         69427.0|
|        2016|  SAN FRANCISCO|  Hi

In [248]:
#Count of travellers by different visa type
spark.sql("""select 
dt.year as arrival_year,
vt.visa_type_desc,
count(distinct imm.cicid) as traveller_count
from immig_stats_table imm,
immig_date_table dt,
visa_type vt
where 1=1
and imm.arrival_date=dt.date
and imm.visa_type=vt.visa_type
group by dt.year,vt.visa_type_desc
""").show()

+------------+--------------+---------------+
|arrival_year|visa_type_desc|traveller_count|
+------------+--------------+---------------+
|        2016|       Student|          34445|
|        2016|      Business|         345716|
|        2016|      Pleasure|        1864699|
+------------+--------------+---------------+



In [249]:
#Average footfall by airports
spark.sql("""select 
dt.year as arrival_year,
dt.month as arrival_month,
imm.entry_port_city,
air.name as airport_name,
count(distinct imm.cicid) as traveller_count
from immig_stats_table imm,
immig_date_table dt,
airports air
where 1=1
and imm.arrival_date=dt.date
and imm.airport_code=air.airport_code
group by dt.year,
dt.month,
imm.entry_port_city,
air.name
""").show()

+------------+-------------+---------------+--------------------+---------------+
|arrival_year|arrival_month|entry_port_city|        airport_name|traveller_count|
+------------+-------------+---------------+--------------------+---------------+
|        2016|            4|        DETROIT|Detroit Metropoli...|          21047|
|        2016|            4|        SPOKANE|  Ox Meadows Airport|              1|
|        2016|            4|       PORTLAND|Portland Hillsbor...|           3834|
|        2016|            4|     WILMINGTON|Pettigrew Moore A...|             14|
|        2016|            4|        CHICAGO|Chicago Midway In...|          95104|
|        2016|            4|     SACRAMENTO| Mc Clellan Airfield|           2173|
|        2016|            4|          MIAMI|Kendall-Tamiami E...|         282233|
|        2016|            4|      NASHVILLE|     Triune Airfield|            686|
|        2016|            4|    BROWNSVILLE|     Resaca Airstrip|            283|
|        2016|  

In [250]:
#Travellers yearly and monthly trends
spark.sql("""select 
dt.year as arrival_year,
dt.month as arrival_month,
count(distinct imm.cicid) as traveller_count
from immig_stats_table imm,
immig_date_table dt
where 1=1
and imm.arrival_date=dt.date
group by dt.year,
dt.month
""").show()

+------------+-------------+---------------+
|arrival_year|arrival_month|traveller_count|
+------------+-------------+---------------+
|        2016|            4|        2244860|
+------------+-------------+---------------+



#### 4.4 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Following data dictionaries are used to enrich data

* countries.csv : table containing country codes based on I94_SAS_Labels_Descriptions.SAS file 
 * Data Description
    * Code -> Integer : Country code for i94 data
    * Country -> String : Country name for corresponding code
* i94portCodes.csv: table containing city codes based on I94_SAS_Labels_Descriptions.SAS file 
 * Data Description
    * Code -> String : 3 digit i94 port code
    * Location -> String : City for i94 port code
    * State -> String : State for i94 port code
* visa_type.csv : table containing visa code and visa type desc based on I94_SAS_Labels_Descriptions.SAS file 
 * Data Description
    * visacode -> Integer : i94 visa code
    * visacategory -> String : Visa category for i94 visa code

#### Step 5: Complete Project Write Up
* Based on sheer size of immigration data, sparks makes a good choice to process data efficiently and quickly
* Data can be loaded daily so that to gauge daily footfall of airports and plan accordingly
* Write a description of how you would approach the problem differently under the following scenarios:

##### Data model selection criteria
* Star schema is chosen for this table model with 1 fact table and rest as dimensions
* The advantage is that the dimensions can be reused in other models as conformed dimensions
* Master data can also be implemeted for dimension tables
* Separate fact and dimension data load can be done achieved with this data model
* The trade off can be performance if it is compared to data lake model like cassandra where the data is preprocessed
* However cassandra lacks in other benefits above, which can be achieved through start schema

##### Data increased by 100 x :

* Data will be required to be partitioned by year and month as it grows. partitioning will help in performance of query
* Partioning in parquet format is optimum solution as parquet compress the data and provide gain as well
* S3 should be a good choice for storing parquet files as it is cheap and spectrum tables can be created over it using athena

##### The pipelines would be run on a daily basis by 7 am every day:

* Airflow should be a good choice for scheduling pipeline
* Before scheduling, any dependencies should be identified and the task should be sequenced in correct order
* A notification email will be sent after completion of airflow task and after that business can start using the report
* Airflow is open source and free to use so it bring overall cost down
* Airflow can run on the same ec2 instance for spyspark so it won't increase the cost
##### The database needed to be accessed by 100+ people:

* Creating the tables in redshift makes good choice, so that reporting tools like tableau can ingest data and cater multiple audience
* Dimensions should be distributed on all nodes, so that it can be available for fact data on each node
* Fact data should be distributed by cities as distribution key to improve performance
* Users should be categorized into below categories and assinged respectively roles and permissions
 * Power user : read only access to direct tables and other tools like alteryz
 * Developer : read and write access to tables
 * Admin : Admin access to tables
 * General user : regular business user with no access to tables but access to reports and dashboards
* Moving data into redhshift is cost effective as the charge is not on read or write operations. whereas in case of spectrum table data gets charged based on number of read write operations
* Assigning security roles and groups would give extra control on resource usage and it can also be used to track usage and denying unwanted report access
