# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
# Read in the temperature data into Pandas for exploration
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(fname, sep=',')

In [5]:
# Display first few entries of temperature data
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat') 


In [7]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")


In [8]:
df_spark.show()


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [9]:
# Clean I94 immigration data
import csv
# Create dictionary of valid i94port codes
i94port_valid_dict = {}

with open('valid_i94_port.csv', mode='r') as infile:
    reader = csv.reader(infile)
    i94port_valid_dict = {rows[0]:rows[1] for rows in reader}

    
i94city_valid_dict = {}

with open('i94_vaid_cities.csv', mode='r') as infile:
    reader = csv.reader(infile)
    i94city_valid_dict = {rows[0]:rows[1] for rows in reader}

# Filter out entries where i94port is invalid
df_clean_spark_i94 = df_spark.filter(df_spark.i94port.isin(list(i94port_valid_dict.keys())))


# Filter out entries where i94res is invalid
from pyspark.sql.types import IntegerType
df_clean_spark_i94 = df_clean_spark_i94.withColumn("i94res", df_clean_spark_i94["i94res"].cast(IntegerType()))
df_clean_spark_i94 = df_clean_spark_i94.filter(df_clean_spark_i94.i94res.isin(list(i94city_valid_dict.keys())))


# Filter out entries where I94address is not clearly called out, as it would create anamolies in state wise analytics
df_clean_spark_i94=df_clean_spark_i94[df_clean_spark_i94['i94addr']!=99]


# Filter out entries where I94MODE is not reported
df_clean_spark_i94=df_clean_spark_i94[df_clean_spark_i94['i94mode']!=9]


# Filter out entries where I94VISA,visatype and ftlno are Null
df_clean_spark_i94=df_clean_spark_i94.filter((df_clean_spark_i94.i94visa != 'NaN') &(df_clean_spark_i94.fltno != 'NaN')& (df_clean_spark_i94.visatype != 'NaN'))

# Show dataframe
df_clean_spark_i94.show()


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|5817117.0|2016.0|   4.0| 582.0|   582|    HOU|20574.0|    1.0|      0|20575.0|  52.0|    2.0|  1.0|20160430|    null| null|      K|      O|   null|      M| 1964.0|10302016|     M|  null|     UA| 9.496104673E10|00428|      B2|
| 460015.0|2016.0|   4.0| 135.0|   135|    ATL|20547.0|    1.0|     71|20568.0|  34.0|    2.

In [10]:
# Clean temperature data
df_clean_spark_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
valid_i94_port_df=spark.read.format("csv").option("header", "true").load("valid_i94_port.csv")
valid_i94_port_df.show()

# Filter out entries with NaN average temperature
df_clean_spark_temp=df_clean_spark_temp.filter(df_clean_spark_temp.AverageTemperature != 'NaN')



#df_clean_spark_temp =df_clean_spark_temp[df_clean_spark_temp['City']=='Perth']
df_clean_spark_temp.show()


+-------+--------------------+-----+----+
|i94port|          valid_city|state| _c3|
+-------+--------------------+-----+----+
|    ALC|               ALCAN|  AK |null|
|    ANC|           ANCHORAGE|   AK|null|
|    BAR|BAKER AAF - BAKER...|   AK|null|
|    DAC|       DALTONS CACHE| AK  |null|
|    PIZ|DEW STATION PT LA...|   AK|null|
|    DTH|        DUTCH HARBOR|   AK|null|
|    EGL|               EAGLE|  AK |null|
|    FRB|           FAIRBANKS|   AK|null|
|    HOM|               HOMER|   AK|null|
|    HYD|               HYDER|  AK |null|
|    JUN|              JUNEAU|   AK|null|
|    5KE|           KETCHIKAN|   AK|null|
|    KET|           KETCHIKAN|   AK|null|
|    MOS|MOSES POINT INTER...|   AK|null|
|    NIK|             NIKISKI| AK  |null|
|    NOM|                 NOM|   AK|null|
|    PKC|         POKER CREEK|  AK |null|
|    ORI|      PORT LIONS SPB|   AK|null|
|    SKA|             SKAGWAY| AK  |null|
|    SNP|     ST. PAUL ISLAND|   AK|null|
+-------+--------------------+----

In [11]:
# Add iport94 code based on city name

import pyspark.sql.functions as f
df_clean_spark_temp = df_clean_spark_temp.withColumn("City",f.lower(f.col("City")))
valid_i94_port_df = valid_i94_port_df.withColumn("valid_city",f.lower(f.col("valid_city")))

df_clean_spark_temp=df_clean_spark_temp.join(valid_i94_port_df,valid_i94_port_df.valid_city.contains(df_clean_spark_temp.City),how="inner")

# Show results
df_clean_spark_temp.show()

