# Project Title
### Data Engineering Capstone Project

#### Project Summary

In our research company, Data Scientists are tasked to observe tourism behaviors and called on the Data Engineers to clean, process and develop data model (star schema) that would be the starting point of long-term project (of more data collection and experimenting) that will allow them to hypothesis relationships or patterns between the cities that non-immigrants visited and the cities demographics. 
We will create dimensional (and fact) tables and saved as parquet files for star schema model on I94 non-immigrants ports of entires data and US port city demographics data which will be a data model for queries on non-immigrants entering US to observe relationships between non-immigrants profiles (age, country of origin, seasons or holidays visited, reason for visiting, etc) and the cities demographics. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()

from pyspark.sql.functions import first
from pyspark.sql.functions import upper, col
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
from pyspark.sql.functions import udf, date_format
import datetime as dt



### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc.
#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?

##### The I94 immigration data comes from the US National Tourism and Trade Office. This data was already provided inside sas_data folder in parquet file type. We will use the following records:

1) i94Bir - Age of non-immigrant in years

2) admnum - Admission Number

3) i94res - 3 digit code of nationality

4) i94port - 3 character code of destination USA city

5) arrdate - arrival date in the USA (SAS date numeric field)

6) i94mode - 1 digit mode (plane, boat, etc) of travel code

7) i94visa - reason for immigration

8) gender - Non-immigrant sex

9) depdate - departure date in the USA (SAS date numeric field)

10) count - Used for summary statistics 

##### U.S. City Demographic Data comes from OpenSoft in csv file format and will be converted to json. We will use the following records:

1) City (USA)

2) Male Population

3) Female Population

4) Median Age (overall median age within the city population)

5) Total Population

6) Foreign-Born (number of foreign born residences)

7) State Code (USA state abbreviation of the City column)

8) Race (specifies race category suchas Asian, Alaskan Indian, Black, Hispanic, etc)

9) Count (number of people under specific race category anotated by Race column)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data:

##### US CITIES DEMOGRAPHICS DATA SET (CLEANING AND TRANSFORMATION)
We need to Transform the US Demo dataset because it contains duplicate columns.
We need to pivot the Race column to convert Race categories into individual columns.
We first separate the US demo dataset into two datasets from both Race and Count columns. 
We will have two datasets (`US` and `US RACE COUNT`). `US RACE COUNT` dataset will also include 
City and State Code columns so we can use them to join back both datasets after we first 
remove duplicate rows from the `US` dataset and pivot the `US Race Count` dataset. In theory both
datasets should have equal rows to join them back to original transformed state.

In [2]:
# Read US Cities Demo dataset file
us_spark=spark.read.csv("./data/us-cities-demographics.csv", sep=';', header=True)

In [3]:
# Check columns of dataset
us_spark.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [4]:
# Check us_spark dataset for duplicate rows and which columns cause the duplicates
us_spark.select("City","state","Median Age","male population","female population","total population", \
                  "foreign-born","Average Household Size").orderBy("city").show()


+-------+----------+----------+---------------+-----------------+----------------+------------+----------------------+
|   City|     state|Median Age|male population|female population|total population|foreign-born|Average Household Size|
+-------+----------+----------+---------------+-----------------+----------------+------------+----------------------+
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|
|  Akron|      Ohio|      38.1|          96886| 

In [5]:
# Check subset of `US` dataset that maybe causing dupliate rows
us_spark.select("city","state code","Race","count").orderBy("city").show()

