# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project performs creates an ETL pipeline with the provided US i94 Immigration data together with the demographic data to be analysed and determine if trends and patterns can be analysed for different US cities.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [5]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import first, col, upper, udf, date_format
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.functions import regexp_replace, trim
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, dayofweek
from datetime import datetime, timedelta, date



In [6]:
	

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 1: Scope the Project and Gather Data

#### Scope 
Create an ETL pipeline with the I94 immigration data and cities demographic data.

#### Describe and Gather Data 
US Citites Demographic Data - A breakdown of the cities' demographics

I94 Immigration Data - As per the I94 SAS Label Description.txt, removing fields which CIC do not use 

cit_res.txt - Id used in I94 to denote countries and countries (Extracted from I94 SAS Label Description.txt)

mode.txt - Id used in I94 to denote mode of transport and the transportation used to enter the US (Extracted from I94 SAS Label Description.txt) 

port.txt - Id used in I94 to denote port of entry, port name and state (Extracted from I94 SAS Label Description.txt)

visa.txt - Id used in I94 to denote the visa and the reason of entry into the US (Extracted from I94 SAS Label Description.txt) 

![Schema](./Capstone_Diagram.png)




In [7]:
# Read in the data here
us_city_demographics=spark.read.csv("./us-cities-demographics.csv", sep=';', header=True)

# Display dataset columns
us_city_demographics.columns


