# Immigration and Temperature Data Modeling Project
### Data Engineering Capstone Project

#### Project Summary
Combining immigration data with temperature data to create an OLAP data model in order to find correlation between immagration behavior and temerpature data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import re
from pyspark.sql.functions import udf, col

### Step 1: Scope the Project and Gather Data

#### Scope 
We will be using Spark to do all the necessary data transformation to create our immagration and temperature data warehouse. The datawarehouse will sit in the SparkSQL temp tables. We will create dimension tables using both the I94 immagration and temperature datasets. We will then join our dimension tables on valid city codes to create our fact table for analysis.

#### Describe and Gather Data 

* I94 Immigration Data comes from the US National TOURISM and Trade Office. The data is partioned by month-year and can be found here: https://travel.trade.gov/research/reports/i94/historical/2016.html

* The temperature data is a Kaggle data set that includes temperatures in cities around the world. It can be found here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data


In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


The code below creates a dictionary of all the valid i94ports. We determine which i94ports are valid from the I_94_SAS_Labels_Description.SAS provided in our directory.


In [3]:
valid_airport = {}
with open('valid.txt') as f:
     for line in f:
         match = re.compile(r'\'(.*)\'.*\'(.*)\'').search(line)
         valid_airport[match[1]]=[match[2]]
valid_airpot_list = list(valid_airport.keys())

In [4]:
print(valid_airpot_list)