+-------+----------+--------------------+------+
|   city|state code|                Race| count|
+-------+----------+--------------------+------+
|Abilene|        TX|American Indian a...|  1813|
|Abilene|        TX|  Hispanic or Latino| 33222|
|Abilene|        TX|               White| 95487|
|Abilene|        TX|               Asian|  2929|
|Abilene|        TX|Black or African-...| 14449|
|  Akron|        OH|               White|129192|
|  Akron|        OH|  Hispanic or Latino|  3684|
|  Akron|        OH|Black or African-...| 66551|
|  Akron|        OH|               Asian|  9033|
|  Akron|        OH|American Indian a...|  1845|
|Alafaya|        FL|  Hispanic or Latino| 34897|
|Alafaya|        FL|               Asian| 10336|
|Alafaya|        FL|               White| 63666|
|Alafaya|        FL|Black or African-...|  6577|
|Alameda|        CA|               White| 44232|
|Alameda|        CA|American Indian a...|  1329|
|Alameda|        CA|Black or African-...|  7364|
|Alameda|        CA|

#### Start transformation for US CITIES DEMOGRAPHICS DATA SET:
*CLEANING DATA SET SUMMARY NOTED BY STEPS:*
1. 'Race' and 'Count' records are causing duplicate rows.
    We will separate them from US demographics data set and include 'City' and 'State Code'.
2. 'Race' will be pivoted to column headers and saved to us_race_cnt dataset
3. US dataset will be cleaned of duplicate rows (shown above).
4. Cleaned US dataset will be joined back with us_race_cnt dataset
to eventually have unique rows.

In [6]:
# Creating 'us_race_cnt' dataset
us_race_cnt=(us_spark.select("city","state code","Race","count")
    .groupby(us_spark.City, "state code")
    .pivot("Race")
    .agg(first("Count")))

In [7]:
# Checking dataset
us_race_cnt.orderBy("city").show()

+------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|        City|state code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|     Abilene|        TX|                             1813| 2929|                    14449|             33222| 95487|
|       Akron|        OH|                             1845| 9033|                    66551|              3684|129192|
|     Alafaya|        FL|                             null|10336|                     6577|             34897| 63666|
|     Alameda|        CA|                             1329|27984|                     7364|              8265| 44232|
|      Albany|        NY|                             1611| 8090|                    31303|              9368| 58368|
|      Albany|        GA|                              4

In [8]:
# Comparing both datasets after dropping duplicate rows
(us_race_cnt.count(), us_race_cnt.dropDuplicates().count())

(596, 596)

In [9]:
uscols=["Number of Veterans","Race","Count"]

In [10]:
# Drop columns we don't need and drop duplicate rows
us=us_spark.drop(*uscols).dropDuplicates()

In [11]:
# Comparing row count between original and new dataset with dropped duplicate rows
(us_spark.count(), us.count())
# We can see that new cleaned 'us' dataset now matches number of rows with 'us_race_cnt' dataset
# which will be joined together into one 'us' dataset

(2891, 596)

In [12]:
# Checking number of records after joining both data sets
us.join(us_race_cnt, ["city","state code"]).count()
# We now see total rows match both datasets after joining them

596

In [13]:
# Checking data sample
us.join(us_race_cnt, ["city","state code"]).orderBy("city").show()

+------------+----------+------------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|        City|State Code|       State|Median Age|Male Population|Female Population|Total Population|Foreign-born|Average Household Size|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+------------+----------+------------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|     Abilene|        TX|       Texas|      31.3|          65212|            60664|          125876|        8129|                  2.64|                             1813| 2929|                    14449|             33222| 95487|
|       Akron|        OH|        Ohio|      38.1|          96886|           100667| 

In [14]:
# Finally saving (committing) joined US dataset
us=us.join(us_race_cnt, ["city","state code"])

In [15]:
# Another check
(us.count(), us.show(5))

+---------------+----------+-----------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|           City|State Code|      State|Median Age|Male Population|Female Population|Total Population|Foreign-born|Average Household Size|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+---------------+----------+-----------+----------+---------------+-----------------+----------------+------------+----------------------+---------------------------------+-----+-------------------------+------------------+------+
|Highlands Ranch|        CO|   Colorado|      39.6|          49186|            53281|          102467|        8827|                  2.72|                             1480| 5650|                     1779|              8393| 94499|
|           Kent|        WA| Washington|      33.4|          61825|         

