# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this project, I'll be looking at the immigration data for the US. More specifically, looking into:

- the seasonality of travel
- the connection between the volume of travel and the number of entry ports (ie airports)
- the connection between the volume of travel and the demographics of various cities

To accomplish this, the following datasets will be used:

1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office and includes the contents of the i94 form on entry to the united states. A data dictionary is included in the workspace.

2. countries.csv : table containing country codes used in the dataset, extracted from the data dictionary
3. i94portCodes.csv: table containing city codes used in the dataset, extracted from the data dictionary


4. World Temperature Data: This dataset comes from Kaggle and includes the temperatures of various cities in the world fomr 1743 to 2013.

5. U.S. City Demographic Data: This data comes from OpenSoft. It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000 and comes from the US Census Bureau's 2015 American Community Survey.

6. Airport Code Table: This is a simple table of airport codes and corresponding cities.

The data: 

1. is aggregated based on time (year, month, day, etc...)
2. is aggregated data by cities and airports
3.  was looked into for the impact of temperatures on the in and ouflux of travelers
4.  was looked into the impact on regional demographics


The project follows the follow steps:
    Step 1: Scope the Project and Gather Data
    Step 2: Explore and Assess the Data
    Step 3: Define the Data Model
    Step 4: Run ETL to Model the Data
    Step 5: Complete Project Write Up

In [1]:

# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.enableHiveSupport().getOrCreate()

## I94 Immigration Data

In [3]:
imm_data = spark.read.parquet("sas_data")
print(imm_data.count())
imm_data.limit(10).toPandas()

3096313


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


## U.S City Demographic Data

In [4]:
city_dem_data = pd.read_csv('us-cities-demographics.csv', sep=';')
print(city_dem_data.info())
city_dem_data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB
None


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


## Airport Code Data

In [5]:
airport_code_data = pd.read_csv('airport-codes_csv.csv')
print(airport_code_data.info())
airport_code_data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB
None


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


## World Temperature Data

In [6]:
temp_data = pd.read_csv('GlobalLandTemperaturesByCity.csv')
print(temp_data.info())
temp_data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB
None


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Airport data

In [7]:
airport_code_data.shape

(55075, 12)

In [8]:
airport_code_data.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [9]:
# to check the countries where the airports are located
airport_code_data.groupby('iso_country')['iso_country'].count()

iso_country
AD        2
AE       57
AF       64
AG        3
AI        1
AL       13
AM       13
AO      104
AQ       27
AR      848
AS        4
AT      145
AU     1963
AW        1
AZ       35
BA       15
BB        6
BD       16
BE      146
BF       51
BG      134
BH        4
BI        7
BJ       10
BL        1
BM        3
BN        2
BO      197
BQ        3
BR     4334
      ...  
TM       21
TN       15
TO        6
TR      124
TT        3
TV        3
TW       65
TZ      207
UA      191
UG       38
UM        6
US    22757
UY       54
UZ      176
VA        1
VC        6
VE      592
VG        3
VI        9
VN       50
VU       32
WF        2
WS        4
XK        6
YE       25
YT        1
ZA      489
ZM      103
ZW      138
ZZ        7
Name: iso_country, Length: 243, dtype: int64

In [10]:
# limiting our data to only country Canada since our immagration dataset only contains those entries
airport_code_data = airport_code_data[airport_code_data['iso_country'].fillna('').str.upper().str.contains('US')].copy()

In [11]:
#looking at the types column
airport_code_data.groupby('type')['type'].count()

type
balloonport          18
closed             1326
heliport           6265
large_airport       170
medium_airport      692
seaplane_base       566
small_airport     13720
Name: type, dtype: int64

#### Since we are looking at immigration data, no data will be collected from closed airport (we're assuming its closed) nor from heliport and seaplane_baseand balloonport since those are used for recreational purposes. 

In [12]:
airport_code_data = airport_code_data[~airport_code_data['type'].str.strip().isin(['closed','seaplane_base', 'heliport', 'balloonport'])].copy()

In [13]:
# checking for other null values
airport_code_data.isnull().sum()

ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

