# City Immigration and Temperatures
### Data Engineering Capstone Project

#### Project Summary
This project performs extraction of data from two different sources - the I94 immigration data set and city temperature data from Kaggle. It allows users to make queries on the data, facilitating the analysis of the relationship between destination temperature, immigration counts and demographic features of immigrants. The data is modeled in a star schema and the fact table is written to parquet files.

In [27]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import re
from pprint import pprint
from itertools import islice

In [3]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
def spark_results_to_pandas(results):
    import pandas as pd
    df = pd.DataFrame(results, columns=results[0].__fields__)
    return df

### #1: Scope the Project and Gather Data

#### Scope 
The goal of this project is to pull data from two sources, and create fact and dimension tables to show the relations between immigration and temperature.

This project uses Python and Apache Spark to process this data. 

#### Describe and Gather Data 
1. I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and includes details on incoming international arrivals. The data includes information on country of origin, visa type, age, and port of entry [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)
2. World Temperature Data: comes from Kaggle and includes temperature data in the U.S. since from 1850 to 2013 [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

In [5]:
# Read in the data here
demo=spark.read.format("csv").option("delimiter", ";").option("header", "true").load("us-cities-demographics.csv")
airports=spark.read.format("csv").option("delimiter", ",").option("header", "true").load("airport-codes_csv.csv")
temperature=spark.read.format("csv").option("delimiter", ",").option("header", "true").load("GlobalLandTemperaturesByCity.csv")

In [7]:
i94 =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### #2: Explore and Assess the Data
#### Data Explorationa & Cleaning

# <i> i94 Data <i>

In [8]:
spark_results_to_pandas(i94.take(5))

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
i94_sas_labels = 'I94_SAS_Labels_Descriptions.SAS'

In [50]:
with open(i94_sas_labels) as f:
    lines = f.readlines()    

# use regex to parse I94_SAS_Labels_Descriptions.SAS file
prog1 = re.compile(r'^/\*\s+(?P<label>.+?)\s+$')
result1 = [prog1.match(c) for c in lines]

for parsed_line in result1:
    if parsed_line != None:
        print(parsed_line.group("label"))

I94YR - 4 digit year */
I94MON - Numeric month */
I94CIT & I94RES - This format shows all the valid and invalid codes for processing */
I94PORT - This format shows all the valid and invalid codes for processing */
ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a
I94MODE - There are missing values as well as not reported (9) */
I94ADDR - There is lots of invalid codes in this variable and the list below
DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that
I94BIR - Age of Respondent in Years */
I94VISA - Visa codes collapsed into three categories:
COUNT - Used for summary statistics */
DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use */
VISAPOST - Department of State where where Visa was issued - CIC does not use */
OCCUP - Occupation that will be performed in U.S. - CIC does not use */
ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use */
ENTDEPD - Departure Flag - Departed, l

In [67]:
prog2 = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_ports = {}
for line in lines[302:961]:
    res = prog2.search(line)
    valid_ports[res.group(1)]=[res.group(2)]
#pprint(valid_ports)
#list(islice(valid_ports, 50))

In [89]:
# 1. remove nulls
# 2. filter to include only valid ports
# 3. cast relevant columns into correct data types
# 4. select relevant columns into final dataframe

df_i94 = i94.filter(i94.i94addr.isNotNull())\
.filter(i94.i94res.isNotNull())\
.filter(i94.i94port.isin(list(valid_ports.keys())))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("i94res",col("i94res").cast("integer"))\
.withColumn("i94bir",col("i94bir").cast("integer"))\
.withColumn("count",col("count").cast("integer"))\
.withColumn("I94VISA",col("I94VISA").cast("integer"))\
.withColumn("i94cit",col("i94cit").cast("integer"))\
.withColumn("i94mode",col("i94mode").cast("integer"))

df_i94 = df_i94.select(
    col("I94YR").alias("year"),
    col("I94MON").alias("month"),
    col("I94RES").alias("origin_country_code"),
    col("i94cit").alias("origin_city_code"),
    col("i94port").alias("dest_city_code"),
    col("I94ADDR").alias("dest_state"),
    col("I94VISA").alias("visa_type"),
    col("I94BIR").alias("age"),
    col("GENDER").alias("gender"),
    col("i94mode").alias("mode_of_transport"),
    "count"
).drop_duplicates()

In [90]:
spark_results_to_pandas(df_i94.take(5))

Unnamed: 0,year,month,origin_country_code,origin_city_code,dest_city_code,dest_state,visa_type,age,gender,mode_of_transport,count
0,2016,4,103,103,NEW,FL,2,5,F,1,1
1,2016,4,124,103,NEW,TX,2,37,M,1,1
2,2016,4,689,103,NEW,NY,2,36,F,1,1
3,2016,4,104,104,NEW,FL,2,7,M,1,1
4,2016,4,104,104,NEW,NY,2,62,F,1,1


In [91]:
df_i94.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('origin_country_code', 'int'),
 ('origin_city_code', 'int'),
 ('dest_city_code', 'string'),
 ('dest_state', 'string'),
 ('visa_type', 'int'),
 ('age', 'int'),
 ('gender', 'string'),
 ('mode_of_transport', 'int'),
 ('count', 'int')]