(596, None)

In [16]:
# Change `state code` column name to `state_code` and other similar problems to avoid parquet complications
us=us.select('City', col('State Code').alias('State_Code'), 'State', col('Median Age').alias('Median_age'),
     col('Male Population').alias('Male_Pop'), col('Female Population').alias('Fem_Pop'), 
        col('Total Population').alias('Ttl_Pop'), 'Foreign-born', 
          col('Average Household Size').alias('Avg_Household_Size'),
             col('American Indian and Alaska Native').alias('Native_Pop'), 
                 col('Asian').alias('Asian_Pop'), 
                    col('Black or African-American').alias('Black_Pop'), 
                      col('Hispanic or Latino').alias('Latino_Pop'), 
                        col('White').alias('White_Pop'))

In [17]:
us.show(2)

+---------------+----------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|           City|State_Code|     State|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+---------------+----------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|Highlands Ranch|        CO|  Colorado|      39.6|   49186|  53281| 102467|        8827|              2.72|      1480|     5650|     1779|      8393|    94499|
|           Kent|        WA|Washington|      33.4|   61825|  65137| 126962|       38175|              3.06|      3651|    26168|    20450|     21928|    67918|
+---------------+----------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
only showing top 2 rows



In [18]:
# Drop the `state` column
us=us.drop("state")

In [19]:
us.columns

['City',
 'State_Code',
 'Median_age',
 'Male_Pop',
 'Fem_Pop',
 'Ttl_Pop',
 'Foreign-born',
 'Avg_Household_Size',
 'Native_Pop',
 'Asian_Pop',
 'Black_Pop',
 'Latino_Pop',
 'White_Pop']

In [20]:
# Now write (and overwrite) transformed `US` dataset onto parquet file
us.write.mode('overwrite').parquet("./data/us_cities_demographics.parquet")

#### I94 NON-IMMIGRATION DATA SET (CLEANING AND TRANSFORMATION)

In [21]:
# Read i94 non-immigration dataset
i94_spark=spark.read.parquet("sas_data")

In [22]:
i94_spark.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [23]:
i94_spark.select("i94res","i94port","arrdate","i94mode","depdate","i94bir","i94visa","count" \
                  ,"gender",col("admnum").cast(LongType())).show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
| 438.0|    LOS|20574.0|    1.0|20582.0|  40.0|    1.0|  1.0|     F|94953870030|
| 438.0|    LOS|20574.0|    1.0|20591.0|  32.0|    1.0|  1.0|     F|94955622830|
| 438.0|    LOS|20574.0|    1.0|20582.0|  29.0|    1.0|  1.0|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



Converted numbers longtype and integrtype:

In [24]:
i94_spark=i94_spark.select(col("i94res").cast(IntegerType()),col("i94port"),
                           col("arrdate").cast(IntegerType()), \
                           col("i94mode").cast(IntegerType()),col("depdate").cast(IntegerType()),
                           col("i94bir").cast(IntegerType()),col("i94visa").cast(IntegerType()), 
                           col("count").cast(IntegerType()), \
                              "gender",col("admnum").cast(LongType()))

In [25]:
i94_spark.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   438|    LOS|  20574|      1|  20582|    40|      1|    1|     F|94953870030|
|   438|    LOS|  20574|      1|  20591|    32|      1|    1|     F|94955622830|
|   438|    LOS|  20574|      1|  20582|    29|      1|    1|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



Check for duplicate rows on each dataset by comparing original total rows 
with .dropDuplicates()

In [26]:
i94_spark.count(), i94_spark.dropDuplicates().count()

(3096313, 3096302)

In [27]:
i94_spark.dropDuplicates(['admnum']).count()

3075579

We see above that 'admnum' may not be unique which we presummed.
We will only drop duplicate rows instead of dropping duplicates by only `admnum` and let so we will have unique rows.

