# City Immigration and Temperatures
### Data Engineering Capstone Project

#### Project Summary
The project aims to answer US immigration from factors of immigrant demographics, destination city and temperature, and arrival date in the US. We extract data from two different sources, the I94 immigration dataset of 2016, and city temperature data from Kaggle. We design 3 dimension tables covering immigrant demographics spanning age, gender, and visa type, destination city and average temperature of 2013, and arrival date with flags on day of week, week of year, and month, and 1 fact table on immigration number. We use Spark for ETL jobs, and store result in parquet for downstream analysis.


In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
import pyspark.sql.types as t
from datetime import datetime
from datetime import timedelta
import re
from pprint import pprint

In [2]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
# Create helper to convert spark to pandas dataframe 
def spark_to_pandas(results):
    import pandas as pd
    df = pd.DataFrame(results, columns=results[0].__fields__)
    return df

### #1: Scope the Project and Gather Data

#### Scope 
The goal of this project is to pull data from two sources, and create fact and dimension tables to show immigration pattern with factors of immigrant demographics, destination city temperature, and seasonality. 
 

#### Describe and Gather Data 
1. I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and includes details on incoming international arrivals. The data includes information on country of origin, visa type, age, and port of entry [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)
2. World Temperature Data: comes from Kaggle and includes temperature data in the U.S. since from 1850 to 2013 [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

In [4]:
# Read in data sources
demo = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("us-cities-demographics.csv")
airports = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("airport-codes_csv.csv")
i94 = spark.read.format('com.github.saurfang.sas.spark').load('data/i94_apr16_sub.sas7bdat')
temperature = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("GlobalLandTemperaturesByCity.csv")

### #2: Explore and Assess the Data
#### Data Explorationa & Cleaning

### <i> i94 Data <i>

In [5]:
i94.count()

3096313

In [6]:
spark_to_pandas(i94.take(3))

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2


In [7]:
# Create helper to parse/map variable labels in SAS data files

i94_sas_labels = 'I94_SAS_Labels_Descriptions.SAS'
with open(i94_sas_labels) as f:
    lines = f.readlines()    

prog1 = re.compile(r'^/\*\s+(?P<label>.+?)\s+$')
result1 = [prog1.match(c) for c in lines]

for parsed_line in result1:
    if parsed_line != None:
        print(parsed_line.group("label"))

I94YR - 4 digit year */
I94MON - Numeric month */
I94CIT & I94RES - This format shows all the valid and invalid codes for processing */
I94PORT - This format shows all the valid and invalid codes for processing */
ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a
I94MODE - There are missing values as well as not reported (9) */
I94ADDR - There is lots of invalid codes in this variable and the list below
DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that
I94BIR - Age of Respondent in Years */
I94VISA - Visa codes collapsed into three categories:
COUNT - Used for summary statistics */
DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use */
VISAPOST - Department of State where where Visa was issued - CIC does not use */
OCCUP - Occupation that will be performed in U.S. - CIC does not use */
ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use */
ENTDEPD - Departure Flag - Departed, l

In [8]:
# Create helper to validate destination columns are valid state shorthand names
valid_state = demo.toPandas()['State Code'].unique()
#print(valid_state)

valid_state_num = demo.toPandas()['State Code'].nunique()
#print(valid_state_num)

In [9]:
# Create helper to screen valid ports 
prog2 = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_ports = {}
for line in lines[302:961]:
    res = prog2.search(line)
    valid_ports[res.group(1)]=[res.group(2)]
#pprint(valid_ports)

In [10]:
# Create helper to convert SAS date to PySpark date 

def date_add_(date, days):
    if type(date) is not datetime:
        date = datetime.strptime('1960-01-01', "%Y-%m-%d")
    return date + timedelta(days)

date_add_udf = F.udf(date_add_, t.DateType())

In [11]:
# Create cleaned dataframe for i94

df_i94 = i94.filter(i94.i94addr.isNotNull())\
            .filter(i94.i94addr.isin(list(valid_state)))\
            .filter(i94.i94port.isNotNull())\
            .filter(i94.i94port.isin(list(valid_ports.keys())))\
            .withColumn("sas_date", lit("1960-01-01"))\
            .withColumn('newdate', date_add_udf(F.to_date('sas_date'), 'arrdate'))\
            .withColumn("id", monotonically_increasing_id())\
            .withColumn("i94yr",col("i94yr").cast("integer"))\
            .withColumn("i94mon",col("i94mon").cast("integer"))\
            .withColumn("i94bir",col("i94bir").cast("integer"))\
            .withColumn("count",col("count").cast("integer"))\
            .withColumn("I94VISA",col("I94VISA").cast("integer"))\


df_i94 = df_i94.select(col("id").alias("uuid"), 
                       col("newdate").alias("date"),
                       col("i94port").alias("city_code"),
                       col("i94addr").alias("state_code"),
                       col("i94bir").alias("age"),
                       col("gender").alias("gender"),
                       col("i94visa").alias("visa_type"),
                       "count").drop_duplicates()

In [12]:
spark_to_pandas(df_i94.take(3))

Unnamed: 0,uuid,date,city_code,state_code,age,gender,visa_type,count
0,107,2016-04-01,NEW,NY,54,M,2,1
1,473,2016-04-01,SFR,CA,51,M,1,1
2,486,2016-04-01,SFR,CA,48,M,2,1


In [13]:
df_i94.dtypes

[('uuid', 'bigint'),
 ('date', 'date'),
 ('city_code', 'string'),
 ('state_code', 'string'),
 ('age', 'int'),
 ('gender', 'string'),
 ('visa_type', 'int'),
 ('count', 'int')]

### <i> Temperature Data </i>

In [14]:
temperature.count()

8599212

In [15]:
spark_to_pandas(temperature.take(3))

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [16]:
# Create helper to map city full name to city shorthand name
# Use @udf to turn Python function into PySpark function

@udf  
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key][0].lower():
            return key

