# Analysis on Immigration records
### Udacity Data Engineering Capstone Project

#### Project Summary
In this project, immigration records from the US National Tourism and Trade Office along with the US demographic and airport code data from other sources are pulled in for analysis purposes. Original data is first gathered and explored so that the key information can be located and specified. Then a data model is defined in order to align with the analysis purposes. The orginal data is then processed by ETL (extract, transform and load) to be dumped into the defined data model. Lastly, the analysis questions can be answered based on the data model using simple joins.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
pd.options.display.float_format = '{:.4f}'.format

import glob

# Connect to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id 


### Step 1: Scope the Project and Gather Data

#### Scope 
The data used in this project includes I94 immigration data, the US demographic data and airport code data. These data sources are grathered and processed to give insights and answer the following questions, such as:
1. Which airport has the most immigrants coming in?
2. Which state has the highest immigration rate?
3. Which state has the most immigrants coming in as student? What's the immigration rate of that state?
4. Which state has the most immigrants coming in for pleasure? What's the immigration rate of that state?   
...

The tools mainly used is Jupyter notebook with pandas and pyspark libraries. For revisit purposes, the data model after ETL can be saved in S3. For future improvement, such as making the pipeline automatic so that the data model runs every month when new immigration data comes in, Apache Airflow with schedules can be applicable for such use case. 


#### Describe and Gather Data 

The data used in this project are from three diffrent sources.     
1. I94 Immigration data:      
This data comes from the US National Tourism and Trade Office. A sample data file is attached and is used for exploratory purpose. The real data used for analysis covers the whole year (12 months) of 2016. The data comes from [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). The data provides detailed information about each collected immigration records, such as residency city, entry port, arrival date, visiting state, visa type and so on. This data is the largest among all three data sources. To better answer analysis questions, the orginal data can be aggregated to provide an appropriate granularity level of data. 
2. The US City demographic data:     
This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). The data provides information such as population, population by race for a city in the US. 
3. Airport code data:     
The data provides airport codes, types of the airports and their corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).




### Step 2: Explore and Assess the Data
#### Explore the Data & Cleaning Steps
In this section data quality issues will be identified, like missing values, duplicate data, etc. Necessary steps to clean the data will also be documented. 

#### 2.1 Demographic Data
The original demographic data "us-cities-demographics.csv" is first loaded into Python. 

In [2]:
# read in demographic data
fname ='us-cities-demographics.csv'
demo_df =  pd.read_csv(fname,sep=';')
demo_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


After an initial look at the provided demographic data, it's easily noticed that multiple records exist for a single city. It's because each record presents the population of a certain race within that single city. 
For example, let's have a look at the records for Silver Spring, Maryland.
For Silver Spring, Maryland, there's 5 records in total with the first couple of columns having the same values. The only difference is the last two columns, race and count. Each record presents the race and the corresponding population of that race. 

In [3]:
demo_df[(demo_df.City == 'Silver Spring' ) & (demo_df.State == 'Maryland')]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
592,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,White,37756
1678,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Black or African-American,21330
2123,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,American Indian and Alaska Native,1084
2162,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Asian,8841


To put it in another way, the orginal demographic data is a long dataset. To better utilize this dataset, pivot function is used to transform the original long demographic dataset into a wide one. The transformed dataset will provide one record for each city in the U.S with the population of 5 race categories, all in one entry. 

In [4]:
# transform the long dataset to a wide one so that there's only one record for each city with # for each race
demo_df = demo_df.pivot_table(index=['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code'],columns='Race', values='Count').reset_index()

Let's have a look at the records for Silver Spring, Maryland again after transformation. 

In [5]:
demo_df[(demo_df.City == 'Silver Spring' ) & (demo_df.State == 'Maryland')]

Race,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
489,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,1084.0,8841.0,21330.0,25924.0,37756.0


Now there's only one record for Silver Spring, Maryland with 5 races and their respective populations. Let's check to see if there's any duplicated records for cities. 

In [6]:
demo_df[demo_df.duplicated(subset=['City','State']) == True ]

Race,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White


No duplicated records exist for cities. This concludes the cleaning and pre-processing of demographic data.     
Please note, the pre-processed demographic data is on city-level. To be linked up with other data sources in the following steps, state-level information is preferred, but let's just leave it for now, because city-level information may provide more insights later. 