In [14]:
# checking if we can use municipality to merge our dataset
airport_code_data[airport_code_data.municipality.isna()].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7653,6XA4,small_airport,Zadow Airstrip,,,US,US-TX,,6XA4,,,"-95.954353809, 29.991738550900003"
7887,74xa,small_airport,Gun Barrel City Airpark,385.0,,US,US-TX,,74XA,,,"-96.1456650496, 32.3551499558"
8082,79ID,small_airport,Kooskia (Clear Creek Int) Airport,1800.0,,US,US-ID,,79ID,,,"-115.869691372, 46.0488642914"
8114,79WT,small_airport,Ellensburg (Rotor Ranch) Airport,1962.0,,US,US-WA,,79WT,,,"-120.589778423, 47.091426059499994"
9055,8FA4,small_airport,Samsula / Coe Field,40.0,,US,US-FL,,8FA4,,,"-81.1328315735, 29.0102045831"


In [15]:
#removing those with lack of information (mostly municipality as null) from our dataset
airport_code_data = airport_code_data[~airport_code_data['municipality'].isna()].copy()

In [16]:
#to join with the other dataset, need municipality column to be upper case
airport_code_data.municipality = airport_code_data.municipality.str.upper()

In [17]:
# taking an overall look at iso_region
airport_code_data.groupby('iso_region')['iso_region'].count()

iso_region
US-AK      586
US-AL      197
US-AR      291
US-AZ      214
US-CA      551
US-CO      288
US-CT       56
US-DC        2
US-DE       36
US-FL      522
US-GA      365
US-HI       35
US-IA      232
US-ID      238
US-IL      579
US-IN      486
US-KS      372
US-KY      164
US-LA      281
US-MA       79
US-MD      157
US-ME      122
US-MI      379
US-MN      361
US-MO      411
US-MS      211
US-MT      255
US-NC      349
US-ND      297
US-NE      259
US-NH       54
US-NJ      116
US-NM      149
US-NV      113
US-NY      402
US-OH      492
US-OK      372
US-OR      357
US-PA      486
US-RI       10
US-SC      173
US-SD      162
US-TN      228
US-TX     1546
US-U-A       3
US-UT      103
US-VA      311
US-VT       66
US-WA      379
US-WI      457
US-WV       83
US-WY       95
Name: iso_region, dtype: int64

In [18]:
# removing US-U-A  - seems like a mistake
airport_code_data = airport_code_data[airport_code_data.iso_region != 'US-U-A ']

In [19]:
# extracting the state code
airport_code_data['state'] = airport_code_data['iso_region'].str.strip().str.split("-", n = 1, expand = True)[1]

### Temperature data

In [20]:
temp_data.shape

(8599212, 7)

In [21]:
temp_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [22]:
temp_data['Country'].nunique()

159

In [23]:
# Reducing the size of the dataset
temp_data = temp_data[temp_data['Country']=='Canada']

In [24]:
# converting date into datetime
temp_data['convertedDate'] = pd.to_datetime(temp_data.dt)

In [25]:
#removing dates before the 1950 since there is not commercial airplanes before then
temp_data=temp_data[temp_data['convertedDate']>"1950-01-01"].copy()

In [26]:
#checking for null values
temp_data.isnull().sum()

dt                               0
AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
Country                          0
Latitude                         0
Longitude                        0
convertedDate                    0
dtype: int64

In [27]:
temp_data.shape

(19100, 8)

Now making city and date as primary key

In [28]:
temp_data[['City', 'convertedDate']].drop_duplicates().shape

(19100, 2)

### Demographic data

In [29]:
city_dem_data.shape

(2891, 12)

In [30]:
# looking at missing values
city_dem_data.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

Seems like we have the relevant information needed.

In [31]:
# could we use city name and race as primary key?
city_dem_data[city_dem_data[['City','Race']].duplicated()].head()


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
177,Wilmington,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193
210,Lakewood,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987
238,Glendale,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718
300,Springfield,Massachusetts,31.8,74744.0,79592.0,154336,5723.0,16226.0,2.81,MA,Asian,5606
549,Bloomington,Indiana,23.5,40588.0,43227.0,83815,2368.0,10033.0,2.33,IN,Asian,9801