In [28]:
# We will drop duplicate rows and save it as final dataset for i94
i94_spark=i94_spark.dropDuplicates()

In [29]:
i94_spark.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   582|    XXX|  20557|   null|  20558|    34|      2|    1|  null|91904214530|
|   209|    AGA|  20552|      1|   null|  null|      2|    1|     M|47842155333|
|   209|    ATL|  20571|      1|   null|  null|      2|    1|     M|44537883633|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



We will save `i94_spark` dataframe to parquet file after we join `i94port` and `us` dataframes which you will see later.

#### Performing cleaning tasks for i94 code lists and creating dimension tables

The I94_SAS_Labels_Description.SAS file was used to create master i94 code lists by creating text files 
and then cleaning whitespaces and quotations; converting them to python lists. Then finally saving them as parquet files. These files will be used as master record attribute files because they do not frequently change.
>Note: There were some manual data cleaning done by hand inside the text files before processing them. 
Any code description with words No/Collapsed/INVALID/ALL were manually removed from the text files. 
Smaller lists of codes were also created by hand such as i94mode and i94visa.
Finally all master i94 code lists were written to parquets files. 
These parquet files will be used to join with `i94_spark`.

In [30]:
# Start processing the I9I94_SAS_Labels_Description.SAS to create master i94 code dimensions:

'''
/* I94MODE - There are missing values as well as not reported (9) */
	1 = 'Air'
	2 = 'Sea'
	3 = 'Land'
	9 = 'Not reported' ;
'''
# Create i94mode list
i94mode_data =[[1,'Air'],[2,'Sea'],[3,'Land'],[9,'Not reported']]

# Convert to spark dataframe
i94mode=spark.createDataFrame(i94mode_data)

# Create i94mode parquet file
i94mode.write.mode("overwrite").parquet('./data/i94mode.parquet')

In [31]:
# Read i94port text file
i94port_df = pd.read_csv('./data/i94port.txt',sep='=',names=['id','port'])

# Remove whitespaces and single quotes
i94port_df['id']=i94port_df['id'].str.strip().str.replace("'",'')

# Create two columns from i94port string: port_city and port_addr
# also remove whitespaces and single quotes
i94port_df['port_city'], i94port_df['port_state']=i94port_df['port'].str.strip().str.replace("'",'').str.strip().str.split(',',1).str

# Remove more whitespace from port_addr
i94port_df['port_state']=i94port_df['port_state'].str.strip()

# Drop port column and keep the two new columns: port_city and port_addr
i94port_df.drop(columns =['port'], inplace = True)

# Convert pandas dataframe to list (objects which had single quotes removed automatically become string again with single quotes)
i94port_data=i94port_df.values.tolist()

In [32]:
# Now convert list to spark dataframe
# Create a schema for the dataframe
i94port_schema = StructType([
    StructField('id', StringType(), True),
    StructField('port_city', StringType(), True),
    StructField('port_state', StringType(), True)
])
i94port=spark.createDataFrame(i94port_data, i94port_schema)

In [33]:
# Create parquet file
i94port.write.mode('overwrite').parquet('./data/i94port.parquet')

In [34]:
# Read i94res text file
i94res_df = pd.read_csv('./data/i94res_cit.txt',sep='=',names=['id','country'])
# Remove whitespaces and single quotes
i94res_df['country']=i94res_df['country'].str.replace("'",'').str.strip()
# Convert pandas dataframe to list (objects which had single quotes removed automatically become string again with single quotes)
i94res_data=i94res_df.values.tolist()

In [35]:
# Now convert list to spark dataframe
# Create a schema for the dataframe
i94res_schema = StructType([
    StructField('id', StringType(), True),
    StructField('country', StringType(), True)
])
i94res=spark.createDataFrame(i94res_data, i94res_schema)

