# Project Title
### Data Engineering Capstone Project

#### Project Summary
This pipeline creates a data lake for CIC immigration data. It extratcs data from S3, process it using Spark and loads data back to S3 as a set of parquete files

In [1]:
#Import Librarires
import pandas as pd
import os
import configparser
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from datetime import datetime, timedelta, date
from pyspark.sql import types as T
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear

In [2]:
# Read Credentials

# Reading credentials
config = configparser.ConfigParser()
config.read('conf.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

# define paths to data
airport_datapath = config.get('S3','S3_INPUT_AIRPORTS')
demograf_datapath = config.get('S3','S3_INPUT_DEMOGRAF')
immigr_datapath = config.get('S3','S3_INPUT_IMMGR')
iata_datapath = config.get('S3','S3_INPUT_IATA')
s3_output = config.get('S3','S3_OUTPUT')

In [3]:
#Spark Session
spark = SparkSession.builder\
.config("spark.jars.repositories", "https://repos.spark-packages.org/")\
.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")\
.enableHiveSupport().getOrCreate()

In [4]:
# Define your functions here

# This function converts SAS dates into datetime
def convertSasDate(d):
    if(type(d)==int or type(d)==float):
        d = d
    else:
        d = 0
    return(pd.to_timedelta(d, unit='D') + pd.Timestamp('1960-1-1'))
convertSasDateUDF = udf(lambda x: convertSasDate(x),T.DateType())


# Function to check if tables have data
def tableNotEmptyCheck(spark, table):
    tableCheck = spark.sql("""
            Select count(*)
            FROM {}
        """.format(table))
    if tableCheck.collect()[0][0]>0:
        print("QC passed. Table {} is not empty and has {} rows".format(table,tableCheck.collect()[0][0]))
    else:
        print("WARNING, Table {} has {} rows. ! Check your code".format(table,tableCheck.collect()[0][0]))

### Step 1: Scope the Project and Gather Data

#### Scope 
This project uses 4 data sources described below. The pipeline processes immigration data and loads it into a data lake on S3 server as a set of parquete files. 
The end solution includes 3 parquete files:
- Immigration data: each row is an event of a person entering the US.
- Demografic data: contains info about each city in the US (avg median age, population, avg household size, etc.)
- Airport Data: airport description contaning IATA code, location and other details  

All three data sets can be joined via IATA code

#### Input Data
* I94 Immigration Data: SAS file for each month of 2016. 
    This data comes from US National Tourism and Trade Office, and the original
    source: https://travel.trade.gov/research/reports/i94/historical/2016.html 
* U.S. City Demographic Data: CSV file with information on US City demographics:
    population -male, female, median income etc. 
    Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
* Airport Code Table: CSV file with table of airport codes and corresponding cities
    source: https://datahub.io/core/airport-codes#data
* IATA data: CSV file, lookup to get IATA code to airport name, city, state
    Source: https://www.airportcodes.us/us-airports.htm


In [5]:
# reading data
df_airports = spark.read.format('csv').options(header='true').load(airport_datapath)
df_immg = spark.read.parquet(immigr_datapath)
df_demograf = spark.read.format('csv').options(header='true', sep=';',inferSchema=True).load(demograf_datapath)
df_iateCodes = spark.read.format('csv').options(header='true').load(iata_datapath)

### Step 2: Explore and Assess the Data

#### Cleaning Steps
* Select needed fields
* Convert SAS date format into a datetime format
* Remove NA values from iata codes as it is used for mapping

In [6]:
# Cleanining immigration data
df_immg=df_immg.na.drop(subset=['i94port'])
df_immg = df_immg.withColumn('arrdate',convertSasDateUDF(df_immg.arrdate))
df_immg.printSchema()
df_immg.show(5)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = tru

In [7]:
#cleaning airport data
df_airports=df_airports.na.drop(subset=['iata_code'])
df_airports.printSchema()
df_airports.show(5)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|  03N|small_airport|      Utirik Airport| 

In [8]:
# Cleaning Demografic data
df_demograf=df_demograf.na.drop(subset=['City','State'])
#remove City-State duplicates
df_demograf = df_demograf.drop('Race','Count')
df_demograf=df_demograf.distinct()
df_demograf.printSchema()
df_demograf.show(5)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)

+------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|        City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|  Somerville|Massachusetts|      31.0|          41028|            39

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
This project is designed as a star schema with 1 fact and multiple dimensonal tables. This solution allows efficient querying and optimizes execution speed. 

#### 3.2 Mapping Out Data Pipelines
Input data is stored in an S3 location. The pipeline loads each data file into a staging table from which final tables are created and are loaded into a data lake on S3 as a set of parquete files. Fact and dimensional tables can be mapped through the iata code.