Seems like using City and Race isn't enough, perhaps adding State would make more sense

In [32]:
#make sure there are no duplicate rows
city_dem_data[city_dem_data[['City','Race', 'State']].duplicated()].head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


### Immigration data

In [33]:
imm_data.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [34]:
imm_data.count()

3096313

In [35]:
imm_data.createOrReplaceTempView("immigration_table")


In [36]:
#can cicid be used a primary key?
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM immigration_table
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



Seems like cicid is a good candidate for primary key

All dates in SAS correspond to the number of days since 1960-01-01. We need to compute the arrival dates and departure dates by adding dates to 1960-01-01

In [37]:
imm_data = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immigration_table")
imm_data.createOrReplaceTempView("immigration_table")

In [38]:
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immigration_table""").createOrReplaceTempView("immigration_table")

In [39]:
# check for no more null values
spark.sql("SELECT count(*) FROM immigration_table WHERE departure_date = 'N/A'").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



Replacing I94VISA columns into the three categories where 1 = Business, 2 = Pleasure and 3 = Student.

In [40]:
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immigration_table""").createOrReplaceTempView("immigration_table")

In [41]:
# merge the common values between the two sets
spark.sql("""   SELECT COUNT(DISTINCT departure_date) 
                FROM immigration_table
                WHERE departure_date IN (
                    SELECT DISTINCT arrival_date FROM immigration_table
                ) 
                """).show()

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                            30|
+------------------------------+



Looking at different arrival modes