['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [8]:
# From the above data, only race and count has different data based on the state.
# Normalise us_city_demographics with Spark's pivot on Race to set Race as columns and Count as record row values
us_city_demographics.show(20)
us_race_count = (us_city_demographics.select("City","State Code","Race","Count")
    .groupby("City", "State Code")
    .pivot("Race")
    .agg(first("Count")))

us_race_count.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [9]:
# Remove extracted Race and Count from us_city_demographics columns and check for duplicates
columns_to_drop = ['Race', 'Count']
us_city_demographics = us_city_demographics.drop(*columns_to_drop)
us_city_demographics.count(), us_city_demographics.dropDuplicates().count()

(2891, 596)

In [10]:
#remove duplicates
us_city_demographics = us_city_demographics.dropDuplicates()
us_city_demographics.show(10)

+---------------+-----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|           City|      State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+---------------+-----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|   Johnson City|  Tennessee|      38.2|          31019|            34350|           65369|              5038|        2878|                  2.18|        TN|
|   San Clemente| California|      45.2|          34076|            31456|           65532|              3970|        8109|                  2.64|        CA|
|   Redwood City| California|      37.1|          42676|            42624|           85300|              2430|       27652|                  2.64|        CA|
|     Fort Myers|    Florida|      37.3|          36

In [11]:
# Combine dataframes on City, StateCode 
us_df = us_city_demographics.join(us_race_count, ["City", "State Code"])

us_df.show()

+---------------+----------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+---------------------------------+------+-------------------------+------------------+-------+
|           City|State Code|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|American Indian and Alaska Native| Asian|Black or African-American|Hispanic or Latino|  White|
+---------------+----------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+---------------------------------+------+-------------------------+------------------+-------+
|   Johnson City|        TN|     Tennessee|      38.2|          31019|            34350|           65369|              5038|        2878|                  2.18|                              400|  1877|                     6016|        

In [12]:
# Display data
us_city_demographics.orderBy("city").show(20)

+------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|        City|       State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|     Abilene|       Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|
|       Akron|        Ohio|      38.1|          96886|           100667|          197553|             12878|       10024|                  2.24|        OH|
|     Alafaya|     Florida|      33.5|          39504|            45760|           85264|              4176|       15842|                  2.94|        FL|
|     Alameda|  California|      41.4|          37747|          

In [13]:
# Combine dataframes on City, StateCode 
us_df = us_city_demographics.join(us_race_count, ["City", "State Code"])

us_df.show()

+---------------+----------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+---------------------------------+------+-------------------------+------------------+-------+
|           City|State Code|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|American Indian and Alaska Native| Asian|Black or African-American|Hispanic or Latino|  White|
+---------------+----------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+---------------------------------+------+-------------------------+------------------+-------+
|   Johnson City|        TN|     Tennessee|      38.2|          31019|            34350|           65369|              5038|        2878|                  2.18|                              400|  1877|                     6016|        

In [14]:
# Remove column spaces and "-" and change to lower case before writing to parquet for compatibility
us_df = us_df.select(
    col("City").alias("city"),
    col("State Code").alias("state_code"),
    col("State").alias("state"),
    col("Median Age").alias("median_age"),
    col("Male Population").alias("male_population"),
    col("Female Population").alias("female_population"),
    col("Total Population").alias("total_population"),
    col("Number of Veterans").alias("veterans"),
    col("Foreign-born").alias("foreign_born"),
    col("Average Household Size").alias("avg_household_size"),
    col("American Indian and Alaska Native").alias("natives"),
    col("Asian").alias("asian"),
    col("Black or African-American").alias("black"),
    col("Hispanic or Latino").alias("hispanic"),
    col("White").alias("white"),
)

us_df.columns



['city',
 'state_code',
 'state',
 'median_age',
 'male_population',
 'female_population',
 'total_population',
 'veterans',
 'foreign_born',
 'avg_household_size',
 'natives',
 'asian',
 'black',
 'hispanic',
 'white']

In [15]:
# write to parquet
us_df.write.mode('overwrite').parquet("./data/us_city_demographics.parquet")


In [16]:
# Mode file

# define schema
mode_schema = StructType([
    StructField("mode_id", StringType(), True),
    StructField("mode_type", StringType(), True)])
mode_df=spark.read.csv("./mode.txt", sep='=', header=False, schema=mode_schema)
mode_df.show()

+-------+---------------+
|mode_id|      mode_type|
+-------+---------------+
|     1 |          'Air'|
|     2 |          'Sea'|
|     3 |         'Land'|
|     9 | 'Not reported'|
+-------+---------------+



In [17]:
mode_df = mode_df.withColumn("mode_id", trim(regexp_replace("mode_id","[']",'')))\
                    .withColumn("mode_type", trim(regexp_replace("mode_type","[']",'')))
mode_df.show()

+-------+------------+
|mode_id|   mode_type|
+-------+------------+
|      1|         Air|
|      2|         Sea|
|      3|        Land|
|      9|Not reported|
+-------+------------+



In [18]:
# write to parquet
mode_df.write.mode("overwrite").parquet("./data/mode.parquet")

In [19]:
# Cit_Res file
# define schema
cit_res_schema = StructType([
    StructField("cit_res_id", StringType(), True),
    StructField("country", StringType(), True)])
cit_res_df=spark.read.csv("./cit_res.txt", sep='=', header=False, schema=cit_res_schema)
cit_res_df.show(10)

+----------+--------------------+
|cit_res_id|             country|
+----------+--------------------+
|      582 |  'MEXICO Air Sea...|
|      236 |       'AFGHANISTAN'|
|      101 |           'ALBANIA'|
|      316 |           'ALGERIA'|
|      102 |           'ANDORRA'|
|      324 |            'ANGOLA'|
|      529 |          'ANGUILLA'|
|      518 |   'ANTIGUA-BARBUDA'|
|      687 |        'ARGENTINA '|
|      151 |           'ARMENIA'|
+----------+--------------------+
only showing top 10 rows



In [20]:
cit_res_df = cit_res_df.withColumn("cit_res_id", trim(regexp_replace("cit_res_id","[']",'')))\
                        .withColumn("country", trim(regexp_replace("country","[']",'')))
cit_res_df.show(10)

+----------+--------------------+
|cit_res_id|             country|
+----------+--------------------+
|       582|MEXICO Air Sea, a...|
|       236|         AFGHANISTAN|
|       101|             ALBANIA|
|       316|             ALGERIA|
|       102|             ANDORRA|
|       324|              ANGOLA|
|       529|            ANGUILLA|
|       518|     ANTIGUA-BARBUDA|
|       687|           ARGENTINA|
|       151|             ARMENIA|
+----------+--------------------+
only showing top 10 rows



In [21]:
# write to parquet
mode_df.write.mode("overwrite").parquet("./data/cit_res.parquet")

In [22]:
# Port file, manual cleaning of text file to separate city, state, and "'"
# define schema
port_schema = StructType([
    StructField("port_id", StringType(), True),
    StructField("port_city", StringType(), True),
    StructField("port_state", StringType(), True)])
port_df=spark.read.csv("./port.txt", sep='=', header=False, schema=port_schema)
port_df.show(10)

+-------+--------------------+--------------------+
|port_id|           port_city|          port_state|
+-------+--------------------+--------------------+
|    ALC|               ALCAN|     AK             |
|    ANC|           ANCHORAGE|         AK         |
|    BAR|BAKER AAF - BAKER...|                  AK|
|    DAC|       DALTONS CACHE|             AK     |
|    PIZ|DEW STATION PT LA...|                  AK|
|    DTH|        DUTCH HARBOR|            AK      |
|    EGL|               EAGLE|     AK             |
|    FRB|           FAIRBANKS|         AK         |
|    HOM|               HOMER| AK              ...|
|    HYD|               HYDER|     AK             |
+-------+--------------------+--------------------+
only showing top 10 rows



In [23]:
port_df = port_df.withColumn("port_id", trim(regexp_replace("port_id","[']",''))) \
                 .withColumn("port_city", trim(regexp_replace("port_city","[']",'')))\
                 .withColumn("port_state", trim(regexp_replace("port_state","[']",'')))
port_df.show(10)

+-------+--------------------+----------+
|port_id|           port_city|port_state|
+-------+--------------------+----------+
|    ALC|               ALCAN|        AK|
|    ANC|           ANCHORAGE|        AK|
|    BAR|BAKER AAF - BAKER...|        AK|
|    DAC|       DALTONS CACHE|        AK|
|    PIZ|DEW STATION PT LA...|        AK|
|    DTH|        DUTCH HARBOR|        AK|
|    EGL|               EAGLE|        AK|
|    FRB|           FAIRBANKS|        AK|
|    HOM|               HOMER|        AK|
|    HYD|               HYDER|        AK|
+-------+--------------------+----------+
only showing top 10 rows



In [24]:
# write to parquet
port_df.write.mode("overwrite").parquet("./data/port.parquet")

In [25]:
# Visa file
# define schema
visa_schema = StructType([
    StructField("visa_id", StringType(), True),
    StructField("visa_type", StringType(), True)])
visa_df=spark.read.csv("./visa.txt", sep='=', header=False, schema=visa_schema)
visa_df.show()

+-------+---------+
|visa_id|visa_type|
+-------+---------+
|     1 | Business|
|     2 | Pleasure|
|     3 |  Student|
+-------+---------+



In [26]:
visa_df = visa_df.withColumn("visa_id", trim(regexp_replace("visa_id","[']",''))) \
                 .withColumn("visa_type", trim(regexp_replace("visa_type","[']",'')))
visa_df.show()

+-------+---------+
|visa_id|visa_type|
+-------+---------+
|      1| Business|
|      2| Pleasure|
|      3|  Student|
+-------+---------+



In [27]:
# write to parquet
visa_df.write.mode("overwrite").parquet("./data/visa.parquet")

In [28]:
# Read in the immigration data here
immigration_data_sample=spark.read.parquet("sas_data")

# Display dataset columns
immigration_data_sample.columns



['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [29]:
immigration_data_sample.show(10)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [30]:
# Preliminary removal of empty first field and fields where CIC does not use
immigration_data_sample.select("cicid","i94yr","i94mon","i94cit","i94res","i94port","arrdate","i94mode","i94addr","depdate" \
                               ,"i94bir","i94visa","count","matflag","biryear","gender","insnum","airline","admnum" \
                               , "fltno", "visatype").show()


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|matflag|biryear|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|      M| 1976.0|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  1.0|      M| 1984.0|     F|  null|     VA|9.495562283E10|00007|      B1|
|5748519.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     WA|20582.0|  29.0|    1.0|  1.0|      M| 1987.0|     M|  null|     DL|9.495640653E10|00

In [31]:
immigration_data_sample.select("i94mon").distinct().show()
immigration_data_sample.select("i94yr").distinct().show()

+------+
|i94mon|
+------+
|   4.0|
+------+

+------+
| i94yr|
+------+
|2016.0|
+------+



In [32]:
# remove year and month as they are duplicated throughout since this is snapshot data at this point in time
immigration_data_sample = immigration_data_sample.select(col("cicid").cast(IntegerType()),col("i94cit").cast(IntegerType()),col("i94res").cast(IntegerType()),"i94port"\
                               ,col("arrdate").cast(IntegerType()),col("i94mode").cast(IntegerType()),"i94addr",col("depdate").cast(IntegerType()) \
                               ,"i94bir",col("i94visa").cast(IntegerType()),"count","matflag","biryear","gender","insnum","airline","admnum" \
                               , "fltno", "visatype")

immigration_data_sample.show()

+-------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+
|  cicid|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|matflag|biryear|gender|insnum|airline|        admnum|fltno|visatype|
+-------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+
|5748517|   245|   438|    LOS|  20574|      1|     CA|  20582|  40.0|      1|  1.0|      M| 1976.0|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518|   245|   438|    LOS|  20574|      1|     NV|  20591|  32.0|      1|  1.0|      M| 1984.0|     F|  null|     VA|9.495562283E10|00007|      B1|
|5748519|   245|   438|    LOS|  20574|      1|     WA|  20582|  29.0|      1|  1.0|      M| 1987.0|     M|  null|     DL|9.495640653E10|00040|      B1|
|5748520|   245|   438|    LOS|  20574|      1|     WA|  20588|  29.0|      1|  1.

In [33]:
# Check for duplicates after removing columns
immigration_data_sample.count(), immigration_data_sample.dropDuplicates().count()

(3096313, 3096313)

In [34]:
immigration_data_sample.show()


+-------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+
|  cicid|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|matflag|biryear|gender|insnum|airline|        admnum|fltno|visatype|
+-------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+
|5748517|   245|   438|    LOS|  20574|      1|     CA|  20582|  40.0|      1|  1.0|      M| 1976.0|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518|   245|   438|    LOS|  20574|      1|     NV|  20591|  32.0|      1|  1.0|      M| 1984.0|     F|  null|     VA|9.495562283E10|00007|      B1|
|5748519|   245|   438|    LOS|  20574|      1|     WA|  20582|  29.0|      1|  1.0|      M| 1987.0|     M|  null|     DL|9.495640653E10|00040|      B1|
|5748520|   245|   438|    LOS|  20574|      1|     WA|  20588|  29.0|      1|  1.

In [35]:
# join immigration_data_sample && port_df
i94_imm_df = immigration_data_sample.join(port_df, immigration_data_sample.i94port == port_df.port_id,how='left').drop(col("port_id"))

us_df.show()

## join immigration_data_sample && us_df (set us_df city to upper to match immigration df)
i94_imm_df=i94_imm_df.join(us_df, (i94_imm_df.port_city == upper(us_df.city)) & (i94_imm_df.port_state == us_df.state_code), how='left')
i94_imm_df.where(col("city").isNotNull()).show(10)

+---------------+----------+--------------+----------+---------------+-----------------+----------------+--------+------------+------------------+-------+------+------+--------+-------+
|           city|state_code|         state|median_age|male_population|female_population|total_population|veterans|foreign_born|avg_household_size|natives| asian| black|hispanic|  white|
+---------------+----------+--------------+----------+---------------+-----------------+----------------+--------+------------+------------------+-------+------+------+--------+-------+
|   Johnson City|        TN|     Tennessee|      38.2|          31019|            34350|           65369|    5038|        2878|              2.18|    400|  1877|  6016|    1114|  59147|
|   San Clemente|        CA|    California|      45.2|          34076|            31456|           65532|    3970|        8109|              2.64|   null|  4296|   251|   10421|  58849|
|   Redwood City|        CA|    California|      37.1|          42676|

In [36]:
# Note that from above, some demographic data may be missing. This is due to the values not existing in the I94 Labels Description file
# Drop city and state code for consistency as we are using the values from the dimension table port_df
i94_imm_df=i94_imm_df.drop("city", "state_code")

In [37]:
# convert SAS date numeric field to generate date table
to_date = udf(lambda movement_date: (datetime(1960, 1, 1).date() + timedelta(movement_date)).isoformat() if movement_date else None)
i94_imm_df = i94_imm_df.withColumn("us_arr_date", to_date(i94_imm_df.arrdate)) \
                       .withColumn("us_dep_date", to_date(i94_imm_df.depdate))
i94_imm_df.where(col("state").isNotNull()).show(10)

+-------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+---------+----------+--------+----------+---------------+-----------------+----------------+--------+------------+------------------+-------+-----+-----+--------+------+-----------+-----------+
|  cicid|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|matflag|biryear|gender|insnum|airline|        admnum|fltno|visatype|port_city|port_state|   state|median_age|male_population|female_population|total_population|veterans|foreign_born|avg_household_size|natives|asian|black|hispanic| white|us_arr_date|us_dep_date|
+-------+------+------+-------+-------+-------+-------+-------+------+-------+-----+-------+-------+------+------+-------+--------------+-----+--------+---------+----------+--------+----------+---------------+-----------------+----------------+--------+------------+------------------+-------+-----+-----+---

In [38]:
# extract columns to create time table
#year, month, dayofmonth, hour, weekofyear, dayofweek
i94_arr_date = i94_imm_df.select(col("arrdate").alias("date_sas"), col("us_arr_date").alias("date_iso")) \
                                    .withColumn('day', dayofmonth('date_iso')) \
                                    .withColumn('week', weekofyear('date_iso')) \
                                    .withColumn('month', month('date_iso')) \
                                    .withColumn('year', year('date_iso')) \
                                    .withColumn('weekday', dayofweek('date_iso')) \
                                    .dropDuplicates()
i94_arr_date.show()                          

+--------+----------+---+----+-----+----+-------+
|date_sas|  date_iso|day|week|month|year|weekday|
+--------+----------+---+----+-----+----+-------+
|   20555|2016-04-11| 11|  15|    4|2016|      2|
|   20545|2016-04-01|  1|  13|    4|2016|      6|
|   20574|2016-04-30| 30|  17|    4|2016|      7|
|   20550|2016-04-06|  6|  14|    4|2016|      4|
|   20570|2016-04-26| 26|  17|    4|2016|      3|
|   20556|2016-04-12| 12|  15|    4|2016|      3|
|   20561|2016-04-17| 17|  15|    4|2016|      1|
|   20549|2016-04-05|  5|  14|    4|2016|      3|
|   20571|2016-04-27| 27|  17|    4|2016|      4|
|   20547|2016-04-03|  3|  13|    4|2016|      1|
|   20548|2016-04-04|  4|  14|    4|2016|      2|
|   20553|2016-04-09|  9|  14|    4|2016|      7|
|   20560|2016-04-16| 16|  15|    4|2016|      7|
|   20569|2016-04-25| 25|  17|    4|2016|      2|
|   20546|2016-04-02|  2|  13|    4|2016|      7|
|   20573|2016-04-29| 29|  17|    4|2016|      6|
|   20567|2016-04-23| 23|  16|    4|2016|      7|


In [39]:
i94_dep_date = i94_imm_df.select(col("depdate").alias("date_sas"), col("us_dep_date").alias("date_iso")) \
                                    .withColumn('day', dayofmonth('date_iso')) \
                                    .withColumn('week', weekofyear('date_iso')) \
                                    .withColumn('month', month('date_iso')) \
                                    .withColumn('year', year('date_iso')) \
                                    .withColumn('weekday', dayofweek('date_iso')) \
                                    .dropDuplicates()
i94_dep_date.show()          

+--------+----------+---+----+-----+----+-------+
|date_sas|  date_iso|day|week|month|year|weekday|
+--------+----------+---+----+-----+----+-------+
|   15176|2001-07-20| 20|  29|    7|2001|      6|
|   19095|2012-04-12| 12|  15|    4|2012|      5|
|   45427|2084-05-16| 16|  20|    5|2084|      3|
|   20701|2016-09-04|  4|  35|    9|2016|      1|
|   20443|2015-12-21| 21|  52|   12|2015|      2|
|   20201|2015-04-23| 23|  17|    4|2015|      5|
|   20673|2016-08-07|  7|  31|    8|2016|      1|
|   20692|2016-08-26| 26|  34|    8|2016|      6|
|   20670|2016-08-04|  4|  31|    8|2016|      5|
|   20524|2016-03-11| 11|  10|    3|2016|      6|
|   20651|2016-07-16| 16|  28|    7|2016|      7|
|   20555|2016-04-11| 11|  15|    4|2016|      2|
|   20588|2016-05-14| 14|  19|    5|2016|      7|
|   20650|2016-07-15| 15|  28|    7|2016|      6|
|   20706|2016-09-09|  9|  36|    9|2016|      6|
|   20517|2016-03-04|  4|   9|    3|2016|      6|
|   20545|2016-04-01|  1|  13|    4|2016|      6|


In [40]:
i94_dates = i94_arr_date.union(i94_dep_date).dropDuplicates()

i94_dates.columns
i94_dates.show(10)

+--------+----------+---+----+-----+----+-------+
|date_sas|  date_iso|day|week|month|year|weekday|
+--------+----------+---+----+-----+----+-------+
|   15176|2001-07-20| 20|  29|    7|2001|      6|
|   19095|2012-04-12| 12|  15|    4|2012|      5|
|   45427|2084-05-16| 16|  20|    5|2084|      3|
|   20701|2016-09-04|  4|  35|    9|2016|      1|
|   20443|2015-12-21| 21|  52|   12|2015|      2|
|   20201|2015-04-23| 23|  17|    4|2015|      5|
|   20673|2016-08-07|  7|  31|    8|2016|      1|
|   20692|2016-08-26| 26|  34|    8|2016|      6|
|   20670|2016-08-04|  4|  31|    8|2016|      5|
|   20524|2016-03-11| 11|  10|    3|2016|      6|
+--------+----------+---+----+-----+----+-------+
only showing top 10 rows



In [41]:
i94_dates.write.mode("overwrite").partitionBy("month", "year").parquet("./data/dates.parquet")

In [42]:
date_columns_to_drop = ['us_arr_date', 'us_dep_date']

i94_imm_df = i94_imm_df.drop(*date_columns_to_drop)

# withColumn("yearTmp", df.year.cast(IntegerType)) recast integer columns
i94_imm_df.columns

['cicid',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'matflag',
 'biryear',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype',
 'port_city',
 'port_state',
 'state',
 'median_age',
 'male_population',
 'female_population',
 'total_population',
 'veterans',
 'foreign_born',
 'avg_household_size',
 'natives',
 'asian',
 'black',
 'hispanic',
 'white']

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

![Schema](./Capstone_Diagram.png)

The star schema model was implemented and its the most widely used approach to develop dimensional data marts.The fact table of i94_imm is linked to 5 dimension tables: <br>
1) dates <br>
2) visa <br>
3) cit_res <br>
4) port <br>
5) mode <br>


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model <br>
1) Extract data <br>
2) Clean data (duplicates) <br>
3) Transform columns where applicable <br>
4) Create dimension tables with cleaned transformed data <br>
5) Create fact table <i>i94_imm</i> from cleaned demographic dataframe and cleaned i94 immigration dataframe

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [43]:
# Write code here