#### 2.2 Airport Code data
The original airport code data "airport-codes_csv.csv" is first loaded into Python. 

From the provided airport code dataset, airports that are not based in the U.S. or that are already closed are not of interest and will be filtered out from the dataset. In addition, a new variable 'state_id' is derived based on 'iso_region' in order to make it easier when joinging with other tables.

In [7]:
# read in airport code data 
fname ='airport-codes_csv.csv'
air_df =  pd.read_csv(fname)
air_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [8]:
air_df.iso_country.unique()

array(['US', 'PR', 'MH', 'MP', 'GU', 'SO', 'AQ', 'GB', 'PG', 'AD', 'SD',
       'SA', 'AE', 'SS', 'ES', 'CN', 'AF', 'LK', 'SB', 'CO', 'AU', 'MG',
       'TD', 'AL', 'AM', 'MX', 'MZ', 'PW', 'NR', 'AO', 'AR', 'AS', 'AT',
       'ZZ', 'GA', 'AZ', 'BA', 'BB', 'BE', 'DE', 'BF', 'BG', 'GL', 'BH',
       'BI', 'IS', 'BJ', 'OM', 'XK', 'BM', 'KE', 'PH', 'BO', 'BR', 'BS',
       'CV', 'BW', 'FJ', 'BY', 'UA', 'LR', 'BZ', 'CA', 'CD', 'CF', 'CG',
       'MR', 'CH', 'CL', 'CM', 'MA', 'CR', 'CU', 'CY', 'CZ', 'SK', 'PA',
       'DZ', 'ID', 'GH', 'RU', 'CI', 'DK', 'NG', 'DO', 'NE', 'HR', 'TN',
       'TG', 'EC', 'EE', 'FI', 'EG', 'GG', 'JE', 'IM', 'FK', 'EH', 'NL',
       'IE', 'FO', 'LU', 'NO', 'PL', 'ER', 'MN', 'PT', 'SE', 'ET', 'LV',
       'LT', 'ZA', 'SZ', 'GQ', 'SH', 'MU', 'IO', 'ZM', 'FM', 'KM', 'YT',
       'RE', 'TF', 'ST', 'FR', 'SC', 'ZW', 'MW', 'LS', nan, 'ML', 'GM',
       'GE', 'GF', 'SL', 'GW', 'GN', 'SN', 'GR', 'GT', 'TZ', 'GY', 'SR',
       'DJ', 'HK', 'LY', 'HN', 'VN', 'KZ', 'RW', 'HT

The provided airport code data provides airport code and other info for airports all over the world, but only the ones in the U.S. are of interest. 

In [9]:
air_df.type.unique()

array(['heliport', 'small_airport', 'closed', 'seaplane_base',
       'balloonport', 'medium_airport', 'large_airport'], dtype=object)

The data covers different types of airports and the ones that are closed are not of interest either, which need to be removed. 

After inspection, the iata_code in this data can be linked together with I94PORT in immigration data, but the NaN records need to be removed first. 

In [10]:
air_df.iata_code.unique()

array([nan, 'UTK', 'OCA', ..., 'SHE', 'YNJ', 'YKH'], dtype=object)

Overall, to clean and pre-process airport code data, the steps are as bellow:
1. Only select records where the airport is located in the U.S. and the type of the airport is not closed.
2. Remove the records where iata_code is NaN. 
3. Derive state information based on column "iso_region". 

In [11]:
# only select airports that are in the U.S. and are not closed 
air_df = air_df[(air_df.iso_country == 'US') & (air_df.type != 'closed')]

# drop records where iata_code is null
air_df = air_df.dropna(0,subset = ['iata_code'])

# make a new column state_id from iso_region
air_df['state_id'] = air_df.iso_region.str[3:]
air_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state_id
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804",FL
594,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601",AK
673,0CO2,small_airport,Crested Butte Airpark,8980.0,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918",CO
1088,0TE7,small_airport,LBJ Ranch Airport,1515.0,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003",TX
1402,13MA,small_airport,Metropolitan Airport,418.0,,US,US-MA,Palmer,13MA,PMX,13MA,"-72.31140136719999, 42.223300933800004",MA


In [12]:
# iata_code is unique, can function as primary key for this table 
air_df[air_df.duplicated(subset=['iata_code']) == True ]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state_id


After processing, it seems that column 'iata_code' is unqiue and not null, which can be used as primary key. 

#### 2.3 Immigration data
First the sample immigration data is loaded into Python for exploratory purpose. 

In [13]:
fname = 'immigration_data_sample.csv'
imm_df =  pd.read_csv(fname, index_col =0)
imm_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [14]:
imm_df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

For the purpose of the project, we are only interested to know the immigration entry records via air, so data will be filter by i94mode = 1 first.     

The real immigration data provided covers all 12 months of year 2016 and is very large to process. For the purpose of the project, this level of granularity the orginal data presents is unnecessary. Therefore, we can group by the records and keep the level of information that is aligned with the purpose of the project. 

The sample immigration data is to be grouped by visa category, airport, state, month and year. 
The same processing will also be utilized when dealing with the whole immigration data. 

In [15]:
imm_df = imm_df[(imm_df.i94mode == 1)] 
imm_df1 = imm_df.groupby(['i94visa','i94port', 'i94addr', 'i94mon', 'i94yr'])['count'].sum().reset_index()
imm_df1.head()

Unnamed: 0,i94visa,i94port,i94addr,i94mon,i94yr,count
0,1.0,ATL,AL,4.0,2016.0,1.0
1,1.0,ATL,CA,4.0,2016.0,1.0
2,1.0,ATL,GA,4.0,2016.0,3.0
3,1.0,ATL,IL,4.0,2016.0,1.0
4,1.0,ATL,IN,4.0,2016.0,1.0


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Three different sources of data are used in this project, immigration records, airport code and demographic data. Naturally, three individual tables will also be created accordingly. The granularity of data from each orginal data source may not be consistent with each other and requires some processing and aggregation. 

To be specific, 
- From immigration table, we are interested to know how many people under each type of visas have entered the U.S. via which airport per month. 
- From airport table, we are interested to know the ariport code, which state it's located and what type of the airport it is. 
- From demographic table, the granularity of data is a bit different. The demographic table provides information to city level, and we are able to find information such as population, population by race and so on. 


An illustration of the data model is shown below. 
![image](./model.png)

The immigration and airport table can be merged by airport code.   
The immigration table can also be merged with demographic table by state. 


#### 3.2 Mapping Out Data Pipelines
The steps necessary to pipeline the data into the chosen data model are listed below:  

1. Extract: First the original data will be loaded as Pandas dataframes or Spark dataframes, depending on the size of the orginal data. 
2. Transform: After examing the provided orginal data, it's clear that some level of cleaning is required so that the data can be fed into the data model. In addition, the granularity of some data is also required to be redefined, for which aggregation of records might be helpful. Data cleaning and wrangling happen at this step. 
2. Load: After data cleaning and necessary aggregation is completed, the processed data can then be fed into the data model. 


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
The data pipelines are built to create the data model.

### 4.1.1 Immigration data 
All 12 months of immigration records are read into Spark dataframe. The source data comes in by month, so they will be processed and aggregated by month first and then get combined into one large Spark dataframe. 

In [16]:
paths = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
# read in all 12 months' data, pre-processing on the fly by grouping individual records 
# by visa category, port, state, month and year 
for idx,f in enumerate(paths):
    if idx == 0:
        immigration = spark.read.format('com.github.saurfang.sas.spark').load(f).select('i94visa', 'i94port', 'i94addr', 'i94mon', 'i94yr', 'i94mode', 'count')
        immigration = immigration.filter(immigration['i94mode']==1)
        immigration = immigration.groupby(['i94visa','i94port', 'i94addr', 'i94mon', 'i94yr']).count()
        immigration = immigration.withColumn("imm_id", monotonically_increasing_id()) \
            .withColumnRenamed('i94visa', 'visa_cat') \
            .withColumnRenamed('i94port', 'airport_code') \
            .withColumnRenamed('i94addr', 'state_code') \
            .withColumnRenamed('i94mon', 'month') \
            .withColumnRenamed('i94yr', 'year') 
    else:
        single = spark.read.format('com.github.saurfang.sas.spark').load(f).select('i94visa', 'i94port', 'i94addr', 'i94mon', 'i94yr', 'i94mode', 'count')
        single = single.filter(single['i94mode']==1)
        single = single.groupby(['i94visa','i94port', 'i94addr', 'i94mon', 'i94yr']).count()
        single = single.withColumn("imm_id", monotonically_increasing_id()) \
            .withColumnRenamed('i94visa', 'visa_cat') \
            .withColumnRenamed('i94port', 'airport_code') \
            .withColumnRenamed('i94addr', 'state_code') \
            .withColumnRenamed('i94mon', 'month') \
            .withColumnRenamed('i94yr', 'year') 
        immigration = immigration.union(single)


In [17]:
immigration.take(5)

[Row(visa_cat=1.0, airport_code='HOU', state_code='TN', month=4.0, year=2016.0, count=316, imm_id=0),
 Row(visa_cat=2.0, airport_code='EDA', state_code='CA', month=4.0, year=2016.0, count=164, imm_id=1),
 Row(visa_cat=1.0, airport_code='LVG', state_code='IL', month=4.0, year=2016.0, count=219, imm_id=2),
 Row(visa_cat=2.0, airport_code='SNA', state_code='CA', month=4.0, year=2016.0, count=491, imm_id=3),
 Row(visa_cat=2.0, airport_code='MIA', state_code='MS', month=4.0, year=2016.0, count=84, imm_id=4)]

### 4.1.2 Airport code data
Since the airport code data is small, it'll be processed as Pandas dataframe first and then passed into a Spark dataframe. 

In [18]:
# read in airport code data 
fname ='airport-codes_csv.csv'
air_df =  pd.read_csv(fname)

# only select airports that are in the U.S. and are not closed 
air_df = air_df[(air_df.iso_country == 'US') & (air_df.type != 'closed')]

# drop records where iata_code is null
air_df = air_df.dropna(0,subset = ['iata_code'])

# make a new column state_id from iso_region
air_df['state_code'] = air_df.iso_region.str[3:]

# the columns that are of interest are type, name, iata_code and state_code
air_df = air_df[['type', 'name', 'iata_code', 'state_code']]

In [19]:
# pandas dataframe to spark dataframe 
# LongType() IntegerType() StringType() 
air_schema = StructType([ StructField("type", StringType(), True)\
                       ,StructField("name", StringType(), True)\
                       ,StructField("iata_code", StringType(), True)\
                       ,StructField("state_code", StringType(), True) ])

air_df2 = spark.createDataFrame(air_df,schema=air_schema)

airport = air_df2.withColumnRenamed('iata_code', 'airport_code')

In [20]:
airport.take(5)

[Row(type='small_airport', name='Ocean Reef Club Airport', airport_code='OCA', state_code='FL'),
 Row(type='small_airport', name='Pilot Station Airport', airport_code='PQS', state_code='AK'),
 Row(type='small_airport', name='Crested Butte Airpark', airport_code='CSE', state_code='CO'),
 Row(type='small_airport', name='LBJ Ranch Airport', airport_code='JCY', state_code='TX'),
 Row(type='small_airport', name='Metropolitan Airport', airport_code='PMX', state_code='MA')]

### 4.1.3 Demographic code data
Since the US demographic data is small, it'll be processed as Pandas dataframe first and then passed into a Spark dataframe. 

In [21]:
# read in demographic data
fname ='us-cities-demographics.csv'
demo_df =  pd.read_csv(fname,sep=';')
# transform the long dataset to a wide one so that there's only one record for each city with # for each race
demo_df = demo_df.pivot_table(index=['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code'],columns='Race', values='Count').reset_index()

In [22]:
# pandas dataframe to spark dataframe 
demo_df2 =  spark.createDataFrame(demo_df)

demographic = demo_df2.withColumn("demo_id", monotonically_increasing_id()) \
            .withColumnRenamed('City', 'city') \
            .withColumnRenamed('State', 'state') \
            .withColumnRenamed('Median Age', 'mage') \
            .withColumnRenamed('Male Population', 'mpopulation') \
            .withColumnRenamed('Female Population', 'fpopulation') \
            .withColumnRenamed('Total Population', 'tpopulation') \
            .withColumnRenamed('Number of Veterans', 'veterans') \
            .withColumnRenamed('Foreign-born', 'fborn') \
            .withColumnRenamed('Average Household Size', 'household') \
            .withColumnRenamed('State Code', 'state_code') 

In [23]:
demographic.take(5)

[Row(city='Abilene', state='Texas', mage=31.3, mpopulation=65212.0, fpopulation=60664.0, tpopulation=125876, veterans=9367.0, fborn=8129.0, household=2.64, state_code='TX', American Indian and Alaska Native=1813.0, Asian=2929.0, Black or African-American=14449.0, Hispanic or Latino=33222.0, White=95487.0, demo_id=0),
 Row(city='Akron', state='Ohio', mage=38.1, mpopulation=96886.0, fpopulation=100667.0, tpopulation=197553, veterans=12878.0, fborn=10024.0, household=2.24, state_code='OH', American Indian and Alaska Native=1845.0, Asian=9033.0, Black or African-American=66551.0, Hispanic or Latino=3684.0, White=129192.0, demo_id=1),
 Row(city='Alafaya', state='Florida', mage=33.5, mpopulation=39504.0, fpopulation=45760.0, tpopulation=85264, veterans=4176.0, fborn=15842.0, household=2.94, state_code='FL', American Indian and Alaska Native=nan, Asian=10336.0, Black or African-American=6577.0, Hispanic or Latino=34897.0, White=63666.0, demo_id=2),
 Row(city='Alameda', state='California', mag

#### 4.2 Data Quality Checks

#### 4.2.1 Immigration data checks
The unique key in immigration table imm_id is assigned during table build-up by monotonically_increasing_id() so it's naturally unique and not null. 
Data types of the columns are to be checked by looking at the shcema in order to ensure that all the columns are in the right format. 

In [24]:
immigration.schema

StructType(List(StructField(visa_cat,DoubleType,true),StructField(airport_code,StringType,true),StructField(state_code,StringType,true),StructField(month,DoubleType,true),StructField(year,DoubleType,true),StructField(count,LongType,false),StructField(imm_id,LongType,false)))

#### 4.2.2 Aiport data checks 
airport_code is the primary key in this table; needs to check if airport_code is not null and unique. 
Data types of the columns are to be checked by looking at the shcema in order to ensure that all the columns are in the right format.   
It can be confirmed that airport_code is not null and unqiue by following checks. 

In [25]:
# check if any duplicated records in airport data by airport_code
import pyspark.sql.functions as f
airport.groupBy(airport.airport_code)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .show()

+----------+
|sum(count)|
+----------+
|      null|
+----------+



In [26]:
airport.na.drop(subset=["airport_code"])

DataFrame[type: string, name: string, airport_code: string, state_code: string]

In [27]:
airport.schema

StructType(List(StructField(type,StringType,true),StructField(name,StringType,true),StructField(airport_code,StringType,true),StructField(state_code,StringType,true)))

#### 4.2.3 Demographic data checks
The unique key in immigration table demo_id is assigned during table build-up by monotonically_increasing_id() so it's naturally unique and not null. 
Data types of the columns are to be checked by looking at the shcema in order to ensure that all the columns are in the right format. 

In [28]:
demographic.schema

StructType(List(StructField(city,StringType,true),StructField(state,StringType,true),StructField(mage,DoubleType,true),StructField(mpopulation,DoubleType,true),StructField(fpopulation,DoubleType,true),StructField(tpopulation,LongType,true),StructField(veterans,DoubleType,true),StructField(fborn,DoubleType,true),StructField(household,DoubleType,true),StructField(state_code,StringType,true),StructField(American Indian and Alaska Native,DoubleType,true),StructField(Asian,DoubleType,true),StructField(Black or African-American,DoubleType,true),StructField(Hispanic or Latino,DoubleType,true),StructField(White,DoubleType,true),StructField(demo_id,LongType,false)))

#### 4.3 Data dictionary 
Data dictionary is attached as a separate file. 

#### 4.4 Write tables to parquet (onto AWS S3)
The three tables are to be saved onto AWS S3 for revisit. 

In [None]:
#write to parquet
# immigration.write.partitionBy("month","year").mode("overwrite").parquet("AWS S3 path")
# airport.write.parquet("AWS S3 path")
# demographic.write.parquet("AWS S3 path")

#### 4.5 Analysis questions
Once the data is fed into the data model. The model can be used to answer some analysis questions mentioned at the beginning of the project. 

1. Which airport has the most immigrants coming in?
2. Which state has the highest immigration rate?
3. Which state has the most immigrants coming in as student? What's the immigration rate of that state?
4. Which state has the most immigrants coming in for pleasure? What's the immigration rate of that state?

First temp views are created based on the three data tables. 

In [29]:
airport.createOrReplaceTempView("airport")
demographic.createOrReplaceTempView("demographic")
immigration.createOrReplaceTempView("immigration")

In [30]:
# 1. Which airport has the most immigrants coming in in year 2016?
analysis1 = spark.sql("""
                      SELECT immigration.airport_code, sum(count) as records
                      FROM immigration 
                      JOIN airport 
                      ON immigration.airport_code = airport.airport_code
                      GROUP BY immigration.airport_code
                      ORDER BY records DESC
                      LIMIT 1
                       """).show()

+------------+-------+
|airport_code|records|
+------------+-------+
|         MIA|5080114|
+------------+-------+



By looking up in the provided I94 SAS label file, MIA airport refers to the one in Miami, FL. Therefore, the airport that sees the most immigrants comin into the U.S. in 2016 is Miami, FL. 

In [31]:
# 2. Which city has the highest immigration rate?
analysis2 = spark.sql("""
                      SELECT  city, state, (fborn/tpopulation) as irate
                      FROM demographic
                      ORDER BY irate DESC
                      LIMIT 1
                       """).show()

+-------+-------+------------------+
|   city|  state|             irate|
+-------+-------+------------------+
|Hialeah|Florida|0.7176757408829013|
+-------+-------+------------------+



The city in the U.S. that has the highest immigration rate is also in Florida, which is Hialeah. The immigration rate here is calculated by foreign borns/total population. 

In [32]:
# 3. which state has the most immigrants coming in as student? What's the immigration rate of that state?
analysis3_1 = spark.sql("""
                      SELECT immigration.state_code, sum(immigration.count) as records
                      FROM immigration 
                      WHERE visa_cat = 3 
                      GROUP BY immigration.state_code
                      ORDER BY records DESC
                      LIMIT 1
                       """).show()

+----------+-------+
|state_code|records|
+----------+-------+
|        CA| 282339|
+----------+-------+



In [33]:
analysis3_2 = spark.sql("""
                      SELECT state_code, (sum(fborn)/sum(tpopulation)) as irate
                      FROM demographic
                      WHERE state_code = 'CA'
                      GROUP BY state_code
                       """).show()

+----------+-------------------+
|state_code|              irate|
+----------+-------------------+
|        CA|0.30006119457942526|
+----------+-------------------+



In year 2016, California has the most students coming in; California has a moderately high rate of immigration. 

In [34]:
# 4. Which state has the most immigrants coming in for pleasure? What's the immigration rate of that state?
analysis4_1 = spark.sql("""
                      SELECT immigration.state_code, sum(immigration.count) as records
                      FROM immigration 
                      WHERE visa_cat = 2
                      GROUP BY immigration.state_code
                      ORDER BY records DESC
                      LIMIT 1
                       """).show()

+----------+-------+
|state_code|records|
+----------+-------+
|        FL|7486788|
+----------+-------+



In [35]:
analysis4_2 = spark.sql("""
                      SELECT state_code, (sum(fborn)/sum(tpopulation)) as irate
                      FROM demographic
                      WHERE state_code = 'FL'
                      GROUP BY state_code
                       """).show()

+----------+-------------------+
|state_code|              irate|
+----------+-------------------+
|        FL|0.25057405042244757|
+----------+-------------------+



In year 2016, most tourists went to Florida for pleasure; the immigration rate of Florida is a little less then that of California. 

#### Step 5: Complete Project Write Up
* The tools used in this project was mainly Spark, which is used to handle large data processing. AWS S3 was also used to store processed tables. Pandas library was also heavily used for data wrangling, explorations, etc..   

* Assuming the incoming immigration data will also poccesse a strcuture just like the existing ones, which are saved by month and year, the data then should be updated by month naturally.    

* If the data was increased 100x, then it's likely that the processing steps carried out by pandas library will be unable to complete or it will take much longer time to fullfill. In this scenario, one solution might be pre-processing the orginal directly in Spark instead of pandas. Also one may consider using Spark on the cloud.   

* If a time is scheduled so that the dashboard to be updated, then this would be a perfect use case for one to utilize Apache Airflow to carry out this assignemnt. Within Apache Airflow, one can set schedules so that the data can be refreshed on a daily basis by 7am every day. Accordingly, the dashboard that's built based on the data will also be updated.     
 
* Presaved OLAP cubes might be able to handle when the data needs to be accessed by a large group of people. 