# Immigration Data Modelling

### Data Engineering Capstone Project

#### Project Summary

The aim of this project is to build a data processing pipeline, that import i94 immigration related data from various data source, transform and clean the data before storing to a data warehouse designed for immigration data.

The data warehouse will support various queries to allow user to gain insight to immigration related problems, such as travel purpose, vsia type and other more complex queries.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import configparser
import datetime
import os
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
from pyspark.sql.functions import udf, col, monotonically_increasing_id, split, trim, asc, first, upper, desc, initcap, to_date, mean, lit
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType, StringType, DoubleType, DecimalType, IntegerType
import psycopg2

In [2]:
config = configparser.ConfigParser()
config.read("dl.cfg")

AWS_ACCESS_KEY_ID = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config['AWS']['AWS_SECRET_ACCESS_KEY']
HOST = config['CLUSTER']['HOST']
DB_NAME = config['CLUSTER']['DB_NAME']
DB_USER = config['CLUSTER']['DB_USER']
DB_PASSWORD = config['CLUSTER']['DB_PASSWORD']
DB_PORT = config['CLUSTER']['DB_PORT']

In [3]:
#create spark session
spark = SparkSession.builder\
                    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The aim of this project is to create a data warehouse for the immigration related data. The data will be loaded to the fact and dimension tables. The following dataset will be used.

#### Describe and Gather Data 
1. I94 immigration data, https://travel.trade.gov/research/reports/i94/historical/2016.html . The data is provied as sas7bat format with the following schema

| Field name | Description |
| ---------- | ----------- |
| CICID | Primary Key |
| I94YR | Year |
| I94MON | Month |
| I94CIT | Country of CitizenShip |
| I94RES | Country of Residency |
| I94PORT | Port of Arrival |
| ARRDATE | Arrivate Date |
| I94MODE | Transportation Mode |
| I94ADDR | State of Arrival |
| DEPDATE | Departure Date |
| I94BIR | Age |
| I94VISA | Visa Category |
| COUNT | Number of people |
| DTADFILE | Character Date |
| VISAPOST | Department issuing Visa |
| OCCUP | Occupation |
| ENTDEPA | Arrival Flag |
| ENTDEPD | Departure Flag |
| ENTDEPU | Update Flag |
| MATFLAG | Match Flag |
| BIRYEAR | Year of Birth |
| DTADDTO | Character Date |
| GENDER | Gender |
| INSNUM | INS Number |
| AIRLINE | Airline |
| ADMNUM | Admission Number |
| FLTNO | Flight Number |
| VISATYPE | Visa Type |

    

In [4]:
#go thru all the sas7bbat file
i94_files = []
for root, directory, files in os.walk('../../data'):
    for file in files:
        if file.endswith('.sas7bdat'):
            i94_files.append(os.path.join(root, file))
print (f'number of sas7bdat files: {len(i94_files)}')

number of sas7bdat files: 12


In [5]:
i94Df = spark.read.format('com.github.saurfang.sas.spark').load(i94_files[0])
i94Df.printSchema()
print(f"number of records: {i94Df.count()}")
i94Df.show(5)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 


2. World temperature data, https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data . The data is provided as csv file with the following schema

|                      Field Name |                             Description | Sample Data |
| ------------------------------- | --------------------------------------- | ----------- |
| dt                              |                                    Date |  1743-11-01 |
| Average Temperature             |             Monthly Average Temperature |       6.068 |
| Average Temperature Uncertainty | Monthly Average Temperature Uncertainty |       1.737 |
| City                            |                                    City |       Århus |
| Country                         |                                 Country |     Denmark |
| Latitude                        |                                Latitude |      57.05N |
| Longitude                       |                               Longitude |      10.33E | 
    



In [6]:
weatherDf = spark.read.format('csv').option('sep', ",").option("inferSchema", "true").option("header", "true").load('GlobalLandTemperaturesByCity.csv')
weatherDf.printSchema()
print (f'number of records: {weatherDf.count()}')
weatherDf.show(5)

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

number of records: 8599212
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null

3. U.S. city demographics data, https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/ . The data is provided as csv file with the following schema

|             Field Name |                     Description | Sample Data | 
| ---------------------- | ------------------------------- | ----------- |
|                   City |                            City |      Newark |
|                  State |                           State |  New Jersey |
|             Median Age |    Median Age of the population |        34.6 |
|        Male Population |         Population that is Male |      138040 |
|      Female Population |       Population that is Female |      143873 |
|       Total Popultaion |          Population of the city |      281913 |
|     Number of Veterans |          Population of Veterans |        5829 |
|           Foreign-born | Population that is foreign-born |       86253 |
| Average HouseHold Size |       Average size of household |        2.73 |
|             State Code |                   US State Code |          NJ |
|                   Race |                            Race |       White |
|                  Count |          Population of the Race |       76402 |
    



In [7]:
demographicsDf = spark.read.format('csv').option('sep', ";").option('inferSchema', "true").option("header", "true").load("us-cities-demographics.csv")
demographicsDf.printSchema()
print (f'number of records: {demographicsDf.count()}')
demographicsDf.show(5)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)

