# Customers Demographic Analysis for Auto Rental Company
### Data Engineering Capstone Project

#### Project Summary
We create a data model to allow auto rental company to examine the demographics of its potential customers that are visitors to US from other countries. They are able to query this database to know the demographics of US travelers including the percentage of each travel purpose, the airline they choose, arrival time, stay length, age, nationality, occupation, and temperature of their destination etc. By knowing these, the auto rental company may better tailor their marketing and advertisement messages to the customers who may need the car rental the most and provide the tailored service and auto inventory to these customers. 


The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import functions as f

### Step 1: Scope the Project and Gather Data

#### Scope 
We pull together from 4 different sources and create total 8 fact and dimension table with star schema to allow fast querying to check on the potential customer demographic like number of visitors to each city, their nationality, and age etc. We use Spark as the tool to perfor ETL and create these tables. 

#### Describe and Gather Data 
The data sources include the following:
1. I94 Foriegn Visitor/Immigration Data: This is in SAS file format which contain information about each foreign visitor's information such as age, travel mode, gender, and destination airport etc. The data comes from the US National Touris and Trade Office. 

2. US Cities Demographics: This data has each US City's demographics include median age, population count by gender and race, and average household size etc. The data is stored in csv file format from OpenSoft.   

3. Airport Code Table: This data contains airport codes and corresponding cities. It is downloaded from [here](https://datahub.io/core/airport-codes#data).

4. World Temperature Data: This data contains average temperature every day for each city in the world. The dataset is from Kaggle [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

In [43]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


In [44]:
# To display all columns on pandas dataframe
pd.set_option('max_columns', None)

In [4]:
#### Import immigration data
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

In [6]:
print('Row Count', df_spark.count())
df_spark.printSchema()

Row Count 3096313
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: d

In [7]:
# Import mapping table for i94res to nationality
df_nation =spark.read.format('csv').option('header', 'true').load('immigration_nationality.csv')
df_nation.limit(5).toPandas()

Unnamed: 0,Code,Country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [8]:
print('Row Count', df_nation.count())
df_nation.printSchema()

Row Count 289
root
 |-- Code: string (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
# Import mapping table for state abbreviation to state name
df_state =spark.read.format('csv').option('header', 'true').load('state.csv')
df_state.limit(5).toPandas()

Unnamed: 0,State_abbr,State_name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [10]:
print('Row Count', df_state.count())
df_state.printSchema()

Row Count 55
root
 |-- State_abbr: string (nullable = true)
 |-- State_name: string (nullable = true)



In [11]:
# Capitalize the first letter of the state name for consistency

df_state = df_state.withColumn("State_name_2",  f.initcap(df_state.State_name))

In [12]:
df_state.limit(5).toPandas()

Unnamed: 0,State_abbr,State_name,State_name_2
0,AL,ALABAMA,Alabama
1,AK,ALASKA,Alaska
2,AZ,ARIZONA,Arizona
3,AR,ARKANSAS,Arkansas
4,CA,CALIFORNIA,California


In [13]:
# Import mapping table for immigration mode
df_mode =spark.read.format('csv').option('header', 'true').load('mode.csv')
df_mode.limit(5).toPandas()

Unnamed: 0,mode_code,mode_desc
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [14]:
print('Row Count', df_state.count())
df_state.printSchema()

Row Count 55
root
 |-- State_abbr: string (nullable = true)
 |-- State_name: string (nullable = true)
 |-- State_name_2: string (nullable = true)



In [15]:
# Import table for airport
df_airport =spark.read.format('csv').option('header', 'true').load('airport-codes_csv.csv')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [16]:
print('Row Count', df_airport.count())
df_airport.printSchema()

Row Count 55075
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [45]:
# Import table for city demographics

df_demo =spark.read.format('csv').option('header', 'true').option('delimiter', ';').load('us-cities-demographics.csv')
df_demo.limit(5).toPandas()
# which city or state is most popular for immigrant.  What's the ratio of immigrate to the total population.  Or how close is the age. 

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [46]:
print('Row Count', df_demo.count())
df_demo.printSchema()

Row Count 2891
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [19]:
# Import temperature table for each city

path_temp = "../../data2/GlobalLandTemperaturesByCity.csv"
df_temp =spark.read.format('csv').option('header', 'true').option('delimiter', ',').load(path_temp)
df_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [20]:
print('Row Count', df_temp.count())
df_temp.printSchema()

Row Count 8599212
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Check the missing value

In [23]:
for col in df_spark.columns:
  print(col, "with null values", "%15s" % df_spark.filter(df_spark[col].isNull()).count() )

cicid with null values               0
i94yr with null values               0
i94mon with null values               0
i94cit with null values               0
i94res with null values               0
i94port with null values               0
arrdate with null values               0
i94mode with null values             239
i94addr with null values          152592
depdate with null values          142457
i94bir with null values             802
i94visa with null values               0
count with null values               0
dtadfile with null values               1
visapost with null values         1881250
occup with null values         3088187
entdepa with null values             238
entdepd with null values          138429
entdepu with null values         3095921
matflag with null values          138429
biryear with null values             802
dtaddto with null values             477
gender with null values          414269
insnum with null values         2982605
airline with null values    

In [21]:
for col in df_airport.columns:
  print(col, "with null values", "%15s" % df_airport.filter(df_airport[col].isNull()).count() )

ident with null values               0
type with null values               0
name with null values               0
elevation_ft with null values            7006
continent with null values               0
iso_country with null values               0
iso_region with null values               0
municipality with null values            5676
gps_code with null values           14045
iata_code with null values           45886
local_code with null values           26389
coordinates with null values               0


In [20]:
for col in df_demo.columns:
  print(col, "with null values", "%15s" % df_demo.filter(df_demo[col].isNull()).count() )

City with null values               0
State with null values               0
Median Age with null values               0
Male Population with null values               3
Female Population with null values               3
Total Population with null values               0
Number of Veterans with null values              13
Foreign-born with null values              13
Average Household Size with null values              16
State Code with null values               0
Race with null values               0
Count with null values               0


In [19]:
for col in df_temp.columns:
  print(col, "with null values", "%15s" % df_temp.filter(df_temp[col].isNull()).count() )

dt with null values               0
AverageTemperature with null values          364130
AverageTemperatureUncertainty with null values          364130
City with null values               0
Country with null values               0
Latitude with null values               0
Longitude with null values               0


##### There are quite a few missing values, but let's work on processing our data to narrow down to the data relevant to our project.

#### Airport Table Data Processing

In [24]:
# Keep only the US airport
df_airport_us = df_airport.filter(df_airport['iso_Country'] == 'US')
print('Row Count', df_airport_us.count())

Row Count 22757


In [25]:
# Write a function to see unique value in each field (limit to show part of it)
def check_distinct(df, cols):
  """
  A function to show the number of distinct value in each column and show the first 300 distinct value for a quick look and print out.

  df (object): the Spark dataframe to check unique value
  cols (list of string): the list of string for the column name to check unique value
  
  """  
  
  for col in cols:
    distinct = sorted(list(df.select(col).distinct().toPandas()[col]))
    print(col, "distinct count:", len(distinct), "distinct value:", distinct[:300],"\n")

In [28]:
# Check soe 
check_distinct(df_airport_us.dropna(how = 'any', subset = ['iata_code']), ['ident','iso_region','iata_code' ])

ident distinct count: 2019 distinct value: ['07FA', '0AK', '0CO2', '0TE7', '13MA', '13Z', '16A', '16K', '19AK', '19P', '1KC', '1O6', '1Z1', '1Z9', '2AK', '2AK3', '2AK6', '2IG4', '2K5', '2TE0', '2Z1', '2Z6', '38WA', '3AK5', '3IS8', '3VS', '3Z8', '4A2', '4AK', '4K0', '4K5', '4KA', '4TE8', '4Z7', '57A', '5A8', '5KE', '5NK', '5Z9', '65LA', '6CA3', '6N5', '6N7', '6OK6', '78K', '78WA', '7KA', '7NC2', '7WA5', '82CL', '83Q', '84CL', '84K', '89NY', '8XS8', '90WA', '96Z', '9A3', '9A8', '9N2', '9Z8', 'A23', 'A61', 'A63', 'A79', 'AHT', 'AK13', 'AK26', 'AK33', 'AK49', 'AK56', 'AK62', 'AK71', 'AK75', 'AK81', 'AK96', 'AK97', 'AL73', 'ALZ', 'AQY', 'ARX', 'AUS', 'AYZ', 'AZ15', 'BBG', 'BCJ', 'BDH', 'BNF', 'BOF', 'BOK', 'BQV', 'BRG', 'BYA', 'BZS', 'CGA', 'CHP', 'CIV', 'CJX', 'CKR', 'CKU', 'CKX', 'CLC', 'CLG', 'CT88', 'CVH', 'CVR', 'CXC', 'CYM', 'CZK', 'CZN', 'CZO', 'D38', 'D66', 'DCK', 'DCR', 'DHB', 'DPK', 'DWS', 'EDA', 'ESP', 'EXI', 'F23', 'FAL', 'FEW', 'FLE', 'FLT', 'FSN', 'GAB', 'GFD', 'GNU', 'GSZ', '

In [26]:
# split iso_region column to single out State field
split_region = f.split(df_airport_us['iso_region'], '-')
df_airport_us = df_airport_us.withColumn('state', split_region.getItem(1)) \
                .drop('iso_region') \
                .drop('ident') \
                .withColumnRenamed('municipality', 'city') \
                .withColumnRenamed('iso_country', 'country') \

In [27]:
# split coordinates column
split_coordinates = f.split(df_airport_us['coordinates'], ',')
df_airport_us = df_airport_us.withColumn('latitude', split_coordinates.getItem(0)) \
                .withColumn('longitude', split_coordinates.getItem(1)) \
                .drop('coordinates') \
                .drop_duplicates()

In [28]:
# remove null of iata_code which would be our primary key
df_airport_us = df_airport_us.dropna(how = 'any', subset = ['iata_code'])

In [29]:
print('Row Count', df_airport_us.count())
df_airport_us.limit(5).toPandas()

Row Count 2019


Unnamed: 0,type,name,elevation_ft,continent,country,city,gps_code,iata_code,local_code,state,latitude,longitude
0,closed,Ben Bruce Memorial Airpark,44.0,,US,Evadale,4TE8,EVA,4TE8,TX,-94.0735015869,30.3209991455
1,seaplane_base,Yes Bay Lodge Seaplane Base,,,US,Yes Bay,78K,WYB,78K,AK,-131.800994873,55.9163017273
2,small_airport,Goodnews Airport,15.0,,US,Goodnews,GNU,GNU,GNU,AK,-161.57699585,59.117401123
3,medium_airport,Childress Municipal Airport,1954.0,,US,Childress,KCDS,CDS,CDS,TX,-100.288002014,34.4337997437
4,medium_airport,Columbia Regional Airport,889.0,,US,Columbia,KCOU,COU,COU,MO,-92.21959686279295,38.81809997558594


In [53]:
# check what are duplicates
df_airport_us.groupBy(['iata_code']) \
    .count() \
    .where(f.col('count') >1) \
    .show()

+---------+-----+
|iata_code|count|
+---------+-----+
|      CLG|    2|
|      AUS|    2|
|      AHT|    2|
|      ESP|    2|
|      PHL|    2|
+---------+-----+



In [31]:
# Let's look into what is causing the duplicate in iata_code
in_list = ['CLG', 'AUS', 'AHT', 'ESP', 'PHL']
df_airport_us.filter(f.col('iata_code').isin(in_list)) \
            .orderBy(['iata_code']) \
            .show()

+-------------+--------------------+------------+---------+-------+----------------+--------+---------+----------+-----+-------------------+-------------------+
|         type|                name|elevation_ft|continent|country|            city|gps_code|iata_code|local_code|state|           latitude|          longitude|
+-------------+--------------------+------------+---------+-------+----------------+--------+---------+----------+-----+-------------------+-------------------+
|       closed|Amchitka Army Air...|         215|       NA|     US| Amchitka Island|    null|      AHT|      null|   AK|      179.259166667|      51.3777777778|
|       closed|Amchitka Army Air...|         215|       NA|     US| Amchitka Island|    PAHT|      AHT|      null|   AK|      179.259166667|      51.3777777778|
|       closed|Austin Robert Mue...|        null|       NA|     US|            null|    KAUS|      AUS|      null|   TX|     -97.6997852325|      30.2987223546|
|large_airport|Austin Bergstrom ..

In [55]:
# See null record in city field
df_airport_us.filter(f.col('city').isNull()).show()

+------+--------------------+------------+---------+-------+----+--------+---------+----------+-----+------------------+------------------+
|  type|                name|elevation_ft|continent|country|city|gps_code|iata_code|local_code|state|          latitude|         longitude|
+------+--------------------+------------+---------+-------+----+--------+---------+----------+-----+------------------+------------------+
|closed|       Carmel Valley|        null|       NA|     US|null|    null|      O62|      null|   CA|     -121.72911644|     36.4814843441|
|closed|Ft Devens Moore A...|         269|       NA|     US|null|    KAYE|      AYE|      null|   MA|-71.60279846191406| 42.56999969482422|
|closed|    Coalinga Airport|        null|       NA|     US|null|    null|      CLG|      null|   CA|    -120.360116959|     36.1580433385|
|closed|Austin Robert Mue...|        null|       NA|     US|null|    KAUS|      AUS|      null|   TX|    -97.6997852325|     30.2987223546|
|closed|         Era

In [32]:
# drop null in city field if it is one of the duplicate iata_code row
df_airport_us = df_airport_us.filter(~(f.col('iata_code').isin(in_list) & f.col('city').isNull()) )

In [33]:
# drop duplicate in iata_code + city
df_airport_us = df_airport_us.drop_duplicates(subset=['iata_code','city'])


In [34]:
print('Row Count', df_airport_us.count())
df_airport_us.limit(5).toPandas()

Row Count 2014


Unnamed: 0,type,name,elevation_ft,continent,country,city,gps_code,iata_code,local_code,state,latitude,longitude
0,small_airport,Artesia Municipal Airport,3541,,US,Artesia,KATS,ATS,ATS,NM,-104.468002319,32.8525009155
1,small_airport,Buchanan Field,26,,US,Concord,KCCR,CCR,CCR,CA,-122.056999207,37.9897003174
2,small_airport,Calexico International Airport,4,,US,Calexico,KCXL,CXL,CXL,CA,-115.513000488,32.6694984436
3,medium_airport,Bisbee Douglas International Airport,4154,,US,Douglas Bisbee,KDUG,DUG,DUG,AZ,-109.603996277,31.4689998627
4,small_airport,Danbury Municipal Airport,458,,US,Danbury,KDXR,DXR,DXR,CT,-73.48220062259999,41.3714981079


In [35]:
# final check on duplicate in iata_code => no more duplicate in iata_code
df_airport_us.groupBy(['iata_code']) \
    .count() \
    .where(f.col('count') >1) \
    .show()

+---------+-----+
|iata_code|count|
+---------+-----+
+---------+-----+



#### Temperature Table Data Processing

In [16]:
check_distinct(df_temp, ['City', 'Country'])

City distinct count: 3448 distinct value: ['A Coruña', 'Aachen', 'Aalborg', 'Aba', 'Abadan', 'Abakaliki', 'Abakan', 'Abbotsford', 'Abengourou', 'Abeokuta', 'Aberdeen', 'Abha', 'Abidjan', 'Abiko', 'Abilene', 'Abohar', 'Abomey Calavi', 'Abu Dhabi', 'Abuja', 'Acapulco', 'Acarigua', 'Accra', 'Achalpur', 'Acheng', 'Achinsk', 'Acuña', 'Adana', 'Addis Abeba', 'Adelaide', 'Aden', 'Adilabad', 'Adiwerna', 'Adoni', 'Afyonkarahisar', 'Agadir', 'Agartala', 'Agboville', 'Ageo', 'Agra', 'Aguascalientes', 'Ahmadabad', 'Ahmadnagar', 'Ahmadpur East', 'Ahvaz', 'Aix En Provence', 'Aizawl', 'Ajdabiya', 'Ajmer', 'Akashi', 'Akishima', 'Akita', 'Akola', 'Akron', 'Aksaray', 'Aksu', 'Aktau', 'Akure', 'Akyab', 'Alagoinhas', 'Alandur', 'Alanya', 'Alappuzha', 'Albacete', 'Alberton', 'Albuquerque', 'Albury', 'Alcalá De Henares', 'Alcobendas', 'Alcorcón', 'Aleppo', 'Alexandria', 'Algeciras', 'Algiers', 'Aligarh', 'Allahabad', 'Allentown', 'Almaty', 'Almere', 'Almería', 'Almetyevsk', 'Alor Setar', 'Altay', 'Alwar', '

In [36]:
# Temperature to Limit to only "United States" record

df_temp_us = df_temp.filter(df_temp['Country'] == 'United States')
print('Row Count', df_temp_us.count())

Row Count 687289


In [37]:
# Split datetime column into year and month

df_temp_us = df_temp_us.withColumn('year', f.year(f.col('dt'))) \
            .withColumn('month', f.month(f.col('dt')))

In [37]:
check_distinct(df_temp_us, ['year', 'month'])

year distinct count: 271 distinct value: [1743, 1744, 1745, 1746, 1747, 1748, 1749, 1750, 1751, 1752, 1753, 1754, 1755, 1756, 1757, 1758, 1759, 1760, 1761, 1762, 1763, 1764, 1765, 1766, 1767, 1768, 1769, 1770, 1771, 1772, 1773, 1774, 1775, 1776, 1777, 1778, 1779, 1780, 1781, 1782, 1783, 1784, 1785, 1786, 1787, 1788, 1789, 1790, 1791, 1792, 1793, 1794, 1795, 1796, 1797, 1798, 1799, 1800, 1801, 1802, 1803, 1804, 1805, 1806, 1807, 1808, 1809, 1810, 1811, 1812, 1813, 1814, 1815, 1816, 1817, 1818, 1819, 1820, 1821, 1822, 1823, 1824, 1825, 1826, 1827, 1828, 1829, 1830, 1831, 1832, 1833, 1834, 1835, 1836, 1837, 1838, 1839, 1840, 1841, 1842, 1843, 1844, 1845, 1846, 1847, 1848, 1849, 1850, 1851, 1852, 1853, 1854, 1855, 1856, 1857, 1858, 1859, 1860, 1861, 1862, 1863, 1864, 1865, 1866, 1867, 1868, 1869, 1870, 1871, 1872, 1873, 1874, 1875, 1876, 1877, 1878, 1879, 1880, 1881, 1882, 1883, 1884, 1885, 1886, 1887, 1888, 1889, 1890, 1891, 1892, 1893, 1894, 1895, 1896, 1897, 1898, 1899, 1900, 1901, 1902

##### There are only temperature data until 2013 while the immigration data is in 2016. Let's keep only the 2013 temperature data as it is closet to immigration data time range.

In [38]:
df_temp_us = df_temp_us.filter(f.col('year') == 2013)
print('Row Count', df_temp_us.count())

Row Count 2313


In [39]:
# Write one function that can show number of null record and its % of total rows.
def check_missing_all(df, cols):
  """
  This function to allow quickly specify the dataframe and columns to perform null value count and its % to total row counts, and print out.

  Args:
      df (object): the spark dataframe to check missing value
      cols (list of string): the column name to check missing value
  
  """  
      
  from pyspark.sql import functions as f
  df_copy = df
  total_cnt = df_copy.count()
  print('Total Row Count', total_cnt)  
  type_map = dict(df_copy[cols].dtypes)
  for key in type_map:
    if type_map[key] == "timestamp":
      df_copy = df_copy.withColumn(key, f.col(key).cast(StringType()))
    col_cnt = df_copy.filter(df_copy[key].isNull() | f.isnan(df_copy[key]) | (df_copy[key] == "") ).count()
    print(key, "with null values", "%15s" % col_cnt,  "%15s" % round(col_cnt / total_cnt * 100,2),"%" )

In [70]:
check_missing_all(df_temp_us, df_temp_us.columns)

Total Row Count 2313
dt with null values               0             0.0 %
AverageTemperature with null values               1            0.04 %
AverageTemperatureUncertainty with null values               1            0.04 %
City with null values               0             0.0 %
Country with null values               0             0.0 %
Latitude with null values               0             0.0 %
Longitude with null values               0             0.0 %
year with null values               0             0.0 %
month with null values               0             0.0 %


In [40]:
# Let's drop the null and duplicates

df_temp_us = df_temp_us.dropna(how = 'any') \
                .drop_duplicates()

In [41]:
# Select only the key fields and aggregate the temperature fields by taking the mean.
cols_to_avg = ['AverageTemperature', 'AverageTemperatureUncertainty']
exprs = {x: "mean" for x in cols_to_avg}
df_temp_us_avg = df_temp_us.groupby(['City', 'year', 'month']).agg(exprs) \
                .withColumnRenamed('avg(AverageTemperature)', 'Avg_Temperature') \
                .withColumnRenamed('avg(AverageTemperatureUncertainty)', 'Avg_Temperature_Uncertainty')

In [44]:
print('Row Count', df_temp_us_avg.count())
df_temp_us_avg.limit(5).toPandas()

Row Count 2231


Unnamed: 0,City,year,month,Avg_Temperature,Avg_Temperature_Uncertainty
0,Bakersfield,2013,1,6.501,0.425
1,Westminster,2013,5,13.409,0.368
2,East Los Angeles,2013,5,19.028,0.531
3,Topeka,2013,1,-0.162,0.41
4,Worcester,2013,3,0.699,0.367


#### Demographic Table Data Processing

In [47]:
df_demo.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [48]:
# drop State field and use State Code instead since we will have one dimension table for state name
df_demo = df_demo.drop('State')


In [89]:
df_demo.limit(5).toPandas()

Unnamed: 0,City,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [39]:
# Check whethere there are duplicate city and state
df_demo.groupBy(['City', 'State Code']) \
    .count() \
    .where(f.col('count') >1) \
    .select(f.sum('count')) \
    .show()

+----------+
|sum(count)|
+----------+
|      2889|
+----------+



In [41]:
# To sort to visually inspect what is causing the duplicate => Race field
cols_sort = ['City','State Code']
df_demo.orderBy(cols_sort, ascending = True).limit(10).show()

+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   City|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|Abilene|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|               White| 95487|
|Abilene|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|American Indian a...|  1813|
|Abilene|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|               Asian|  2929|
|Abi

In [49]:
# Let me pivot the "Race" field to make it to multiple columns, in order to keep state code and city to be unique identifier for this table.
df_demo = df_demo.groupBy(['City', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code']) \
        .pivot('Race') \
        .agg({'Count':'sum'})

df_demo.limit(5).toPandas()

Unnamed: 0,City,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Kalamazoo,26.4,37175,38865,76040,3048,3482,2.36,MI,954.0,2429.0,19194.0,4114.0,56866.0
1,Augusta-Richmond County consolidated government,33.7,94662,101917,196579,19085,7915,2.67,GA,1667.0,4429.0,112271.0,9068.0,77940.0
2,Weston,38.6,32956,36991,69947,1507,30876,3.34,FL,128.0,4130.0,3697.0,36687.0,61021.0
3,Jacksonville,24.2,40015,27348,67363,8252,3732,2.51,NC,1741.0,4204.0,13253.0,11947.0,51245.0
4,New Rochelle,40.6,38871,40967,79838,2780,26960,2.85,NY,645.0,4218.0,17723.0,23548.0,44435.0


In [52]:
df_demo.columns

['City',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'American Indian and Alaska Native',
 'Asian',
 'Black or African-American',
 'Hispanic or Latino',
 'White']

In [50]:
# Nice, now we can uniquely identify the demographic table by city & state code
df_demo.groupBy(['City', 'State Code']) \
    .count() \
    .where(f.col('count') >1) \
    .show()

+----+----------+-----+
|City|State Code|count|
+----+----------+-----+
+----+----------+-----+



#### Immigration Table Data Processing

In [32]:
check_distinct(df_spark_select, ['i94yr', 'i94mon','i94port'])

i94yr distinct count: 1 distinct value: [2016.0] 

i94mon distinct count: 1 distinct value: [4.0] 

i94port distinct count: 299 distinct value: ['5KE', '5T6', 'ABG', 'ABQ', 'ABS', 'ADS', 'ADT', 'ADW', 'AGA', 'AGN', 'ALC', 'ANA', 'ANC', 'AND', 'ANZ', 'APF', 'ATL', 'ATW', 'AUS', 'AXB', 'BAL', 'BAU', 'BDL', 'BEB', 'BED', 'BEE', 'BEL', 'BGM', 'BHX', 'BLA', 'BOA', 'BOS', 'BQN', 'BRG', 'BRO', 'BTN', 'BUF', 'BWA', 'BWM', 'CAE', 'CAL', 'CHA', 'CHI', 'CHL', 'CHM', 'CHR', 'CHS', 'CHT', 'CIN', 'CLE', 'CLG', 'CLM', 'CLS', 'CLT', 'CNA', 'CNC', 'COB', 'COL', 'COO', 'CPX', 'CRP', 'CRQ', 'CRY', 'DAB', 'DAC', 'DAL', 'DEN', 'DER', 'DET', 'DLB', 'DLR', 'DNA', 'DNS', 'DOU', 'DPA', 'DUB', 'DVL', 'EDA', 'EGP', 'ELP', 'EPI', 'ERC', 'FAL', 'FAR', 'FCA', 'FER', 'FMY', 'FOK', 'FPR', 'FPT', 'FRB', 'FRE', 'FRI', 'FRT', 'FTC', 'FTF', 'FTK', 'FTL', 'FWA', 'GAL', 'GPM', 'GSP', 'HAL', 'HAM', 'HAR', 'HEF', 'HEL', 'HHW', 'HID', 'HIG', 'HNN', 'HNS', 'HOU', 'HPN', 'HSV', 'HTM', 'HVR', 'ICT', 'INP', 'INT', 'JAC', 'JFA', '

In [43]:
# Select only necessary fields
immigration_select = ['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'occup',
 'biryear',
 'gender',
 'airline',
 'visatype']

In [53]:
df_spark_select = df_spark.select(immigration_select)

In [74]:
# Check if duplicate in cicid, the primary key field
df_spark_select.groupBy(['cicid']) \
    .count() \
    .where(f.col('count') >1) \
    .show()

+-----+-----+
|cicid|count|
+-----+-----+
+-----+-----+



In [45]:
spark.conf.set( "spark.sql.crossJoin.enabled" , "true" )

In [54]:
# join in city field from airport table. City field is to be one of the foreign key to temperature and demographic table

df_spark_select = df_spark_select.join(df_airport_us.select(['city', 'iata_code']), [df_spark_select.i94port == df_airport_us.iata_code], how='left') \
                    .drop('iata_code')

In [55]:
# write a function to rename multiple columns (reference: https://stackoverflow.com/questions/38798567/pyspark-rename-more-than-one-column-using-withcolumnrenamed)

def rename_columns(df, columns):
    """
    This function to allow quickly specify the dataframe and its columns for renaming.
    
    Args:
        df (object): the Spark dataframe to rename
        columns (dict): a dictinoary containing key value pairs which is old name and new name
    
    Return:
        df (object): the renamed Spark dataframe
    """
       
    if isinstance(columns, dict):
        for old_name, new_name in columns.items():
            df = df.withColumnRenamed(old_name, new_name)
        return df
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

In [56]:
renamed_dict = {'cicid': 'id', 'i94yr':'year', 'i94mon':'month', 'i94cit':'citizenship', 'i94res':'residence', 'i94port':'airport_code', 'arrdate':'arrival_date', 'i94mode':'travel_mode', 'i94addr':'state_abbr', 'depdate':'departure_date', 'i94bir':'age','i94visa': 'visa_category', 'occup':'occupation', 'biryear':'birth_year' }
df_spark_select = rename_columns(df_spark_select, renamed_dict)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The design the data model with star schema to make it quick to join tables and perform analysis. The schema of tables is as follows.

**Fact Table:**
1. immigration_record:
          id (PK)
          year (FK)
          month (FK)
          citizenship
          residence (FK)
          airport_code (FK)
          arrival_date
          travel_mode (FK)
          state_abbr (FK)
          city (FK)
          departure_date
          age
          visa_category (FK)
          occupation
          birth_year
          gender
          airline
          visatype
          
 
**Dimension Table:**

2. temperature:
        City (Composite PK)
        year (Composite PK)
        month (Composite PK)
        Avg_Temperature
        Avg_Temperature_Uncertainty

3. demographic:
        State Code (Composite PK)
        City (Composite PK)
        Median Age
        Male Population
        Female Population
        Total Population
        Number of Veterans
        Foreign-born
        Average Household Size
        American Indian and Alaska Native
        Asian
        Black or African-American
        Hispanic or Latino
        White

4. airport:
        iata_code (PK)
        type
        name
        elevation_ft
        continent
        country
        state
        city
        local_code
        gps_code
        latitude
        longitude

5. immigrant_nationality:
        code (PK)
        country

6. visa_table:
        visa_code (PK)
        visa_type     

7. mode_table:
        mode_code (PK)
        mode_desc

8. state_table:
        state_abbr (PK)
        state_name


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Retrive airport data from csv file and do necessary data cleanining to get this dimension table.
2. Retrieve immigration and travel data from the sas file, join the immigration table with airport table, and perform necessary data cleaning to get our fact table.
3. Retrieve the temperarture data from csv file and perform necessary data cleaning to get this dimension table.
4. Retrieve the demographics data from csv file and perform necessary data cleaning to get this dimension table.
5. Retrive the travel nationality, mode, state, and visa from the csv file to get these dimension tables.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [4]:
def process_airport(file_name ):
    
    """
    This function to perform ETL of airport data and return the processed dimension table
    
    Args:
        file_name (string): the filenmame of the airport source file.
    
    Return:
        df_airport_us: the processed Spark dataframe.        
        
    """
    
    # load in airport file
    df_airport =spark.read.format('csv').option('header', 'true').load(file_name)
    
    # filter to only US airport
    df_airport_us = df_airport.filter(df_airport['iso_Country'] == 'US')
    
    # split iso_region column to single out State field
    split_region = f.split(df_airport_us['iso_region'], '-')
    df_airport_us = df_airport_us.withColumn('state', split_region.getItem(1)) \
                    .drop('iso_region') \
                    .drop('ident') \
                    .withColumnRenamed('municipality', 'city') \
                    .withColumnRenamed('iso_country', 'country')
    
    # split coordinates column
    split_coordinates = f.split(df_airport_us['coordinates'], ',')
    df_airport_us = df_airport_us.withColumn('latitude', split_coordinates.getItem(0)) \
                    .withColumn('longitude', split_coordinates.getItem(1)) \
                    .drop('coordinates') \
                    .drop_duplicates()
    
    # remove null of iata_code which would be our primary key
    df_airport_us = df_airport_us.dropna(how = 'any', subset = ['iata_code'])
    
    # drop null in city field if it is one of the duplicate iata_code row
    in_list = ['CLG', 'AUS', 'AHT', 'ESP', 'PHL']
    df_airport_us = df_airport_us.filter(~(f.col('iata_code').isin(in_list) & f.col('city').isNull()) )
    
    # drop duplicate in iata_code + city
    df_airport_us = df_airport_us.drop_duplicates(subset=['iata_code','city'])
    
    return df_airport_us

In [5]:
def process_immigration(file_name, df_airport_us, package = 'com.github.saurfang.sas.spark'):
    
    """
    This function to perform ETL of immigration/travel data and return the processed fact table
    
    Args:
        package: the required package to process the sas file.
        file_name (string): the filenmame of the immigration/travel source file.
    
    Return:
        df_spark_select: the processed Spark dataframe.        
    
    """
        
    from pyspark.sql import SparkSession
    
    # load necessary package to process sas
    spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()
    
    #### Import immigration data
    df_spark =spark.read.format(package).load(file_name)
    
    # Select only necessary fields
    immigration_select = ['cicid',
     'i94yr',
     'i94mon',
     'i94cit',
     'i94res',
     'i94port',
     'arrdate',
     'i94mode',
     'i94addr',
     'depdate',
     'i94bir',
     'i94visa',
     'occup',
     'biryear',
     'gender',
     'airline',
     'visatype']
    
    df_spark_select = df_spark.select(immigration_select)
    
    # join in city field from airport table. City field is to be one of the foreign key to temperature and demographic table
    spark.conf.set( "spark.sql.crossJoin.enabled" , "true" )
    df_spark_select = df_spark_select.join(df_airport_us.select(['city', 'iata_code']), [df_spark_select.i94port == df_airport_us.iata_code], how='left') \
                    .drop('iata_code')
    
    # rename the column
    renamed_dict = {'cicid': 'id', 'i94yr':'year', 'i94mon':'month', 'i94cit':'citizenship', 'i94res':'residence', 'i94port':'airport_code', 'arrdate':'arrival_date', 'i94mode':'travel_mode', 'i94addr':'state_abbr', 'depdate':'departure_date', 'i94bir':'age','i94visa': 'visa_category', 'occup':'occupation', 'biryear':'birth_year' }
    for old_name, new_name in renamed_dict.items():
        df_spark_select = df_spark_select.withColumnRenamed(old_name, new_name)
            
    return df_spark_select

In [6]:
def process_temperature(path_temp):
    """
    This function to perform ETL of temperature data and return the processed dimension table
    
    Args:
        path_temp (string): the filenmame of the temperature source file.
    
    Return:
        df_temp_us_avg (object): the processed Spark dataframe.            
    
    """
    
    path_temp = "../../data2/GlobalLandTemperaturesByCity.csv"
    df_temp =spark.read.format('csv').option('header', 'true').option('delimiter', ',').load(path_temp)
    
    # Temperature to Limit to only "United States" record
    df_temp_us = df_temp.filter(df_temp['Country'] == 'United States')
    
    # Split datetime column into year and month
    df_temp_us = df_temp_us.withColumn('year', f.year(f.col('dt'))) \
                .withColumn('month', f.month(f.col('dt')))
    
    # filter to year 2013
    df_temp_us = df_temp_us.filter(f.col('year') == 2013)
    
    # Let's drop the null and duplicates
    df_temp_us = df_temp_us.dropna(how = 'any') \
                    .drop_duplicates()
    
    # Select only the key fields and aggregate the temperature fields by taking the mean.
    cols_to_avg = ['AverageTemperature', 'AverageTemperatureUncertainty']
    exprs = {x: "mean" for x in cols_to_avg}
    df_temp_us_avg = df_temp_us.groupby(['City', 'year', 'month']).agg(exprs) \
                    .withColumnRenamed('avg(AverageTemperature)', 'Avg_Temperature') \
                    .withColumnRenamed('avg(AverageTemperatureUncertainty)', 'Avg_Temperature_Uncertainty')
    
    return df_temp_us_avg

In [53]:
def process_demo(filename):
    """
    This function to perform ETL of demographic data and return the processed dimension table
    
    Args:
        filename (string): the filenmame of the demographic source file.
    
    Return:
        df_demo (object): the processed Spark dataframe.            
    """    
    # load file
    df_demo =spark.read.format('csv').option('header', 'true').option('delimiter', ';').load(filename)
    
    # drop State field
    df_demo = df_demo.drop('State')
    
    # Pivot the Race column into multiple columns to make State Code and City unique identifier
    df_demo = df_demo.groupBy(['City', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code']) \
        .pivot('Race') \
        .agg({'Count':'sum'})
    
    # rename the column so no more space that could cause save to parquet problem later on
    renamed_dict = {'Median Age':'median_age', 'Male Population':'male_population', 'Female Population':'female_population', 'Total Population':'total_population', 'Number of Veterans':'number_of_veterans', 'Average Household Size':'average_household_size','State Code':'state_code', 'American Indian and Alaska Native':'American_Indian_and_Alaska_Native','Black or African-American':'Black_or_African-American','Hispanic or Latino':'Hispanic_or_Latino'}
    for old_name, new_name in renamed_dict.items():
        df_demo = df_demo.withColumnRenamed(old_name, new_name)    
    
    return df_demo

In [8]:
# Now run these functions to create the tables
airport = process_airport('airport-codes_csv.csv')
immigration_record = process_immigration(df_airport_us = airport, file_name = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
temperature = process_temperature('../../data2/GlobalLandTemperaturesByCity.csv')
demographic = process_demo('us-cities-demographics.csv')

In [12]:
# Load the remaining tables from csv files that derived from I94_SAS_Labels_Descriptions
immigrant_nationality = spark.read.format('csv').option('header', 'true').load('immigration_nationality.csv')
state_table = spark.read.format('csv').option('header', 'true').load('state.csv')
mode_table = spark.read.format('csv').option('header', 'true').load('mode.csv')
visa_table = spark.read.format('csv').option('header', 'true').load('visa.csv')

In [18]:
# write to parquet files for easier querying later
immigration_record.write.mode("overwrite").partitionBy("state_abbr", "gender").parquet("./tables/immigration_record")
airport.write.mode("overwrite").partitionBy("iata_code").parquet("./tables/airport")
temperature.write.mode("overwrite").partitionBy("City", "year", "month").parquet("./tables/temperature")
demographic.write.mode("overwrite").partitionBy("state_code", "City").parquet("./tables/demographic")
immigrant_nationality.write.mode("overwrite").parquet("./tables/immigrant_nationality")
visa_table.write.mode("overwrite").parquet("./tables/visa_table")
mode_table.write.mode("overwrite").parquet("./tables/mode_table")
state_table.write.mode("overwrite").parquet("./tables/state_table")


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
# 1. Check all table exist and is not empty
# 2. Check primary key of each table is unique and no null.

In [2]:
# 1. Check whether all tables exist and is not empty

def check_table(table_directory = './tables'):
    
    """
    This function to perform data quality check to examine whether all required table are presented in our directory,
    and check if they all contain value.
    
    Args:
        table_directory (str): the directory that contain all the table in parquet format
    
    Output:
        Print out of messages related to quality check result.
    """
    
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()
    
    # placeholder for table name and its row counts
    dict_cnt = {'temperature':0, 'immigrant_nationality':0,'demographic':0,'immigration_record':0, 'state_table':0,'airport':0,'mode_table':0,'visa_table':0}

    for table, cnt in dict_cnt.items():
        try:
            df_test = spark.read.parquet(table_directory+'/'+table)

            cnt_test = df_test.count()

            # store the row count result
            dict_cnt[table] = cnt_test

        except Exception:
            print("Data Quality Check Failed: The table", table, "cannot be read with parquet format by Spark, please check if the table is missing.")

    if all(x != 0 for x in dict_cnt.values()):
        print("Data Quality Check Passed: All table contain value. Row counts for all tables in {table: count} format =>", str(dict_cnt))
    else:
        print("Data Quality Check Failed: Not all table contain data. Row counts for all tables in {table: count} format =>", str(dict_cnt))


In [3]:
# run the 1st data quality check
check_table('./tables')

Data Quality Check Passed: All table contain value. Row counts for all tables in {table: count} format => {'temperature': 2231, 'immigrant_nationality': 289, 'demographic': 2891, 'immigration_record': 3096313, 'state_table': 55, 'airport': 2014, 'mode_table': 4, 'visa_table': 3}


In [57]:
# 2. Check primary key of each table is unique and no null.

def check_primary_key(table_directory = './tables'):
    
    dict_table_primary = {'temperature':['City','year','month'], 'immigrant_nationality':['code'],'demographic':['City', 'state_code'],'immigration_record':['id'], 'state_table':['State_abbr'],'airport':['iata_code'],'mode_table':['mode_code'],'visa_table':['visa_code']}
    
    # placeholder for any data quality issue
    all_cnt = 0
    
    for table, primary in dict_table_primary.items():
        try:
            df_test = spark.read.parquet(table_directory+'/'+table)
        except Exception:
            print("Data Quality Check Failed: The table", table, "cannot be read with parquet format by Spark, please check if the table is missing.")
            all_cnt += 1
            continue
        
        # check duplicate
        duplicate_cnt = df_test.groupBy(primary) \
                        .count() \
                        .where(f.col('count') >1) \
                        .select(f.sum('count')) \
                        .collect()[0]['sum(count)']
        
        if duplicate_cnt != 0 and duplicate_cnt is not None:
            print("Data Quality Check Failed: The table", table, "has", duplicate_cnt, "duplicate records in its primary key fields", primary)
            all_cnt += 1
            
        # check null
        null_cnt = 0
        
        for field in primary:
            null_cnt = null_cnt + df_test.filter(df_test[field].isNull()).count()
    
        if null_cnt != 0:
            print("Data Quality Check Failed: The table", table, "has", null_cnt, "null records in its primary key fields", primary)
            all_cnt += 1
    
    if all_cnt == 0:
        print("Data Quality Check Passed!")


In [58]:
# run the 2nd data quality check
check_primary_key('./tables')

Data Quality Check Passed!


#### 4.3 Data dictionary 
Below is the data dictionary for this data model.

1. immigration_record:
          id (PK): identification number
          year (FK): year of i94
          month (FK): month of i94
          citizenship: citizenship of the visitor
          residence (FK): visitor's residence
          airport_code (FK): airport code
          arrival_date: arrival date
          travel_mode (FK): code for mode of travel
          state_abbr (FK): state abbreviation
          city (FK): city of the airport
          departure_date: departure date
          age: age of the visitor
          visa_category (FK): visa category
          occupation: occupation of the visitor
          birth_year: visitor's birthyear
          gender: visitor's gender
          airline: airline the visitor choose
          visatype: visitor's visa type
          
 
2. temperature:
        City (Composite PK): city
        year (Composite PK): year
        month (Composite PK): month
        Avg_Temperature: average temperature
        Avg_Temperature_Uncertainty: average temperature uncertainty range

3. demographic:
        state_code (Composite PK): state abbreviation
        City (Composite PK): city
        median_age: median age of the city
        male_population: male population count
        female_population: female population count
        total_population: total population count
        number_of_veterans: number of veterans
        Foreign-born: number of foreign born
        average_household_size: average household size
        American_Indian_and_Alaska_Native: count of population of this race
        Asian: count of population of this race
        Black_or_African-American: count of population of this race
        Hispanic_or_Latino: count of population of this race
        White: count of population of this race

4. airport:
        iata_code (PK): iata code of this airport
        type: type of the airport
        name: name of the airport
        elevation_ft: elevation of the airport
        continent: continent of the airport
        country: country of the airport
        state: state of the airport
        city: city of the airport
        local_code: local code of the airport
        gps_code: gps code of the airport
        latitude: latitude of the airport
        longitude: longitude of the airport

5. immigrant_nationality:
        code (PK): citizenship and residence code on the immigration_record table
        country: corresponding country name 

6. visa_table:
        visa_code (PK): visa_category on the immigration_record table 
        visa_type: corresponding visa type

7. mode_table:
        mode_code (PK): travel_mode on the immigration_record table
        mode_desc: corresponding mode of travel description

8. state_table:
        state_abbr (PK): state abbreviation
        state_name: state name


#### Step 5: Complete Project Write Up


In this project, we use Spark to perform ETL and create tables for our data model because Spark can process large amount of data with in-memory computing which are more efficient and can scale easily to even larger dataset. Spark also allows loading from various data format and also can be integrated with Cloud computing such as AWS.

The proposed update timing should be based on firstly, the reporting need; and secondly, the new data availability. In our case, it may make sense to update monthly because company may want to adjust their following months marketing strategy and message based on at least one month of new data, assuming new data coming out every month.

For future development, we can tackle this project with different approaches under the following scenarios:
 * **If the data was increased by 100x:** We can utilize Cloud computing to host the ETL on Amazon Web Service using Amazon EMR, so that we can easily increase the number of worker nodes based on the necessary data load. This approach allows us to easily scale up or scale out, thereby maintain the desired efficiency.
 * **If the data populates a dashboard that must be updated on a daily basis by 7am every day:** We can utilize Apache Airflow to perform schedule run following the desired workflow, and furthermore to monitor the workflow and notify us when issues arise, so we can fix issues in a timely manner to meet the requirement.  
 * **If the database needed to be accessed by 100+ people:** We can put the data model onto Amazon Redshift as our data warehouse on the Cloud. Its massively parallel processing architecture allows multiple users to perform fast querying on large scale of data. And it also allows to add nodes to the data warehouse to maintain fast query performance as the data warehouse grows.