# Project Title
### Data Engineering Capstone Project

#### Project Summary
    This project process the I94 Immigration Data, World Temperature Data, U.S. City Demographic Data, Airport Code datasets to understand the following.

        1. Number of Immigrants arrival for given airport for a given month.
        2. Different immigrants residential country average temperature 
        3. Immigrants on arrival state address average population etc...

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

  ##### Project Plan/Data Sets:
    Planning to import immigration, temperature, demographics and airport code datasets provided by udacity to understand the number of immigrants arrival by the given airport for the given month and also using the temperature data sets to predict the immigrants country of citizenship average temperature data and also using demographics data to predict the average population in the state they are going to reside in.
    
   
   ##### End Solution:
       In this project airflow is used to build the data pipeline. For handling different datasets, I have used Spark to process the different data sets in memory and the processed output is written into Parquet format in S3. Redshift is used to load the data for validation and also further required analysis can be performed in Redshift database itself. 
   
   ##### Different Tools :
           1. Airflow 
           2. S3
           3. Spark/EMR
           4. Redshift
    

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

   ##### Data sets
        1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. It includes U.S. immigrant details for the year 2016.               
        
        2. World Temperature Data: This dataset came from Kaggle. It includes the temperature data of different city from different countries.
        
        3. U.S. City Demographic Data: This data comes from OpenSoft. This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 
        
        4. Airport Code Table: This is a simple table of airport codes and corresponding cities.