# Write to new i94_imm_df to parquet
i94_imm_df.write.mode('overwrite').parquet("./data/i94_imm.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [51]:
# Perform quality checks here
# Table not empty check
def table_not_empty(dataframe):
    return dataframe.count() > 0

# Fact table
#print("i94_imm")
table_not_empty(i94_imm_df)
# Dim tables
#print("dates")
table_not_empty(i94_dates)
#print("visa")
table_not_empty(visa_df)
#print("cit_res")
table_not_empty(cit_res_df)
#print("port")
table_not_empty(port_df)
#print("mode")
table_not_empty(mode_df)

True

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
<br>
<br><b>dates</b>
<li>date_sas : identity key. Arrival and departure date of immigration file</li>
<li>date_iso : Date format yyyy/MM/dd</li>
<li>day : day of the week</li>
<li>week : week number</li>
<li>month : month</li>
<li>year : year</li>
<li>weekday : 1 = Monday, 7 = Sunday</li>

<br>
<br><b>visa</b>
<li>visa_id : identity key. Visa number indicating reason of entry</li>
<li>visa_type : Reason for entry to US</li>

<br>
<br><b>cit_res</b>
<li>cit_res : identity key. Countries alphabetic code</li>
<li>visa_type : Countries name</li>

<br>
<br><b>port</b>
<li>port_id : identity key.Port indicating place of disembarkment</li>
<li>port_city : US City</li>
<li>port_state : US State</li>

<br>
<br><b>mode</b>
<li>mode_id : identity key.Type of transportation code</li>
<li>mode_type : Transportation type</li>



#### Step 5: Complete Project Write Up
* <b>Clearly state the rationale for the choice of tools and technologies for the project.</b>
<br>Apache Spark was used to read,transform, clean and load to parquet. This is due to Spark's capabilities for processing big datasets. Information is parquet as we aim to have columnar storage for more efficient storage and processing and allows us to save data on AWS S3. As both Spark and parquet format are supported by Amazon AWS, which allows us to shift our solution to the cloud, these were the optimal choice for the project.
* <b>Propose how often the data should be updated and why.</b>
<br>Other than the date dimension table, the code values provided by SAS need not be updated unless new code values or new column definitions are created.
<br>We have earlier identified that i94 immigration files are monthly. As such,the i94 immigration files as well as the date dimension table would require monthly updates for data accuracy.
* <b>Write a description of how you would approach the problem differently under the following scenarios: </b>
 * <b>The data was increased by 100x. </b>
 <br>AWS allows easily scaling of resources. When using Spark, we can increase the worker nodes to cater to the increase in data.
 * <b>The data populates a dashboard that must be updated on a daily basis by 7am every day. </b>
 <br>Using Apache Airflow, we can schedule DAGs to perform tasks at specific timings, set cut off time and frequencies. After the DAG run is successful, a notification can then be configured to be sent to the dashboard.
 * <b>The database needed to be accessed by 100+ people. </b>
 <br>Utilising AWS RDS, we can scale our database needs accordingly. Similarly, having a read replica with R