# <i> Temperature Data </i>

In [44]:
spark_results_to_pandas(temperature.take(5))

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [76]:
@udf
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key][0].lower():
            return key

In [86]:
df_temp = temperature.filter(temperature["Country"]=="United States")\
                     .withColumn("month",month(temperature["dt"]))\
                     .withColumn("i94port", city_to_port(temperature.City))\
                     .withColumn("AverageTemperature",col("AverageTemperature").cast("float"))\
                     .withColumn("AverageTemperatureUncertainty",col("AverageTemperatureUncertainty").cast("float"))\
                     .na.drop(subset=["AverageTemperature"])

# Avg temperature for each city by month
df_temp_avg=df_temp.groupBy('i94port','month', 'city').agg({'AverageTemperature':'avg',
                                                 'AverageTemperatureUncertainty':'avg'})

df_temp_final=df_temp_avg.select(col("i94port").alias("city_code"),
                                 "city",
                               "month",
                               col("avg(AverageTemperature)").alias("AverageTemperature"),
                               col("avg(AverageTemperatureUncertainty)").alias("AverageTemperatureUncertainty")).drop_duplicates()

In [88]:
spark_results_to_pandas(df_temp_final.take(5))

Unnamed: 0,city_code,city,month,AverageTemperature,AverageTemperatureUncertainty
0,SBN,South Bend,7,23.103946,1.182562
1,RDU,Durham,8,24.555035,1.161132
2,,Overland Park,11,5.551698,1.275236
3,SYR,Syracuse,10,8.902348,1.192456
4,SDP,San Diego,10,17.961799,0.675427


In [87]:
df_temp_final.dtypes

[('city_code', 'string'),
 ('city', 'string'),
 ('month', 'int'),
 ('AverageTemperature', 'double'),
 ('AverageTemperatureUncertainty', 'double')]

### #3: Define the Data Model
#### 3.1 Conceptual Data Model

##### Dimension Tables
    1. temp_table
        A. city_code
        B. city
        C. month
        D. AverageTemperature
        E. AverageTemperatureUncertainty

    2. demographics_table
        A. year
        B. month
        C. origin_country_code
        D. origin_city_code
        E. dest_city_code
        F. dest_state
        G. visa_type
        H. age
        I. gender
        J. mode_of_transport
        K. count


##### Fact Table
    immigration_table
        A. year
        B. month
        C. origin_country_code
        D. origin_city_code
        E. dest_city_code
        F. dest_state
        G. visa_type
        H. age
        I. gender
        J. mode_of_transport
        K. count
        L. AverageTemperature

#### 3.2 Mapping Out Data Pipelines
<b> Steps necessary to pipeline the data into the chosen data model </b>

1. Clean the data
2. Create dimension tables for demographics, I94 and temperature datasets
2. Create the fact table by joining both dimension tables on city_code and dest_city_code, and write to parquet file partitioned by city_code

### #4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [94]:
df_immigration = i94.filter(i94.i94addr.isNotNull())\
.filter(i94.i94res.isNotNull())\
.filter(i94.i94port.isin(list(valid_ports.keys())))\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("i94res",col("i94res").cast("integer"))\
.withColumn("i94bir",col("i94bir").cast("integer"))\
.withColumn("count",col("count").cast("integer"))\
.withColumn("I94VISA",col("I94VISA").cast("integer"))\
.withColumn("i94cit",col("i94cit").cast("integer"))\
.withColumn("i94mode",col("i94mode").cast("integer"))

df_immigration = df_immigration.select(
    col("I94YR").alias("year"),
    col("I94MON").alias("month"),
    col("I94RES").alias("origin_country_code"),
    col("i94cit").alias("origin_city_code"),
    col("i94port").alias("dest_city_code"),
    col("I94ADDR").alias("dest_state"),
    col("I94VISA").alias("visa_type"),
    col("I94BIR").alias("age"),
    col("GENDER").alias("gender"),
    col("i94mode").alias("mode_of_transport"),
    "count"
).drop_duplicates()

df_immigration.write \
    .mode("append") \
    .partitionBy("dest_city_code") \
    .parquet("/tables/immigration.parquet")

df_temperature = temperature.filter(temperature["Country"]=="United States")\
                     .withColumn("month",month(temperature["dt"]))\
                     .withColumn("i94port", city_to_port(temperature.City))\
                     .withColumn("AverageTemperature",col("AverageTemperature").cast("float"))\
                     .withColumn("AverageTemperatureUncertainty",col("AverageTemperatureUncertainty").cast("float"))\
                     .na.drop(subset=["AverageTemperature"])

# Avg temperature for each city by month
df_temp_avg=df_temperature.groupBy('i94port','month', 'city').agg({'AverageTemperature':'avg',
                                                 'AverageTemperatureUncertainty':'avg'})

df_temp_final=df_temp_avg.select(col("i94port").alias("city_code"),
                                 "city",
                               "month",
                               col("avg(AverageTemperature)").alias("AverageTemperature"),
                               col("avg(AverageTemperatureUncertainty)").alias("AverageTemperatureUncertainty")).drop_duplicates()