number of records: 2891
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-------

4. Airport Code Table, https://datahub.io/core/airport-codes#data . The data is provided as csv file with the following schema

|   Field Name |                           Description |                        Sample Data |
| ------------ | ------------------------------------- | ---------------------------------- |
|        ident |                    airport identifier |                                00A |
|         type |                       type of airport |                           heliport |
|         name |                       name of airport |                  Total Rf Heliport |
| elevation_ft |                             Elevation |                                 11 |
|    continent |                             Continent |                                 NA |
|  iso_country |                      iso country code |                                 US |
|   iso_region |      country and state of the airport |                              US-PA |
| municipality |                       city of airport |                           Bensalem |
|     gps_code |                              GPS Code |                                00A |
|    iata_code |                        IATA code      |                                    |
|   local_code |                       local code      |                                00A |
|  coordinates | latitude and longitude of the airport | -74.93360137939453, 40.07080078125 |


In [8]:
airportDf = spark.read.format('csv').option("sep", ",").option("inferSchema", "true").option("header", "true").load("airport-codes_csv.csv")
airportDf.printSchema()
print (f"number of records: {airportDf.count()}")
airportDf.show(5)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

number of records: 55075
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [15]:
#extract detail information from imigration label file

country = { 'code' : [], 'country' : []}
port = { 'code': [], 'city' : [], 'state' : []}
mode = { 'code' : [], 'mode' : []}
address = { 'code' : [], 'state' : []}
visa = { 'code': [], 'type' : []}

#united states seems to be missing in the i94 country mapping
country['code'].append(888)
country['country'].append('United States')
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    lines = f.readlines()

    isCountry = isVisa = isAddr = isPort = isMode = False
    
    for line in lines:
        line = line.strip()
        
        if line.startswith('value i94cntyl'):
            isCountry = True
            isVisa = isAddr = isPort = isMode = False
        elif line.startswith('value $i94prtl'):
            isPort = True
            isCountry = isVisa = isAddr = isMode = False
        elif line.startswith('value i94model'):
            isMode = True
            isCountry = isVisa = isAddr = isPort = False
        elif line.startswith('value i94addrl'):
            isAddr = True
            isCountry = isVisa = isPort = isMode = False
        elif line.startswith('/* I94VISA'):
            isVisa = True
            isCountry = isAddr = isPort = isMode = False
            
        try:
            if isCountry and '=' in line:
                data = line.split('=')
                if len(data) >= 2:
                    code = int(data[0].strip())
                    c = data[1].replace(';', '').replace("'", '').strip()
                    if c:
                        country['country'].append(c.title())
                        country['code'].append(code)
            elif isPort and '=' in line:
                data = line.split('=')
                if len(data) >= 2:
                    code = data[0].replace("'", "").strip()
                    pair = data[1].strip().replace("'", "").split(',')
                    if len(pair) >= 2:
                        city = pair[0].strip()
                        state = pair[1].strip()
                    elif len(pair) == 1:
                        city = pair[0].strip()
                        state = 'NA'
                    if code and city and state:
                        port['code'].append(code)
                        port['city'].append(city.title()) #convert each word to lower case, except first character
                        port['state'].append(state.title())
            elif isMode and '=' in line:
                data = line.split('=')
                if len(data) >= 2:
                    code = int(data[0].strip())
                    m = data[1].replace(';', '').replace("'", "").strip()
                    if m:
                        mode['code'].append(code)
                        mode['mode'].append(m)
            elif isAddr and '=' in line:
                data = line.split('=')
                if len(data) >= 2:
                    code = data[0].replace("'", '').strip()
                    state = data[1].replace("'", '').replace(";", '').strip()
                    if code and state:
                        address['code'].append(code)
                        address['state'].append(state.title())
            elif isVisa and '=' in line:
                data = line.split('=')
                if len(data) >= 2:
                    code = int(data[0].strip())
                    t = data[1].replace(';', '').replace("'", "").strip()
                    if t:
                        visa['code'].append(code)
                        visa['type'].append(t)
        except Exception as e:
            print (f'fail, catch exception: {e}, processing line: {line}')
        

In [16]:
#create Country dataframe
countryDf = spark.createDataFrame(pd.DataFrame(country))
countryDf.printSchema()
countryDf.count()
countryDf.show(5)

root
 |-- code: long (nullable = true)
 |-- country: string (nullable = true)

+----+--------------------+
|code|             country|
+----+--------------------+
| 888|       United States|
| 582|Mexico Air Sea, A...|
| 236|         Afghanistan|
| 101|             Albania|
| 316|             Algeria|
+----+--------------------+
only showing top 5 rows



In [17]:
#create Port dataframe
portDf = spark.createDataFrame(pd.DataFrame(port))
portDf.printSchema()
portDf.count()
portDf.show(5)

root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)

+----+--------------------+-----+
|code|                city|state|
+----+--------------------+-----+
| ALC|               Alcan|   Ak|
| ANC|           Anchorage|   Ak|
| BAR|Baker Aaf - Baker...|   Ak|
| DAC|       Daltons Cache|   Ak|
| PIZ|Dew Station Pt La...|   Ak|
+----+--------------------+-----+
only showing top 5 rows