['ALC', 'ANC', 'BAR', 'DAC', 'PIZ', 'DTH', 'EGL', 'FRB', 'HOM', 'HYD', 'JUN', '5KE', 'KET', 'MOS', 'NIK', 'NOM', 'PKC', 'ORI', 'SKA', 'SNP', 'TKI', 'WRA', 'HSV', 'MOB', 'LIA', 'ROG', 'DOU', 'LUK', 'MAP', 'NAC', 'NOG', 'PHO', 'POR', 'SLU', 'SAS', 'TUC', 'YUI', 'AND', 'BUR', 'CAL', 'CAO', 'FRE', 'ICP', 'LNB', 'LOS', 'BFL', 'OAK', 'ONT', 'OTM', 'BLT', 'PSP', 'SAC', 'SLS', 'SDP', 'SFR', 'SNJ', 'SLO', 'SLI', 'SPC', 'SYS', 'SAA', 'STO', 'TEC', 'TRV', 'APA', 'ASE', 'COS', 'DEN', 'DRO', 'BDL', 'BGC', 'GRT', 'HAR', 'NWH', 'NWL', 'TST', 'WAS', 'DOV', 'DVD', 'WLL', 'BOC', 'SRQ', 'CAN', 'DAB', 'FRN', 'FTL', 'FMY', 'FPF', 'HUR', 'GNV', 'JAC', 'KEY', 'LEE', 'MLB', 'MIA', 'APF', 'OPF', 'ORL', 'PAN', 'PEN', 'PCF', 'PEV', 'PSJ', 'SFB', 'SGJ', 'SAU', 'FPR', 'SPE', 'TAM', 'WPB', 'ATL', 'BRU', 'AGS', 'SAV', 'AGA', 'HHW', 'OGG', 'KOA', 'LIH', 'CID', 'DSM', 'BOI', 'EPI', 'IDA', 'PTL', 'SPI', 'CHI', 'DPA', 'PIA', 'RFD', 'UGN', 'GAR', 'HMM', 'INP', 'MRL', 'SBN', 'ICT', 'LEX', 'LOU', 'BTN', 'LKC', 'LAK', 'MLU'

In [5]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp_pd = pd.read_csv(fname, sep=',')

In [6]:
df_temp_pd.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


### Step 2: Explore and Assess the Data
#### Explore the Data 
The code below creates a dictionary of all the valid i94ports. We determine which i94ports are valid from the I_94_SAS_Labels_Description.SAS provided in our directory.

For the temperature data set we will have to create a new column called i94port which we will use to join back to our i94 dataset. We will also need to drop duplicates in this dataset.


In [8]:
# Performing cleaning tasks here
valid_airport = {}
with open('valid.txt') as file:
     for line in file:
         match = re.compile(r'\'(.*)\'.*\'(.*)\'').search(line)
         valid_airport[match[1]]=[match[2]]
valid_airpot_list = list(valid_airport.keys())
print(valid_airpot_list)



['ALC', 'ANC', 'BAR', 'DAC', 'PIZ', 'DTH', 'EGL', 'FRB', 'HOM', 'HYD', 'JUN', '5KE', 'KET', 'MOS', 'NIK', 'NOM', 'PKC', 'ORI', 'SKA', 'SNP', 'TKI', 'WRA', 'HSV', 'MOB', 'LIA', 'ROG', 'DOU', 'LUK', 'MAP', 'NAC', 'NOG', 'PHO', 'POR', 'SLU', 'SAS', 'TUC', 'YUI', 'AND', 'BUR', 'CAL', 'CAO', 'FRE', 'ICP', 'LNB', 'LOS', 'BFL', 'OAK', 'ONT', 'OTM', 'BLT', 'PSP', 'SAC', 'SLS', 'SDP', 'SFR', 'SNJ', 'SLO', 'SLI', 'SPC', 'SYS', 'SAA', 'STO', 'TEC', 'TRV', 'APA', 'ASE', 'COS', 'DEN', 'DRO', 'BDL', 'BGC', 'GRT', 'HAR', 'NWH', 'NWL', 'TST', 'WAS', 'DOV', 'DVD', 'WLL', 'BOC', 'SRQ', 'CAN', 'DAB', 'FRN', 'FTL', 'FMY', 'FPF', 'HUR', 'GNV', 'JAC', 'KEY', 'LEE', 'MLB', 'MIA', 'APF', 'OPF', 'ORL', 'PAN', 'PEN', 'PCF', 'PEV', 'PSJ', 'SFB', 'SGJ', 'SAU', 'FPR', 'SPE', 'TAM', 'WPB', 'ATL', 'BRU', 'AGS', 'SAV', 'AGA', 'HHW', 'OGG', 'KOA', 'LIH', 'CID', 'DSM', 'BOI', 'EPI', 'IDA', 'PTL', 'SPI', 'CHI', 'DPA', 'PIA', 'RFD', 'UGN', 'GAR', 'HMM', 'INP', 'MRL', 'SBN', 'ICT', 'LEX', 'LOU', 'BTN', 'LKC', 'LAK', 'MLU'

In [9]:
#clean immigration data
fpath = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im = spark.read.format('com.github.saurfang.sas.spark').load(fpath)
df_im_clean = df_im.filter(col('i94port').isin(valid_airpot_list))
df_im_clean.select('i94port').show(10)

+-------+
|i94port|
+-------+
|    ATL|
|    WAS|
|    NYC|
|    NYC|
|    NYC|
|    NYC|
|    NYC|
|    NYC|
|    NYC|
|    NYC|
+-------+
only showing top 10 rows



In [10]:

@udf()
def convert_i94port_code(city):
    for key in valid_airport:
        if city.lower() in valid_airport[key][0].lower():
            return key

In [11]:
print(valid_airport['ANC'][0])

ANCHORAGE, AK         


In [15]:
# df_temp_spark = spark.createDataFrame(df_temp_pd)


In [16]:
# df_temp_spark.show(10)

KeyboardInterrupt: 

In [12]:
#clean temp data
ftemp_path = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = spark.read.format("csv").option("header", "true").load(ftemp_path)
df_temp_clean = df_temp.where(col('AverageTemperature') != 'NaN')
df_temp_clean = df_temp_clean.dropDuplicates(['Country', 'City'])
df_temp_clean = df_temp_clean.withColumn('i94port', convert_i94port_code(df_temp_clean.City))
df_temp_clean = df_temp_clean.filter(col('i94port') != 'null')
# df_temp_clean.select('i94port','City').show(10)

KeyboardInterrupt: 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

#### I94 Immigration Dimension Table
* i94_year = year
* i94_month = month
* i94_city = origin city code
* i94port = destination city code
* arrival_date = arrival date
* depart_date = departure date
* i94_visa = reason for immigration

#### Temperature Dimension Table
* i94port = destination city code
* AverageTemperature = average temperature
* City = city name
* Country = country name

#### Fact Table (I94 Immigration and Temperature Dimention Tables joined on i94port column)
* i94_year = year
* i94_month = month
* i94_city = origin city code
* i94port = destination city code
* arrival_date = arrival date
* depart_date = departure date
* i94_visa = reason for immigration
* AverageTemperature = average temperature
* City = city name
* Country = country name

#### 3.2 Mapping Out Data Pipelines
1. Read I94 immigration and Temperature Data into  spark dataframe
2. Clean I94 immigration and Temperature Data
3. Create I94 immigration and Temperature dimesion tables
4. Create Immigration/Temperature Fact table by joining the two dimension tables created
5. Perform Data Quality checks for each table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

1. Read I94 immigration and Temperature Data into  spark dataframe


In [None]:
# read I94 into spark dataframe
fpath_im = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im_spark = spark.read.format('com.github.saurfang.sas.spark').load(fpath_im)

# read Temperature into spark dataframe
fpath_temp = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp_spark = spark.read.format("csv").option("header", "true").load(fpath_temp)

2. Clean I94 immigration and Temperature Data

In [None]:
# clean I94 dataframe 
valid_airport = {}
with open('valid.txt') as file:
     for line in file:
         match = re.compile(r'\'(.*)\'.*\'(.*)\'').search(line)
         valid_airport[match[1]]=[match[2]]
            
valid_airpot_list = list(valid_airport.keys())
df_im_clean = df_im_spark.filter(col('i94port').isin(valid_airpot_list))


In [None]:
#UDF to convert city name into i94port code
@udf()
def convert_i94port_code(city):
    for key in valid_airport:
        if city.lower() in valid_airport[key][0].lower():
            return key

In [None]:
@udf()
def convert_i94port_code(city):
    for key in valid_airport:
        if city.lower() in valid_airport[key][0].lower():
            return key

In [None]:
# clean Temperature dataframe
df_temp_clean = df_temp_spark.where(col('AverageTemperature') != 'NaN')
df_temp_clean = df_temp_clean.dropDuplicates(['Country', 'City'])
df_temp_clean = df_temp_clean.withColumn('i94port', convert_i94port_code(df_temp_clean.City))
df_temp_clean = df_temp_clean.filter(col('i94port') != 'null')


3. Create I94 immigration and Temperature dimesion tables


In [None]:
im_dim = df_im_clean.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "depdate", "i94visa"])
temp_dim = df_temp_clean.select(["i94port", "AverageTemperature", "City", "Country"])

