Study of Immigration and Temperature Data
### Data Engineering Capstone Project


Scope the Project and Gather Data

##Scope

The goal is to create an ETL pipeline using the Udacity provided I94 immigration dataset and the city temperature data from Kaggle to allow users to make queries to see and assess if there is a correlation between destination temperature and immigration statistics.

##Potential Consummers of this data model:

Data Analysts and Data Scientists are primary consumers of this data model in order to apply exploration techniques (data mining) which will help them to understand the potential correlation between destination temperature and immigration statistics.

This data model could be integrate into a Data Visualization tool (e.g.PoweBI, Tableau) and BI analyst could explore and apply different exploration technics;



#### Project Summary
In this project, we will be looking at the immigration data, focusing on following hypothesis:

-> The effects and correlation between temperature on the volume of travellers;

-> Travel seasonality


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [31]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf



### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [3]:
# Read April 2016 I94 immigration data into Pandas for exploration
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
# Read temperature data into Pandas for exploration
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(fname, sep=',')

In [6]:
# Display first few entries of temperature data
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
# Create Spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
# Read Airports location into Pandas for exploration
fname = 'airport_location.csv'
df = pd.read_csv(fname, sep=',')

In [9]:
# Display first few entries of Airports Location Info
df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [10]:
# I94 immigration data quality assessment
# City airport codes are cleaned using the txt file with airport_codes.txt (source:https://en.wikipedia.org/wiki/IATA_airport_code)


# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
port_valid = {}
with open('airport_codes.txt') as f:
     for line in f:
         match = re_obj.search(line)
         port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
    '''
    Input: Path to I94 immigration data file
    
    Output: Spark dataframe of I94 immigration data with valid airport codes
    
    '''
    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(port_valid.keys())))

    return df_immigration


In [11]:
#Test function
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immigration_test = clean_i94_data(immigration_test_file)
df_immigration_test.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [12]:
# Create a view of the immigration dataset in order to assess key attributes from this source
df_immigration_test.createOrReplaceTempView("immig_table")

In [13]:
df_immigration_test.count()

3088544

In [14]:
#Check if the airport codes in the dataset having all observation 3 character long
spark.sql("""
SELECT LENGTH (i94port) AS len
FROM immig_table
GROUP BY len
""").show()

+---+
|len|
+---+
|  3|
+---+



In [15]:
#Temperature data quality assessment
df_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# Isolate and Filter out observations with NaN average temperature
df_temp=df_temp.filter(df_temp.AverageTemperature != 'NaN')

