# Immigration by City Data Lake
### Data Engineering Capstone Project

#### Project Summary
This notebook extracts data from a number of sources and generates a data model to analyze international arrival data to different cities throughout the US. The data is modeled in a star schema and the fact table is written to parquet files. This process will facilitate analytics on the data from the different sources, including immigration details from the I94, temperature data, demmographics from US cities, and airports.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from udfs import *

In [3]:
# Dfine/configure Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


### Step 1: Scope the Project and Gather Data

#### Scope 
The fact and dimension tables used in this project are derived from four data sources. The fact table is designed for analyzing international arrival data by city and month for U.S. airport arrivals. This notebook is designed to build a data lake, rather than a data warehouse. This allows greater flexibility in analysis while minimizing the transformations needed to process the data. 

This project uses Python and Apache Spark to process this data. 

#### Data sources for the project: 

 1. I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and includes details on incoming international arrivals. The data includes information on country of origin, visa type, age, and port of entry [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)
 1. U.S. City Demographic Data: comes from OpenSoft originally from the U.S. Census Bureau's 2015 American Community Survey and includes data by city, state, age, gender, and foreign-born population [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
 1. Airport Code Table: comes from datahub.io and includes airport codes and corresponding cities [link](https://datahub.io/core/airport-codes#data)
 1. World Temperature Data: comes from Kaggle and includes temperature data in the U.S. since from 1850 to 2013 [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)



In [4]:
# Read in the data here
demographics=spark.read.format("csv").option("delimiter", ";").option("header", "true").load("us-cities-demographics.csv")
airport=spark.read.format("csv").option("delimiter", ",").option("header", "true").load("airport-codes_csv.csv")
temperature=spark.read.format("csv").option("delimiter", ",").option("header", "true").load("GlobalLandTemperaturesByCity.csv")

In [5]:
to_load = input
# load all of 2016 immigration data (June has an extra 6 columns)
june = 'i94_jun16_sub'
immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/{}.sas7bdat'.format(june))
immigration = immigration.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa')

other_months = ['i94_jan16_sub','i94_feb16_sub','i94_mar16_sub','i94_apr16_sub','i94_may16_sub',
          'i94_jul16_sub','i94_aug16_sub','i94_sep16_sub','i94_oct16_sub','i94_nov16_sub',
          'i94_dec16_sub']
for m in tqdm(other_months):
    # Comment out next 2 lines for faster testing 
#     tmp = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/{}.sas7bdat'.format(m))
#     immigration = immigration.union(tmp)
    pass 

100%|██████████| 11/11 [00:00<00:00, 77803.28it/s]


### Step 2: Explore and Assess the Data
#### Explore the Data


In [6]:
immigration.show(1, vertical=True)
demographics.show(1, vertical=True)
airport.show(1, vertical=True)
temperature.show(1, vertical=True)

-RECORD 0-------------------
 cicid    | 4.0             
 i94yr    | 2016.0          
 i94mon   | 6.0             
 i94cit   | 135.0           
 i94res   | 135.0           
 i94port  | XXX             
 arrdate  | 20612.0         
 i94mode  | null            
 i94addr  | null            
 depdate  | null            
 i94bir   | 59.0            
 i94visa  | 2.0             
 count    | 1.0             
 dtadfile | null            
 visapost | null            
 occup    | null            
 entdepa  | Z               
 entdepd  | null            
 entdepu  | U               
 matflag  | null            
 biryear  | 1957.0          
 dtaddto  | 10032016        
 gender   | null            
 insnum   | null            
 airline  | null            
 admnum   | 1.4938462027E10 
 fltno    | null            
 visatype | WT              
only showing top 1 row

-RECORD 0------------------------------------
 City                   | Silver Spring      
 State                  | Maryland         

#### Cleaning Steps
Each data set will be cleaned and processed individually. 
 1. Cleaning Temperature data
     1. Parse date to add year and month columns
     1. Filter for U.S. cities 
     1. Add average temperature by month column 
     1. Add average temperature uncertainty column 
     1. Drop duplicates
 1. Cleaning Airport data
     1. Filter for U.S. locations 
     1. Transform city name to upper case for easier joins
     1. Filter for airports 
     1. Parse region code for state code value 
     1. Drop duplicates
 1. Cleaning up demographic data 
     1. Calculate percentages for gender and foreign-born populations 
     1. Format median age as float and city as upper case 
     1. Rename fields 
     1. Drop duplicates
 1. Cleaning Immigration data
     1. Filter for arrivals by air, non-null origin country code, non-null destination code
     1. Limit table to year, month, origin country, destination city, destination state, visa type, age, and gender fields
     1. Populate destination city value from destination code
     1. Populate origin country value from origin country code
     1. Aggregate table by yea, month, destination state, destination city, visa type, age gender, and country of origin 


In [7]:
# clean temp
# add year and month columns 
temperature = temperature.withColumn("year",year(temperature["dt"]))\
.withColumn("month",month(temperature["dt"]))\
# only need US temps
temperature=temperature.filter(temperature["country"]=="United States")\
.withColumn('City', upper(col('City')))

# Avg by city / month
temperature_avg=temperature.groupBy('City','month').agg({'AverageTemperature':'avg',
                                                 'AverageTemperatureUncertainty':'avg'})

# clean up table
temperature_avg=temperature_avg.select("city",
                               "month",
                               col("avg(AverageTemperature)").alias("avg_temp"),
                               col("avg(AverageTemperatureUncertainty)").alias("avg_temp_uncertainty")).drop_duplicates()

In [8]:
temperature_avg.show(3)

+----------+-----+------------------+--------------------+
|      city|month|          avg_temp|avg_temp_uncertainty|
+----------+-----+------------------+--------------------+
|  BELLEVUE|    6|12.756615384615394|   0.700005494505495|
|BRIDGEPORT|   12|1.2839808429118775|  1.5540383141762462|
|CHARLESTON|   12|11.119011494252877|  1.6366973180076625|
+----------+-----+------------------+--------------------+
only showing top 3 rows



In [9]:
# clean airports

# only need US airports
airport=airport.filter(airport["iso_country"]=="US")\
.withColumn('municipality', upper(col('municipality')))

# filter for airports 
airport=airport.where(col("type").isin({"small_airport", "medium_airport", "large_airport"})).drop_duplicates()

# parse region for state
airport = airport.withColumn('state', split(airport['iso_region'], '-').getItem(1))\
.withColumn("elevation_ft",col("elevation_ft").cast("integer"))

# clean up table
airport=airport.select(col("ident").alias("airport_id"),
                       col("municipality").alias("city"),
                       "state",
                       "name",
                       "type",
                       "elevation_ft").drop_duplicates()


In [10]:
airport.show(3)

+----------+-----------+-----+--------------------+-------------+------------+
|airport_id|       city|state|                name|         type|elevation_ft|
+----------+-----------+-----+--------------------+-------------+------------+
|   US-0495|    MILFORD|   IL|Stichnoth RLA Air...|small_airport|         700|
|      18XA|     GOLIAD|   TX|Lantana Ridge Air...|small_airport|         250|
|      CO92|LAST CHANCE|   CO|Frasier Ranch Air...|small_airport|        5000|
+----------+-----------+-----+--------------------+-------------+------------+
only showing top 3 rows



In [11]:
# clean demographics 

# calculate percentages
demographics=demographics.withColumn("median_age",col("Median Age").cast("float"))\
.withColumn("pct_male_population",demographics["Male Population"]/demographics["Total Population"]*100)\
.withColumn("pct_female_population",demographics["Female Population"]/demographics["Total Population"]*100)\
.withColumn("pct_foreign_born",demographics["Foreign-born"]/demographics["Total Population"]*100)\
.withColumn('City', upper(col('City')))\
.withColumn("Total Population",col("Total Population").cast("integer"))

# rename fields 
demographics = demographics.select(
    col("City").alias("city"),
    col("State Code").alias("state"),
    col("Total Population").alias("population"),
    "median_age",
    "pct_male_population",
    "pct_female_population",
    "pct_foreign_born"
).drop_duplicates()

In [12]:
demographics.show(3)

+-------------+-----+----------+----------+-------------------+---------------------+------------------+
|         city|state|population|median_age|pct_male_population|pct_female_population|  pct_foreign_born|
+-------------+-----+----------+----------+-------------------+---------------------+------------------+
|   FORT MYERS|   FL|     74015|      37.3|  49.78720529622374|    50.21279470377627|20.759305546173074|
|THOUSAND OAKS|   CA|    129329|      44.8|  50.34678996976702|    49.65321003023297|19.585707768559253|
|   COSTA MESA|   CA|    113186|      34.8| 52.212287738766285|    47.78771226123372| 23.54089728411641|
+-------------+-----+----------+----------+-------------------+---------------------+------------------+
only showing top 3 rows



In [13]:
# clean immigration data

# only need air arivals i94mode=1
immigration=immigration.filter(immigration["i94mode"]==1)\
.filter(immigration.i94res.isNotNull())\
.filter(immigration.i94port.isNotNull())\
.withColumn("I94MON",col("I94MON").cast("integer"))\
.withColumn("I94RES",col("I94RES").cast("integer"))\
.withColumn("I94YR",col("I94YR").cast("integer"))\
.withColumn("I94BIR",col("I94BIR").cast("integer"))\
.withColumn("count",col("count").cast("integer"))\
.withColumn("I94VISA",col("I94VISA").cast("integer"))

# only take relevant fields 
immigration = immigration.select(
    "cicid",
    col("I94YR").alias("year"),
    col("I94MON").alias("month"),
    col("I94RES").alias("org_country_code"),
    col("I94PORT").alias("dest_city_code"),
    col("I94ADDR").alias("dest_state"),
    col("I94VISA").alias("visa_type"),
    col("I94BIR").alias("age"),
    col("GENDER").alias("gender"),
    "count"
).drop_duplicates()

# add dest city name 
immigration = immigration.withColumn('dest_city', city_codes_udf(
    immigration["dest_city_code"]))

# add org country 
immigration = immigration.withColumn('org_country', country_codes_udf(
    immigration["org_country_code"]))

In [14]:
# aggregate immigration table 
immigration_ag=immigration.groupBy('year','month', 
                                  'dest_city_code',
                                   'dest_state','age',
                                   'visa_type','gender',
                                   'dest_city', 'org_country'
                                  ).agg({'count':'sum'})\
.withColumnRenamed("sum(count)", "count")

In [15]:
immigration_ag.show(3)

+----+-----+--------------+----------+---+---------+------+-----------+-----------+-----+
|year|month|dest_city_code|dest_state|age|visa_type|gender|  dest_city|org_country|count|
+----+-----+--------------+----------+---+---------+------+-----------+-----------+-----+
|2016|    6|           LOS|        CA| 32|        2|     M|LOS ANGELES|    HUNGARY|    7|
|2016|    6|           LVG|        NV| 37|        2|     M|  LAS VEGAS|    IRELAND|   12|
|2016|    6|           HHW|        HI| 62|        2|     M|   HONOLULU|  AUSTRALIA|   74|
+----+-----+--------------+----------+---+---------+------+-----------+-----------+-----+
only showing top 3 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The star schema was chosen because of its simplicity and the flexibility for analysis. It lso allows for minimizing the transformations needed for the dimension tables.

**Schema**
##### Dimension Tables
1. Temperature
    1. city
    1. month
    1. avg_temp
    1. avg_temp_uncertainty
1. Airport
    1. airport_id
    1. city
    1. state
    1. name
    1. type
    1. elevation_ft
1. Demographics 
    1. city
    1. state
    1. population
    1. median_age
    1. pct_male_population
    1. pct_female_population
    1. pct_foreign_born
1. Immigration
    1. year
    1. month
    1. dest_city_code
    1. dest_state
    1. age
    1. visa_type
    1. gender
    1. dest_city
    1. org_country
    1. count
  
##### Fact Table
1. Fact
    1. year
    1. month
    1. dest_city
    1. dest_state
    1. org_country
    1. avg_temp
    1. num_airports
    1. population
    1. pct_foreign_born
    1. imigration_count


#### 3.2 Mapping Out Data Pipelines
Data pipeline steps for data model 
1. Using the cleaned and minimally transformed data generate the dimension tables as DataFrames
1. Generate the Fact table using Spark SQL to join the dimension tables
1. Write fact table to parquet files as this is the only significant transformation


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [16]:
# create spark sql views 

temperature_avg.createOrReplaceTempView("temperature")
airport.createOrReplaceTempView("airport")
demographics.createOrReplaceTempView("demographics")
immigration_ag.createOrReplaceTempView("immigration")



In [18]:

fact=spark.sql("""
SELECT 
    i.year, 
    i.month, 
    i.dest_city, 
    i.dest_state,
    i.org_country,
    t.avg_temp,
    a.num_airports,
    d.population,
    d.pct_foreign_born,
    SUM(i.count) as imigration_count
   
FROM immigration i
JOIN temperature t ON i.dest_city=t.city AND i.month=t.month
JOIN (
    select city, state, COUNT(airport_id) as num_airports 
    FROM airport 
    GROUP BY city, state
    ) a ON i.dest_city=a.city AND i.dest_state = a.state
JOIN demographics d ON i.dest_city=d.city AND i.dest_state = d.state

GROUP BY 
    i.year, 
    i.month, 
    i.dest_city, 
    i.dest_state,
    i.org_country,
    t.avg_temp,
    d.population,
    d.pct_foreign_born ,
    a.num_airports

ORDER BY 
    i.dest_city, i.org_country

""")


In [19]:
fact.show(10)

+----+-----+-----------+----------+-----------+-----------------+------------+----------+------------------+----------------+
|year|month|  dest_city|dest_state|org_country|         avg_temp|num_airports|population|  pct_foreign_born|imigration_count|
+----+-----+-----------+----------+-----------+-----------------+------------+----------+------------------+----------------+
|2016|    6|ALBUQUERQUE|        NM|     FRANCE|20.76131088082901|           3|    559131| 10.40900969540233|               1|
|2016|    6|  ANCHORAGE|        AK| ARGENTINA |9.911816666666668|          10|    298695|11.134434791342338|               2|
|2016|    6|  ANCHORAGE|        AK|  AUSTRALIA|9.911816666666668|          10|    298695|11.134434791342338|             352|
|2016|    6|  ANCHORAGE|        AK|    AUSTRIA|9.911816666666668|          10|    298695|11.134434791342338|              79|
|2016|    6|  ANCHORAGE|        AK|    BELARUS|9.911816666666668|          10|    298695|11.134434791342338|          

In [20]:
fact.printSchema()
immigration_ag.printSchema()
temperature_avg.printSchema()
airport.printSchema()
demographics.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- org_country: string (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- num_airports: long (nullable = false)
 |-- population: integer (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- imigration_count: long (nullable = true)

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dest_city_code: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa_type: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- org_country: string (nullable = true)
 |-- count: long (nullable = true)

root
 |-- city: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- avg_temp_uncertainty: double (nullable = true

In [22]:
# write fact table to parquet
print('Writing {} rows to parquet'.format(fact.count()))
fact.write.partitionBy('dest_city','dest_state').option('compression','snappy')\
.parquet("data_tables/fact",mode='overwrite')


Writing 3258 rows to parquet


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

1. Check for null values in fact table
1. Make sure fact table sums correctly compared to raw data
1. Confirm shape of fact table 

In [23]:
# check for null values in the fact table 
fact.select(isnull('year').alias('year'),
            isnull('month').alias('month'),
            isnull('org_country').alias('org_country'),
            isnull('dest_city').alias('dest_city'),
            isnull('avg_temp').alias('avg_temp'),
            isnull('num_airports').alias('num_airports'),
            isnull('population').alias('population'),
            isnull('pct_foreign_born').alias('pct_foreign_born')
           ).dropDuplicates().show()

+-----+-----+-----------+---------+--------+------------+----------+----------------+
| year|month|org_country|dest_city|avg_temp|num_airports|population|pct_foreign_born|
+-----+-----+-----------+---------+--------+------------+----------+----------------+
|false|false|      false|    false|   false|       false|     false|           false|
+-----+-----+-----------+---------+--------+------------+----------+----------------+



In [24]:
test1 = immigration.select(
    'count', 'dest_city').filter(
    immigration['dest_city']=='NEW YORK').filter(
    immigration['dest_state']=='NY').filter(
    immigration['month']==6)
test1=test1.groupBy('dest_city').agg({'count':'sum'})

test1.show()

+---------+----------+
|dest_city|sum(count)|
+---------+----------+
| NEW YORK|    443474|
+---------+----------+



In [25]:
# test2 should match test1 
test2 = fact.select(
    'dest_city', 'imigration_count').filter(
    fact['dest_city']=='NEW YORK').filter(
    fact['dest_state']=='NY').filter(
    fact['month']=='6')
test2=test2.groupBy('dest_city').agg({'imigration_count':'sum'})

test2.show()

+---------+---------------------+
|dest_city|sum(imigration_count)|
+---------+---------------------+
| NEW YORK|               443474|
+---------+---------------------+



In [26]:
# Count the total count from the fact table
fact.select(sum('imigration_count').alias('total_count')).show()

+-----------+
|total_count|
+-----------+
|    1680508|
+-----------+



In [28]:
# Check table format and rows are correct 
print("Number of records: {r}\nColumns: {c}".format(
    r=fact.count(),c=len(fact.columns)
))

Number of records: 3258
Columns: 10


#### 4.3 Data dictionary 

**fact**
 - year(Integer): Calendar year YYYY
 - month(Integer): Calendar month 
 - dest_city(String): Name of city of arrival
 - dest_state(String): State of arrival abbriviation (e.x. NY)
 - org_country(String): Name of country of origin 
 - avg_temp(Double): Average monthly temperature of the city of arrvial
 - num_airport(long): Number of airports for the city of arrvial
 - population(Integer): Total popultaion of the city of arrvial
 - pct_foreign_born(Double): Percentage of total population that is foreign born in the city of arrvial
 - imigration_count(Long): Number of internatonal arrvials 
  
**immigration_ag**
 - year(Integer): Calendar year YYYY
 - month(Integer): Calendar month
 - dest_city_code(String): Code for city of arrival
 - dest_state(String): State of arrival abbriviation (e.x. NY)
 - age(Integer): Age of person
 - visa_type(Integer): Visa type listed on I94 (1: Business, 2: Pleasure, 3: Student)
 - gender(String): Gender of person listed on I94 (M: Male, F:Female)
 - dest_city(String): Name of city of arrival
 - org_country(String): Name of country of origin 
 - count(Long): Number of internatonal arrvials 
  
**temperature_avg**
 - city(String): Name of U.S. City 
 - month(Integer): Calendar month
 - avg_temp(Double): Average temperature in degrees Celsius
 - avg_temp_uncertainty(Double): Average temperature uncertanty in degrees Celsius
  
**airport**
 - airport_id(String): Airport ID code
 - city(String): Name of city the airport services 
 - state(String): State abbriviation (e.x. NY) for airport
 - name(String): Airport name
 - type(String): Type of airport (e.x. small_airport)
 - elevation_ft(Integer): Elevation of airport in feet

**demographics**
 - city(String): City name
 - state(String): State abbriviation (e.x. NY) for airport
 - population(Integer): Total popultaion of the city
 - median_age(Float): Median age of population in city
 - pct_male_population(Double): Percent male of population in city
 - pct_female_population(Double): Percent female of population in city
 - pct_foreign_born(Double): Percent foreign-born of population in city

#### Step 5: Complete Project Write Up
Given the size of the data and the frequency of updates, Apache Spark was chosen as the logical tool to processing the data. The fact table generated by Spark SQL is written back to the data lake as parquet files partitioned by city and state. The data should be updated at most monthly, to match the level of aggregation used. 

Under different scenarios the project would look different. 
 - If the data was increased by 100x, the process would remain similar but should leverage Apache Hadoop to distribute the processing for improved performance 
 - If the data needed to updated and reported daily, Airflow should be used to schedule and run the data pipeline. This could be scheduled to ensure that the data is available for the dashboard reporting time and any issues in the process could be appropriately identified and handled. 
 - If the data needed to be accessed by 100+ people, migrating to a data warehouse design on a cloud hosted database (e.g. AWS Redshift) would be more appropriate. 