In [42]:
spark.sql("""
SELECT i94mode, count(*)
FROM immigration_table
GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     239|
|    1.0| 2994505|
|    3.0|   66660|
|    2.0|   26349|
|    9.0|    8560|
+-------+--------+



In [43]:
#Looking at birth year min and max
spark.sql("SELECT MAX(biryear), MIN(biryear) FROM immigration_table WHERE biryear IS NOT NULL").show()

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2019.0|      1902.0|
+------------+------------+



In [44]:
# lets look at people that are 70 years old and over
spark.sql("""
SELECT COUNT(*)
FROM immigration_table
WHERE biryear IS NOT NULL
AND biryear <= 1951
""").show()

#freq per birth year
spark.sql("""
SELECT biryear, COUNT(*)
FROM immigration_table 
WHERE biryear IS NOT NULL
AND biryear <= 1951
GROUP BY biryear
ORDER BY biryear ASC
""").head(60)

+--------+
|count(1)|
+--------+
|  326562|
+--------+



[Row(biryear=1902.0, count(1)=1),
 Row(biryear=1905.0, count(1)=1),
 Row(biryear=1906.0, count(1)=1),
 Row(biryear=1907.0, count(1)=2),
 Row(biryear=1908.0, count(1)=2),
 Row(biryear=1909.0, count(1)=1),
 Row(biryear=1911.0, count(1)=2),
 Row(biryear=1913.0, count(1)=1),
 Row(biryear=1914.0, count(1)=4),
 Row(biryear=1915.0, count(1)=2),
 Row(biryear=1916.0, count(1)=24),
 Row(biryear=1917.0, count(1)=19),
 Row(biryear=1918.0, count(1)=26),
 Row(biryear=1919.0, count(1)=52),
 Row(biryear=1920.0, count(1)=46),
 Row(biryear=1921.0, count(1)=88),
 Row(biryear=1922.0, count(1)=104),
 Row(biryear=1923.0, count(1)=185),
 Row(biryear=1924.0, count(1)=241),
 Row(biryear=1925.0, count(1)=319),
 Row(biryear=1926.0, count(1)=463),
 Row(biryear=1927.0, count(1)=638),
 Row(biryear=1928.0, count(1)=884),
 Row(biryear=1929.0, count(1)=1204),
 Row(biryear=1930.0, count(1)=1594),
 Row(biryear=1931.0, count(1)=1999),
 Row(biryear=1932.0, count(1)=2500),
 Row(biryear=1933.0, count(1)=2965),
 Row(biryear=

In [45]:
# looking at gender
spark.sql("""
SELECT gender, count(*) 
FROM immigration_table
GROUP BY gender
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1302743|
|  null|  414269|
|     M| 1377224|
|     U|     467|
|     X|    1610|
+------+--------+



In [46]:
# removing missing or incorrect gender
spark.sql("""SELECT * FROM immigration_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immigration_table")

In [47]:
# looking at citizenship and residence data for missing data

#citizenship countries
spark.sql("""
SELECT count(*) 
FROM immigration_table
WHERE i94cit IS NULL
""").show()

#residence countries
spark.sql("""
SELECT count(*) 
FROM immigration_table
WHERE i94res IS NULL
""").show()

#reported address
spark.sql("""
SELECT count(*) 
FROM immigration_table
WHERE i94addr IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|  129668|
+--------+



Address is practically unusable. Relying on entry port instead.

In [48]:
# looking at occupation

spark.sql("""
SELECT occup, COUNT(*) AS n
FROM immigration_table
GROUP BY occup
ORDER BY n DESC, occup
""").show()

+-----+-------+
|occup|      n|
+-----+-------+
| null|2671843|
|  STU|   4719|
|  OTH|    661|
|  NRR|    345|
|  MKT|    280|
|  EXA|    196|
|  GLS|    189|
|  ULS|    175|
|  ADM|    125|
|  TIE|    124|
|  MVC|    110|
|  ENO|     60|
|  CEO|     56|
|  TIP|     52|
|  RET|     50|
|  CMP|     47|
|  LLJ|     46|
|  PHS|     45|
|  UNP|     45|
|  HMK|     40|
+-----+-------+
only showing top 20 rows



There are lots of missing information. Not usable so we will be dropping.

In [49]:
#building our immigration table
df_immigration = spark.sql("""SELECT * FROM immigration_table""")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since I'm interested in the flow of travellers through the united states. The i94 data will serve as our fact table. The fact_immigration table will be :

* cicid,
* citizenship_country,
* residence_country,
* city,
* state,
* arrival_date,
* departure_date,
* age,
* visa_type,
* detailed_visa_type,

For the dimension tables, since our dataset only contains one month of data we will keep a record of the daily entries and provide the uses with four dimensions to aggregate our data:\

dim_time : to aggregate the data suing various time units: The fileds available will be:

* date,
* year,
* month,
* day,
* week,
* weekday,
* dayofyear

dim_airports: Used to determine the areas with the largest flow of travelers. Fileds included will be:

* ident,
* type,
* name,
* elevation_ft,
* state,
* municipality,
* iata_code


dim_city_demographics: To look at the demographic data of the areas with the most travelers and potentially look at the impact of the flow of travellers on the demographic data (if it were updated on a regular basis). The fiels available will be:

* City,
* state,
* median_age,
* male_population,
* female_population,
* total population
* Foreign_born,
* Average_Household_Size,
* Race,
* Count,

dim_temperatures: to look at the temperature data of the cities where traveller entry and departure is being reported. The fields included will be:

* date,
* City,
* average temperature,
* average temperature uncertainty


#### 3.2 Mapping Out Data Pipelines
The steps necessary to pipeline the data into the chosen data model:

1. Data Extraction: Load all the datasets from CSV and SAS data files;
2. Data Transformation and Loading:

    * fact_immigration:
        * Drop rows where the mode of arrival is not air travel;
        * Drop rows with incorrect gender data;
        * Convert arrival and departure dates;
        * Replace country codes with the character string equivalents;
        * Replace port of entry with city and state;
        * Filter out any row where the port of entry is not in the US;
        * Compute age in a new row using birth year and year of our current date;
        * insert data into fact table;
        * Write to parquet.
        
    * dim_temperature:
        * For the temperature table, drop all data for cities outside the united states;
        * For the temperature table, drop all data for dates before 1950 since airtravel wasn't possible before that date;
        * Convert city to upper case;
        * Write to parquet.
        
    * dim_time:
        * Get all the arrival dates from the immigration data_set;
        * Extract year, month, day, week from the date and insert all the values in the dim_time table;
        * Write to parquet.
        
    * dim_airports:
        * Remove all non us airports;
        * Remove all invalid port of entries, ie: ['closed', 'heliport', 'seaplane_base', 'balloonport'];
        * Remove all rows where municipalities are missing;
        * Convert municipality to upper case;
        * Insert to table;
        * Write to parquet.
        
   * dim_city_demographics:
        * Convert to city names to upper case;
        * Insert to table;
        * Write to parquet.
        

        





### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Start off by reading the csv files using Pandas then convert them to spark dataframes. The reason why its being done this way, is because Spark automatically reads the fields in csv files as strings, whereas pandas automatically detects the data types.

In [50]:
spark.createDataFrame(city_dem_data).printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: double (nullable = true)
 |-- Female Population: double (nullable = true)
 |-- Total Population: long (nullable = true)
 |-- Number of Veterans: double (nullable = true)
 |-- Foreign-born: double (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: long (nullable = true)



###  1. Staging

In [51]:
# load dictionary data
df_countryCodes = pd.read_csv('countries.csv')
df_i94portCodes = pd.read_csv('I94-airport.csv')

# load the various csv files into pandas dataframes
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

# load the SAS data
df_immigration=spark.read.parquet("sas_data")

### 2. Staging

In [52]:
# convert data dictionary to spark df to perform SQL queries
spark_df_countryCodes = spark.createDataFrame(df_countryCodes)
spark_df_countryCodes .createOrReplaceTempView("countryCodes")

In [53]:
# remove all entries with null values as they are either un reported or outside the US
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isna()].copy()

In [54]:
# exclude values for airports outside of the US. 
nonUSstates = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']

In [55]:
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isin(nonUSstates)].copy()

In [56]:
# airport code spark df
spark_df_i94portCodes = spark.createDataFrame(df_i94portCodes)
spark_df_i94portCodes .createOrReplaceTempView("i94portCodes")

In [57]:
# immagration spark df
df_immigration.createOrReplaceTempView("immigration_table")

In [58]:
# remove all entries into the united states that weren't via air travel
spark.sql("""
SELECT *
FROM immigration_table
WHERE i94mode = 1
""").createOrReplaceTempView("immigration_table")

In [59]:
# drop rows where the gender values entered is undefined
spark.sql("""SELECT * FROM immigration_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immigration_table")

In [60]:
# convert the arrival dates into a useable value
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immigration_table").createOrReplaceTempView("immigration_table")

In [61]:
# convert the departure dates into a useable value
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immigration_table""").createOrReplaceTempView("immigration_table")

In [62]:
# use an inner join to drop invalid codes
#country of citizenship
spark.sql("""
SELECT im.*, cc.country AS citizenship_country
FROM immigration_table im
INNER JOIN countryCodes cc
ON im.i94cit = cc.code
""").createOrReplaceTempView("immigration_table")

In [63]:
# country of residence
spark.sql("""
SELECT im.*, cc.country AS residence_country
FROM immigration_table im
INNER JOIN countryCodes cc
ON im.i94res = cc.code
""").createOrReplaceTempView("immigration_table")

In [64]:
# add visa character string aggregation
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immigration_table""").createOrReplaceTempView("immigration_table")

In [65]:
# Add entry_port names and entry port states to the view
spark.sql("""
SELECT im.*, pc.location AS entry_port, pc.state AS entry_port_state
FROM immigration_table im 
INNER JOIN i94portCodes pc
ON im.i94port = pc.code
""").createOrReplaceTempView("immigration_table")

In [66]:
# Compute the age of each individual and add it to the view
spark.sql("""
SELECT *, (2016-biryear) AS age 
FROM immigration_table
""").createOrReplaceTempView("immigration_table")

In [67]:
# Insert the immigration fact data into a spark dataframe
fact_immigration = spark.sql("""
                        SELECT 
                            cicid, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (entry_port)) AS city,
                            TRIM(UPPER (entry_port_state)) AS state,
                            arrival_date,
                            departure_date,
                            age,
                            visa_type,
                            visatype AS detailed_visa_type

                        FROM immigration_table
