# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

please refer to https://github.com/stelioshalk/ETL-pipeline-for-I94-immigration-data


In [21]:
# Do all imports and installs here
#!pip install pyspark --upgrade
import pandas as pd
import os
from pyspark.sql import SparkSession
import configparser
from subprocess import call
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


#create the csv files from the txt files.
#call(["python", "generate_csv.py"])

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()



### Step 1: Scope the Project and Gather Data

#### Scope 
This project is to design a data lake ETL solution that automates data cleansing and processing the I-94 Immigration Dataset. 
Additionaly, the the Global Temperature Dataset and U.S. City Demographic Dataset are used.

#### Describe and Gather Data 
##### I-94 Dataset
For decades, U.S. immigration officers issued the I-94 Form (Arrival/Departure Record) to foreign visitors (e.g., business visitors, tourists and foreign students) who lawfully entered the United States. The I-94 was a small white paper form that a foreign visitor received from cabin crews on arrival flights and from U.S. Customs and Border Protection at the time of entry into the United States. It listed the traveler's immigration category, port of entry, data of entry into the United States, status expiration date and had a unique 11-digit identifying number assigned to it. Its purpose was to record the traveler's lawful admission to the United States.

This data is stored as a set of SAS7BDAT files. SAS7BDAT is a database storage file created by Statistical Analysis System (SAS) software to store data. It contains binary encoded datasets used for advanced analytics, business intelligence, data management, predictive analytics, and more. The SAS7BDAT file format is the main format used to store SAS datasets. The immigration data is partitioned into monthly SAS files. The data provided represents 12 months of data for the year 2016. This is the main dataset used in the project.



###### The I-94 Dataset can be found stored as parquet files in the sas_data folder.

In [3]:
# Read in the data here
print("Sample data of the I-94 Dataset. The dataset was read using Spark:")
spark = SparkSession.builder.getOrCreate()
df_spark =spark.read.load('./sas_data')

df_spark.head(5)

Sample data of the I-94 Dataset. The dataset was read using Spark:


[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

##### Global Temperature Dataset
The Berkeley Earth Surface Temperature Study has created a preliminary merged data set by combining 1.6 billion temperature reports from 16 preexisting data archives. 
The dataset is available here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

In the original dataset from Kaggle, several files are available but in this capstone project we will be using only the GlobalLandTemperaturesByCountry.csv.

The dataset does not provide tempratures for year 2016. We use this dataset as a dimnsion table linked to the imigration fact table for extracting average temperatures per country. (I94CIT & I94RES fields)

In [4]:
#run this if you want to vew sample data
temperature_fname = 'csvfiles/GlobalLandTemperaturesByState.csv'
spark = SparkSession.builder.getOrCreate()
country_temperature_df=spark.read.csv(temperature_fname)
country_temperature_df.head(5)





[Row(_c0='dt', _c1='AverageTemperature', _c2='AverageTemperatureUncertainty', _c3='State', _c4='Country'),
 Row(_c0='1855-05-01', _c1='25.544', _c2='1.171', _c3='Acre', _c4='Brazil'),
 Row(_c0='1855-06-01', _c1='24.228', _c2='1.103', _c3='Acre', _c4='Brazil'),
 Row(_c0='1855-07-01', _c1='24.371', _c2='1.044', _c3='Acre', _c4='Brazil'),
 Row(_c0='1855-08-01', _c1='25.427', _c2='1.073', _c3='Acre', _c4='Brazil')]

##### The US Cities: Demographics Dataset
This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey. This product uses the Census Bureau Data API but is not endorsed or certified by the Census Bureau. The dataset is available here: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/

This dataset will be combined with port city data to provide ancillary demographic info for port cities.

In [6]:
#run this if you want to vew sample data
demographics_fname='csvfiles/us-cities-demographics.csv'
spark = SparkSession.builder.getOrCreate()
demo_df=spark.read.csv(demographics_fname)
demo_df.head(5)

[Row(_c0='City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count'),
 Row(_c0='Silver Spring;Maryland;33.8;40601;41862;82463;1562;30908;2.6;MD;Hispanic or Latino;25924'),
 Row(_c0='Quincy;Massachusetts;41.0;44129;49500;93629;4147;32935;2.39;MA;White;58723'),
 Row(_c0='Hoover;Alabama;38.5;38040;46799;84839;4819;8229;2.58;AL;Asian;4759'),
 Row(_c0='Rancho Cucamonga;California;34.5;88127;87105;175232;5821;33878;3.18;CA;Black or African-American;24437')]

##### Airports Data
The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).