In [36]:
# Create parquet file
i94res.write.mode('overwrite').parquet('./data/i94res.parquet')

In [37]:
'''/* I94VISA - Visa codes collapsed into three categories:
   1 = Business
   2 = Pleasure
   3 = Student
*/'''
i94visa_data = [[1, 'Business'], [2, 'Pleasure'], [3, 'Student']]

In [38]:
# Convert to spark dataframe
i94visa=spark.createDataFrame(i94visa_data)

In [39]:
# Create parquet file
i94visa.write.mode('overwrite').parquet('./data/i94visa.parquet')

In [40]:
us.show(3)

+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|           City|State_Code|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+---------------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|Highlands Ranch|        CO|      39.6|   49186|  53281| 102467|        8827|              2.72|      1480|     5650|     1779|      8393|    94499|
|           Kent|        WA|      33.4|   61825|  65137| 126962|       38175|              3.06|      3651|    26168|    20450|     21928|    67918|
|        Madison|        WI|      30.7|  122596| 126360| 248956|       30090|              2.23|      2296|    23937|    20424|     19697|   204302|
+---------------+----------+----------+--------+-------+-------+------------+------------------+----------

In [41]:
i94_spark.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum']

In [42]:
# Add i94port city and state columns to i94 dataframe
i94_spark.join(i94port, i94_spark.i94port==i94port.id, how='left').count()

3096302

In [43]:
# above count matches original i94 cleaned dataset `i94_spark`

In [44]:
# Commit i94_spark
i94_spark=i94_spark.join(i94port, i94_spark.i94port==i94port.id, how='left')

In [45]:
i94_spark.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---+---------+----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum| id|port_city|port_state|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---+---------+----------+
|   110|    BGM|  20550|      1|  20556|    36|      1|    1|     F|92847893530|BGM|   BANGOR|        ME|
|   108|    BGM|  20569|      1|  20571|    43|      1|    1|     M|59234302533|BGM|   BANGOR|        ME|
|   129|    BGM|  20559|      1|  20584|    67|      1|    1|     M|93553405030|BGM|   BANGOR|        ME|
|   261|    BGM|  20568|      1|   null|     9|      1|    1|     M|94455571030|BGM|   BANGOR|        ME|
|   135|    BGM|  20564|      1|  20569|    30|      2|    1|     M|94033156330|BGM|   BANGOR|        ME|
|   111|    BGM|  20563|      1|  20567|    56|      1|    1|     M|93951299930|BGM|   BANGOR|        ME|
|   582|    BGM|  20563|      1|  20575|    57

In [46]:
# Drop `id` column
i94_spark=i94_spark.drop("id")

In [47]:
i94_spark.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum',
 'port_city',
 'port_state']

In [48]:
us.columns

['City',
 'State_Code',
 'Median_age',
 'Male_Pop',
 'Fem_Pop',
 'Ttl_Pop',
 'Foreign-born',
 'Avg_Household_Size',
 'Native_Pop',
 'Asian_Pop',
 'Black_Pop',
 'Latino_Pop',
 'White_Pop']

In [49]:
# Join US with i94_spark to get fact table `i94non_immigrant_port_entry`
# NOTE: We use left join againt city records which may cause null values because
# we may not currently have demographic stats on all U.S. ports of entry
i94non_immigrant_port_entry=i94_spark.join(us, (upper(i94_spark.port_city)==upper(us.City)) & \
                                           (upper(i94_spark.port_state)==upper(us.State_Code)), how='left')

In [50]:
i94non_immigrant_port_entry.count()

3096302

In [51]:
i94non_immigrant_port_entry.columns

['i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'gender',
 'admnum',
 'port_city',
 'port_state',
 'City',
 'State_Code',
 'Median_age',
 'Male_Pop',
 'Fem_Pop',
 'Ttl_Pop',
 'Foreign-born',
 'Avg_Household_Size',
 'Native_Pop',
 'Asian_Pop',
 'Black_Pop',
 'Latino_Pop',
 'White_Pop']