""")

In [68]:
# extract all distinct dates from arrival and departure dates to create dimension table
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immigration_table
UNION
SELECT DISTINCT departure_date AS date
FROM immigration_table
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")

In [69]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the dim_time table;
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [70]:
# Keep only data for the United States
df_temperature = df_temperature[df_temperature['Country']=='United States'].copy()

# Convert the date to datetime objects
df_temperature['date'] = pd.to_datetime(df_temperature.dt)

# Remove all dates prior to 1950
df_temperature=df_temperature[df_temperature['date']>"1950-01-01"].copy()

In [71]:
# convert the city names to upper case
df_temperature.City = df_temperature.City.str.strip().str.upper()

In [72]:
# convert the dataframes from pandas to spark
spark_df_temperature = spark.createDataFrame(df_temperature)
spark_df_temperature .createOrReplaceTempView("temperature")

In [73]:

dim_temperature = spark.sql("""
SELECT
    DISTINCT date, city,
    AVG(AverageTemperature) OVER (PARTITION BY date, City) AS average_temperature, 
    AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY date, City) AS average_termperature_uncertainty
    
FROM temperature
""")

In [74]:
df_demographics.City = df_demographics.City.str.strip().str.upper()
df_demographics['State Code'] = df_demographics['State Code'].str.strip().str.upper()
df_demographics.Race = df_demographics.Race.str.strip().str.upper()

In [75]:
# convert the dataframes from pandas to spark
spark_df_demographics = spark.createDataFrame(df_demographics)
spark_df_demographics.createOrReplaceTempView("demographics")

In [76]:
# insert data into the demographics dim table
dim_demographics = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        `Median Age` AS median_age, 
                                        `Male Population` AS male_population, 
                                        `Female Population` AS female_population, 
                                        `Total Population` AS total_population, 
                                        `Foreign-born` AS foreign_born, 
                                        `Average Household Size` AS average_household_size, 
                                        `State Code` AS state_code, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [77]:
# airport dataset contains a lot of nulls. We'll load the csv directly into a spark dataframe to avoid having to deal with converting pandas NaN into nulls
spark_df_airports = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airports")

In [78]:
# get country as US and not-null
spark.sql("""
SELECT *
FROM airports
WHERE iso_country IS NOT NULL
AND UPPER(TRIM(iso_country)) LIKE 'US'
""").createOrReplaceTempView("airports")

In [79]:
# further cleaning of airports
spark.sql("""
SELECT *
FROM airports
WHERE LOWER(TRIM(type)) NOT IN ('closed', 'heliport', 'seaplane_base', 'balloonport')
AND municipality IS NOT NULL
AND LENGTH(iso_region) = 5
""").createOrReplaceTempView("airports")

In [80]:
dim_airports = spark.sql("""
SELECT TRIM(ident) AS ident, type, name, elevation_ft, SUBSTR(iso_region, 4) AS state, TRIM(UPPER(municipality)) AS municipality, iata_code
FROM airports
""")

In [None]:
# Saving the data in parquet format
dim_demographics.write.parquet("dim_demographics")
dim_time.write.parquet("dim_time")
dim_airports.write.parquet("dim_airports")
dim_temperature.write.parquet("dim_temperature")
fact_immigration.write.parquet("fact_immigration")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [82]:
# retrieving our data
dim_demographics.createOrReplaceTempView("dim_demographics")
dim_time.createOrReplaceTempView("dim_time")
dim_airports.createOrReplaceTempView("dim_airports")
dim_temperature.createOrReplaceTempView("dim_temperature")
fact_immigration.createOrReplaceTempView("fact_immigration")

In [83]:
#check for null values function

def nullValueCheck(spark_ctxt, tables_to_check):
    """
    This function performs null value checks on specific columns of given tables received as parameters and raises a ValueError exception when null values are encountered.
    It receives the following parameters:
    spark_ctxt: spark context where the data quality check is to be performed
    tables_to_check: A dictionary containing (table, columns) pairs specifying for each table, which column is to be checked for null values.   
    """  
    for table in tables_to_check:
        print(f"Performing data quality check on table {table}...")
        for column in tables_to_check[table]:
            returnedVal = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed.")

In [86]:
#time dimension verification

#check the number of rows in our time table : 192 expected
spark.sql("""
SELECT COUNT(*) - 192
FROM dim_time
""").show()

# make sure each row has a distinct date key : 192 expected
spark.sql("""
SELECT COUNT(DISTINCT date) - 192
FROM dim_time
""").show()

# we could also subtract the result of one query from the other


# and make sure all dates from the fact table are included in the time dimension (NULL is the expected result)
spark.sql("""
SELECT DISTINCT date
FROM dim_time