Airport codes from around the world. Downloaded from public domain source http://ourairports.com/data/ who compiled this data from multiple different sources. 
    This data is updated nightly. The definition of the dataset is described here: https://ourairports.com/help/data-dictionary.html
    

This dataset is in the airport-codes.csv file. This is used for the dimension table "airports".

In [7]:
#run this if you want to vew sample data
airports_fname='csvfiles/airport-codes_csv.csv'
spark = SparkSession.builder.getOrCreate()
airport_df=spark.read.csv(airports_fname)
airport_df.head(5)

[Row(_c0='ident', _c1='type', _c2='name', _c3='elevation_ft', _c4='continent', _c5='iso_country', _c6='iso_region', _c7='municipality', _c8='gps_code', _c9='iata_code', _c10='local_code', _c11='coordinates'),
 Row(_c0='00A', _c1='heliport', _c2='Total Rf Heliport', _c3='11', _c4='NA', _c5='US', _c6='US-PA', _c7='Bensalem', _c8='00A', _c9=None, _c10='00A', _c11='-74.93360137939453, 40.07080078125'),
 Row(_c0='00AA', _c1='small_airport', _c2='Aero B Ranch Airport', _c3='3435', _c4='NA', _c5='US', _c6='US-KS', _c7='Leoti', _c8='00AA', _c9=None, _c10='00AA', _c11='-101.473911, 38.704022'),
 Row(_c0='00AK', _c1='small_airport', _c2='Lowell Field', _c3='450', _c4='NA', _c5='US', _c6='US-AK', _c7='Anchor Point', _c8='00AK', _c9=None, _c10='00AK', _c11='-151.695999146, 59.94919968'),
 Row(_c0='00AL', _c1='small_airport', _c2='Epps Airpark', _c3='820', _c4='NA', _c5='US', _c6='US-AL', _c7='Harvest', _c8='00AL', _c9=None, _c10='00AL', _c11='-86.77030181884766, 34.86479949951172')]

#### Additional data
A data dictionary file for immigration was provided which was splitted to the following files:

- i94cntyl.txt : I94CIT & I94RES - This format shows all the valid and invalid codes for processing 
- i94prtl.txt  : I94PORT - This format shows all the valid and invalid codes for processing
- i94addrl.txt : I94ADDR - There is lots of invalid codes in this variable and the list below shows what we have found to be valid, everything else goes into 'other'
- i94model.txt : I94MODE - There are missing values as well as not reported
- I94VISA.txt  :I94VISA - Visa codes collapsed into three categories.

### Step 2: Explore and Assess the Data
#### Explore the Data


#### Cleaning Steps
Document steps necessary to clean the data.

Initially we convert the additional data extracted as txt files from the I94_SAS_Labels_Descriptions.SAS file to csv files. 

Some invalid characters get removed during this process.

The script below generates three csv files in the /csvfiles/ folder by reading the source files from the /txtfiles/ folder..


In [9]:
#converts txt files to csvs
import os
#run once to generate the csv files.
#process i94cntyl.txt
i94cntylFile = open('txtfiles/i94cntyl.txt', 'r')
i94cntylFilecsv = open('csvfiles/i94cntyl.csv', 'w')
try:
    while True:
        line = i94cntylFile.readline() 
        if not line:
            break
        list=line.split("=")
        newline=list[0].strip()+","+list[1].replace("'","").replace(","," ")
        i94cntylFilecsv.write(newline)
except IndexError:
    print("empty line->"+line)
i94cntylFilecsv.close()

#process i94addrl.txt
i94addrlFile = open('txtfiles/i94addrl.txt', 'r')
i94addrlFilecsv = open('csvfiles/i94addrl.csv', 'w')
try:
    while True:
        line = i94addrlFile.readline()
        if not line:
            break
        list=line.split("=")
        newline=list[0].strip().replace("'","")+","+list[1].replace("'","").replace(","," ")
        i94addrlFilecsv.write(newline)
except IndexError:
    print("empty line->"+line)
i94addrlFilecsv.close()

#process i94prtl.txt
i94prtlFile = open('txtfiles/i94prtl.txt', 'r')
i94prtllFilecsv = open('csvfiles/i94prtl.csv', 'w')
try:
    while True:
        line = i94prtlFile.readline()
        if not line:
            break
        list=line.split("=")       
        arport_code=list[0].strip().replace("'","")        
        secondpart=list[1].split(",")        
        city=secondpart[0].strip().replace("'","")
        if len(secondpart)==2:
            state_code=secondpart[1].replace("'","").replace(" ","")
        else:
            state_code=""
        newline=arport_code+","+city+","+state_code#+"\n"
        i94prtllFilecsv.write(newline)