In [52]:
# Drop City and State_Code
i94non_immigrant_port_entry=i94non_immigrant_port_entry.drop("City","State_Code")

In [53]:
i94non_immigrant_port_entry.show()

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|port_city|port_state|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+
|   103|    NEC|  20556|      3|  20557|    51|      2|    1|     F|  788711085|    NECHE|        ND|      null|    null|   null|   null|        null|              null|      null|     null|     null|      null|     null|
|   112|    NEC|  20573|      3|  20575|    32|      2|    1|     F|59477349333|    NECHE|        ND|      null|

Create the i94date dimension from the i94non_immigrant_port_entry dataframe:

In [55]:
# Convert SAS arrival date to datetime format
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
i94non_immigrant_port_entry = i94non_immigrant_port_entry.withColumn("arrival_date", get_date(i94non_immigrant_port_entry.arrdate))

In [56]:
i94non_immigrant_port_entry.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+------------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|port_city|port_state|Median_age|Male_Pop|Fem_Pop|Ttl_Pop|Foreign-born|Avg_Household_Size|Native_Pop|Asian_Pop|Black_Pop|Latino_Pop|White_Pop|arrival_date|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+---------+----------+----------+--------+-------+-------+------------+------------------+----------+---------+---------+----------+---------+------------+
|   103|    NEC|  20556|      3|  20557|    51|      2|    1|     F|  788711085|    NECHE|        ND|      null|    null|   null|   null|        null|              null|      null|     null|     null|      null|     null|  2016-04-12|
|   112|    NEC|  20573|      3|  20575|    32|      2|    1

In [57]:
i94date=i94non_immigrant_port_entry.select(col('arrdate').alias('arrival_sasdate'),
                                   col('arrival_date').alias('arrival_iso_date'),
                                   date_format('arrival_date','M').alias('arrival_month'),
                                   date_format('arrival_date','E').alias('arrival_dayofweek'), 
                                   date_format('arrival_date', 'y').alias('arrival_year'), 
                                   date_format('arrival_date', 'd').alias('arrival_day'),
                                  date_format('arrival_date','w').alias('arrival_weekofyear')).dropDuplicates()

In [58]:
# Drop arrival_date column from the i94non_immigrant_port_entry dataframe and finally save it to parquet file 
i94non_immigrant_port_entry.drop('arrival_date').write.mode("overwrite").parquet('./data/i94non_immigrant_port_entry.parquet')

In [59]:
i94date.count()

30

In [60]:
i94date.show(5)

+---------------+----------------+-------------+-----------------+------------+-----------+------------------+
|arrival_sasdate|arrival_iso_date|arrival_month|arrival_dayofweek|arrival_year|arrival_day|arrival_weekofyear|
+---------------+----------------+-------------+-----------------+------------+-----------+------------------+
|          20562|      2016-04-18|            4|              Mon|        2016|         18|                17|
|          20554|      2016-04-10|            4|              Sun|        2016|         10|                16|
|          20556|      2016-04-12|            4|              Tue|        2016|         12|                16|
|          20548|      2016-04-04|            4|              Mon|        2016|          4|                15|
|          20553|      2016-04-09|            4|              Sat|        2016|          9|                15|
+---------------+----------------+-------------+-----------------+------------+-----------+------------------+
o

In [61]:
# Create temporary sql table
i94date.createOrReplaceTempView("i94date_table")

In [62]:
# Add seasons to i94 date dimension table
i94date_season=spark.sql('''select arrival_sasdate,
                         arrival_iso_date,
                         arrival_month,
                         arrival_dayofweek,
                         arrival_year,
                         arrival_day,
                         arrival_weekofyear,
                         CASE WHEN arrival_month IN (12, 1, 2) THEN 'winter' 
                                WHEN arrival_month IN (3, 4, 5) THEN 'spring' 
                                WHEN arrival_month IN (6, 7, 8) THEN 'summer' 
                                ELSE 'autumn' 
                         END AS date_season from i94date_table''')