In [17]:
# Create cleaned dataframe for average temperature

df_temp = temperature.filter(temperature["Country"]=="United States")\
                     .withColumn("year", year(temperature['dt']))\
                     .withColumn("month",month(temperature["dt"]))\
                     .withColumn("i94port", city_to_port(temperature["City"]))\
                     .withColumn("AverageTemperature",col("AverageTemperature").cast("float"))\
                     .na.drop(subset=["i94port"])


df_temp = df_temp.filter(df_temp['year'] == 2013)\
                 .groupBy('i94port','city').agg({'AverageTemperature':'avg'})


df_temp = df_temp.select(col("i94port").alias("city_code"),
                         col("city").alias("city_name"),
                         col("avg(AverageTemperature)").alias("2013_average_temperature")).drop_duplicates()

In [18]:
spark_to_pandas(df_temp.take(3))

Unnamed: 0,city_code,city_name,2013_average_temperature
0,SYR,Syracuse,10.180111
1,NYC,New York,12.163889
2,MIL,Milwaukee,11.586889


In [19]:
df_temp.dtypes

[('city_code', 'string'),
 ('city_name', 'string'),
 ('2013_average_temperature', 'double')]

### #3: Define the Data Model
#### 3.1 Conceptual Data Model

##### Staging Tables

    df_i94
        A. uuid
        B. date
        E. city_code
        F. state_code
        G. age
        H. gender
        I. visa_type
        J. count
        
    df_temp
        A. city_code
        B. city_name
        C. 2013_average_temperature

##### Dimension Tables

    df_immigrant
        A. uuid
        B. age
        C. gender
        D  visa_type
   
    df_city
        A. city_code
        B. 2013_average_temperature
        
    df_time
        A. date
        B. dayofweek
        C. weekofyear
        D. month
        
##### Fact Table

    df_immigration
        A. uuid
        B. city_code
        C. date
        D. count
        
#### 3.2 Mapping Out Data Pipelines
<b> Steps necessary to pipeline the data into the chosen data model </b>

1. Clean the data on nulls, data types, duplicates, etc
2. Load staging tables for df_i94 and df_temp
3. Create dimension tables for df_immigrant, df_city, df_time 
4. Create fact table df_immigration with information on immigration count, mapping uuid in df_immigrant, city_code in df_city, and date in df_time ensuring referntial integrity
4. Save processed dimension and fact tables in parquet for downstream query

### #4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [20]:
# Create dimension table for immigrant

df_immigrant = df_i94.select("uuid",
                             "age",
                             "gender",
                             "visa_type").drop_duplicates()

In [21]:
spark_to_pandas(df_immigrant.take(5))

Unnamed: 0,uuid,age,gender,visa_type
0,1811,73,F,2
1,2280,39,F,2
2,73917,6,M,2
3,158057,47,M,2
4,8589952282,31,F,1


In [22]:
# Create dimension table for city

