# Capstone - 2016 Immigration and Temperature Data 

### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using I94 immigration data and city temperature data to form a database that is optimized for queries on immigration events. This database can be used to answer questions relating immigration behavior to destination temperature e.g., do people tend to immigrate to warmer places?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import os
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import json

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will aggregate I94 immigration data by destination city to form our first dimension table. Next we will aggregate city temperature data by city to form the second dimension table. The two datasets will be joined on destination city to form the fact table. The final database is optimized to query on immigration events to determine if temperature affects the selection of destination cities. Spark will be used to process the data.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

The [I94 immigration data](https://travel.trade.gov/research/reports/i94/historical/2016.html) comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format. Some relevant attributes include:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

The [temperature data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) comes from Kaggle. It is provided in csv format. Some relevant attributes include:

* Dt = date
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

In [101]:
# Read in the data here
df = pd.read_csv('immigration_data_sample.csv',header=0)
df = df.iloc[:,1:]

In [93]:
# Because the immigration data has 28 columns
pd.set_option('display.max_columns', 28)

In [107]:
df.astype({'admnum':'int64'}).dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile      int64
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu     float64
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum      float64
airline      object
admnum        int64
fltno        object
visatype     object
dtype: object

In [108]:
df.groupby('airline').count().head(3)

Unnamed: 0_level_0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,admnum,fltno,visatype
airline,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1
*GA,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,0,4,4,0,4,4,4,4,0,4,4,4
2D,3,3,3,3,3,3,3,3,3,2,3,3,3,3,3,0,3,2,0,2,3,3,3,0,3,3,3
3U,3,3,3,3,3,3,3,3,1,3,3,3,3,3,0,0,3,3,0,3,3,3,3,3,3,3,3


In [109]:
df.query('i94cit==209.0').head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,782,WT
11,5056736.0,2016.0,4.0,209.0,209.0,PHI,20571.0,1.0,HI,20575.0,72.0,2.0,1.0,20160427,,,G,O,,M,1944.0,7252016,M,,DL,59336620000.0,598,WT
24,2721962.0,2016.0,4.0,209.0,209.0,NEW,20559.0,1.0,HI,20562.0,41.0,2.0,1.0,20160415,,,O,O,,M,1975.0,7132016,,,HA,56217030000.0,458,WT
42,5472659.0,2016.0,4.0,209.0,209.0,NEW,20573.0,1.0,NY,20579.0,8.0,2.0,1.0,20160429,,,G,O,,M,2008.0,7272016,M,,UA,59478730000.0,78,WT
46,861557.0,2016.0,4.0,209.0,209.0,SDP,20549.0,1.0,,20552.0,46.0,2.0,1.0,20160405,,,G,I,,M,1970.0,7032016,M,,JL,55663230000.0,66,WT


In [110]:
df.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile      int64
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu     float64
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum      float64
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object

In [11]:
# Create dictionary of valid i94port codes
i94port_valid = {}
with open('immigration_i94port_valid.txt') as f:
     for line in f:
            line = line.rstrip().lstrip()
            if line:
                #print(line)
                try:
                    key = line.split('=')[0].rstrip().lstrip()[1:-1]
                    val = line.split('=')[1].rstrip().lstrip()[1:-1].split(',')
                    #print('a',key,val[0],val[1].lstrip().rstrip())
                    i94port_valid[key] = [val[0],val[1].lstrip().rstrip()]
                except IndexError:
                    key = line.split('=')[0].rstrip().lstrip()[1:-1]
                    val = line.split('=')[1].rstrip().lstrip()[1:-1].rstrip()
                    #print('b',key,val)
                    i94port_valid[key] = [val]

In [77]:
i94port_valid["NEW"]

['NEWARK/TETERBORO', 'NJ']

In [12]:
# Create dictionary of valid i94cit codes
i94cit_valid = {}
with open('immigration_i94cit_valid.txt') as f:
     for line in f:
            line = line.rstrip().lstrip()
            if line:
                try:
                    key = line.split('=')[0].rstrip().lstrip()
                    val = line.split('=')[1].rstrip().lstrip()[1:-1]
                    #print(key,val)
                    i94cit_valid[key] = val
                except IndexError:
                    print("Error:",line)
                    raise

In [73]:
i94cit_valid["209"]

'JAPAN'

In [13]:
# Create dictionary of valid i94addr codes
i94addr_valid = {}
with open('immigration_i94addr_valid.txt') as f:
     for line in f:
            line = line.rstrip().lstrip()
            if line:
                try:
                    key = line.split('=')[0].rstrip().lstrip()[1:-1]
                    val = line.split('=')[1].rstrip().lstrip()[1:-1]
                    #print(key,val)
                    i94addr_valid[key] = val
                except IndexError:
                    print("Error:",line)
                    raise

In [79]:
i94addr_valid["HI"]

'HAWAII'

In [14]:
df_global_temp = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv',header=0)

In [27]:
distinct_city = pd.unique(df_global_temp.City)

In [30]:
for city in distinct_city:
    if city.lower().find('los angeles')!=-1:
        print(city)

East Los Angeles
Los Angeles


In [72]:
city = 'Los Angeles'
country = 'United States'

In [73]:
df_global_temp.query('City==@city and Country==@country')

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
4356748,1849-01-01,8.819,2.558,Los Angeles,United States,34.56N,118.70W
4356749,1849-02-01,9.577,1.970,Los Angeles,United States,34.56N,118.70W
4356750,1849-03-01,11.814,2.173,Los Angeles,United States,34.56N,118.70W
4356751,1849-04-01,13.704,2.902,Los Angeles,United States,34.56N,118.70W
4356752,1849-05-01,14.834,2.017,Los Angeles,United States,34.56N,118.70W
4356753,1849-06-01,21.173,2.586,Los Angeles,United States,34.56N,118.70W
4356754,1849-07-01,26.159,4.767,Los Angeles,United States,34.56N,118.70W
4356755,1849-08-01,26.099,4.547,Los Angeles,United States,34.56N,118.70W
4356756,1849-09-01,21.848,1.977,Los Angeles,United States,34.56N,118.70W
4356757,1849-10-01,16.549,2.177,Los Angeles,United States,34.56N,118.70W


In [64]:
len(pd.unique(df_global_temp.Country))

159

In [65]:
df_global_temp.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [15]:
with open('us-cities-demographics.json','r') as f:
    data = json.load(f)
df_us_info = pd.io.json.json_normalize(data)

In [16]:
df_us_info.head(3)

Unnamed: 0,datasetid,fields.average_household_size,fields.city,fields.count,fields.female_population,fields.foreign_born,fields.male_population,fields.median_age,fields.number_of_veterans,fields.race,fields.state,fields.state_code,fields.total_population,record_timestamp,recordid
0,us-cities-demographics,2.73,Newark,76402,143873.0,86253.0,138040.0,34.6,5829.0,White,New Jersey,NJ,281913,1969-12-31T19:00:00-05:00,85458783ecf5da6572ee00e7120f68eff4fd0d61
1,us-cities-demographics,2.4,Peoria,1343,62432.0,7517.0,56229.0,33.1,6634.0,American Indian and Alaska Native,Illinois,IL,118661,1969-12-31T19:00:00-05:00,a5ad84bdb4d72688fb6ae19a8bee43bcb01f9fea
2,us-cities-demographics,2.77,O'Fallon,2583,43270.0,3269.0,41762.0,36.0,5783.0,Hispanic or Latino,Missouri,MO,85032,1969-12-31T19:00:00-05:00,c54cd5021a16eb5f7b83987742bd495229b2155e


In [17]:
df_us_info.set_axis(['datasetid','average_household_size','city','count','female_population',
                     'foreign_born','male_population','median_age','number_of_veterans','race',
                     'state','state_code','total_population','record_timestamp','recordid'], 
                    axis=1, inplace=True)

In [70]:
df_us_info.query('state_code=="CA" and city == "Los Angeles"')

Unnamed: 0,datasetid,average_household_size,city,count,female_population,foreign_born,male_population,median_age,number_of_veterans,race,state,state_code,total_population,record_timestamp,recordid
97,us-cities-demographics,2.86,Los Angeles,2177650,2012898.0,1485425.0,1958998.0,35.0,85417.0,White,California,CA,3971896,1969-12-31T19:00:00-05:00,7da42fda61238faccac3d43954a8f621a3a51194
554,us-cities-demographics,2.86,Los Angeles,512999,2012898.0,1485425.0,1958998.0,35.0,85417.0,Asian,California,CA,3971896,1969-12-31T19:00:00-05:00,e23be85ef2bf6caecf2309ba6dedc868929d1377
729,us-cities-demographics,2.86,Los Angeles,404868,2012898.0,1485425.0,1958998.0,35.0,85417.0,Black or African-American,California,CA,3971896,1969-12-31T19:00:00-05:00,cda8c0b63e4c14d174940e1df5d50d2d2491ccfa
1225,us-cities-demographics,2.86,Los Angeles,63758,2012898.0,1485425.0,1958998.0,35.0,85417.0,American Indian and Alaska Native,California,CA,3971896,1969-12-31T19:00:00-05:00,f45da4b5c979eb53a8e10a5d4e6ef2a7bb480fbc
1899,us-cities-demographics,2.86,Los Angeles,1936732,2012898.0,1485425.0,1958998.0,35.0,85417.0,Hispanic or Latino,California,CA,3971896,1969-12-31T19:00:00-05:00,5212831e25cadd6f383d1bf93274aa17e346adb6


In [188]:
for i in df_us_info.columns:
    print(i)

datasetid
average_household_size
city
count
female_population
foreign_born
male_population
median_age
number_of_veterans
race
state
state_code
total_population
record_timestamp
recordid


In [19]:
df_airport_cd = pd.read_csv('airport-codes_csv.csv',header=0)

In [55]:
df_airport_cd[df_airport_cd.iso_region.str.len()>5].head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,city
174,02PR,small_airport,Cuylers Airport,15.0,,PR,PR-U-A,Vega Baja,02PR,,02PR,"-66.36689758300781, 18.45330047607422","[PR, U, A]"
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222","[MH, UTI]"


In [64]:
df_airport_cd['city'] = df_airport_cd['iso_region'].apply(lambda x: x[3:])

In [65]:
df_airport_cd.head(3)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,city
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",PA
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",AK


In [66]:
df_airport_cd.query('iso_country == "US" and municipality=="Los Angeles"').head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,city
71,01CN,heliport,Los Angeles County Sheriff's Department Heliport,300.0,,US,US-CA,Los Angeles,01CN,,01CN,"-118.15399932861328, 34.03779983520508",CA
639,0CA0,closed,Drew Medical Center Heliport,180.0,,US,US-CA,Los Angeles,,,,"-118.241997, 33.923302",CA


In [75]:
df_airport_cd.count()

ident           55075
type            55075
name            55075
elevation_ft    48069
continent       27356
iso_country     54828
iso_region      55075
municipality    49399
gps_code        41030
iata_code        9189
local_code      28686
coordinates     55075
city            55075
dtype: int64

In [77]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [78]:
%%time
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

CPU times: user 0 ns, sys: 1.98 ms, total: 1.98 ms
Wall time: 2.51 s


In [112]:
%%time
df_spark.groupBy('admnum').count().show(2)

+---------------+-----+
|         admnum|count|
+---------------+-----+
|5.5412292033E10|    1|
|5.5457659733E10|    1|
+---------------+-----+
only showing top 2 rows

CPU times: user 8.86 ms, sys: 335 µs, total: 9.2 ms
Wall time: 44.9 s


In [None]:
%%time
df_spark.count().show(2)

In [3]:
SAS_files = [
    '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
    '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
	'../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
	'../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat'
]

In [208]:
for i in os.listdir('../../data/18-83510-I94-Data-2016'):
    print(os.path.join('../../data/18-83510-I94-Data-2016/',i))

../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat


In [194]:
for i in SAS_files:
    print(i)

../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat


In [4]:
%%time
def read_files(file_path):
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(file_path)
    return [file_path,df_spark.count()]

with tp() as executor:
    future_count = {executor.submit(read_files,file_path) : file_path for file_path in SAS_files}
    for count in concurrent.futures.as_completed(future_count):
        print(count.result()[0])
        print(count.result()[1])

../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
3096313
../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
3733786
../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
2914926
../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
3157072
CPU times: user 27.8 ms, sys: 20 ms, total: 47.8 ms
Wall time: 2min 43s


In [179]:
%%time
df_spark.count()

CPU times: user 10.4 ms, sys: 210 µs, total: 10.6 ms
Wall time: 57 s


3096313

In [183]:
sd = df_spark.groupBy(df_spark.i94port).count()

In [184]:
%%time
sd.show(10)

+------+-------+-----+
| i94yr|i94port|count|
+------+-------+-----+
|2016.0|    FAR|    5|
|2016.0|    WBE|   28|
|2016.0|    ALC|   39|
|2016.0|    SGJ|    2|
|2016.0|    THO|  984|
|2016.0|    DEN|18260|
|2016.0|    TKI|    6|
|2016.0|    ROM|   12|
|2016.0|    PSP|18117|
|2016.0|    BRO|  395|
+------+-------+-----+
only showing top 10 rows

CPU times: user 5.99 ms, sys: 4.2 ms, total: 10.2 ms
Wall time: 51 s


In [11]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Performing cleaning tasks here





### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.