In [63]:
i94date_season.show(3)

+---------------+----------------+-------------+-----------------+------------+-----------+------------------+-----------+
|arrival_sasdate|arrival_iso_date|arrival_month|arrival_dayofweek|arrival_year|arrival_day|arrival_weekofyear|date_season|
+---------------+----------------+-------------+-----------------+------------+-----------+------------------+-----------+
|          20562|      2016-04-18|            4|              Mon|        2016|         18|                17|     spring|
|          20554|      2016-04-10|            4|              Sun|        2016|         10|                16|     spring|
|          20556|      2016-04-12|            4|              Tue|        2016|         12|                16|     spring|
+---------------+----------------+-------------+-----------------+------------+-----------+------------------+-----------+
only showing top 3 rows



In [64]:
# Save i94date dimension to parquet file partitioned by year and month:
i94date_season.write.mode("overwrite").partitionBy("arrival_year", "arrival_month").parquet('./data/i94date.parquet')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

Using the dimensional tables saved as parquet files we can implement them on any columnar database in Star Schema model.
Star Schema model was chosen because it will be easier for Data Analysts and Data Scientists
to understand and apply queries with best performance outcomes and flexibility.


#### U.S. Immigration and U.S. Ports Cities Demographics Data Model
<img src="./i94star_schema2.png">

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Install the packages:
    - pandas
    - datetime
    - pyspark.sql -> SparkSession -> "org.apache.hadoop:hadoop-aws:2.7.0"
    - pyspark.sql.functions -> first, upper, col, udf, date_format, expr
    - pyspark.sql.types -> StructField, StructType, StringType, LongType, IntegerType


2. Create the dimensions (i94port, i94visa, i94res, i94mode) from `i94_SAS_Labels_Descriptions.SAS` file. *NOTE: Once they're created it does not have to be included in future Data Pipeline schedules because these are essentially master records which do not frequently get added or changed on the dimension tables.

3. Read US Cities Demo dataset file to form `us_spark` dataframe

4. Create 'us_race_cnt' from `us_spark`

5. Drop columns we don't need and drop duplicate rows from `us_spark`

6. Join `us_spark` with us_race_cnt to form `US` data set

7. Change `state code` column name to `state_code` and other similar problems to avoid parquet complications

8. Drop the `state` column

9. Write (and overwrite) transformed `US` dataset onto parquet file

10. Read i94 non-immigration dataset to form `i94_spark` dataframe

11. Convert numbers to longtype and integertype

12. Drop duplicate rows

13. Read i94port dimension parquet file so we can use it to join with `i94_spark`. This will add i94port city and state columns to i94_spark dataframe

14. Drop `id` column from i94_spark dataframe

15. Join US with i94_spark to get fact table `i94non_immigrant_port_entry`

18. Add iso date format column `arrival_date` inside the `i94non_immigrant_port_entry` dataframe by using custom function.

17. Create `time` dimension from `i94non_immigrant_port_entry` and save to parquet file.

18. Drop `arrival_date` column from `i94non_immigrant_port_entry` and save it to parquet file.

19. Add seasons to i94date_seasons dataframe.

20. Save i94date_seasons to parquet file partitioned by year and month.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### The data pipeline is built inside the `ETL.py` file included with this Capstone Project.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [65]:
# Perform quality checks here

if us_spark.count() > 0:
    print('Passed reading data file.')
else:
    print('Seems to be nothing in file!')

if us.count() == us_race_cnt.count():
    print('Transformation went perfect.')
else:
    print('Inconsistant data between both dataframes!')

if i94_spark.count() > 0:
    print('Passed reading data file.')
else:
    print('Seems to be nothing in file!')
    