+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+----------+-----+----+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|       Country|Latitude|Longitude|i94port|valid_city|state| _c3|
+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+----------+-----+----+
|1743-11-01|             8.758|                        1.886|aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|  aberdeen|  WA |null|
|1744-04-01|6.0699999999999985|           2.9339999999999997|aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|  aberdeen|  WA |null|
|1744-05-01|             7.751|                        1.494|aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|  aberdeen|  WA |null|
|1744-06-01|             10.62|                        1.574|aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|  aberdeen|  WA |null|
|1744-07-01|             12.35|                        

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [12]:
# Extract columns for immigration fact table
df_clean_spark_i94 = df_clean_spark_i94.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write immigration fact table to parquet files partitioned by i94port
df_clean_spark_i94.write.mode("append").partitionBy("i94port").parquet("/home/workspace//results/immigration.parquet")

# Show dataframe
df_clean_spark_i94.show()


+------+------+------+-------+-------+-------+-------+-------+
| i94yr|i94mon|i94cit|i94port|arrdate|i94mode|depdate|i94visa|
+------+------+------+-------+-------+-------+-------+-------+
|2016.0|   4.0| 582.0|    HOU|20574.0|    1.0|20575.0|    2.0|
|2016.0|   4.0| 135.0|    ATL|20547.0|    1.0|20568.0|    2.0|
|2016.0|   4.0| 135.0|    SFR|20547.0|    1.0|20553.0|    1.0|
|2016.0|   4.0| 111.0|    CHI|20548.0|    1.0|20550.0|    1.0|
|2016.0|   4.0| 135.0|    WAS|20548.0|    1.0|20552.0|    1.0|
|2016.0|   4.0| 148.0|    ATL|20548.0|    1.0|20551.0|    1.0|
|2016.0|   4.0| 148.0|    CHI|20548.0|    1.0|20552.0|    1.0|
|2016.0|   4.0| 213.0|    NYC|20548.0|    1.0|20549.0|    2.0|
|2016.0|   4.0| 582.0|    HOU|20548.0|    1.0|20551.0|    1.0|
|2016.0|   4.0| 111.0|    NYC|20549.0|    1.0|20559.0|    2.0|
|2016.0|   4.0| 111.0|    NYC|20549.0|    1.0|20559.0|    2.0|
|2016.0|   4.0| 124.0|    TAM|20549.0|    1.0|20551.0|    1.0|
|2016.0|   4.0| 209.0|    DEN|20554.0|    1.0|20560.0| 

In [13]:
# Create the dimension table 
df_clean_spark_temp = df_clean_spark_temp.select(["dt","AverageTemperature","City", "Country", "Latitude", "Longitude", "i94port"]).dropDuplicates()
#df_clean_spark_temp = df_clean_spark_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude","i94port"]).distinct()

# Write dimension table to parquet files
df_clean_spark_temp.write.parquet("/home/workspace//results/temperature.parquet")
# Dimension dataframe show
df_clean_spark_temp.show()

+----------+------------------+---------+--------------------+--------+---------+-------+
|        dt|AverageTemperature|     City|             Country|Latitude|Longitude|i94port|
+----------+------------------+---------+--------------------+--------+---------+-------+
|1758-08-01|            13.167| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1819-03-01|             5.155| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1823-12-01|             5.776| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1854-05-01| 8.229000000000001| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1862-07-01|            10.848| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1875-12-01|             5.728| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1886-04-01|             5.282| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1918-12-01|6.9849999999999985| aberdeen|      United Kingdom|  57.05N|    1.48W|    ABE|
|1931-07-0

In [14]:
#Create us-cities-demographics dimension table 
from pyspark.sql.functions import col
df_clean_us_cities_demo=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
df_clean_us_cities_demo = df_clean_us_cities_demo.select(col("City"),col("State"),col("Median Age").alias("Median_Age"), col("Male Population").alias("Male_Population"),
col("Female Population").alias("Female_Population"),
col("Total Population").alias("Total_Population"),
col("Number of Veterans").alias("Number_of_Veterans"),
col("Foreign-born"),
col("Average Household Size").alias("Average_Household_Size"),
col("State Code").alias("State_Code"),col("Race"),col("Count"))
df_clean_us_cities_demo.write.parquet("/home/workspace//results/city_demographics.parquet")

df_clean_us_cities_demo.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign-born|Average_Household_Size|State_Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [15]:
df_spark_fact=df_clean_spark_i94.join(df_clean_spark_temp,['i94port'],how='inner').distinct()

# Write immigration fact table to parquet files partitioned by i94port
df_spark_fact.write.mode("append").partitionBy("i94port").parquet("/home/workspace/results/result_fact.parquet")

# Show dataframe
df_spark_fact.show()