MINUS

(SELECT DISTINCT arrival_date AS date
FROM immigration_table
UNION
SELECT DISTINCT departure_date AS date
FROM immigration_table
WHERE departure_date IS NOT NULL)

""").show()

+--------------------------------+
|(count(1) - CAST(192 AS BIGINT))|
+--------------------------------+
|                               0|
+--------------------------------+

+--------------------------------------------+
|(count(DISTINCT date) - CAST(192 AS BIGINT))|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+

+----+
|date|
+----+
+----+



In [87]:
#immigration verification

# The number of primary key from the staging table (2165257 expected)
spark.sql("""
SELECT count(distinct cicid) - 2165257
FROM immigration_table
""").show()

#should match the primary key count from the fact table (2165257 expected)
spark.sql("""
SELECT count(distinct cicid) - 2165257
FROM fact_immigration
""").show()

#and should match the row count from the fact table since it is also the primary key (2165257 expected)
spark.sql("""
SELECT count(*) - 2165257
FROM fact_immigration
""").show()

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2165257 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2165257 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+------------------------------------+
|(count(1) - CAST(2165257 AS BIGINT))|
+------------------------------------+
|                                   0|
+------------------------------------+



In [88]:
# demographics dimension table (2891 expected) 
spark.sql("""
SELECT count(*) - 2891
FROM dim_demographics
""").show()

spark.sql("""
SELECT COUNT(DISTINCT city, state, race) - 2891
FROM dim_demographics
""").show()

+---------------------------------+
|(count(1) - CAST(2891 AS BIGINT))|
+---------------------------------+
|                                0|
+---------------------------------+

+----------------------------------------------------------+
|(count(DISTINCT city, state, race) - CAST(2891 AS BIGINT))|
+----------------------------------------------------------+
|                                                         0|
+----------------------------------------------------------+



In [89]:
# primary key for airports (expected 14529)
spark.sql("""
SELECT count(*) - 14529
FROM dim_airports
""").show()

spark.sql("""
SELECT COUNT(DISTINCT ident) - 14529
FROM dim_airports
""").show()

+----------------------------------+
|(count(1) - CAST(14529 AS BIGINT))|
+----------------------------------+
|                                 0|
+----------------------------------+

+-----------------------------------------------+
|(count(DISTINCT ident) - CAST(14529 AS BIGINT))|
+-----------------------------------------------+
|                                              0|
+-----------------------------------------------+



In [90]:
# city & date is our primary key for the temperature (expected 189472)

spark.sql("""
SELECT count(*) - 189472
FROM dim_temperature
""").show()

spark.sql("""
SELECT COUNT(DISTINCT date, city) - 189472
FROM dim_temperature
""").show()

+-----------------------------------+
|(count(1) - CAST(189472 AS BIGINT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+

+-----------------------------------------------------+
|(count(DISTINCT date, city) - CAST(189472 AS BIGINT))|
+-----------------------------------------------------+
|                                                    0|
+-----------------------------------------------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Please see the beginning of the notebook.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Considering the size of the immigration dataset (~ 3 million rows) for only a month, combined with the temperature, airport and demographic dataset, the most sensible technology choice for such an approach would be spark, especially for processing the data over a longer period of time.

This project was intended to look for:

* the effects of temperature aon the volume of travellers,
* the seasonality of travel
* the connection between the volume of travel and the number of entry ports (ie airports)
* the connection between the volume of travel and the demographics of various cities

None of these phenomenons require a rapid update of our data. A monthly or quarterly update is sufficient.

### If under the following scenarios:
    * Data is increased by 100x: Use S3 bucket storage and still use spark as the processing platform (since its well suited for large datasets);
    * Dashboard needs to be updated daily: Use apache Airflow to perform the ETL and data quality validation;
    * database needs to be accessed by >100 people; once the data has been processed, it can be stored in a postgres database (using Redshift), which supports multiuser access.