except IndexError:
    print("empty line->"+line)
i94prtllFilecsv.close()

- Filter temperature data to only use data for United States.
- Remove irregular ports from I94 data.
- Drop rows with missing IATA codes from I94 data.


In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [11]:
# Performing cleaning tasks here

#Filter GlobalLandTemperaturesByState temperature data to only use data for United States.
temperature_fname = 'csvfiles/GlobalLandTemperaturesByState.csv'

country_temp_df=spark.read.option("header", "true").csv(temperature_fname)
filtered_df=country_temp_df.where("Country == 'United States'")

#write the filtered dataset as a staging parquet file in S3 bucket
filtered_df.write.parquet(path="s3a://shalbucket/staging_GlobalLandTemperaturesByState.parquet", mode = "overwrite")
filtered_df.head(5)


[Row(dt='1743-11-01', AverageTemperature='10.722000000000001', AverageTemperatureUncertainty='2.898', State='Alabama', Country='United States'),
 Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States'),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States'),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States'),
 Row(dt='1744-03-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States')]

In [13]:
#create the other staging parquet files in S3

#create the staging_i94prtl
staging_i94prtl=spark.read.option("header", "true").csv('csvfiles/i94prtl.csv')
staging_i94prtl.write.parquet(path="s3a://shalbucket/staging_i94prtl.parquet", mode = "overwrite")

#create the staging_i94cntyl
staging_i94cntyl=spark.read.option("header", "true").csv('csvfiles/i94cntyl.csv')
staging_i94cntyl.write.parquet(path="s3a://shalbucket/staging_i94cntyl.parquet", mode = "overwrite")

#create the staging_i94addrl
staging_i94addrl=spark.read.option("header", "true").csv('csvfiles/i94addrl.csv')
staging_i94addrl.write.parquet(path="s3a://shalbucket/staging_i94addrl.parquet", mode = "overwrite")

#create the staging_i94prtl
staging_i94addrl=spark.read.option("header", "true").csv('csvfiles/i94prtl.csv')
staging_i94prtl.write.parquet(path="s3a://shalbucket/staging_i94prtl.parquet", mode = "overwrite")

#create the staging_i94model
staging_i94model=spark.read.option("header", "true").csv('csvfiles/i94model.csv')
staging_i94model.write.parquet(path="s3a://shalbucket/staging_i94model.parquet", mode = "overwrite")

#create the staging_airport_codes
staging_airport_codes=spark.read.option("header", "true").csv('csvfiles/airport-codes_csv.csv')
staging_airport_codes.write.parquet(path="s3a://shalbucket/staging_airport_codes.parquet", mode = "overwrite")

#create the staging_i94visa
staging_i94visa=spark.read.option("header", "true").csv('csvfiles/i94visa.csv')
staging_i94visa.write.parquet(path="s3a://shalbucket/staging_i94visa.parquet", mode = "overwrite")

staging_airport_codes.head(5)