+-------+------+------+------+-------+-------+-------+-------+----------+-------------------+-------------+--------------+--------+---------+
|i94port| i94yr|i94mon|i94cit|arrdate|i94mode|depdate|i94visa|        dt| AverageTemperature|         City|       Country|Latitude|Longitude|
+-------+------+------+------+-------+-------+-------+-------+----------+-------------------+-------------+--------------+--------+---------+
|    HOU|2016.0|   4.0| 148.0|20552.0|    1.0|20558.0|    1.0|2007-09-01| 27.048000000000002|      houston| United States|  29.74N|   96.00W|
|    LOS|2016.0|   4.0| 148.0|20560.0|    1.0|20569.0|    2.0|1937-10-01|             12.348|  los angeles|         Chile|  37.78S|   73.22W|
|    LOS|2016.0|   4.0| 135.0|20558.0|    1.0|20566.0|    2.0|1870-09-01|             20.477|  los angeles| United States|  34.56N|  118.70W|
|    MIA|2016.0|   4.0| 131.0|20550.0|    1.0|20556.0|    2.0|1831-04-01| 23.028000000000002|        miami| United States|  26.52N|   80.60W|
|    M

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [16]:
# Perform quality checks here

result_df_spark_fact = df_spark_fact.count()
if result_df_spark_fact != 0:
    print("Data validation passed with {} with records for fact table".format(result_df_spark_fact))
else:
    print("Data validation failed for fact table")

result_df_clean_spark_i94 = df_clean_spark_i94.count()
if result_df_clean_spark_i94 != 0:
    print("Data validation passed with {}  records for i94 dimension table".format(result_df_clean_spark_i94))
else:
    print("Data quality failed  for i94 dimension table")

result_df_clean_spark_temp = df_clean_spark_temp.count()
if result_df_clean_spark_temp != 0:
    print("Data validation passed with {} records for temperature dimension table".format(result_df_clean_spark_temp))
else:
    print("Data validation failed for temperature dimension table")


result_df_clean_us_cities_demo = df_clean_us_cities_demo.count()
if result_df_clean_us_cities_demo != 0:
    print("Data validation passed with {} records for city dempgraphic dimension table".format(result_df_clean_us_cities_demo))
else:
    print("Data validation failed for city dempgraphic dimension table")



Data validation passed with 349057 with records for fact table
Data validation passed with 89  records for i94 dimension table
Data validation passed with 674065 records for temperature dimension table
Data validation passed with 2891 records for city dempgraphic dimension table


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Document stores data model is used where data is stored in Parquet file format with partitions. This will help in storage/ maintenance and extraction of data in Spark by different users.  

The first dimension table will contain events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration

The second dimension table will contain city temperature data. The columns below will be extracted from the temperature * dataframe:
* i94port = 3 character code of destination city (mapped from immigration data during cleanup step)
* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

The third dimension table will contain city demographic data. The columns below will be extracted from the us-city-demo * dataframe:
* City = city name
* State = State where city is residing
* Median_Age = Average age
* Male_Population = Count of Male population
* Female_Population = Count of Female population
* Total_Population = Count of Total population
* Number_of_Veterans = Number of Veterans
* Foreign-born = Count of Foreign born
* Average_Household_Size = Count of average household size
* State_Code = State Code
* Race = Race
* Count = Count of race in city


The fact table will contain information from the I94 immigration data joined with the city temperature data on i94port:

* i94port = 3 character code of destination city
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* AverageTemperature = average temperature of destination city
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Spark was chosen since it can easily handle multiple file formats (including SAS/ Parquet) containing large amounts of data. PySpark  and Spark dataframes including join operations was chosen to process the large input files and performed slicing and dicing to form additional tables.
The final fact and dimension tables will be used by Data anlytics teams to generate reports and understand the traffic of immigrant movement in various cities
and also help in track of the immigration population. It helps in understand various factors like mode of transport, what kind of major visa types are used.
It also helps the analytics team to understand if temperature plays any
vital role in the cities where people are migrating too. 
As we understand that the data volumes are so huge for every week across all the ports, loading data using Python pandas take
huge time and loading this data into local memory and process will be of great challenge. Spark plays vital role in processing huge datasets.
As we can see in our project it clearly shows with Spark architecture loading time for Pandas is lesser than Spark.
When data increases this plays an important role.
We have used Pyspark to process data in Spark instead of Scala as development time, community support comes to a big advantage
using Pyspark.



* Propose how often the data should be updated and why.
The data should be updated daily on the arrival of passengers by getting data from all valid  every airports.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 Cleaning and uploading the files to cloud S3 buckets(by partitioning properly across clusters) and perform incremental updates by using  multiple cluster model in AWS EMR / design Hadoop map reduce can handle very huge datasets. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 If the data needs to populate a dashboard daily by 7AM then we can use a scheduling tool such as Airflow or KRON or LUIGI to run the ETL pipeline.
 * The database needed to be accessed by 100+ people.
 Access can be provided to HUE(Web based query execution tool) or provide an impala engine connection and can run queries using Python.