# Remove duplicates on locations attribute
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Mapping with i94port attribute
    
    '''
    
    for key in port_valid:
        if city.lower() in port_valid[key][0].lower():
            return key

# Include iport94 code based on city name
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# Remove entries with no iport94 code
df_temp=df_temp.filter(df_temp.i94port != 'null')


In [16]:
# Test Quality assessment function
df_temp.take(20)

[Row(dt='1852-07-01', AverageTemperature='15.488', AverageTemperatureUncertainty='1.395', City='Perth', Country='Australia', Latitude='31.35S', Longitude='114.97E', i94port='PER'),
 Row(dt='1828-01-01', AverageTemperature='-1.977', AverageTemperatureUncertainty='2.551', City='Seattle', Country='United States', Latitude='47.42N', Longitude='121.97W', i94port='SEA'),
 Row(dt='1743-11-01', AverageTemperature='2.767', AverageTemperatureUncertainty='1.905', City='Hamilton', Country='Canada', Latitude='42.59N', Longitude='80.73W', i94port='HAM'),
 Row(dt='1849-01-01', AverageTemperature='7.399999999999999', AverageTemperatureUncertainty='2.699', City='Ontario', Country='United States', Latitude='34.56N', Longitude='116.76W', i94port='ONT'),
 Row(dt='1821-11-01', AverageTemperature='2.322', AverageTemperatureUncertainty='2.375', City='Spokane', Country='United States', Latitude='47.42N', Longitude='117.24W', i94port='SPO'),
 Row(dt='1843-01-01', AverageTemperature='18.874000000000002', Averag

In [17]:
# Check
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)
df_temperature['Country'].nunique()

159

In [18]:
# Check for null values.
df_temperature.isnull().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [19]:
df_temperature[df_temperature.AverageTemperature.isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E
18,1745-05-01,,,Århus,Denmark,57.05N,10.33E
19,1745-06-01,,,Århus,Denmark,57.05N,10.33E
20,1745-07-01,,,Århus,Denmark,57.05N,10.33E
21,1745-08-01,,,Århus,Denmark,57.05N,10.33E
22,1745-09-01,,,Århus,Denmark,57.05N,10.33E


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Map out the conceptual data model and explain why you chose that model

3.1 Data Model - concept

3.1.1 Dimension tables

The first dimension table will contain events from the I94 immigration data. The attributes will be extracted from the immigration dataframe:

Attribute Name       Description

i94yr             =  Year (4 digits)
i94mon            =  numeric month
i94cit            =  Origin City (3 digits code)
i94port           =  Destination City (3 character code)
arrdate           =  Date of Arrival
i94mode           =  Travel Code (1 digit)
depdate           =  Date of departure
i94visa           =  Immigration Reason
visatype          =  Type of Visa
Gender            =  Sex

The second dimension table will contain city temperature data. The attributes will be extracted from the temperature dataframe:

Attribute Name       Description

i94port             = Destination city (mapped from immigration data during cleanup step) - 3 char code
AverageTemperature  = average temperature
City                = Name of the city
Country             = Name of the country
Latitude            = Latitude
Longitude           = Longitude


dim_time_table

Get all the arrival dates from the immigration dataset;
extract year, month, day, week from the date and insert all the values in the dim_time_table;

Write to parquet

3.1.2 Fact table

The fact table will contain information from the I94 immigration data joined with the city temperature data and airport location on i94port:

Attribute Name       Description

i94yr              = Year(4 digits)
i94mon             = numeric month
i94cit             = Origin city(3 digits code)
i94port            = Destination city(3 characters code)
arrdate            = Date of arrival
i94mode            = Travel Code (1 digit)
depdate            = Date of Departure
i94visa            = immigration reason
AverageTemperature = Average temperature of destination city
visatype           = Type of visa

The tables will be saved to Parquet files partitioned by city (i94port attribute) except dim_time_table.


#### 3.2 Mapping Out Data Pipelines

List the steps necessary to pipeline the data into the chosen data model

The pipeline steps are described below:

1. Initiate quality assessment for I94 attribute data as described in step 2 to create Spark dataframe df_immigration for each month;

2. Initiate quality assessment for temperature data as described in step 2 to create Spark dataframe df_temp (already performed);

3. Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port;

4. Create temperature dimension table by selecting relevant columns from df_temp and write to parquet file partitioned by i94port;

5. Create time dimension table by extracting year, month, day, week from the date (Arrival Date from immigration dimension table) write to parquet file;

6. Create fact table by joining immigration and temperature dimension tables on i94port and write to parquet file partitioned by i94port;

In [20]:

# Path to I94 immigration data 
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Initiate quality assessment for I94 attribute data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
dim_immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa", "visatype", "Gender"])

# Write immigration dimension table to parquet files partitioned by i94port
dim_immigration_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [21]:
# Extract columns for temperature dimension table
dim_temp_table = df_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Write temperature dimension table to parquet files partitioned by i94port
dim_temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [22]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immig_view")
df_temp.createOrReplaceTempView("temp_view")

In [23]:
# drop rows where the gender values entered is undefined
spark.sql("""SELECT * FROM immig_view WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_view")

In [24]:
# convert the departure dates into a useable value
spark.sql("""SELECT *, CASE 
                        WHEN arrdate >= 1.0 THEN date_add(to_date('1960-01-01'), arrdate)
                        WHEN arrdate IS NULL THEN NULL
                        ELSE 'N/A' END AS arrival_date 
                        
                FROM immig_view""").createOrReplaceTempView("immig_view")

In [25]:
# convert the departure dates into a useable value
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_view""").createOrReplaceTempView("immig_view")

In [26]:

# extract all distinct dates from arrival and departure dates to create dimension table
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immig_view
UNION
SELECT DISTINCT departure_date AS date
FROM immig_view
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")


In [27]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the dim_time table;
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [28]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the dim_time table;
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [29]:
#Saving the data in parquet format
dim_time.write.parquet("/results/dim_time.parquet")

In [30]:

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immig_view.i94yr as year,
       immig_view.i94mon as month,
       immig_view.i94cit as city,
       immig_view.i94port as i94port,
       immig_view.arrival_date as arrival_date,
       immig_view.departure_date as departure_date,
       immig_view.i94visa as reason,
       immig_view.Gender as Gender,
       temp_view.AverageTemperature as temperature,
       temp_view.Latitude as latitude,
       temp_view.Longitude as longitude
FROM immig_view
JOIN temp_view ON (immig_view.i94port = temp_view.i94port)
''')

# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [32]:
# Perform quality checks here

def quality_check(df, description):
    '''
    Input:  Description of Spark datafram
    
    Output: Data qulity check results
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(df_temp, "temperature table")

Data quality check passed for immigration table with 3088544 records
Data quality check passed for temperature table with 207 records


0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up

Purpose of data model;

The purpose of this data model is to provide a consistent source of information from which different analytics teams could initiate followings:

->Data exploration;(cluster, correlations, retrospective analysis)
->Find potential correlations between destination temperature and immigration statistics;
->Travel sesonality;
->Understand the traveller behaviour and initiate classifications;

Primary consumers of this data model are followings:

Data Scientist;
Data Analyst;
BI analyst;

Data is saved in parquet which has the following advantages:
->Size efficiency which drives sustainability in case data will increase;
->I/O failures are minimized;

Data model could be connected to a Data Visualization tool such as Power BI or Tableau;




* Clearly state the rationale for the choice of tools and technologies for the project.

The main purpose of this data model is to be a consistent and efficient source of data for consumers such as Data Scientists, Data Analysts or BI analysts for following use cases:

1.Data exploration (correlations using attributes from fact table e.g. temperature and Arrival date or Destination City)

    1.1 Applying clustering techniques in order to assess geographical clusters per different continuos/discrete variables (temperature, reason etc.) - The consumer of data model needs to initiate a query to aggregate data at Destinaty City level;
    
2.Travel sesonality - another potential use case which analytics team could cover using this data model is to assess the travel seasonality in understanding frequency of flights over the year;

    ->Analytics team could take a long time period of time (3-5 years) in order to draw consistent conlusions;
    ->Analytics team needs to query finalized years and use month or day (depending on granularity of seasonality analysis);

3.Traveler behavior - Analytics team could initiate queries in order to define traveler behaviour;

    ->In this regard they need to take into consideration explanatory variables such as : Gender, Immigation Reason, Origin City etc.
    
Technologies used:

Spark was used as main tool in creating data model due to below reasons:

->Imigration Dataset has a considerable amount of data (~ 3 million rows) combined with temperature date;

->We need to drive sustainability to process data over a longer period of time;

->Drive resilience in case we want to integrate new external sources;

->Ease the work to create new table derived from existing tables (dim_time_table);

->Posibility to process multiple queries quickly which drives to time efficient to explore, quality assess and create the data model;

->Spark is easing our work in handling multiple types of formats (such as SAS format)

* Propose how often the data should be updated and why.

We stated at the beginning of this project that we were interested in:

-> The effects and correlation between temperature on the volume of travellers;

-> Travel seasonality

None of these phenomenons require a rapid update of our data.The data update in order to cover the needs of this study would be monthly or quarterly.


* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x.
 
Our data would be stored in an Amazon S3 bucket (instead of storing it in the EMR cluster along with the staging tables) and loaded to our staging tables. We would still use Spark as it as our data processing platform since it is the best suited platform for very large datasets.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 In this case we can use a batch job ochestration such as Apache Airflow to perform the ETL and data qualtiy validation.
 
 * The database needed to be accessed by 100+ people.
 
 Once the data is ready to be consumed, it would be stored in a postgres database on a redshift cluster that easily supports multiuser access.