if i94_spark.count() == i94non_immigrant_port_entry.count():
    print('Transformation went perfect.')
else:
    print('Inconsistant data between both dataframes!')


Passed reading data file.
Transformation went perfect.
Passed reading data file.
Transformation went perfect.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

'''
i94date
---------------
arrival_sasdate int PK
    > Non-immigrant arrival date in the USA (SAS date numeric field) from `arrdate` i94 non-immigration data
arrival_isodate date
    > converted to iso date format (YYYY-MM-DD) from arrival_sasdate
arrival_month int
arrival_day int
arrival_year int
arrival_week int
    > week of year
arrival_season string
    > winter, spring, summer or autumn seasons

i94port
--------------
id string PK
    > i94 port 3 character code of destination USA city
city string
state string

i94visa
--------------
id int PK
    > i94 visa code number stating reason for immigration from i94 non-immigration data
reason string

i94res
--------------
id int PK
    > i94 3 digit code of nationality from i94 non-immigration data
country string

i94mode
--------------
id int PK
    > i94 1 digit mode (plane, boat, etc) of travel code from i94 non-immigration data
transport string

i94non_immigrant_port_entry
-----------------------------------
admnum int PK
    > Admission Number from i94 non-immigration data
arrdate int FK >- i94date.arrival_sasdate
    > arrival date in the USA (SAS date numeric field) from i94 non-immigration data
depdate int
    > Departure Date from the USA. It is a SAS date numeric field from i94 non-immigration data
port_id string FK >- i94port.id
    > 3 character code of destination USA city from i94 non-immigration data
visa_id int FK >- i94visa.id
    > reason for immigration from i94 non-immigration data
res_id int FK >- i94res.id
    > 3 digit code of nationality from i94 non-immigration data
mode_id int FK >- i94mode.id
    > 1 digit mode (plane, boat, etc) of travel code from i94 non-immigration data
age int
    > Age of non-immigrant in years from i94 non-immigration data
gender string
    > Non-immigrant sex from i94 non-immigration data
cnt_of_one int
    > count of one per row used for statistical metrics from i94 non-immigration data
median_age float
    > Median age of population in city and state from US cities demographics data
male_pop int
    > Male population of city and state from US cities demographics data
female_pop int
    > Female population of city and state from US cities demographics data
ttl_pop int
    > Total population of city and state from US cities demographics data
foreign_born_pop int
    > Foreign born population of city and state from US cities demographics data
avg_household_size float
    > Average household size of city and state from US cities demographics data
american_indian_alaskan_native_pop int
    > Aerican Indian population of city and state from US cities demographics data
asian_pop int
    > Asian population of city and state from US cities demographics data
black_african_american_pop int
    > Black population of city and state from US cities demographics data
hispanic_pop int
    > Hispanic population of city and state from US cities demographics data
white_pop int
    > White population of city and state from US cities demographics data
'''

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
>Because we are dealing with big data and cloud technologies for solutions it made economical sense to use opensource Apache PySpark and Python tools that can be easily ported over to cloud solution such as AWS.
* Propose how often the data should be updated and why.
> Dimenstion tables only have to be updated when a new category is created by I94. However, the I94 non-immigrant port of entry data along with the time dimension table (i94date) can be updated every month. The US Cities Demographics data is updated every ten years according to https://www.usa.gov/statistics. So, the new US Cities Demographics data set maybe coming after year 2020. And may need updating after one year or two years as of 2019.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 >Deploy this Spark solution on a cluster using AWS (EMR cluster) and use S3 for data and parquet file storage. AWS will easily scale when data increases by 100x
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 >Use Apache Airflow to schedule queries.
 * The database needed to be accessed by 100+ people.
 >The saved parquet files can be bulk copied over to AWS Redshift cluster where it can scale big data requirements and has 'massively parallel' and 'limitless concurrency' for thousands of concurrent queries executed by users.