df_temp_final.write \
    .mode("append") \
    .partitionBy("city_code") \
    .parquet("/tables/temperature.parquet")

In [95]:
# Create temporary views of the immigration and temperature data
df_immigration.createOrReplaceTempView("immigration_view")
df_temp_final.createOrReplaceTempView("temp_view")

In [99]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql("""
SELECT immigration_view.year as year,
       immigration_view.month as month,
       immigration_view.origin_country_code as origin_country_code,
       immigration_view.origin_city_code as origin_city_code,
       immigration_view.dest_city_code as dest_city_code,
       immigration_view.dest_state as dest_state,
       immigration_view.visa_type as visa_type,
       immigration_view.age as age,
       immigration_view.gender as gender,
       immigration_view.mode_of_transport as mode_of_transport,
       immigration_view.count as immigration_count,
       temp_view.AverageTemperature as avg_temp
FROM immigration_view
JOIN temp_view ON immigration_view.dest_city_code = temp_view.city_code
ORDER BY 
    immigration_view.dest_state
""")

In [100]:
spark_results_to_pandas(fact_table.take(5))

Unnamed: 0,year,month,origin_country_code,origin_city_code,dest_city_code,dest_state,visa_type,age,gender,mode_of_transport,immigration_count,avg_temp
0,2016,4,104,111,SFR,..,1,55,,1,1,10.251915
1,2016,4,104,111,SFR,..,1,55,,1,1,19.556291
2,2016,4,104,111,SFR,..,1,55,,1,1,18.391352
3,2016,4,104,111,SFR,..,1,55,,1,1,16.024335
4,2016,4,104,111,SFR,..,1,55,,1,1,8.275661


In [101]:
# Write fact table to parquet files partitioned by dest_city_code
fact_table.write \
    .mode("append") \
    .partitionBy("dest_city_code") \
    .parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks

Run Quality Checks

In [104]:
# Perform quality checks here

def count_check(df):
    return df.count() == 0 

def integrity_check(df_immigration, df_temp):
    return df_immigration.select(col("dest_city_code")).distinct() \
         .join(df_temp, df_immigration["dest_city_code"] == df_temp["city_code"], "left_anti") \
         .count() == 0

def quality_check(df_immigration, df_temp):
    return count_check(df_immigration) and count_check(df_temp) \
        and integrity_check(df_immigration, df_temp)

# Perform data quality check - should return False to pass checks
print(quality_check(df_immigration, df_temp_final))

False


#### 4.3 Data dictionary 
Brief description of what the data is and where it came from.

##### Dimension Tables
    1. temp_table
        A. city_code: Code of US city
        B. city: Name of US city
        C. month: Calendar month
        D. AverageTemperature: average temperature in city, in degrees celsius
        E. AverageTemperatureUncertainty: variation in average temperature in city, in degrees celsius

    2. demographics_table
        A. year: Calendar year YYYY
        B. month: Calendar month
        C. origin_country_code: Code of country of departure
        D. origin_city_code: Code of city of departure
        E. dest_city_code: Code of US city of arrival
        F. dest_state: US state of arrival (two-letter abbreviation)
        G. visa_type: Visa type listed on I94 (1: Business, 2: Pleasure, 3: Student)
        H. age: person's age
        I. gender: Gender of person listed on I94 (M: Male, F:Female)
        J. mode_of_transport: mode of transport taken to US (eg. air arrival)
        K. count: no. of arrivals


##### Fact Table
    immigration_table
        A. year: Calendar year YYYY
        B. month: Calendar month
        C. origin_country_code: Code of country of departure
        D. origin_city_code: Code of city of departure
        E. dest_city_code: Code of US city of arrival
        F. dest_state: US state of arrival (two-letter abbreviation)
        G. visa_type: Visa type listed on I94 (1: Business, 2: Pleasure, 3: Student)
        H. age: person's age
        I. gender: Gender of person listed on I94 (M: Male, F:Female)
        J. mode_of_transport: mode of transport taken to US (eg. air arrival)
        K. count: no. of arrivals
        L. AverageTemperature: average temperature in city

#### #5: Final Project Write Up
Given the size of the data and the frequency of updates, Apache Spark was chosen as the logical tool to processing the data since it can handle large amounts of data simply by scaling up the hardware. Spark can be unit tested locally with Hive support and in production to save to S3. 

The data is best updated monthly, to match the level of aggregation used. 

Approaching the problem differently under different scenarios:
 * The data was increased by 100x: <br>
 Scale up the number of spark workers used in processing the data. Spark workers can be provisioned only as and when needed since the data is processed as a batch job every month, with an autoscaling service or serverless infrastructure. <br><br>
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: <br>
    Airflow should be used to schedule and run the data pipeline. Airflow is able to schedule data processing and ensure that data is available at a pre determined timing every day. <br><br>
 * The database needed to be accessed by 100+ people: <br>
    It might be more appropriate to migrate the data warehouse onto a cloud hosted database.