# Data Engineering Capstone Project

### Project Scope

In this project a data pipeline is developed to create an analytics database that revolves around US immigration data. The tables are stored in AWS Redshift and the pipeline is orchestrated in Apache Airflow

### Data overview
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. 
- World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
- U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

In [1]:
import pandas as pd
import numpy as np
import configparser
import psycopg2

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, rand
from pyspark.sql.functions import isnan, when, count, col, isnull

### Exploration phase
I will explore the dimensions of this dataset with spark. Also I will do a preliminary data quality check. I'll check whether the dataset has duplicate rows and if it has null or NaN values in the id columns. I will do this consistently across all datasets

#### Exploration: Immigration Data I94

In [3]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
df_immig = spark.read.parquet('sas_data')

# Count the records
df_immig.count()


3096313

In [5]:
# Print schema
df_immig.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

All columns are nullable

In [6]:
# table head
df_immig.show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [7]:
# Check for duplicate rows
df_immig_dups = df_immig.groupBy('cicid', 'admnum').count().where('count > 1') \
    .sort('count', ascending=False) \
    .show()
df_immig_dups

+-----+------+-----+
|cicid|admnum|count|
+-----+------+-----+
+-----+------+-----+



There are no duplicate rows in this dataset

In [8]:
df_immig.select('cicid').withColumn('is_nan', isnan('cicid')).where('is_nan = True').count()

0

In [9]:
df_immig.select('cicid').withColumn('is_null', isnull('cicid')).where('is_null = True').count()

0

There are no NaN or null values in the id column