In [17]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('data/input/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

### Step 2: Explore and Assess the Data
#### Explore/Cleaning the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### Immigration Data:
Following data fields having floating 0 are converted to int
1. cicid 
2. i94mon
3. i94yr
4. i94cit
5. i94res
6. arrdate
7. i94mode
8. i94addr
9. depdate
10. i94bir
11. i94visa
12. biryear

Also, below fields are converted to appropriate data types as documented below

1. admnum converted to Double
2. fltno is converted to String
3. Visatype is converted to String.
    
i94mode field having null values are converted to 0.

##### Airport Data:

local_code/airport_code containing values are removed from the dataset to have a unique value set and also distinct local_code data is selected.

trim is applied for all the below data fields.

1. local_code
2. iata_code
3. name
4. type
5. iso_region
6. iso_country
7. municipality
8. gps_code

coordinates is converted to String

#### Temperature Data:

Temperature Data is merged with immigration data country code and the data is grouped by country to get the average temperature of the given country

#### U.S Demographics Data:

Race, count data fields are dropped to get the distinct records for a given city.

Demographics data is grouped by state code to get the state average male, female and total population of the given state.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

    Data Model:

   <img src="star_schema_v1.png">

    Following data model is chosen to answer the project scope related questions.

    1. In order to get the immigrants count by given airport and year need to combine the airport_dim, time_dim and immig_fact table we can query the total immigrants arrival for the given airport name for the given month.

    2. In order to get different immigrants residential country average temperature we need to combine the immig_fact and country_temperature_dim

    3. In order to get the immigrant arrival state address average population, need to combine immig_fact and demographics_dim by stateCode.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

    1. Create different etl for different data sets
    2. Immigration etl breaks the data into following fact and dimension tables
        1. immig_fact
        2. time_dim for arrival date
        3. visa_type_dim
        4. travel_mode_dim
    3. Airport etl retrieves all distinct airport code along with airport name etc..
    4. Temperature etl retrieves all country average temperature and average temperature uncertainity data
    5. Demographics etl retrieves state average male, female and total population 
    6. Different etl jobs loads the data into parquet format into an S3 bucket.
    7. Data pipeline loads the S3 parquet files into the redshift database and all the data validations are performed in the redshift db.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Immigration data model

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:3.0.0-s_2.12").enableHiveSupport().getOrCreate()
df_immig =spark.read.format('com.github.saurfang.sas.spark').load('data/input/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

df_immig.createOrReplaceTempView("immigration_data")

output_data = "data/output/"

immig_fact = spark.sql(""" select 
                    CAST(cicid as INT) as cicid,
                    CAST(i94mon as INT) as i94_month,
                    trim(CAST(i94port as String)) as partition_port,
                    trim(CAST(i94port as String)) as i94_port,
                    CAST(i94cit as INT) as i94_citizen,
                    CAST(i94res as INT) as i94_resident, 
                    CAST(arrdate as INT) as i94_arrival_date,
                    CAST(i94mode as INT) as travel_mode_id,
                    CAST(i94addr as String) as state_code,
                    CAST(depdate as INT) as i94_departure_date,
                    CAST(i94bir as INT) as i94_age,
                    CAST(i94visa as INT) as visa_category_id,
                    to_date(dtadfile,'yyyyMMdd') as i94_date_added,
                    to_date(dtaddto, 'yyyyMMdd') as i94_date_admitted,
                    occup as i94_occupation,
                    entdepa as i94_arrival_flag,
                    entdepd as i94_departure_flag,
                    entdepu as i94_update_flag,
                    matflag as i94_match_flag,
                    CAST(biryear as INT) as i94_birth_year,
                    gender as i94_gender,
                    insnum as i94_insnum,
                    airline as i94_airline,
                    CAST(admnum as Double) as i94_admission_number,
                    CAST(fltno as String) as i94_flight_number,
                    CAST(visatype as String) as i94_visa_type
                    from 
                    immigration_data""")

output_data = "data/output/"

immig_fact_updated = immig_fact.na.fill(0, subset=["travel_mode_id"])

immig_fact_out = output_data + "immig_fact.parquet"

immig_fact_updated.write \
        .mode("append") \
        .partitionBy("partition_port", "i94_month") \
        .parquet(immig_fact_out)



visa_type_dim = spark.sql (""" select 
                        distinct CAST(i94visa as INT) as visa_category_id,
                        CASE(CAST(i94visa as INT)) WHEN 1 then 'Business' WHEN 2 THEN 'Pleasure' ELSE 'Student' END as visa_type 
                        from 
                        immigration_data""")
    
visa_type_dim_out = output_data + "visa_type_dim.parquet"

visa_type_dim.write \
        .mode("append") \
        .parquet(visa_type_dim_out)



travel_mode_dim = spark.sql (""" select 
                            distinct CAST(i94mode as INT) as travel_mode_id,
                            CASE(CAST(i94mode as INT)) WHEN 1 then 'Air' WHEN 3 THEN 'Land' WHEN 2 THEN 'Sea' WHEN 9 THEN 'Not reported' ELSE 'Undefined' END as travel_mode 
                            from 
                            immigration_data""")

travel_mode_dim_updated = travel_mode_dim.na.fill(0, subset=["travel_mode_id"])

travel_mode_dim_out = output_data + "travel_mode_dim.parquet"

travel_mode_dim_updated.write \
        .mode("append") \
        .parquet(travel_mode_dim_out)
    

time_dim = spark.sql (""" select 
                            distinct CAST(arrdate as INT) as arrival_sas,
                            date_add(to_date('1960-01-01', "yyyy-MM-dd"), CAST(arrdate as INT)) as arrival_date,
                            dayofmonth(date_add(to_date('1960-01-01', "yyyy-MM-dd"), CAST(arrdate as INT))) as day,
                            month(date_add(to_date('1960-01-01', "yyyy-MM-dd"), CAST(arrdate as INT))) as month,
                            month(date_add(to_date('1960-01-01', "yyyy-MM-dd"), CAST(arrdate as INT))) as parition_month,
                            year(date_add(to_date('1960-01-01', "yyyy-MM-dd"), CAST(arrdate as INT))) as year,
                            date_format(date_add(to_date('1960-01-01', "yyyy-MM-dd"), CAST(arrdate as INT)),"E") as weekday
                            from 
                            immigration_data""")
    
time_dim_out = output_data + "time_dim.parquet"

time_dim.write \
    .mode("append") \
    .partitionBy("parition_month") \
    .parquet(time_dim_out)

#### Airport Data Model

In [12]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0").enableHiveSupport().getOrCreate()

input_file_path = "data/input/airport-codes_csv.csv"

# read airport data file
df_airport_data = spark.read.csv(input_file_path, inferSchema=True, header=True)
    
# create a airport_data view from the spark dataframe
df_airport_data.createOrReplaceTempView("airport_data")

# extract columns for country dimensions table    
airport_dim = spark.sql ("""select distinct trim(local_code) as airport_code,
                        trim(local_code) as partition_airport_code,
                        trim(iata_code) as iata_code,
                        trim(name) as airport_name,
                        trim(type) as airport_type,
                        trim(iso_region) as region,
                        trim(iso_country) as country_code,
                        trim(municipality) as municipality,
                        trim(gps_code) as gps_code,
                        CAST(coordinates as String) as coordinates
                        from 
                        airport_data 
                        where
                        local_code is not null
                        """)

output_data = "data/output/"

# write users table to parquet files
output_file_path = output_data + "airport_dim.parquet"
    
airport_dim.write \
    .mode("append") \
    .partitionBy("partition_airport_code") \
    .parquet(output_file_path)

#### Demographics Model

In [14]:
from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
        .enableHiveSupport() \
        .getOrCreate()

input_file_path = "data/input/us-cities-demographics.csv"

demographics_schema = StructType([
        StructField("city",StringType()),
        StructField("state",StringType()),
        StructField("medianAge",DoubleType()),
        StructField("malePopulation",IntegerType()),
        StructField("femalePopulation",IntegerType()),
        StructField("totalPopulation",IntegerType()),
        StructField("noOfVeterans",IntegerType()),
        StructField("foreignBorn",IntegerType()),
        StructField("averageHouseHoldSize",DoubleType()),
        StructField("stateCode",StringType()),
        StructField("race",StringType()),
        StructField("count",IntegerType())
    ])

# read demographics data file
df_demographics_data = spark.read.csv(input_file_path, sep=';', schema = demographics_schema, header=True)
        
# Removing race and count column from data frame and then distinct is called to remove duplicates
df_demographics_drop = df_demographics_data.drop("race","count")
df_demographics_distinct = df_demographics_drop.distinct()
    
# create a demography_data view from the spark dataframe
df_demographics_distinct.createOrReplaceTempView("demography_data")

# extract columns for demographics dimensions table    
demographics_dim = spark.sql("""select 
                            distinct stateCode,
                            avg(medianAge) as average_median_age,
                            avg(malePopulation) as average_male_population,
                            avg(femalePopulation) as average_female_population,
                            avg(totalPopulation) as average_total_population,
                            avg(noOfVeterans) as average_no_of_veterans,
                            avg(foreignBorn) as average_foreign_born,
                            avg(averageHouseHoldSize) as average_house_hold_size
                            from demography_data
                            group by stateCode""")

output_data = "data/output/"
        
# write users table to parquet files
output_file_path = output_data + "demographics_dim.parquet"
        
demographics_dim.write \
    .mode("append") \
    .parquet(output_file_path)

In [36]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
        .enableHiveSupport() \
        .getOrCreate()

temp_data = "data/input/GlobalLandTemperaturesByCity_Kaggle.csv"

# read temperature data file
df_temp_data = spark.read.csv(temp_data, inferSchema=True, header=True)
        
# create a temperature_data view from the spark dataframe
df_temp_data.createOrReplaceTempView("temperature_data")

#Retrieving country data to join with immigration data
country_data = "data/input/country_code.csv"

df_country = spark.read.csv(country_data, inferSchema=True, header=True)

df_country.createOrReplaceTempView("country_table")

# extract columns for country dimensions table    
country_dim = spark.sql ("""select c.country_code as country_code,
                    t.Country as country,
                    t.avg_country_temperature as avg_country_temperature,
                    t.avg_country_temperature_uncertainty as avg_country_temperature_uncertainty
                    from 
                    country_table c
                    left join
                    (select 
                        avg(AverageTemperature) as avg_country_temperature,
                        avg(AverageTemperatureUncertainty) as avg_country_temperature_uncertainty,
                        Country 
                        from 
                        temperature_data 
                        group by Country ) t  
                        on t.Country = c.country
                    where t.country is not null
                    """)

country_dim.show(truncate=False)

+------------+-----------+-----------------------+-----------------------------------+
|country_code|country    |avg_country_temperature|avg_country_temperature_uncertainty|
+------------+-----------+-----------------------+-----------------------------------+
|384         |Chad       |27.189829394812683     |0.8463801152737748                 |
|158         |Russia     |3.3472679828735536     |1.3651891063002695                 |
|693         |Paraguay   |22.784014312977117     |0.828861164122139                  |
|216         |Yemen      |25.76840766445382      |0.9586333735666872                 |
|391         |Senegal    |25.984176694490824     |0.7867987312186983                 |
|130         |Sweden     |5.665518003790269      |1.7004475047378378                 |
|603         |Guyana     |26.54984937439856      |0.648079403272378                  |
|243         |Burma      |26.016839989290098     |0.8800770498218452                 |
|260         |Philippines|26.51646246746498

#### Temperature Data Model

In [15]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
        .enableHiveSupport() \
        .getOrCreate()

temp_data = "data/input/GlobalLandTemperaturesByCity_Kaggle.csv"

# read temperature data file
df_temp_data = spark.read.csv(temp_data, inferSchema=True, header=True)
        
# create a temperature_data view from the spark dataframe
df_temp_data.createOrReplaceTempView("temperature_data")

#Retrieving country data to join with immigration data
country_data = "data/input/country_code.csv"

df_country = spark.read.csv(country_data, inferSchema=True, header=True)

df_country.createOrReplaceTempView("country_table")

# extract columns for country dimensions table    
country_dim = spark.sql ("""select c.country_code as country_code,
                    t.Country as country,
                    t.avg_country_temperature as avg_country_temperature,
                    t.avg_country_temperature_uncertainty as avg_country_temperature_uncertainty
                    from 
                    country_table c
                    left join
                    (select 
                        avg(AverageTemperature) as avg_country_temperature,
                        avg(AverageTemperatureUncertainty) as avg_country_temperature_uncertainty,
                        Country 
                        from 
                        temperature_data 
                        group by Country ) t  
                        on t.Country = c.country
                    where t.country is not null
                    """)

output_data = "data/output/"

# write users table to parquet files
country_dim_out = output_data + "country_dim.parquet"
        
country_dim.write \
    .mode("append") \
    .parquet(country_dim_out)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

    Data quality checks are performed using DataQualityOperator available in data_quality.py and the same is invoked via airflow after loading the data into redshift.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

    Data dictionary is available in data_dictionary.xlsx file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    
        1. Airflow is chosen to run data pipeline with all the error handling in place on airflow makes it the best tool for executing the pipeline.
        2. Spark/EMR is chosen to run all the mentioned ETL's. Spark is chosen to handle all the data in memory for processing and written in a specific parquet files
        3. Redshift is chosen to load the data and perform validation and all the required analysis can be done in redshift db itself.

* Propose how often the data should be updated and why.
      
      Job can run once in a day to load all the immigration, any change in temperature, demographics and airport information

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
        
       If data is increased by 100x, then we need to spin up a big spark cluster to run the data in memory and loads the data into parquet files. S3 and other technology choice will still remain intact for the bigger data needs.
          
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
          
        Monitoring the job execution time for the data and scheduling it appropriately before 7 am will load the data into redshift then the dashboard connecting to redhsift will have the latest data by 7 am everyday.
          
 * The database needed to be accessed by 100+ people.
         
        Database access for all 100+ users can be given in redshift by creating different users and providing different permissions so that each user activity can be tracked in the redshift, providing a common user access to everyone will be harmful to track the activities.