[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125'),
 Row(ident='00AA', type='small_airport', name='Aero B Ranch Airport', elevation_ft='3435', continent='NA', iso_country='US', iso_region='US-KS', municipality='Leoti', gps_code='00AA', iata_code=None, local_code='00AA', coordinates='-101.473911, 38.704022'),
 Row(ident='00AK', type='small_airport', name='Lowell Field', elevation_ft='450', continent='NA', iso_country='US', iso_region='US-AK', municipality='Anchor Point', gps_code='00AK', iata_code=None, local_code='00AK', coordinates='-151.695999146, 59.94919968'),
 Row(ident='00AL', type='small_airport', name='Epps Airpark', elevation_ft='820', continent='NA', iso_country='US', iso_region='US-AL', municipality='Harvest', gps_code='00AL', iata_code=None, local_code='00AL', coordinat

In [14]:
#filter the i94 dataset 
#Remove irregular ports from I94 data and write the dataset as a staging parquet file in S3 
df_spark =spark.read.load('./sas_data')
df_spark.createOrReplaceTempView('raw_immigrations')
allowed_ports=spark.read.option("header", "true").csv('csvfiles/i94prtl.csv')
allowed_ports.createOrReplaceTempView('staging_i94prtl')

staging_immigrations_table=spark.sql("""
SELECT * from raw_immigrations where i94port in (SELECT airport_code from staging_i94prtl)
""")


staging_immigrations_table.write.parquet("s3a://shalbucket/staging_immigrations.parquet")
staging_immigrations_table.head(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [15]:
#create the staging_demographics dataset by processing the us-cities-demographics.csv
staging_demographics=spark.read.option("header", "true").option("delimiter", ";").csv('csvfiles/us-cities-demographics.csv')

#When saving the file to Parquet format, you cannot use spaces and some specific characters.
newColumns = []
problematic_chars = ',;{}()='
for c in staging_demographics.columns:
    c = c.lower()
    c = c.replace(' ', '_')
    for i in problematic_chars:
        c = c.replace(i, '')
    newColumns.append(c) 
staging_demographics = staging_demographics.toDF(*newColumns)
staging_demographics.write.parquet(path="s3a://shalbucket/staging_dmographics.parquet",mode = "overwrite")   

staging_demographics.head(5)


[Row(city='Silver Spring', state='Maryland', median_age='33.8', male_population='40601', female_population='41862', total_population='82463', number_of_veterans='1562', foreign-born='30908', average_household_size='2.6', state_code='MD', race='Hispanic or Latino', count='25924'),
 Row(city='Quincy', state='Massachusetts', median_age='41.0', male_population='44129', female_population='49500', total_population='93629', number_of_veterans='4147', foreign-born='32935', average_household_size='2.39', state_code='MA', race='White', count='58723'),
 Row(city='Hoover', state='Alabama', median_age='38.5', male_population='38040', female_population='46799', total_population='84839', number_of_veterans='4819', foreign-born='8229', average_household_size='2.58', state_code='AL', race='Asian', count='4759'),
 Row(city='Rancho Cucamonga', state='California', median_age='34.5', male_population='88127', female_population='87105', total_population='175232', number_of_veterans='5821', foreign-born='3387

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The facts table of the proposed model is based on the i94 immigration dataset since this represents the main data that could be analysed. 
All other datasets are used for populating the dimension tables. 

The selected data model is composed of the following tables:
- immigrations. This table gets populated using data from the  i94 immigration dataset.
- country. The country table gets created by using data from the staging_i94cntyl table.
- state. The state table gets populated by combining data from the GlobalLandTemperaturesByState and the us-cities-demographics.csv dataset
- airport. The airport dimension table is created by processing the airports-codes dataset and the i94prtl.csv.
- transportation_mode. This table gets populated by using data from the i94model.csv
- visa_status. This table gets populated by using data from the i94visa.csv

The data model is displayed in the diagram below:



#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

All datasets are processed and cleaned using spark and stored staging parquet files in an S3 bucket.
The parquet files get processed further and loaded into a redshift database.


The pipeline follows the steps described below. The whole data pipeline can be executed by calling the new_etl.py python script. 


1. Initially we convert some txt files extracted from the I94_SAS_Labels_Descriptions.SAS file. The txt files get processed, some invalid characters get removed and converted to csv files which are the source of the pipeline in addition to the SAS files.

2. All csv files and the SAS files get filtered and stored as staging files in Amazon S3. The staging files are stored as parquet files.
3. All existing tables and data get dropped.
4. External tables get created which are linked with the parquet files that are stored in S3. Using external tables provides the flexibility to access the data using common sql statements, while at the same time we use the storage capacity of AWS S3 servers.

5. The model tables get created.

6. The model tables get loaded with data from the external staging tables.

The steps 3,4,5 and 6 get executed by calling the create_tables.py. 




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model. All processes described in this jupiter notebook are included in the new_etl.py file.

In [23]:
#create the tables in redshift #all sql queries for creating the redshift tables are in sql_queries.py
#All existing tables and data get dropped.
#External tables get created which are linked with the parquet files that are stored in S3. Using external tables provides the flexibility to access the data using common sql statements, while at the same time we use the storage capacity of AWS S3 servers.
#The model tables get created.
#The model tables get loaded with data from the external staging tables.
#Make sure to update the dl.cfg with the cluster details
#Execute the script create_tables.py
#Read here for more details about external tables: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_EXTERNAL_TABLE.html

#create the external staging tables and load the model with data from the staging tables:
call(["python", "create_tables.py"])



0

In [24]:
# Perform quality checks here
import psycopg2
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
conn.set_isolation_level(0) #set the isolation level to 0 or else ddl commands fails such as the create external table 
cur = conn.cursor()
from sql_queries import data_quality_checks_queries
for query in data_quality_checks_queries:
        cur.execute(query)
        result=cur.fetchone()
        if result[0]==0:
            print("Number of rows:0. Quality check failed for query: {}".format(query))
        else:
            print ("Quality check succeeded.")


Quality check succeeded.
Quality check succeeded.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.