In [10]:
# Check for nan and nulls in all columns
df_immig.select([count(when(isnan(c), c)).alias(c) for c in df_immig.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|      0|      0|      0|     0|      0|    0|       0|       0|    0|      0|      0|      0|      0|      0|      0|     0|     0|      0|     0|    0|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-

In [11]:
df_immig.select([count(when(isnull(c), c)).alias(c) for c in df_immig.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

#### Exploration: Demographics data

In [12]:
# read in data
demo = spark.read.option('delimiter', ';').csv('data/us-cities-demographics.csv', header=True)
demo.count()

2891

In [13]:
demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [14]:
demo.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [15]:
demo_dups = demo.groupBy('City', 'State', 'Race').count().where('count > 1') \
    .sort('count', ascending=False) \
    .show()
demo_dups

+----+-----+----+-----+
|City|State|Race|count|
+----+-----+----+-----+
+----+-----+----+-----+



There are no duplicate rows in the demographics dataset, where the number of people of the same race in a specific city are counted double

In [16]:
demo.select([count(when(isnan(c), c)).alias(c) for c in demo.columns]).show()


+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              0|                0|               0|                 0|           0|                     0|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



No NaN values

In [17]:
demo.select([count(when(isnull(c), c)).alias(c) for c in demo.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



Numerous null values

In [18]:
# Find null male population values
demo.select('City', 'State', 'Male Population', 'Race', 'Count').where(isnull('Male Population')).show()

+------------+-------+---------------+--------------------+-----+
|        City|  State|Male Population|                Race|Count|
+------------+-------+---------------+--------------------+-----+
|The Villages|Florida|           null|  Hispanic or Latino| 1066|
|The Villages|Florida|           null|Black or African-...|  331|
|The Villages|Florida|           null|               White|72211|
+------------+-------+---------------+--------------------+-----+



In [19]:
# Find null female population values
demo.select('City', 'State', 'Female Population', 'Race', 'Count').where(isnull('Female Population')).show()

+------------+-------+-----------------+--------------------+-----+
|        City|  State|Female Population|                Race|Count|
+------------+-------+-----------------+--------------------+-----+
|The Villages|Florida|             null|  Hispanic or Latino| 1066|
|The Villages|Florida|             null|Black or African-...|  331|
|The Villages|Florida|             null|               White|72211|
+------------+-------+-----------------+--------------------+-----+



The villages doesn't have entries for Male and Female Population

In [20]:
# Find null values for Veterans
demo.select('City', 'State', 'Number of Veterans', 'Race', 'Count').where(isnull('Number of Veterans')).orderBy('City').show()

+--------+-----------+------------------+--------------------+------+
|    City|      State|Number of Veterans|                Race| Count|
+--------+-----------+------------------+--------------------+------+
| Bayamón|Puerto Rico|              null|  Hispanic or Latino|169155|
|  Caguas|Puerto Rico|              null|  Hispanic or Latino| 76349|
|  Caguas|Puerto Rico|              null|American Indian a...|   624|
|Carolina|Puerto Rico|              null|American Indian a...| 12143|
|Carolina|Puerto Rico|              null|  Hispanic or Latino|139967|
|Guaynabo|Puerto Rico|              null|  Hispanic or Latino| 69936|
|Guaynabo|Puerto Rico|              null|American Indian a...|   589|
|Mayagüez|Puerto Rico|              null|               Asian|   235|
|Mayagüez|Puerto Rico|              null|  Hispanic or Latino| 65521|
|   Ponce|Puerto Rico|              null|  Hispanic or Latino|120705|
|San Juan|Puerto Rico|              null|  Hispanic or Latino|335559|
|San Juan|Puerto Ric

Puerto Rican cities do not have veteran counts

In [21]:
# Find null values for Foreign born residents
demo.select('City', 'State', 'Foreign-born', 'Race', 'Count').where(isnull('Foreign-born')).orderBy('City').show()

+--------+-----------+------------+--------------------+------+
|    City|      State|Foreign-born|                Race| Count|
+--------+-----------+------------+--------------------+------+
| Bayamón|Puerto Rico|        null|  Hispanic or Latino|169155|
|  Caguas|Puerto Rico|        null|  Hispanic or Latino| 76349|
|  Caguas|Puerto Rico|        null|American Indian a...|   624|
|Carolina|Puerto Rico|        null|American Indian a...| 12143|
|Carolina|Puerto Rico|        null|  Hispanic or Latino|139967|
|Guaynabo|Puerto Rico|        null|  Hispanic or Latino| 69936|
|Guaynabo|Puerto Rico|        null|American Indian a...|   589|
|Mayagüez|Puerto Rico|        null|               Asian|   235|
|Mayagüez|Puerto Rico|        null|  Hispanic or Latino| 65521|
|   Ponce|Puerto Rico|        null|  Hispanic or Latino|120705|
|San Juan|Puerto Rico|        null|  Hispanic or Latino|335559|
|San Juan|Puerto Rico|        null|American Indian a...|  4031|
|San Juan|Puerto Rico|        null|     

Puerto Rican cities do not have foreign-born counts

In [22]:
demo.select('City', 'State', 'Average Household Size', 'Race', 'Count').where(isnull('Average Household Size')).orderBy('City').show()

+------------+-----------+----------------------+--------------------+------+
|        City|      State|Average Household Size|                Race| Count|
+------------+-----------+----------------------+--------------------+------+
|     Bayamón|Puerto Rico|                  null|  Hispanic or Latino|169155|
|      Caguas|Puerto Rico|                  null|  Hispanic or Latino| 76349|
|      Caguas|Puerto Rico|                  null|American Indian a...|   624|
|    Carolina|Puerto Rico|                  null|  Hispanic or Latino|139967|
|    Carolina|Puerto Rico|                  null|American Indian a...| 12143|
|    Guaynabo|Puerto Rico|                  null|  Hispanic or Latino| 69936|
|    Guaynabo|Puerto Rico|                  null|American Indian a...|   589|
|    Mayagüez|Puerto Rico|                  null|               Asian|   235|
|    Mayagüez|Puerto Rico|                  null|  Hispanic or Latino| 65521|
|       Ponce|Puerto Rico|                  null|  Hispanic or L

Puerto Rican cities and the Villages in Florida have not reported their average household size. This coincides with the findings for null values in other categories. The Villages and all the cities in Puerto Rico contain null values.

#### Exploration: Airport codes data

In [23]:
airport = spark.read.option('delimiter', ',').csv('data/airport-codes_csv.csv', header=True)
airport.count()

55075

In [24]:
airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [25]:
airport.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [26]:
# Check for duplicate rows
airport_dups = airport.groupBy('ident', 'name').count().where('count > 1') \
    .sort('count', ascending=False) \
    .show()
airport_dups

+-----+----+-----+
|ident|name|count|
+-----+----+-----+
+-----+----+-----+



No duplicate rows in airport data

In [27]:
airport.select([count(when(isnan(c), c)).alias(c) for c in airport.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|           0|        0|          0|         0|           0|       0|        0|         0|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



In [28]:
airport.select([count(when(isnull(c), c)).alias(c) for c in airport.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|        7006|        0|          0|         0|        5676|   14045|    45886|     26389|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



No NaN values in airport data. Numerous null values in elevation_ft, municipality, gps_code, iata_code and local_code

### Immigration data parsing
I94_SAS_Labels_Description.SAS file contains labels of country codes, ports and modes, that need to be parsed as these are valuable additions to the data model and schema design of the immigration data.

In [29]:
def parse_sas(sas_file: str, parsing_value:str, colnames:list):
    with open(sas_file) as f:
        sub_file = f.read()
    # get sub file from SAS file. It will read in starting from the parsing value
    sub_file = sub_file[sub_file.index(parsing_value):]
    # the cutoff between sub files is a semicolon. So you want to grab everything before the semicolon
    sub_file = sub_file[:sub_file.index(';')]
    # the values in the sub file are separated by new lines, these need to be split first and turned into a list of rows with data
    rows = sub_file.split('\n')[1:]
    codes = []
    names = []
    for row in rows:
        if '=' in row:
            c_code, c_name = row.split('=')
            c_code = c_code.strip()
            c_name = c_name.strip()
            

            if c_code[0] == "'":
                c_code = c_code[1:-1]

            if c_name[0] == "'":
                c_name = c_name[1:-1]
                c_name = c_name.lstrip()
                c_name = c_name.rstrip()

            codes.append(c_code)
            names.append(c_name)
    zipped = zip(codes, names)
    spark_df = spark.createDataFrame(list(zipped), colnames)
    return spark_df

In [31]:
# parse country code
country_code = parse_sas('data/I94_SAS_Labels_Descriptions.SAS', 'i94cntyl', ['code', 'country'])
country_code.show()

+----+--------------------+
|code|             country|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
| 316|             ALGERIA|
| 102|             ANDORRA|
| 324|              ANGOLA|
| 529|            ANGUILLA|
| 518|     ANTIGUA-BARBUDA|
| 687|           ARGENTINA|
| 151|             ARMENIA|
| 532|               ARUBA|
| 438|           AUSTRALIA|
| 103|             AUSTRIA|
| 152|          AZERBAIJAN|
| 512|             BAHAMAS|
| 298|             BAHRAIN|
| 274|          BANGLADESH|
| 513|            BARBADOS|
| 104|             BELGIUM|
| 581|              BELIZE|
+----+--------------------+
only showing top 20 rows



In [32]:
# parse port of entry
port_of_entry = parse_sas('data/I94_SAS_Labels_Descriptions.SAS', 'i94prtl', ['code', 'port_of_entry'])
port_of_entry.show()

+----+--------------------+
|code|       port_of_entry|
+----+--------------------+
| ALC|           ALCAN, AK|
| ANC|       ANCHORAGE, AK|
| BAR|BAKER AAF - BAKER...|
| DAC|   DALTONS CACHE, AK|
| PIZ|DEW STATION PT LA...|
| DTH|    DUTCH HARBOR, AK|
| EGL|           EAGLE, AK|
| FRB|       FAIRBANKS, AK|
| HOM|           HOMER, AK|
| HYD|           HYDER, AK|
| JUN|          JUNEAU, AK|
| 5KE|       KETCHIKAN, AK|
| KET|       KETCHIKAN, AK|
| MOS|MOSES POINT INTER...|
| NIK|         NIKISKI, AK|
| NOM|             NOM, AK|
| PKC|     POKER CREEK, AK|
| ORI|  PORT LIONS SPB, AK|
| SKA|         SKAGWAY, AK|
| SNP| ST. PAUL ISLAND, AK|
+----+--------------------+
only showing top 20 rows



In [33]:
# parse mode of transport
mode_of_transport = parse_sas('data/I94_SAS_Labels_Descriptions.SAS', 'i94model', ['code', 'mode_of_transport'])
mode_of_transport.show()

+----+-----------------+
|code|mode_of_transport|
+----+-----------------+
|   1|              Air|
|   2|              Sea|
|   3|             Land|
|   9|     Not reported|
+----+-----------------+



In [35]:
# parse address
address = parse_sas('I94_SAS_Labels_Descriptions.SAS', ' i94addrl', ['code', 'address'])
address.show()

+----+-----------------+
|code|          address|
+----+-----------------+
|  AL|          ALABAMA|
|  AK|           ALASKA|
|  AZ|          ARIZONA|
|  AR|         ARKANSAS|
|  CA|       CALIFORNIA|
|  CO|         COLORADO|
|  CT|      CONNECTICUT|
|  DE|         DELAWARE|
|  DC|DIST. OF COLUMBIA|
|  FL|          FLORIDA|
|  GA|          GEORGIA|
|  GU|             GUAM|
|  HI|           HAWAII|
|  ID|            IDAHO|
|  IL|         ILLINOIS|
|  IN|          INDIANA|
|  IA|             IOWA|
|  KS|           KANSAS|
|  KY|         KENTUCKY|
|  LA|        LOUISIANA|
+----+-----------------+
only showing top 20 rows



In [37]:
# parse visa
visa = parse_sas('data/I94_SAS_Labels_Descriptions.SAS', ' i94visa', ['code', 'visa'])
visa.show()

+----+--------+
|code|    visa|
+----+--------+
|   1|Business|
|   2|Pleasure|
|   3| Student|
+----+--------+