#### Data Model
* Fact Table. Immigration  </br>
Contains transactional data where each transaction is an event of one person arriving to the US. Maps to other tables via IATA codes (i94port field)
* Dimension Table #1: Airports. </br> 
Airport data including airport exact coordinates, description, IATA code, city, name, etc. Can be mapped to the fact table via iata_code
* Dimension Table #2: Demografic. </br>
Info about US cities. This table joins demografic data with IATA codes (iata data source). Demografic table contains city name, city`s population, median age, etc. it alllows to understand what locations are more popular among immigrants. 

<img src="diagram.png"
     alt="Schema"
 />

### Step 4: Run Pipelines to Model the Data 

In [9]:
# Extract Immigration data
df_immg.createOrReplaceTempView("immigration_stage")
immigration = spark.sql("""
    SELECT 
        cicid,
        I94mon,
        i94port,
        arrdate,
        i94addr,
        CASE
            WHEN i94visa = 1 THEN 'Business'
            WHEN i94visa = 2 THEN 'Pleasure'
            WHEN i94visa = 3 THEN 'Student'
            ELSE 'Not Defined'
        END as visa,
        biryear,
        gender,
        visatype
    FROM immigration_stage 
    LIMIT 1000000
""")
immigration.show()
immigration.printSchema()

+---------+------+-------+----------+-------+--------+-------+------+--------+
|    cicid|I94mon|i94port|   arrdate|i94addr|    visa|biryear|gender|visatype|
+---------+------+-------+----------+-------+--------+-------+------+--------+
|5748517.0|   4.0|    LOS|2016-04-30|     CA|Business| 1976.0|     F|      B1|
|2483726.0|   4.0|    NEW|2016-04-14|     NY|Pleasure| 2001.0|     M|      WT|
|5748518.0|   4.0|    LOS|2016-04-30|     NV|Business| 1984.0|     F|      B1|
|2483727.0|   4.0|    NEW|2016-04-14|     NY|Pleasure| 2002.0|     M|      WT|
|5748519.0|   4.0|    LOS|2016-04-30|     WA|Business| 1987.0|     M|      B1|
|2483728.0|   4.0|    NEW|2016-04-14|     NY|Pleasure| 2003.0|     M|      WT|
|5748520.0|   4.0|    LOS|2016-04-30|     WA|Business| 1987.0|     F|      B1|
|2483729.0|   4.0|    NEW|2016-04-14|     NY|Pleasure| 2005.0|     F|      WT|
|5748521.0|   4.0|    LOS|2016-04-30|     WA|Business| 1988.0|     M|      B1|
|2483730.0|   4.0|    NEW|2016-04-14|     NY|Busines

In [10]:
# Extract airport data
df_airports.createOrReplaceTempView("airports_stage")
airports = spark.sql("""
    SELECT 
        *
    FROM airports_stage 
    where iata_code is not null
""")
df_airports.show(5)
df_airports.printSchema()

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|  03N|small_airport|      Utirik Airport|           4|       OC|         MH|    MH-UTI|Utirik Island|    K03N|      UTK|       03N|  169.852005, 11.222|
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US

In [11]:
# Extract Demografic Data
df_iateCodes.createOrReplaceTempView("iata_codes")
df_demograf.createOrReplaceTempView("demograf_stage")
demografy = spark.sql("""
    SELECT 
        demograf.City as city,
        demograf.State as state,
        demograf.`Median Age` as median_age,
        demograf.`Male Population` as male_population,
        demograf.`Female Population` as female_population,
        demograf.`Total Population` as total_population,
        demograf.`Number of Veterans` as num_of_veterans,
        demograf.`Foreign-born` as foreign_born,
        demograf.`Average Household Size` as avg_household_size,
        demograf.`State Code` as state_code,     
        iata.Code as code
    FROM demograf_stage AS demograf
    LEFT JOIN iata_codes AS iata
    ON demograf.city=iata.City and demograf.`State Code`=iata.State
    where code is not null
""")
demografy.printSchema()
demografy.filter(demografy.city=='Albany').show()
demografy.filter(demografy.code!='').show()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- num_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- avg_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- code: string (nullable = true)

+------+--------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+----+
|  city|   state|median_age|male_population|female_population|total_population|num_of_veterans|foreign_born|avg_household_size|state_code|code|
+------+--------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+----+
|Albany| Georgia|      33.3|          31695|            39414|           71109|

In [12]:
# Writing data to the Data Lake (S3), parquete files

demografy.write.mode('overwrite').partitionBy('State').parquet(s3_output+'demografy.parquet')
airports.write.mode('overwrite').parquet(s3_output+'airports.parquet')
immigration.write.mode('overwrite').partitionBy('I94mon').parquet(s3_output+'immigration.parquet')

#### 4.2 Data Quality Checks
Basic quality checks to ensure data can be properly mapped and tables are not empty. 

In [13]:
demografy.createOrReplaceTempView("demografy")
airports.createOrReplaceTempView("airports")
immigration.createOrReplaceTempView("immigration")

In [15]:
# Check if tables have data
tableNotEmptyCheck(spark,'airports')
tableNotEmptyCheck(spark,'immigration')
tableNotEmptyCheck(spark,'demografy')

QC passed. Table airports is not empty and has 9189 rows
QC passed. Table immigration is not empty and has 1000000 rows
QC passed. Table demografy is not empty and has 243 rows


In [16]:
# Check if tables can join and return results.
joinTables = spark.sql("""
    Select 
        i.cicid,
        i.i94port,
        i.visa,
        a.iata_code,
        a.municipality,
        d.code,
        d.city,
        d.state
    FROM
        immigration i,
        airports a,
        demografy d
    where i.i94port=a.iata_code and i.i94port=d.code 
    and i94port is not null
    LIMIT 1
    """)
joinTablesResults = joinTables.collect()

if len(joinTablesResults)>0:
    print("tables can properly join and return result. Example: {}".format(joinTablesResults[0]))
else:
    print("Warning, tables cannot join. Join returns {} rows and {} columns".format(len(joinTablesResults),len(joinTablesResults[0])))

tables can properly join and return result. Example: Row(cicid=3548566.0, i94port='PVD', visa='Pleasure', iata_code='PVD', municipality='Providence', code='PVD', city='Providence', state='Rhode Island')


#### 4.3 Output Data and Query Examples
- Folder Output data samples contains samples of data expected to generate by this pipeline

A typical query example is present below. It finds a population distribution for each types of visa


In [17]:
print("Visa type and populations")
spark.sql("""
    Select 
        i.visa,
        d.city,
        d.state,
        avg(d.total_population)
    FROM
        immigration as i,
        airports as  a,
        demografy as d
    where i.i94port=a.iata_code and i.i94port=d.code 
    group by 1,2,3
    order by 1,4
""").show()

Visa type and populations
+--------+----------------+--------------+---------------------+
|    visa|            city|         state|avg(total_population)|
+--------+----------------+--------------+---------------------+
|Business|       Melbourne|       Florida|              80136.0|
|Business|           Ogden|          Utah|              85450.0|
|Business|        Carlsbad|    California|             113466.0|
|Business|           Fargo|  North Dakota|             119250.0|
|Business|     Victorville|    California|             122236.0|
|Business|         Midland|         Texas|             132950.0|
|Business|        Columbia|South Carolina|             133393.0|
|Business|      Charleston|South Carolina|             135524.0|
|Business|        Syracuse|      New York|             144152.0|
|Business|        Savannah|       Georgia|             145684.0|
|Business|         Ontario|    California|             171200.0|
|Business|      Providence|  Rhode Island|             179204.0|

#### 4.4 Data dictionary 
(1) Immigration Table - Created from the I94 SAS files.
    Fields:

     cicid: integer - unique ID for each immigrant used as PK for this table
     iata_code: string - Airport code e.g. BOS -> Boston, NYC - New York City
     State: string - Standard US state e.g. CA -> California, NY -> New York
     arrival_date: date - data of arrival YYYY-MM-DD
     visa: double- visa code, e.g. 1 = Business, 2 = Pleasure
     visa_type: string visa type per visa code e.g. Business, Student
     gender: string - Non immigrant sex e.g. M, F
     birth_year: integer - 4 digit year of birth

(2) Airport Table - Created from airport code csv file
    Fields:

    airport_name: string e.g. Abu Dhabi International Airport
    region: string - Region of the airport e.g AE-AZ
    muncipality: string - e.g. Abu Dhabi
    iata_code: string - 3-digit Airport code, e.g. ALV
    coordinate_x: string - can be used to find location airport
    coordinate_y: string  - can be used to find location airport

(3) Demografic Table - created from US city demographics csv file
    Fields:

     city: string - City for the airport e.g. San Francisco 
     state: string - State e.g California
     median_age: string - median age for the population in that city
     male_population: string - male population for this city
     female_population: string - female population for this city
     total_population: string - total population for this city
     num_of_veterans: string - number of veterans living in the city
     foreign_born: string - how many are foreign born in that city
     avg_household_size - average household size in the city
     state_code: string - State abbr. e.g CA
     race: string - race name
     count: int - number of people related to this race in this city
     iata_code: string - 3-digit Airport code e.g. SFO





#### Step 5: Complete Project Write Up
Combined number of records in all data sets used in this project exceeds 1M lines. Thus, it is more rational to leverage a distributed data processing using Spark on AWS cluster. In order to easy the data loading process all input files were placed to the S3 storage into the "inbound data" folder. Immigration raw data underwent additional processing and was transofrmed from SAS to a parquete format which allows an efficient  data extraction process.  
* Propose how often the data should be updated and why.
    - The pipeline is supposed to run on a monthly basis since the fact table is based on immigration data partitioned by month
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - If data was increased by 100x it would require additional comptational capacity. It can be resollved by adding more work nodes to the AWS cluster.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - Immigration data should be re-partitioned and ETL can be refactored to process only data updated since the last run (a day before)
 * The database needed to be accessed by 100+ people.
     - First, we would need to understand their needs and the purpose of usage. Based on this we could slice data to create a separate table and load this data into a BI tool for easy access