In [18]:
#create transportation mode dataframe
modeDf = spark.createDataFrame(pd.DataFrame(mode))
modeDf.printSchema()
modeDf.count()
modeDf.show(5)

root
 |-- code: long (nullable = true)
 |-- mode: string (nullable = true)

+----+------------+
|code|        mode|
+----+------------+
|   1|         Air|
|   2|         Sea|
|   3|        Land|
|   9|Not reported|
+----+------------+



In [19]:
#create address dataframe
addressDf = spark.createDataFrame(pd.DataFrame(address))
addressDf.printSchema()
addressDf.count()
addressDf.show(5)

root
 |-- code: string (nullable = true)
 |-- state: string (nullable = true)

+----+----------+
|code|     state|
+----+----------+
|  AL|   Alabama|
|  AK|    Alaska|
|  AZ|   Arizona|
|  AR|  Arkansas|
|  CA|California|
+----+----------+
only showing top 5 rows



In [20]:
#create visa dataframe
visaDf = spark.createDataFrame(pd.DataFrame(visa))
visaDf.printSchema()
visaDf.count()
visaDf.show(5)

root
 |-- code: long (nullable = true)
 |-- type: string (nullable = true)

+----+--------+
|code|    type|
+----+--------+
|   1|Business|
|   2|Pleasure|
|   3| Student|
+----+--------+



#### Cleaning Steps
Document steps necessary to clean the data

In [21]:
# clean i94 dataframe
#join with countryDf
spark_i94_df = i94Df

spark_i94_df = spark_i94_df.select(col('cicid').cast(IntegerType()).alias('id'), 
                                   col('i94yr').cast(IntegerType()).alias('year'), \
                                   col('i94mon').cast(IntegerType()).alias('month'), \
                                   col('i94cit').cast(IntegerType()).alias('citizen'), \
                                   col('i94res').cast(IntegerType()).alias('resident'), \
                                   col('i94port').alias('port'), \
                                   col('arrdate').cast(IntegerType()), \
                                   col('i94mode').cast(IntegerType()).alias('transport_mode'), \
                                   col('i94addr').alias('arrival_state'), \
                                   col('depdate').cast(IntegerType()), \
                                   col('i94visa').cast(IntegerType()).alias('visa_category'), \
                                   col('occup').alias('occupation'), \
                                   col('biryear').cast(IntegerType()).alias('birth_year'), \
                                   col('gender'), \
                                   col('i94bir').cast(IntegerType()).alias('age'), \
                                   col('visatype').alias('visa_type')
                                  )
#convert transportation mode
#spark_i94_df = spark_i94_df.join(modeDf, col('transport_mode') == modeDf.code, how='left').drop('code', 'transport_mode') \
#                           .withColumnRenamed("mode", "transport_mode")
#convert visa category
#spark_i94_df = spark_i94_df.join(visaDf, col('visa_category') == visaDf.code, how = 'left').drop('code', 'visa_category') \
#                           .withColumnRenamed('type', 'visa_type')
#translate state code
#spark_i94_df = spark_i94_df.join(addressDf, col('arrival_state') == addressDf.code, how='left').drop('code', 'arrival_state') \
#                           .withColumnRenamed('state', 'arrival_state')
#translate port city/state
spark_i94_df = spark_i94_df.join(portDf, col('port') == portDf.code).drop('port', 'code') \
                           .withColumnRenamed('city', 'port_city').withColumnRenamed('state', 'port_state')