im_dim.createOrReplaceTempView("i94_immigration")
temp_dim.createOrReplaceTempView("temperature")

4. Create Immigration/Temperature Fact table by joining the two dimension tables created

In [None]:
i94_temp_fact = spark.sql("""

SELECT i94_immigration.i94yr as i94_year,
        i94_immigration.i94mon as i94_month,
        i94_immigration.cit as i94_city,
        i94_immigration.port as i94port,
        i94_immigration.arrdate as arrival_date,
        i94_immigration.depdate as depart_date,
        i94_immigration.i94visa as i94_visa, 
        temperature.AverageTemperature as AverageTemperature,
        temperature.City as city,
        temperature.Country as country

FROM i94_immigration
INNER JOIN temperature
ON i94_immigration.i94port = temperature.i94port


"""

)

In [None]:
i94_temp_fact.write.mode('append').partitionBy('i94port').parquet('/res/i94_temp_fact.parquet')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def data_quality(df):
    df_count = df.cache.count()
    if df_count != 0:
        print('Data quality passed! > 0 records')
    else:
        print('Data quality failed! = 0 records')

In [None]:
data_quality(im_dim)
data_quality(temp_dim)
data_quality(i94_temp_fact)

#### 4.3 Data dictionary 
#### I94 Immigration Dimension Table
* i94_year = year
* i94_month = month
* i94_city = origin city code
* i94port = destination city code
* arrival_date = arrival date
* depart_date = departure date
* i94_visa = reason for immigration

#### Temperature Dimension Table
* i94port = destination city code
* AverageTemperature = average temperature
* City = city name
* Country = country name

#### Fact Table (I94 Immigration and Temperature Dimention Tables joined on i94port column)
* i94_year = year
* i94_month = month
* i94_city = origin city code
* i94port = destination city code
* arrival_date = arrival date
* depart_date = departure date
* i94_visa = reason for immigration
* AverageTemperature = average temperature
* City = city name
* Country = country name

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

We used Spark to do our transformation, because it has the ability to handle many types of data files. In this proejct we are using SAS, txt, csv, and parquet file types. We used Spark SQL as our SQL interface powered by the Spark engine to create our datawarehouse. 

* Propose how often the data should be updated and why.
Data is formatted at the month-year level. That means the data should be updated at a monthly batch level.

* Write a description of how you would approach the problem differently under the following scenarios:
 1. The data was increased by 100x.
     * use data warehouse technology like redshift or snowflake. Benefits of snowflake is that the compute and storage is decoupled (optimizes performance and cost). data warehouse enables us to do heavier workloads/transformations.
     
 2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * use airflow as an orchestrator to trigger daily batch jobs at 7am. 
 3. The database needed to be accessed by 100+ people.
     * use data warehouse technology like redshift or snowflake. Benefits of snowflake is that the compute and storage is decoupled (optimizes performance and cost).