df_city = df_temp.select("city_code",
                         "2013_average_temperature").drop_duplicates()

In [23]:
spark_to_pandas(df_city.take(5))

Unnamed: 0,city_code,2013_average_temperature
0,HAR,12.329111
1,LCB,24.102222
2,KAN,13.989667
3,RCM,15.955445
4,OTM,23.564555


In [24]:
# Create dimension table for time

df_time = df_i94.withColumn("day", F.dayofweek("date"))\
                .withColumn("week", F.weekofyear("date"))\
                .withColumn("month", F.month("date"))
                        
df_time = df_time.select("date", 
                         "day", 
                         "week", 
                         "month").drop_duplicates()

In [25]:
spark_to_pandas(df_time.take(5))

Unnamed: 0,date,day,week,month
0,2016-04-08,6,14,4
1,2016-04-14,5,15,4
2,2016-04-11,2,15,4
3,2016-04-17,1,15,4
4,2016-04-05,3,14,4


In [26]:
# Create fact table for immigration
        
df_immigration = df_i94.select("uuid", 
                              "city_code", 
                              "date", 
                              "count").drop_duplicates()

In [27]:
spark_to_pandas(df_immigration.take(5))

Unnamed: 0,uuid,city_code,date,count
0,1365,LOS,2016-04-01,1
1,14519,SFB,2016-04-01,1
2,52056,NYC,2016-04-01,1
3,92333,LOS,2016-04-01,1
4,108719,MIA,2016-04-02,1


#### 4.2 Data Quality Checks

Run Quality Checks

In [28]:
# Perform quality checks here

def qc_df(df):
    if df is not None:
        return True
    else:
        return False
        
if qc_df(df_immigrant) & qc_df(df_city) & qc_df(df_time) & qc_df(df_immigration):
    print("data quality check passed")
    print("dimension tables and fact table exist")
    print()
else:
    print("data quality check failed")
    print("table missing...")

data quality check passed
dimension tables and fact table exist



In [29]:
def qc_records(df):
    return df.count() != 0 

if qc_records(df_immigrant) & qc_records(df_city) & qc_records(df_time) &  qc_records(df_immigration):
    print("data quality check passed!")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed!")
    print("null records...")

data quality check passed!
dimension tables and fact table contain records



#### 4.3 Data dictionary 
Brief description of what the data is and where it came from.


##### Dimension Tables

    df_immigrant
        A. uuid: unique id of immigrants. System generated.
        B. age: immigrant's age
        C. gender: immigrant's gender (M: Male, F: Female)
        D  visa_type: visa type as listed on I94 (1: Business, 2: Pleasure, 3: Student)
   
    df_city
        A. city_code: US city of arrival
        B. 2013_average_temperature: 2013 average temperature of city of arrival 
        
    df_time
        A. date: arrival date YYYY-MM-DD
        B. dayofweek: day of the week for arrival
        C. weekofyear: week of the year for arrival
        D. month: month of the date for arrival
        

##### Fact Table

    df_immigration
        A. uuid: unique id of immigrants. System generated
        B. city_code: US city of arrival
        C. date: arrival date YYYY-MM-DD
        D. count: number of arrival

#### #5: Final Project Write Up
Spark is chosen for this project as it is known for processing large amount of data fast (with in-memory compute), scale easily with additional worker nodes, with ability to digest different data formats (e.g. SAS, Parquet, CSV), and integrate nicely with cloud storage like S3 and warehouse like Redshift. 

The data update cycle is typically chosen on two criteria. One is the reporting cycle, the other is the availabilty of new data to be fed into the system. For example, if new batch of average temperature can be made available at monthly interval, we might settle for monthly data refreshing cycle.  

There are also considerations in terms of scaling existing solution. 
 * If the data was increased by 100x: <br>
 We can consider spinning up larger instances of EC2s hosting Spark and/or additional Spark work nodes. With added capacity arising from either vertical scaling or horizontal scaling, we should be able to accelerate processing time.  <br><br>
 * If the data populates a dashboard that must be updated on a daily basis by 7am every day: <br>
    We can consider using Airflow to schedule and automate the data pipeline jobs. Built-in retry and monitoring mechanism can enable us to meet user requirement. <br><br>
 * if the database needed to be accessed by 100+ people: <br>
    We can consider hosting our solution in production scale data warehouse in the cloud, with larger capacity to serve more users, and workload management to ensure equitable usage of resources across users. 