#convert arrival date
get_date = udf(lambda x: (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat() if x else None)
spark_i94_df = spark_i94_df.withColumn('arrival_date', get_date(spark_i94_df.arrdate)) \
                           .withColumn('departure_date', get_date(spark_i94_df.depdate)) \
                           .drop('arrdate', 'depdate')

In [22]:
spark_i94_df.printSchema()
spark_i94_df.show(10)
spark_i94_df.count()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- citizen: integer (nullable = true)
 |-- resident: integer (nullable = true)
 |-- transport_mode: integer (nullable = true)
 |-- arrival_state: string (nullable = true)
 |-- visa_category: integer (nullable = true)
 |-- occupation: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- port_city: string (nullable = true)
 |-- port_state: string (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- departure_date: string (nullable = true)

+------+----+-----+-------+--------+--------------+-------------+-------------+----------+----------+------+---+---------+---------+----------+------------+--------------+
|    id|year|month|citizen|resident|transport_mode|arrival_state|visa_category|occupation|birth_year|gender|age|visa_typ

3096313

In [23]:
# clean weather dataframe
spark_weather_df = weatherDf.filter(col('Country') == 'United States') \
                            .select(col('dt'), col("AverageTemperature").alias('average_temperature'), \
                                    col('City').alias('city'), col('Country').alias('country')
                                    ).orderBy(asc('dt'))

In [24]:
spark_weather_df.printSchema()
spark_weather_df.show(10)
spark_weather_df.count()

root
 |-- dt: timestamp (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

+-------------------+-------------------+----------+-------------+
|                 dt|average_temperature|      city|      country|
+-------------------+-------------------+----------+-------------+
|1743-11-01 00:00:00|  8.129999999999999|   Atlanta|United States|
|1743-11-01 00:00:00|              5.371|Bridgeport|United States|
|1743-11-01 00:00:00|              3.209|     Akron|United States|
|1743-11-01 00:00:00|              2.208| Ann Arbor|United States|
|1743-11-01 00:00:00|              5.339| Arlington|United States|
|1743-11-01 00:00:00|              5.339|Alexandria|United States|
|1743-11-01 00:00:00|              3.264| Allentown|United States|
|1743-11-01 00:00:00|              3.015|    Aurora|United States|
|1743-11-01 00:00:00|              5.339| Baltimore|United States|
|1743-11-01 00:00:00|      

687289

In [25]:
# clean demographics dataframe
spark_demographics_df = demographicsDf.select(col('City').alias('city'), col('State').alias('state'), \
                                              col("Median Age").alias('median_age'), col('Male Population').alias('male_population'), \
                                              col('Female Population').alias('female_population'), \
                                              col('Total Population').alias('total_population'), \
                                              col('Number of Veterans').alias('verterans_population'), \
                                              col('Foreign-born').alias('foreign-born'), \
                                              col('Average Household Size').alias('average_household_size'), \
                                              col('State Code').alias('state_code'), \
                                              col('Race').alias('race'), \
                                              col('Count').alias('count') \
                                             ).orderBy(asc('state_code'), asc('city'))

In [26]:
# explode the Race column
race_df = spark_demographics_df.select("city", "state_code", "race", "count") \
                               .groupby("city", "state_code") \
                               .pivot("race") \
                               .agg(first("count")).orderBy(asc('state_code'), asc('city'))

In [27]:
race_df.show(10)

+------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|        city|state_code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|   Anchorage|        AK|                            36339|36825|                    23107|             27261|212696|
|  Birmingham|        AL|                             1319| 1500|                   157985|              8940| 51728|
|      Dothan|        AL|                              656| 1175|                    23243|              1704| 43516|
|      Hoover|        AL|                             null| 4759|                    18191|              3430| 61869|
|  Huntsville|        AL|                             1755| 6566|                    61561|             10887|121904|
|      Mobile|        AL|                             28

In [28]:
#join the exploded table, and remove duplicates
spark_demographics_df = spark_demographics_df.join(race_df, ['city', 'state_code'], how='left')
spark_demographics_df = spark_demographics_df.select(col('city'), col('state'), col('median_age'), col('male_population'), \
                             col('female_population'), col('total_population'), col('verterans_population'), \
                             col('foreign-born').alias('foreign_born'), col('average_household_size'), col('state_code'), \
                             col('American Indian and Alaska Native').alias('native_population'), \
                             col('Asian').alias('asian_population'), col('Black or African-American').alias('black_population'), \
                             col('Hispanic or Latino').alias('latino_population'), col('White').alias('white_population')) \
                             .dropDuplicates()

In [29]:
spark_demographics_df.printSchema()
spark_demographics_df.show(5)
spark_demographics_df.count()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- verterans_population: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- native_population: integer (nullable = true)
 |-- asian_population: integer (nullable = true)
 |-- black_population: integer (nullable = true)
 |-- latino_population: integer (nullable = true)
 |-- white_population: integer (nullable = true)

+----------+-------+----------+---------------+-----------------+----------------+--------------------+------------+----------------------+----------+-----------------+----------------+----------------+-----------------+----------------+
|      city|  state|median_age|male_population|female

596

In [30]:
# clean airport dataframe
# only us airport
# split coordinate column into latitude and longitude columns
spark_airport_df = airportDf.filter(col('iso_country') == 'US') \
                            .withColumn("latitude", trim(split(col("coordinates"), ",").getItem(0))) \
                            .withColumn("longitude", trim(split(col("coordinates"), ",").getItem(1))) \
                            .withColumn("state", trim(split(col("iso_region"), "-").getItem(1))) \
                            .select(col('ident').alias('id'), col('type'), col('name'), col('elevation_ft').alias('elevation'), \
                                    col('iso_country').alias('country'), col('state'), col('municipality'),
                                    col('gps_code'), col('iata_code'), col('local_code'), col('latitude').cast(DecimalType(9,6)),
                                    col('longitude').cast(DecimalType(9,6)))
                                    


In [31]:
spark_airport_df.printSchema()
spark_airport_df.show(5)
spark_airport_df.count()

root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- latitude: decimal(9,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)

+----+-------------+--------------------+---------+-------+-----+------------+--------+---------+----------+-----------+---------+
|  id|         type|                name|elevation|country|state|municipality|gps_code|iata_code|local_code|   latitude|longitude|
+----+-------------+--------------------+---------+-------+-----+------------+--------+---------+----------+-----------+---------+
| 00A|     heliport|   Total Rf Heliport|       11|     US|   PA|    Bensalem|     00A|     null|       00A| -74.9336

22757

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

1. The design is to support analyse of immigration data, such as traveller's citizenship, date of arrival, which city is more popular
2. The data is organized in a star schema, with fact and dimension table
3. There is a factImmigration table, containing immigration related information
4. There are 7 dimension tables, dimWeather, dimVisaCategory, dimAirport, dimCountry, dimCity, dimState, dimTransportMode

#### 3.2 Schema
the schema of the table is as follows

##### 3.2.1, Fact Table: factImmigration

| Column |  Data Type | Constraint |
| ------ | ---------- | ---------- |
| cicid | integer | PK |
| year| integer| |
| month | integer | |
| citizen | integer | FK (dimCountry) |
| resident | integer | FK (dimCountry) |
| transport_mode | integer | FK (dimTransportMode) |
| arrival_state | string | FK (dimState) |
| visa_category | integer | FK (dimVisaCategory) |
| occupation | string | |
| birth_year | integer | |
| gender | string | |
| age| integer | |
| visa_type | string | |
| port_city | integer | FK (dimCity) |
| arrival_date | date | |
| departure_date | date | |

##### 3.2.2, Dimension Table: DimWeather

| Column | Data Type | Constraint |
| ------ | --------- | ---------- |
| city | integer | PK, FK(DimCity) |
| date | date | PK |
| average_temperature | double | |

##### 3.2.3, Dimension Table: dimVisaCategory

| Column | Data Type | Constraint |
| ------ | --------- | ---------- |
| code | integer | PK |
| type | string | |


##### 3.2.4, Dimension Table: DimAirPort

| column | Data Type | Constraint |
| ------ | --------- | ---------- |
| id | string | PK |
| type | string | |
| name | string | |
| elevation | integer | |
| country | integer | FK(DimCountry) |
| state | string | FK(DimState) |
| municipality | string | |
| gps_code | string | |
| iata_code | string | |
| local_code | string | |
| latitude | decimal(9,6) | |
| longitude | decimal(9,6) |  |

##### 3.2.5, Dimension Table: DimCountry

| Column | Data Type | Contraint |
| ------ | --------- | --------- |
| code   | integer   | PK |
| name   | string    |           |


##### 3.2.6, Dimension Table: DimCity

| column | Data Type | Constraint |
| ------ | --------- | ---------- |
| city | integer | PK |
| name | string | |
| state | string | FK(DimState) |
| median_age | double | |
| male_population | integer | |
| female_population | integer | |
| total_population | integer | |
| verterans_population | integer | |
| foreign-born | integer | |
| average_household_size | double | |
| native_population | integer | |
| asian_population | integer | |
| black_population | integer | |
| latino_population | integer | |
| white_population | integer | |
 
##### 3.2.7, Dimension Table: DimState

| column | Data Type | Constraint |
|------- | --------- | ---------- |
| code   | string    | PK |
| name   | string    |            |

##### 3.2.8, Dimension Table: DimTransportMode

| column | Data Type | Constraint |
| ------ | --------- | ---------- |
| code | integer | PK |
| mode | string |  |
 
#### 3.3 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

The Fact and Dimension Table will be created in Amazon RedShift cluster. 

##### 4.1.1 Create Statement

In [32]:
#create db connection
conn = psycopg2.connect(f"host={HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} port={DB_PORT}")
cur = conn.cursor()

In [66]:
#drop fact and dimension table
dimState_drop = "DROP TABLE IF EXISTS dimState"
dimTransportMode_drop = "DROP TABLE IF EXISTS dimTransportMode"
dimCountry_drop = "DROP TABLE IF EXISTS dimCountry"
dimVisaCategory_drop = "DROP TABLE IF EXISTS dimVisaCategory"
dimCity_drop = "DROP TABLE IF EXISTS dimCity"
dimWeather_drop = "DROP TABLE IF EXISTS dimWeather"
dimAirPort_drop = "DROP TABLE IF EXISTS dimAirPort"
factImmigration_drop = "DROP TABLE IF EXISTS factImmigration"

# create fact and dimension table
dimState_create = """
CREATE TABLE IF NOT EXISTS dimState (
code varchar PRIMARY KEY,
name varchar(32)
);
"""

dimTransportMode_create = """
CREATE TABLE IF NOT EXISTS dimTransportMode (
code int PRIMARY KEY,
mode varchar(64)
);
"""

dimCountry_create = """
CREATE TABLE IF NOT EXISTS dimCountry (
code int PRIMARY KEY,
name varchar(64)
);
"""

dimVisaCategory_create = """
CREATE TABLE IF NOT EXISTS dimVisaCategory (
code int PRIMARY KEY,
type varchar(64)
);
"""

dimCity_create = """
CREATE TABLE IF NOT EXISTS dimCity (
city bigint PRIMARY KEY,
name varchar(128),
state varchar,
median_age float,
male_population bigint,
female_population bigint,
total_population bigint,
verterans_population bigint,
foreign_born bigint,
average_household_size float,
native_population bigint,
asian_population bigint,
black_population bigint,
latino_population bigint,
white_population bigint,
FOREIGN KEY(state) REFERENCES dimState(code)
);
"""

dimWeather_create = """
CREATE TABLE IF NOT EXISTS dimWeather (
city bigint NOT NULL REFERENCES dimCity(city),
"date" date NOT NULL,
average_temperature float,
PRIMARY KEY(city, date)
);
"""

dimAirPort_create = """
CREATE TABLE IF NOT EXISTS dimAirport (
id varchar PRIMARY KEY,
type varchar(128),
name varchar(128) NOT NULL,
elevation int,
country int REFERENCES dimCountry(code),
state varchar REFERENCES dimState(code),
municipality varchar(128),
gps_code varchar(64),
iata_code varchar(64),
local_code varchar(64),
latitude numeric(9,6),
longitude numeric(9,6)
);
"""

factImmigration_create = """
CREATE TABLE IF NOT EXISTS factImmigration (
cicid bigint PRIMARY KEY,
year int,
month int,
citizen int REFERENCES dimCountry(code),
resident int REFERENCES dimCountry(code),
transport_mode int REFERENCES dimTransportMode(code),
arrival_state varchar REFERENCES dimState(code),
visa_category int REFERENCES dimVisaCategory(code),
occupation varchar(128),
birth_year int,
gender char(1),
age int,
visa_type varchar(32),
port_city bigint REFERENCES dimCity(city),
arrival_date date,
departure_date date
);
"""



In [34]:
#drop existing tables in redshift db
table_drop_list = [ factImmigration_drop, dimAirPort_drop, dimWeather_drop, dimCity_drop, dimVisaCategory_drop, dimTransportMode_drop, \
                    dimCountry_drop, dimState_drop]

for query in table_drop_list:
    cur.execute(query)
    conn.commit()

In [35]:
#create tables in redshift db
table_create_list = [ dimState_create, dimCountry_create, dimTransportMode_create, dimVisaCategory_create, dimCity_create, \
                      dimWeather_create, dimAirPort_create, factImmigration_create]

for query in table_create_list:
    cur.execute(query)
    conn.commit()

In [36]:
conn.close()

In [37]:
# setup parameter for spark dataframe to write to redshift
mode = "append"
url = f"postgresql://{DB_USER}:{DB_PASSWORD}@{HOST}:{DB_PORT}/{DB_NAME}"

conn = create_engine(url)

#### 4.1.2, populate fact and dimension table

In [38]:
#populate dimState table
addressDf.select(col('code'), col('state').alias('name')).toPandas().to_sql('dimstate', con = conn, index=False, if_exists=mode)

In [39]:
#populate dimCountry table
countryDf.select(col('code'), col('country').alias('name')).toPandas().to_sql('dimcountry', con=conn, index=False, if_exists=mode)

In [40]:
#populate dimTransportMode
modeDf.toPandas().to_sql('dimtransportmode', con=conn, index=False, if_exists=mode)

In [41]:
#populate dimVisaCategory
visaDf.toPandas().to_sql('dimvisacategory', con = conn, index = False, if_exists = mode)

In [42]:
#populate dimCity table
city_df = spark_demographics_df.drop('state') \
                          .withColumnRenamed('state_code', 'state') \
                          .withColumnRenamed('city', 'name') \
                          .withColumn('city', monotonically_increasing_id())
city_df.toPandas().to_sql('dimcity', con = conn, index=False, if_exists=mode)

In [43]:
#to limit row count, otherwise query taking long time
city_df = city_df.filter(col('name') == 'New York')

#populate dimWeather table
#calculates the average daily temperature for us cities
spark_daily_weather_df = spark_weather_df.filter(col('Country') == 'United States').withColumnRenamed('city', 'cityname') \
                                          .select(col('cityname'), to_date(col('dt')).alias('date'), col('average_temperature')) \
                                          .dropna() \
                                          .groupBy('cityname', 'date').agg(mean('average_temperature').alias('average_temperature'))
weather_df = spark_daily_weather_df.join(city_df, spark_daily_weather_df.cityname == city_df.name) \
                             .drop_duplicates().select(col('city'),col('date'), col('average_temperature'))


weather_df.toPandas().to_sql('dimweather', con = conn, index = False, if_exists = mode)

In [44]:
#popualte dimAirport table
uscode = countryDf.filter(col('country') == 'United States').collect()
airport_df = spark_airport_df.filter(col('country') == 'US').drop('country').withColumn('country', lit(uscode[0][0]))

#to limit row count, otherwise query taking too long
#airport_df = airport_df.filter(col('state') == 'NY')

airport_df.toPandas().to_sql('dimairport', con=conn, index=False, if_exists=mode)

In [45]:
#populate factImmigration table
i94_df = spark_i94_df
#to limit rows, otherwise it is taking a long time
#i94_df = i94_df.filter(col('visa_type') == 'F1')

i94_df = i94_df.join(city_df.select(col('city'),col('name')) , i94_df.port_city == city_df.name) \
               .drop('name', 'port_city', 'port_state') \
               .select(col('id').alias('cicid'), col('year'), col('month'), col('citizen'), col('resident'), \
                       col('transport_mode'), col('arrival_state'), col('visa_category'), \
                       col('occupation'), col('birth_year'), col('gender'), col('age'), \
                       col('visa_type'), to_date(col('arrival_date')).alias('arrival_date'), \
                       to_date(col('departure_date')).alias('departure_date'), \
                       col('city').alias('port_city'))

i94_df.toPandas().to_sql('factimmigration', con = conn, index = False, if_exists=mode)

In [46]:
conn.dispose()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [64]:
#create db connection
conn = psycopg2.connect(f"host={HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} port={DB_PORT}")
cur = conn.cursor()

In [49]:
# Perform quality checks here
# notice the original data set is too big, when storing data to the database, only a subset was stored otherwise it will take
# too long to run


#row count check
for name in ['dimState', 'dimCountry', 'dimVisaCategory', 'dimTransportMode', 'dimWeather', 'dimAirport', 'dimCity', 'factImmigration']:
    query = f"select count(*) from {name}"
    cur.execute(query)
    result = cur.fetchone()
    print (f"Table {name} has {result[0]} rows")

Table dimState has 55 rows
Table dimCountry has 290 rows
Table dimVisaCategory has 3 rows
Table dimTransportMode has 4 rows
Table dimWeather has 3119 rows
Table dimAirport has 668 rows
Table dimCity has 596 rows
Table factImmigration has 5659 rows


In [58]:
# verify we can retrieve the name of ths state from fact table and state table
query = "select distinct s.code, s.name as state from factImmigration f join dimState s on s.code = f.arrival_state order by f.arrival_state"
cur.execute(query)
result = cur.fetchall()
for row in result:
        print (f"state code: {row[0]}, name: {row[1]}")

state code: AL, name: Alabama
state code: AR, name: Arkansas
state code: AZ, name: Arizona
state code: CA, name: California
state code: CO, name: Colorado
state code: CT, name: Connecticut
state code: DC, name: Dist. Of Columbia
state code: DE, name: Delaware
state code: FL, name: Florida
state code: GA, name: Georgia
state code: HI, name: Hawaii
state code: IA, name: Iowa
state code: ID, name: Idaho
state code: IL, name: Illinois
state code: IN, name: Indiana
state code: KS, name: Kansas
state code: KY, name: Kentucky
state code: LA, name: Louisiana
state code: MA, name: Massachusetts
state code: MD, name: Maryland
state code: ME, name: Maine
state code: MI, name: Michigan
state code: MN, name: Minnesota
state code: MO, name: Missouri
state code: MS, name: Mississippi
state code: MT, name: Montana
state code: NC, name: N. Carolina
state code: ND, name: N. Dakota
state code: NE, name: Nebraska
state code: NH, name: New Hampshire
state code: NJ, name: New Jersey
state code: NM, name: Ne

In [65]:
# verify we can retrieve the name of the country from fact table and country table
query = "select distinct c.code, c.name as country from factImmigration f join dimCountry c on f.citizen = c.code order by c.name"
cur.execute(query)
result = cur.fetchall()
for row in result:
    print (f"country, code: {row[0]}, name: {row[1]}")

country, code: 101, name: Albania
country, code: 316, name: Algeria
country, code: 102, name: Andorra
country, code: 324, name: Angola
country, code: 518, name: Antigua-Barbuda
country, code: 687, name: Argentina
country, code: 438, name: Australia
country, code: 103, name: Austria
country, code: 152, name: Azerbaijan
country, code: 512, name: Bahamas
country, code: 298, name: Bahrain
country, code: 274, name: Bangladesh
country, code: 513, name: Barbados
country, code: 153, name: Belarus
country, code: 104, name: Belgium
country, code: 386, name: Benin
country, code: 509, name: Bermuda
country, code: 242, name: Bhutan
country, code: 688, name: Bolivia
country, code: 164, name: Bosnia-Herzegovina
country, code: 336, name: Botswana
country, code: 689, name: Brazil
country, code: 105, name: Bulgaria
country, code: 310, name: Cameroon
country, code: 690, name: Chile
country, code: 245, name: China, Prc
country, code: 311, name: Collapsed Tanzania (Should Not Show)
country, code: 691, name

#### additional queries

In [54]:
# top 5 popular state that has the highest immigration count
query = "select count(*) as count, arrival_state from factImmigration group by arrival_state order by count desc limit 5"
cur.execute(query)
result = cur.fetchall()
for row in result:
    print (f"state: {row[1]}, count: {row[0]}")
    

state: NY, count: 3441
state: CA, count: 295
state: NJ, count: 251
state: MA, count: 240
state: PA, count: 212


In [63]:
cur.close()
conn.close()

#### 4.3 Data dictionary 

The data dictionary for the Fact and Dimension tables are as below

##### Fact Table: factImmigration

| Column |  Description | Source |
| ------ | ------------ | ------ |
| cicid | unique identifier in I94 immigration data set | I94 immigration sas7dat files |
| year| year of arrival| I94 immigration sas7dat files |
| month | month of arrival | I94 immigration sas7dat files |
| citizen | country of citizenship, id which is used to join with dimCountry Table | dimCountry table |
| resident | country of residence, id which is used to join with dimCountry table | dimCountry table |
| transport_mode | transport mode id which is used to join with dimTransportMode table | dimTransportMode table |
| arrival_state | arrival state code used to join with dimState table | dimState table |
| visa_category | visa category, whcih is used to join with dimVisaCategory table | dimVisaCategory table |
| occupation | occupation | I94 immigration sas7dat files |
| birth_year | year of birth | I94 immigration sas7dat files |
| gender | gender, M or F | I94 immigration sas7dat files |
| age| age | I94 immigration sas7dat files |
| visa_type | visa type | I94 immigration sas7dat files |
| port_city | port of arrival, city id which is used to join with dimCity table | dimCity table |
| arrival_date | date of arrival | I94 immigration sas7dat files |
| departure_date | date of departure | I94 immigration sas7dat files |

##### Dimension Table: DimWeather

| Column | Description | Source |
| ------ | ----------- | ------ |
| city | city id used to join with dimCity | dimCity table |
| date | date of the temperature recording | GlobalLandTemperatureByCities.csv |
| average_temperature | average temperature for a given date | GlobalLandTemperatureByCities.csv |

##### Dimension Table: dimVisaCategory

| Column | Description | Source |
| ------ | ----------- | ------ |
| code | visa category code | I94 immigration sas7dat files |
| type | visa category type | I94 immigration sas7dat files |


##### Dimension Table: DimAirPort

| column | Description | Source |
| ------ | ----------- | ------ |
| id | airport unique id | airport-codes_csv.csv |
| type | string | airport-codes_csv.csv |
| name | string | airport-codes_csv.csv |
| elevation | integer | airport-codes_csv.csv |
| country | country code which is used to join with dimCountry table | dimCountry table |
| state | state code of the airport, which is used to join with dimState table | dimState table |
| municipality | municpal of the airport | airport-codes_csv.csv |
| gps_code | gps identifier of the airport | airport-codes_csv.csv |
| iata_code |  IATA code of the airport | airport-codes_csv.csv |
| local_code | local identifier of the airport | airport-codes_csv.csv |
| latitude | latitude of the airport | airport-codes_csv.csv |
| longitude | longitude of the airport |  airport-codes_csv.csv |

##### Dimension Table: DimCountry

| Column | Description | Source |
| ------ | ----------- | ------ |
| code   | country code | I94_SAS_Labels_Descriptions.SAS |
| name   | country name | I94_SAS_Labels_Descriptions.SAS |


##### Dimension Table: DimCity

| column | Description | Source |
| ------ | ----------- | ------ |
| city | city id, which is generated automatically and serve as primary key | auto generated |
| name | name of the city | us-cities-demographics.csv |
| state | state code, which is used to join with dimState table | dimState table |
| median_age | median age of the given city | us-cities-demographics.csv |
| male_population | male population of the given city | us-cities-demographics.csv |
| female_population | female population of the given city | us-cities-demographics.csv |
| total_population | total population of the given city | us-cities-demographics.csv |
| verterans_population | population of verterans in the given city | us-cities-demographics.csv |
| foreign-born | population of foreign born in the given city | us-cities-demographics.csv |
| average_household_size | average household size in the given city | us-cities-demographics.csv |
| native_population | native population in the given city | us-cities-demographics.csv |
| asian_population | asian population in the given city | us-cities-demographics.csv |
| black_population | black population in the given city | us-cities-demographics.csv |
| latino_population | latino population in the given city | us-cities-demographics.csv |
| white_population | white population in the given city | us-cities-demographics.csv |
 
##### Dimension Table: DimState

| column | Description | Source |
|------- | ----------- | ------ |
| code   | state code    | I94 immigration sas7dat files |
| name   | state name    | I94 immigration sas7dat files |

##### Dimension Table: DimTransportMode

| column | Description | Source |
| ------ | ----------- | ------ |
| code | transportation mode id | I94 immigration sas7dat files |
| mode | transporttation mode  | I94 immigration sas7dat files |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

  The aim of the project is to support queries of I94 immigration data. Apache Spark was used to extract data from various data source, clean up the data and load the data into redshift database.
  
  Spark was used, because it can be run in a cluster, and as data volume increase we can add more machines to handle the volume.
  The data was stored in redshift becasue of its easy of use, setup and performance. Also it support sql queries that most users is already familar with.
  
  
* Propose how often the data should be updated and why.

  Since the I94 data is monthly, it will be appropriate to incrementally udpate the data monthly.
  
  
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
   If the data was to be increased by 100x fold, we can add more nodes to the spark cluster(EMR) to handle the etl portion. On top of that we should also increase the cluster size of the redshift database backend, employ more powerful machines with more memory and disk space, so it can handle the anticipated increase in data volume.
   
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
   If the data needs to be updated daily by 7am every day, then we should use automated workflow solution like airflow. which will send out notification (via slack channel, email or other means) to notify if the job has been successful completed, or need attention by certain time so manual intervention can kick in to ensure the updated will meet the 7am deadline each day.
   
   
 * The database needed to be accessed by 100+ people.
 
   If the number of users accessing the data has to be accessed by 100+ people, we can upgrade the tier of the redshift database server. Using more powerful machines, more cpu's and memory to handle the increase in user population.