# Project Title : The pollution problems that related to immigration in the U.S.
### Data Engineering Capstone Project

#### Project Summary
Our world is facing a self-made poison that is the destruction of ecosystems, including natural resources and the environment. Waste problems, water and air pollution Including deforestation, it is undeniable that this is the poison that all humans spread to the earth. This demonstrates that no matter where humans go, there will be pollution, whether large or small. Air pollution, in particular, is one of the more and more widespread environmental problems every year, mainly due to vehicle and industrial fumes. The smoke directly affects human health.

The question arises, to what extent will changes in local populations directly affect the amount of polluting gas?

This project will consider only the U.S. area, which includes Immigration Data, World Temperature Data, Demographic Data, Airline data, and Pollution Data.
All this information is collected and used to make Data Models in Data Lakes that can show data or relationships according to the questions set.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
The plan for this project starts with a review of the dataset used to create a data model, which includes: Demographic Data, Immigration Data, U.S. Pollution data, World Temperature Data and Airline data. After getting the relationship between the Dataset, design data models, building the data pipelines in Data Lakes for creating the data models, check Data Quality, write SQL queries to answer questions and write summary docs.

This project addresses, the environmental problem of human-caused pollution by considering the population and immigrant data that occurred in the US in each city, along with pollution data to analyze from the data how much migration occurring in the U.S. causes pollution rates.

**Data Lakes** are used in this project with composed components as follows:
1. Data Source and Target uses **AWS S3** as Object Storage.
2. Use **Python language**, **Apache Spark** and **Data Lakes** technology to build an ETL pipeline on **AWS EMR** Cluster



In [1]:
# Do all imports and installs here
import pandas as pd
import datetime as dt
import configparser
import os
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, sum, avg, max
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, to_date
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F
from workspace_utils import active_session
from IPython.display import display

In [2]:
# Prepare config and create spark session
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

#### Describe and Gather Data 
The data sets that have been used in the project are listed below:

1. **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. You can read more about it [here](https://travel.trade.gov/research/reports/i94/historical/2016.html). Data on individual travel to the U.S. for the year 2016. The data is stored in separate 'sas7bdat' files for 1 month each.

In [3]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('s3a://cnp66-bucket/Capstone_Project/DataSource/I94_Immigration Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(float(x))).isoformat() if x else None)
df_Immigration = df_spark.withColumn("arrdate_todate", get_date(df_spark.arrdate)).\
                        withColumn("depdate_todate", get_date(df_spark.depdate))

In [5]:
# Mapping Immigration Data : I94ADDR_Mapping
df_I94ADDR = spark.read.option("header",True).options(delimiter=',') \
    .csv("./I94 Immigration Data-Data-2016/mapping/I94ADDR_Mapping.csv")

# Mapping Immigration Data : I94CIT_I94RES_Mapping
df_I94CIT_I94RES = spark.read.option("header",True).options(delimiter=',') \
    .csv("./I94 Immigration Data-Data-2016/mapping/I94CIT_I94RES_Mapping.csv")

# Mapping Immigration Data : I94MODE_Mapping
df_I94MODE = spark.read.option("header",True).options(delimiter=',') \
    .csv("./I94 Immigration Data-Data-2016/mapping/I94MODE_Mapping.csv")

# Mapping Immigration Data : I94PORT_Mapping
df_I94PORT = spark.read.option("header",True).options(delimiter=',') \
    .csv("./I94 Immigration Data-Data-2016/mapping/I94PORT_Mapping.csv")

# Mapping Immigration Data : I94VISA_Mapping
df_I94VISA = spark.read.option("header",True).options(delimiter=',') \
    .csv("./I94 Immigration Data-Data-2016/mapping/I94VISA_Mapping.csv")

2. **World Temperature Data:** This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). Daily temperature data for each city and state in the U.S. is stored in a 'csv' file. 

In [6]:
#df_worldTemp = pd.read_csv("GlobalLandTemperaturesByCity.csv", sep=',', engine='python',nrows=10)
df_worldTemp = spark.read.option("header",True) \
    .csv("./WorldTemperature_Data/GlobalLandTemperaturesByCity.csv")

3. **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This dataset is stored as a 'csv' file.

In [7]:
#df_demog_us = pd.read_csv("us-cities-demographics.csv", sep=';', engine='python',nrows=10)
df_demog_us = spark.read.option("header",True).options(delimiter=';') \
    .csv("./U.S.City_Demographic_Data/us-cities-demographics.csv")

4. **U.S. Pollution Data:** This dataset came from Kaggle. This dataset deals with pollution in the U.S. Pollution in the U.S. has been well documented by the U.S. EPA but it is a pain to download all the data and arrange them in a format that interests data scientists. Hence I gathered four major pollutants (Nitrogen Dioxide, Sulphur Dioxide, Carbon Monoxide and Ozone) for every day from 2000 - 2016 and place them neatly in a CSV file. You can read more about it [here](https://www.kaggle.com/sogun3/uspollution).

In [8]:
df_pollution = spark.read.option("header",True).options(delimiter=',') \
    .csv("./U.S.Pollution_Data/pollution_us_2000_2016.csv")

5. **Airline Database:** This dataset came from OpenFlights. As of January 2012, the OpenFlights Airlines Database contains 5888 airlines. Some of the information is public data and some is contributed by users. You can read more about it [here](https://openflights.org/data.html#airline). 

> Remark: The **Airline Database** are difference with **Airport Code Table** pre-datasets that Udacity provided to use this project beacause the Airport Code Table cannot mapping with the I94 Immigration Data.

In [9]:
#df_airlines = pd.read_csv("./OpenFlights/airlines.dat", sep=',', engine='python',nrows=10)
df_airlines = spark.read.option("header",True).options(delimiter=',') \
    .csv("./OpenFlights/airlines.dat")
df_airlines=df_airlines.na.replace("\\N",None)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

* Create udf_isdate function for validating date field

In [10]:
import datetime
def isdate(date_text):
    if date_text is not None:
        try:
            datetime.datetime.strptime(date_text, '%Y-%m-%d')
            return True
        except ValueError:
            return False
    else:
        return False
    
spark.udf.register('udf_isdate', isdate)

<function __main__.isdate(date_text)>

* Find missing values of Immigration_data

In [11]:
df_Immigration.createOrReplaceTempView("Immigration_data")
df_Immigration_missingvalue = spark.sql("""
                SELECT arrdate_todate, udf_isdate(arrdate_todate) as chkdate, count(*) as cnt
                FROM Immigration_data
                WHERE udf_isdate(arrdate_todate) = 'false'
                GROUP BY arrdate_todate 
                ORDER BY chkdate desc
            """)
df_Immigration_missingvalue.limit(10).toPandas()
#df_Immigration_missingvalue.printSchema()

Unnamed: 0,arrdate_todate,chkdate,cnt


* Find missing values of World Temperature Data

In [12]:
df_worldTemp.createOrReplaceTempView("WorldTemp")
df_worldTemp_missingvalue1 = spark.sql("""
                SELECT AverageTemperature, AverageTemperatureUncertainty, count(*) as cnt 
                FROM WorldTemp
                WHERE AverageTemperature IS NULL OR AverageTemperatureUncertainty IS NULL
                GROUP BY AverageTemperature, AverageTemperatureUncertainty   
                ORDER BY cnt DESC
            """)
df_worldTemp_missingvalue1.limit(10).toPandas()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty,cnt
0,,,364130


In [13]:
df_worldTemp_missingvalue2 = spark.sql("""
                SELECT Country, count(*) as cnt 
                FROM WorldTemp
                WHERE Country IS NULL OR Country = ''
                GROUP BY Country  
                ORDER BY cnt DESC
            """)
df_worldTemp_missingvalue2.limit(10).toPandas()

Unnamed: 0,Country,cnt


* Find missing values of U.S. City Demographic Data

In [14]:
df_demog_us.createOrReplaceTempView("Demographic")
df_demog_us_missingvalue = spark.sql("""
                SELECT City , count(*) as cnt 
                FROM Demographic 
                WHERE City IS NULL OR City = '' 
                GROUP BY City 
                ORDER BY City 
            """)
df_demog_us_missingvalue.limit(10).toPandas()

Unnamed: 0,City,cnt


* Find missing values of U.S. Pollution Data

In [15]:
df_pollution.createOrReplaceTempView("Pollution")
df_pollution_missingvalue1 = spark.sql("""
                SELECT City , count(*) as cnt 
                FROM Pollution 
                WHERE City IS NULL OR City = '' 
                GROUP BY City 
                ORDER BY City 
            """)
df_pollution_missingvalue1.limit(10).toPandas()

Unnamed: 0,City,cnt


In [16]:
df_pollution_missingvalue2 = spark.sql("""
                SELECT `Date Local` AS Date_Local , udf_isdate(`Date Local`) as chkdate, count(*) as cnt 
                FROM Pollution 
                WHERE udf_isdate(`Date Local`) = 'false'
                GROUP BY Date_Local 
                ORDER BY cnt DESC 
            """)
df_pollution_missingvalue2.limit(10).toPandas()

Unnamed: 0,Date_Local,chkdate,cnt


* Find missing values of OpenFlights (airlines) Data

In [17]:
df_airlines.createOrReplaceTempView("Airline")
df_airlines_missingvalue = spark.sql("""
                SELECT IATA, count(*) as cnt   
                FROM Airline
                WHERE IATA IS NULL OR IATA = '-'
                GROUP BY IATA 
                ORDER BY cnt DESC 
            """)
df_airlines_missingvalue.limit(10).toPandas()

Unnamed: 0,IATA,cnt
0,,4626
1,-,2


* Find missing values of TIME Data

In [18]:
df_Immigration.createOrReplaceTempView("Immigration_data")
df_worldTemp.createOrReplaceTempView("WorldTemp")
df_pollution.createOrReplaceTempView("Pollution")

In [19]:
df_time_temp1 = spark.sql("""
                SELECT DISTINCT arrdate_todate AS date_ref FROM Immigration_data WHERE arrdate_todate IS NULL
                """)
df_time_temp1.limit(10).toPandas()

Unnamed: 0,date_ref


In [20]:
df_time_temp2 = spark.sql("""
                SELECT DISTINCT depdate_todate AS date_ref FROM Immigration_data WHERE depdate_todate IS NULL 
                """)
df_time_temp2.limit(10).toPandas()

Unnamed: 0,date_ref
0,


In [21]:
df_time_temp3 = spark.sql("""
                SELECT DISTINCT dt AS date_ref FROM WorldTemp WHERE dt IS NULL 
                """)
df_time_temp3.limit(10).toPandas()

Unnamed: 0,date_ref


In [22]:
df_time_temp4 = spark.sql("""
                SELECT DISTINCT `Date Local` AS date_ref FROM Pollution WHERE `Date Local` IS NULL 
                """)
df_time_temp4.limit(10).toPandas()

Unnamed: 0,date_ref


In [23]:
df_time = spark.sql("""
                SELECT DISTINCT arrdate_todate AS date_ref FROM Immigration_data WHERE arrdate_todate IS NOT NULL 
                UNION 
                SELECT DISTINCT depdate_todate AS date_ref FROM Immigration_data WHERE depdate_todate IS NOT NULL 
                UNION 
                SELECT DISTINCT dt AS date_ref FROM WorldTemp WHERE dt IS NOT NULL 
                UNION 
                SELECT DISTINCT `Date Local` AS date_ref FROM Pollution WHERE `Date Local` IS NOT NULL 

            """)

In [24]:
df_time.createOrReplaceTempView("Time")
df_time_final = spark.sql("""
            SELECT date_ref, udf_isdate(date_ref) as chkdate, count(*) AS cnt 
            FROM Time 
            WHERE udf_isdate(date_ref) = 'false'
            GROUP BY date_ref 
            ORDER BY cnt DESC 
            """)
df_time_final.limit(10).toPandas()

Unnamed: 0,date_ref,chkdate,cnt


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model:

The main table of the data model is the 'Immigration' table. This data provides the individual migrant information of the U.S. and will be aggregated to find summary value by date and city for reference to other tables. The 'Demographic' table are provided information about summary people by city and state in 2015. The 'Pollution' table are provided information about air pollution (NO2, O3, SO2, and O3) by City, State, Country, and date from 2000 to 2016. The 'WorldTemp' table is provided information about World Temperature by City, Country, and date from 1743 to 2013. The 'Airline' table is provided information about the Airline including Name, IATA, ICAO, etc. And, the 'Time' table collects information about the datetime. This model demonstrates a relationship between Immigration data and Pollution data that may be relevant for one of the real factors of environmental problems. So, to come to such a conclusion requires more information and experimentation.

The conceptual data model is designed as following:

![DataModels](images/CapstoneProject-DataModels.drawio.png)





#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data (Data Lakes) into the chosen data model:

1. Data Source is exported and placed in AWS S3
2. Read Data files from AWS S3
3. Run ETL process with Spark
4. Write output data from ETL process to AWS S3

Which looks like the diagram below:

![Pipeline](images/CapstoneProject-Pipeline.drawio.png)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

* Create the data model of Immigration

In [35]:
df_Immigration.createOrReplaceTempView("Immigration_data")
df_I94ADDR.createOrReplaceTempView("I94ADDR_Mapping")
df_I94CIT_I94RES.createOrReplaceTempView("I94CIT_I94RES_Mapping")
df_I94MODE.createOrReplaceTempView("I94MODE_Mapping")
df_I94PORT.createOrReplaceTempView("I94PORT_Mapping")
df_I94VISA.createOrReplaceTempView("I94VISA_Mapping")
df_Immigration_final = spark.sql("""

            SELECT DISTINCT cicid, i94yr, i94mon, I94CIT_I94RES1.i94cntyl as i94cit, I94CIT_I94RES2.i94cntyl as i94res, 
                trim(i94prtl) as i94port, trim(SPLIT(i94prtl,',')[0]) as i94port_city, trim(SPLIT(i94prtl,',')[1]) as i94port_statecode,
                arrdate_todate, i94model as i94mode, i94addrl as i94addr, depdate_todate, i94bir, 
                I94VISA.I94VISA as i94visa, biryear, gender, airline, fltno   
            FROM Immigration_data AS IMM 
            LEFT JOIN I94ADDR_Mapping AS I94ADDR on IMM.i94addr = I94ADDR.value 
            LEFT JOIN I94CIT_I94RES_Mapping AS I94CIT_I94RES1 on IMM.i94cit = I94CIT_I94RES1.value 
            LEFT JOIN I94CIT_I94RES_Mapping AS I94CIT_I94RES2 on IMM.i94res = I94CIT_I94RES2.value 
            LEFT JOIN I94MODE_Mapping AS I94MODE on IMM.i94mode = I94MODE.value 
            LEFT JOIN I94PORT_Mapping AS I94PORT on IMM.i94port = I94PORT.value 
            LEFT JOIN I94VISA_Mapping AS I94VISA on IMM.i94visa = I94VISA.value 
                
            """)
df_Immigration_final.limit(10).toPandas()
#df_Immigration_final.printSchema()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94port_city,i94port_statecode,arrdate_todate,i94mode,i94addr,depdate_todate,i94bir,i94visa,biryear,gender,airline,fltno
0,161.0,2016.0,4.0,AUSTRIA,AUSTRIA,"NEWARK/TETERBORO, NJ",NEWARK/TETERBORO,NJ,2016-04-01,Air,NEW YORK,2016-04-05,24.0,Pleasure,1992.0,M,OS,89
1,166.0,2016.0,4.0,AUSTRIA,AUSTRIA,"FORT LAUDERDALE, FL",FORT LAUDERDALE,FL,2016-04-01,Air,FLORIDA,2016-04-09,29.0,Pleasure,1987.0,M,DY,7031
2,345.0,2016.0,4.0,AUSTRIA,AUSTRIA,"LOS ANGELES, CA",LOS ANGELES,CA,2016-04-01,Air,CALIFORNIA,2016-04-23,35.0,Pleasure,1981.0,F,AA,109
3,455.0,2016.0,4.0,AUSTRIA,AUSTRIA,"MIAMI, FL",MIAMI,FL,2016-04-01,Air,FLORIDA,2016-04-17,67.0,Pleasure,1949.0,,AA,113
4,766.0,2016.0,4.0,AUSTRIA,AUSTRIA,"CHICAGO, IL",CHICAGO,IL,2016-04-01,Air,ILLINOIS,2016-04-05,31.0,Pleasure,1985.0,,OS,65
5,1457.0,2016.0,4.0,BELGIUM,BELGIUM,"NEW YORK, NY",NEW YORK,NY,2016-04-01,Air,NEW YORK,2016-04-07,14.0,Pleasure,2002.0,F,DL,47
6,1535.0,2016.0,4.0,BELGIUM,BELGIUM,"NEW YORK, NY",NEW YORK,NY,2016-04-01,Air,NEW YORK,2016-04-02,45.0,Pleasure,1971.0,F,SN,1401
7,1643.0,2016.0,4.0,BELGIUM,BELGIUM,"MIAMI, FL",MIAMI,FL,2016-04-01,Air,FLORIDA,2016-04-10,28.0,Pleasure,1988.0,F,AB,7000
8,1654.0,2016.0,4.0,BELGIUM,BELGIUM,"MIAMI, FL",MIAMI,FL,2016-04-01,Air,FLORIDA,2016-04-17,65.0,Pleasure,1951.0,M,AA,39
9,1724.0,2016.0,4.0,BELGIUM,BELGIUM,"MIAMI, FL",MIAMI,FL,2016-04-01,Sea,,,30.0,Pleasure,1986.0,F,VES,91285


* Create the data model of World Temperature Data

In [36]:
df_worldTemp.createOrReplaceTempView("WorldTemp")
df_worldTemp_final = spark.sql("""
            SELECT DISTINCT dt as Record_Date, AverageTemperature, AverageTemperatureUncertainty, UPPER(City) as City, UPPER(Country) as Country  
            FROM WorldTemp 
            WHERE AverageTemperature IS NOT NULL OR AverageTemperatureUncertainty IS NOT NULL 
            """)
df_worldTemp_final.limit(10).toPandas()
#df_worldTemp_final.printSchema()

Unnamed: 0,Record_Date,AverageTemperature,AverageTemperatureUncertainty,City,Country
0,1765-11-01,3.927,4.388,ÅRHUS,DENMARK
1,1783-09-01,14.339,4.867,ÅRHUS,DENMARK
2,1824-05-01,10.503,1.991,ÅRHUS,DENMARK
3,1831-12-01,2.9210000000000003,4.582,ÅRHUS,DENMARK
4,1864-07-01,16.343,0.7609999999999999,ÅRHUS,DENMARK
5,1889-11-01,4.981,0.953,ÅRHUS,DENMARK
6,1901-04-01,6.261,0.505,ÅRHUS,DENMARK
7,1901-06-01,14.654000000000002,0.516,ÅRHUS,DENMARK
8,1905-02-01,1.7880000000000005,0.461,ÅRHUS,DENMARK
9,1940-01-01,-4.0,0.3329999999999999,ÅRHUS,DENMARK


* Create the data model of U.S. City Demographic Data

In [37]:
df_demog_us.createOrReplaceTempView("Demographic")
df_demog_us_final = spark.sql("""
            SELECT DISTINCT UPPER(City) as City, 
                UPPER(State) as State, 
                `Median Age` AS Median_Age, 
                `Male Population` AS Male_Population, 
                `Female Population` AS Female_Population, 
                `Total Population` AS Total_Population, 
                `Number of Veterans` AS NumberofVeterans, 
                `Foreign-born` AS Foreign_born, 
                `Average Household Size` AS Average_Household_Size, 
                `State Code` AS State_Code, 
                Race, 
                Count
            FROM Demographic  
                
            """)
df_demog_us_final.limit(10).toPandas()
#df_demog_us_final.printSchema()

Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,NumberofVeterans,Foreign_born,Average_Household_Size,State_Code,Race,Count
0,HAMPTON,VIRGINIA,35.5,66214,70240,136454,19638,6204,2.48,VA,Hispanic or Latino,7513
1,BISMARCK,NORTH DAKOTA,38.0,34675,35565,70240,4145,2064,2.11,ND,American Indian and Alaska Native,4040
2,SAINT PAUL,MINNESOTA,31.5,149547,151293,300840,10548,56514,2.58,MN,Hispanic or Latino,27307
3,EL MONTE,CALIFORNIA,37.3,57961,58784,116745,1686,59655,3.67,CA,White,64138
4,MURFREESBORO,TENNESSEE,30.2,60704,65417,126121,5199,8948,2.6,TN,Hispanic or Latino,8840
5,CARSON,CALIFORNIA,40.4,43790,49506,93296,4273,33860,3.7,CA,Black or African-American,19947
6,PORT SAINT LUCIE,FLORIDA,42.1,84069,95341,179410,12416,34003,2.91,FL,American Indian and Alaska Native,1536
7,SANTA ANA,CALIFORNIA,30.8,167503,167920,335423,4735,152999,4.58,CA,Hispanic or Latino,262436
8,CORONA,CALIFORNIA,37.1,79749,84493,164242,7709,42131,2.97,CA,Asian,21044
9,JERSEY CITY,NEW JERSEY,34.3,131765,132512,264277,4374,109186,2.57,NJ,American Indian and Alaska Native,3356


* Create the data model of U.S. Pollution Data

In [38]:
df_pollution.createOrReplaceTempView("Pollution")
df_pollution_final = spark.sql("""
            SELECT DISTINCT _c0 AS No,
                `State Code` AS State_Code,
                `County Code` AS County_Code,
                `Site Num` AS Site_Num,
                `Address` AS Address,
                UPPER(State) AS State,
                UPPER(County) AS County,
                UPPER(City) AS City,
                `Date Local` AS Date_Local,
                `NO2 Units` AS NO2_Units,
                DOUBLE(`NO2 Mean`) AS NO2_Mean,
                DOUBLE(`NO2 1st Max Value`) AS NO2_1st_Max_Value,
                DOUBLE(`NO2 1st Max Hour`) AS NO2_1st_Max_Hour,
                DOUBLE(`NO2 AQI`) AS NO2_AQI,
                `O3 Units` AS O3_Units,
                DOUBLE(`O3 Mean`) AS O3_Mean,
                DOUBLE(`O3 1st Max Value`) AS O3_1st_Max_Value,
                DOUBLE(`O3 1st Max Hour`) AS O3_1st_Max_Hour,
                DOUBLE(`O3 AQI`) AS O3_AQI,
                `SO2 Units` AS SO2_Units,
                DOUBLE(`SO2 Mean`) AS SO2_Mean,
                DOUBLE(`SO2 1st Max Value`) AS SO2_1st_Max_Value,
                DOUBLE(`SO2 1st Max Hour`) AS SO2_1st_Max_Hour,
                DOUBLE(`SO2 AQI`) AS SO2_AQI,
                `CO Units` AS CO_Units,
                DOUBLE(`CO Mean`) AS CO_Mean,
                DOUBLE(`CO 1st Max Value`) AS CO_1st_Max_Value,
                DOUBLE(`CO 1st Max Hour`) AS CO_1st_Max_Hour,
                DOUBLE(`CO AQI`) AS CO_AQI
            FROM Pollution  
                
            """)

df_pollution_final = df_pollution_final.na.drop()
df_pollution_final.limit(10).toPandas()
#df_pollution_final.printSchema()

Unnamed: 0,No,State_Code,County_Code,Site_Num,Address,State,County,City,Date_Local,NO2_Units,...,SO2_Units,SO2_Mean,SO2_1st_Max_Value,SO2_1st_Max_Hour,SO2_AQI,CO_Units,CO_Mean,CO_1st_Max_Value,CO_1st_Max_Hour,CO_AQI
0,101,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,ARIZONA,MARICOPA,PHOENIX,2000-01-26,Parts per billion,...,Parts per billion,3.363636,7.0,1.0,10.0,Parts per million,0.866667,1.4,0.0,16.0
1,105,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,ARIZONA,MARICOPA,PHOENIX,2000-01-27,Parts per billion,...,Parts per billion,3.684211,11.0,8.0,16.0,Parts per million,0.925,1.6,23.0,18.0
2,349,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,ARIZONA,MARICOPA,PHOENIX,2000-03-28,Parts per billion,...,Parts per billion,0.226087,0.8,21.0,0.0,Parts per million,0.508333,0.7,12.0,8.0
3,705,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,ARIZONA,MARICOPA,PHOENIX,2000-07-03,Parts per billion,...,Parts per billion,0.272727,1.0,4.0,1.0,Parts per million,0.495833,0.7,4.0,8.0
4,849,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,ARIZONA,MARICOPA,PHOENIX,2000-08-10,Parts per billion,...,Parts per billion,3.434783,9.0,22.0,13.0,Parts per million,0.3125,0.6,23.0,7.0
5,917,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,ARIZONA,MARICOPA,PHOENIX,2000-08-27,Parts per billion,...,Parts per billion,0.25,2.0,1.0,3.0,Parts per million,0.408333,0.5,0.0,6.0
6,2969,4,19,1011,"1237 S. BEVERLY , TUCSON",ARIZONA,PIMA,TUCSON,2000-08-14,Parts per billion,...,Parts per billion,2.0,2.0,0.0,3.0,Parts per million,0.266667,0.3,0.0,3.0
7,5741,6,13,1002,5551 BETHEL ISLAND RD,CALIFORNIA,CONTRA COSTA,BETHEL ISLAND,2000-07-24,Parts per billion,...,Parts per billion,3.136364,5.0,11.0,7.0,Parts per million,0.333333,0.4,1.0,5.0
8,8061,6,13,3001,"583 W. 10TH ST., PITTSBURG",CALIFORNIA,CONTRA COSTA,PITTSBURG,2000-02-23,Parts per billion,...,Parts per billion,1.428571,9.0,9.0,13.0,Parts per million,0.8,0.8,19.0,9.0
9,8485,6,13,3001,"583 W. 10TH ST., PITTSBURG",CALIFORNIA,CONTRA COSTA,PITTSBURG,2000-06-10,Parts per billion,...,Parts per billion,0.681818,2.0,11.0,3.0,Parts per million,0.3,0.3,0.0,3.0


* Create the data model of OpenFlights (airlines) Data

In [39]:
df_airlines.createOrReplaceTempView("Airline")
df_airlines_final = spark.sql("""
            SELECT DISTINCT AirlineID AS Airline_ID, 
                Name, 
                Alias, 
                IATA,
                ICAO,
                Callsign,
                UPPER(Country) AS Country,
                Active  
            FROM Airline
            WHERE IATA IS NOT NULL OR IATA <> '-'

            """)
df_airlines_final.limit(10).toPandas()
#df_airlines_final.printSchema()

Unnamed: 0,Airline_ID,Name,Alias,IATA,ICAO,Callsign,Country,Active
0,1516,BAX Global,,8W,,,,N
1,3186,Kyrgyzstan Airlines,,R8,KGA,KYRGYZ,KYRGYZSTAN,N
2,3274,Linea Turistica Aerotuy,,LD,TUY,AEREOTUY,VENEZUELA,N
3,3287,Linhas A,,LM,LAM,MOZAMBIQUE,MOZAMBIQUE,Y
4,3835,PB Air,,9Q,PBA,PEEBEE AIR,THAILAND,Y
5,4356,Sun Country Airlines,,SY,SCX,SUN COUNTRY,UNITED STATES,Y
6,9082,Uni Air,,B7,UIA,Glory,TAIWAN,Y
7,11948,Viking Hellas,,VQ,VKH,DELPHI,GREECE,Y
8,15930,Airlink (SAA),,4Z,,,SOUTH AFRICA,Y
9,15970,Zuliana de Aviacion,Zuliana,OD,ULA,,VENEZUELA,N


* Create the data model of TIME Data

In [47]:
df_Immigration.createOrReplaceTempView("Immigration_data")
df_worldTemp.createOrReplaceTempView("WorldTemp")
df_pollution.createOrReplaceTempView("Pollution")

df_time_final = spark.sql("""
                SELECT DISTINCT arrdate_todate AS date_ref FROM Immigration_data WHERE arrdate_todate IS NOT NULL 
                UNION 
                SELECT DISTINCT depdate_todate AS date_ref FROM Immigration_data WHERE depdate_todate IS NOT NULL 
                UNION 
                SELECT DISTINCT dt AS date_ref FROM WorldTemp WHERE dt IS NOT NULL 
                UNION 
                SELECT DISTINCT `Date Local` AS date_ref FROM Pollution WHERE `Date Local` IS NOT NULL 
            """)
df_time_final.limit(10).toPandas()

Unnamed: 0,date_ref
0,2016-08-17
1,1820-12-01
2,1839-05-01
3,1853-08-01
4,1892-02-01
5,1893-09-01
6,1924-07-01
7,1955-03-01
8,1992-05-01
9,1994-03-01


* Extract columns to create time table

In [48]:
df_time_final = df_time_final.withColumn('day',dayofmonth(df_time_final.date_ref)) \
    .withColumn('week',weekofyear(df_time_final.date_ref)) \
    .withColumn('month',month(df_time_final.date_ref)) \
    .withColumn('year',year(df_time_final.date_ref)) \
    .withColumn('weekday',dayofweek(df_time_final.date_ref)) 
df_time_final.limit(10).toPandas()
#df_time_final.printSchema()

Unnamed: 0,date_ref,day,week,month,year,weekday
0,2016-08-17,17,33,8,2016,4
1,1820-12-01,1,48,12,1820,6
2,1839-05-01,1,18,5,1839,4
3,1853-08-01,1,31,8,1853,2
4,1892-02-01,1,5,2,1892,2
5,1893-09-01,1,35,9,1893,6
6,1924-07-01,1,27,7,1924,3
7,1955-03-01,1,9,3,1955,3
8,1992-05-01,1,18,5,1992,6
9,1994-03-01,1,9,3,1994,3


#### 4.2 How to run the Python Scripts

**Run on AWS EMR**

1. Create AWS EMR Cluster as following link: 
    - [Udacity: AWS CLI - Create EMR Cluster](https://classroom.udacity.com/nanodegrees/nd027/parts/67bd4916-3fc3-4474-b0fd-197dc014e709/modules/3671171a-8bf9-4f75-a913-37439ad281b3/lessons/f08d8ac9-b44a-4717-9da1-9714829da6c9/concepts/9c346cbc-5cf9-4e76-bdf5-a81ce94b55fe)
    - [Udacity: How to execute ETL of Data Lake project on Spark Cluster in AWS?](https://knowledge.udacity.com/questions/46619#552992)

    Or, create AWS EMR Cluster with AWS CLI as follow example below:

    ``` shell
    aws emr create-cluster --name <INPUT_NAME> --use-default-roles --release-label emr-5.34.0 --instance-count 3 --applications Name=Spark Name=JupyterEnterpriseGateway --ec2-attributes KeyName=<INPUT_KEYNAME>,SubnetId=<INPUT_SUBNET> --instance-type m5.xlarge --region us-west-2 --profile <INPUT_PROFILE>
    
    # Example:
    aws emr create-cluster --name emr-cluster-capstone-project-13 --use-default-roles --release-label emr-5.34.0 --instance-count 3 --applications Name=Spark Name=JupyterEnterpriseGateway --ec2-attributes KeyName=emr-cluster-demo_4,SubnetId=subnet-0f50ed7eb6774646a --instance-type m5.xlarge --region us-west-2 --profile cnpawsadmin
    
    # And, you will receive result messages as below:
    
    {
    "ClusterId": "j-2GMHZ63X7KGYA",
    "ClusterArn": "arn:aws:elasticmapreduce:us-west-2:453256782086:cluster/j-2GMHZ63X7KGYA"
    }
    
    ```
    

2. Connect the AWS EMR Cluster with SSH as following link : [Connect to the master node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)

3. Run command to configure the AWS CLI on EMR cluster.

    ``` bash
    export AWS_ACCESS_KEY_ID=<input_your_key_id>
    export AWS_SECRET_ACCESS_KEY=<input_your_access_key>
    pip3 install pandas
    ```


4. Open **Capstone Project_ETL.py**, edit the `output_data` values in main() function with **your S3 URI** for collect the process's output and SAVE it.


5. Upload/Create **config.cfg** and **Capstone Project_ETL.py** files to EMR Cluster


6. Open **config.cfg**, edit the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` values in "[AWS]" session with **AWS Access Key ID** and **AWS Secret Access Key**, respectively.


7. Run script **Capstone Project_ETL.py** on the EMR Cluster for start to reads data from S3, processes that data using Spark, and writes them back to S3

    ``` bash
    /home/workspace# which spark-submit
    /usr/bin/spark-submit
    /home/workspace# /usr/bin/spark-submit --packages saurfang:spark-sas7bdat:2.0.0-s_2.11 --master yarn "CapstoneProject_ETL.py"
    ```
    
8. After the script are suessful, you maybe need to terminate the AWS EMR Cluster. Please see AWS CLI for terminate the cluster as below:

    ``` bash
    aws emr terminate-clusters --cluster-ids <INPUT_CLUSTERID> --region us-west-2
    
    ```
    
> _INPUT_CLUSTERID_ : you can find the ClusterId from output message of create AWS EMR Cluster (Step 1)
    

#### 4.3 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:

_Run Quality Checks_

In [2]:
# Prepare config and create spark session
config = configparser.ConfigParser()
config.read('config.cfg')
config.sections()

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

spark2 = SparkSession \
    .builder \
    .appName("Udacity_DataEngineering-CapstoneProject_Phakphoom") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false") \
    .getOrCreate()

* Integrity constraints

In [3]:
immigration_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Immigration/*/*/*.parquet')
immigration_df.createOrReplaceTempView("Immigration")
df_immigration_check = spark2.sql("""
                    SELECT cicid
                    FROM Immigration WHERE cicid IS NULL 
                """)
df_immigration_check.limit(10).toPandas()

Unnamed: 0,cicid


In [4]:
demographic_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Demographic/*.parquet')
demographic_df.createOrReplaceTempView("Demographic")
df_demographic_check = spark2.sql("""
                    SELECT City 
                    FROM Demographic WHERE City IS NULL 
                """)
df_demographic_check.limit(10).toPandas()

Unnamed: 0,City


In [3]:
with active_session():
    pollution_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Pollution/*/*/*.parquet')
    #pollution_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Pollution/year=2016/*/*.parquet')
    pollution_df.createOrReplaceTempView("Pollution")
    df_pollution_check = spark2.sql("""
                        SELECT No, City, Date_Local 
                        FROM Pollution WHERE No IS NULL OR City IS NULL OR Date_Local IS NULL
                    """)
    display(df_pollution_check.limit(10).toPandas())

Unnamed: 0,No,City,Date_Local


In [4]:
worldtemp_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/WorldTemp/*.parquet')
worldtemp_df.createOrReplaceTempView("WorldTemp")
df_worldtemp_check = spark2.sql("""
                    SELECT Record_Date, City 
                    FROM WorldTemp WHERE Record_Date IS NULL OR City IS NULL 
                """)
df_worldtemp_check.limit(10).toPandas()

Unnamed: 0,Record_Date,City


In [5]:
airline_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Airline/*.parquet')
airline_df.createOrReplaceTempView("Airline")
df_airline_check = spark2.sql("""
                    SELECT Airline_ID, IATA 
                    FROM Airline WHERE Airline_ID IS NULL OR IATA IS NULL 
                """)
df_airline_check.limit(10).toPandas()

Unnamed: 0,Airline_ID,IATA


In [4]:
with active_session():
    time_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Time/*/*/*.parquet')
    time_df.createOrReplaceTempView("Time")
    df_time_check = spark2.sql("""
                        SELECT date_ref  
                        FROM Time WHERE date_ref IS NULL 
                    """)
    display(df_time_check.limit(10).toPandas())

Unnamed: 0,date_ref


* Data count records

The data in "Immigration" table have 40790529 record(s)

The data in "Demographic" table have 2891 record(s)

The data in "Pollution" table have 436876 record(s)

The data in "WorldTemp" table have 8235082 record(s)

The data in "Airline" table have 1536 record(s)

The data in "Time" table have 9112 record(s)

In [5]:
with active_session():
    print('\n***** Summary records each tables *****')
    immigration_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Immigration/*/*/*.parquet')
    #immigration_df.createOrReplaceTempView("Immigration")
    df_immigration_count = immigration_df.count()
    print('The data in "{}" table have {} record(s)'.format('Immigration', df_immigration_count))

    demographic_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Demographic/*.parquet')
    #demographic_df.createOrReplaceTempView("Demographic")
    df_demographic_count = demographic_df.count()
    print('The data in "{}" table have {} record(s)'.format('Demographic', df_demographic_count))

    pollution_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Pollution/*/*/*.parquet')
    #pollution_df.createOrReplaceTempView("Pollution")
    df_pollution_count = pollution_df.count()
    print('The data in "{}" table have {} record(s)'.format('Pollution', df_pollution_count))

    worldtemp_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/WorldTemp/*.parquet')
    #worldtemp_df.createOrReplaceTempView("WorldTemp")
    df_worldtemp_check = worldtemp_df.count()
    print('The data in "{}" table have {} record(s)'.format('WorldTemp', df_worldtemp_check))

    airline_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Airline/*.parquet')
    #airline_df.createOrReplaceTempView("Airline")
    df_airline_check = airline_df.count()
    print('The data in "{}" table have {} record(s)'.format('Airline', df_airline_check))

    time_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Time/*/*/*.parquet')
    #time_df.createOrReplaceTempView("Time")
    df_time_count = time_df.count()
    print('The data in "{}" table have {} record(s)'.format('Time', df_time_count))


***** Summary records each tables *****
The data in "Immigration" table have 40790529 record(s)
The data in "Demographic" table have 2891 record(s)
The data in "Pollution" table have 436876 record(s)
The data in "WorldTemp" table have 8235082 record(s)
The data in "Airline" table have 1536 record(s)
The data in "Time" table have 9112 record(s)


#### 4.4 Query Example for Analytics

In [7]:
with active_session():
    immigration_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Immigration/year=2016/month=1/*.parquet')
    immigration_df.createOrReplaceTempView("Immigration")

    demographic_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Demographic/*.parquet')
    demographic_df.createOrReplaceTempView("Demographic")

    pollution_df = spark2.read.parquet('s3a://capstoneproject-phakphoom/Capstone_Project/Datalake_Target/Pollution/year=2016/month=1/*.parquet')
    pollution_df.createOrReplaceTempView("Pollution")

    df_query1 = spark2.sql("""

        SELECT *, 
            cntarr - cntdep AS SUMM_IO
            /*,(total_population + cntarr) - cntdep AS Current_population*/
        FROM 
        (
            SELECT Demog.city, Demog.state, INT(Demog.total_population) AS total_population, 
                    Immig.date_arr, Immig.date_dep, 
                        CASE WHEN Immig.cntarr IS NULL THEN INT(0) ELSE INT(Immig.cntarr) END AS cntarr,
                        CASE WHEN Immig.cntdep IS NULL THEN INT(0) ELSE INT(Immig.cntdep) END AS cntdep,
                    Pollu.NO2_AQI, Pollu.O3_AQI, Pollu.SO2_AQI, Pollu.CO_AQI
            FROM
                (SELECT city, state, total_population FROM Demographic GROUP BY city, state, total_population) AS Demog
                JOIN
                (
                    SELECT date_arr, date_dep, immigration_arr.i94port_city, immigration_arr.i94port_statecode, cntarr, cntdep
                    FROM 
                        (SELECT arrdate_todate as date_arr, 
                            i94port, i94port_city, i94port_statecode,
                            COUNT(*) as cntarr
                        FROM Immigration 
                        /*WHERE arrdate_todate = '2016-01-01'*/
                        GROUP BY arrdate_todate, i94port, i94port_city, i94port_statecode) as immigration_arr
                    FULL OUTER JOIN 
                        (SELECT depdate_todate as date_dep, 
                            i94port, i94port_city, i94port_statecode,
                            COUNT(*) as cntdep
                        FROM Immigration 
                        /*WHERE depdate_todate = '2016-01-01' */
                        GROUP BY depdate_todate, i94port, i94port_city, i94port_statecode) as immigration_dep
                    ON date_arr = date_dep AND immigration_arr.i94port = immigration_dep.i94port 
                ) AS Immig
                ON Demog.city = Immig.i94port_city
                JOIN
                (
                    SELECT City, Date_Local, AVG(NO2_AQI) AS NO2_AQI, AVG(O3_AQI) AS O3_AQI, AVG(SO2_AQI) AS SO2_AQI, AVG(CO_AQI) AS CO_AQI 
                    FROM Pollution GROUP BY City, Date_Local
                ) as Pollu 
                ON Demog.city = Pollu.City AND Immig.i94port_city = Pollu.City AND Immig.date_arr = Pollu.Date_Local
        ) AS MAIN
        WHERE MAIN.city = 'TUCSON' 
        ORDER BY MAIN.date_arr 

        """)

    windowval = (Window.partitionBy('city').orderBy('date_arr')
                 .rangeBetween(Window.unboundedPreceding, 0))
    df_query1_cumsum = df_query1.withColumn('CUM_IO', F.sum('SUMM_IO').over(windowval))
    df_query1_cumsum_totalpopu = df_query1_cumsum.withColumn('Current_population', df_query1_cumsum.total_population + df_query1_cumsum.CUM_IO)

    
    pollution_window = Window.partitionBy("city").orderBy("date_arr")
    df_query1_cumsum_totalpopu_pollu = df_query1_cumsum_totalpopu.withColumn("prev_NO2_AQI", F.lag(df_query1_cumsum_totalpopu.NO2_AQI).over(pollution_window))
    df_query1_cumsum_totalpopu_pollu = df_query1_cumsum_totalpopu_pollu.withColumn("diff_NO2_AQI", F.when(F.isnull(df_query1_cumsum_totalpopu_pollu.NO2_AQI - df_query1_cumsum_totalpopu_pollu.prev_NO2_AQI), 0) \
                                                                    .otherwise(df_query1_cumsum_totalpopu_pollu.NO2_AQI - df_query1_cumsum_totalpopu_pollu.prev_NO2_AQI))
    df_query1_cumsum_totalpopu_pollu = df_query1_cumsum_totalpopu_pollu.withColumn("ROC_NO2_AQI", F.when(F.isnull(df_query1_cumsum_totalpopu_pollu.diff_NO2_AQI / df_query1_cumsum_totalpopu_pollu.prev_NO2_AQI * 100), 0) \
                                                                    .otherwise(df_query1_cumsum_totalpopu_pollu.diff_NO2_AQI / df_query1_cumsum_totalpopu_pollu.prev_NO2_AQI * 100)) 

    
    display(df_query1_cumsum_totalpopu_pollu.toPandas())

Unnamed: 0,city,state,total_population,date_arr,date_dep,cntarr,cntdep,NO2_AQI,O3_AQI,SO2_AQI,CO_AQI,SUMM_IO,CUM_IO,Current_population,prev_NO2_AQI,diff_NO2_AQI,ROC_NO2_AQI
0,TUCSON,ARIZONA,531674,2016-01-01,,63,0,29.0,27.0,0.0,7.0,63,63,531737,,0.0,0.0
1,TUCSON,ARIZONA,531674,2016-01-02,,84,0,20.0,28.0,0.0,6.0,84,147,531821,29.0,-9.0,-31.034483
2,TUCSON,ARIZONA,531674,2016-01-03,2016-01-03,116,3,16.0,28.0,0.0,5.0,113,260,531934,20.0,-4.0,-20.0
3,TUCSON,ARIZONA,531674,2016-01-04,,107,0,13.0,22.0,0.0,5.0,107,367,532041,16.0,-3.0,-18.75
4,TUCSON,ARIZONA,531674,2016-01-05,2016-01-05,80,5,21.0,30.0,0.0,5.0,75,442,532116,13.0,8.0,61.538462
5,TUCSON,ARIZONA,531674,2016-01-06,2016-01-06,91,2,25.0,36.0,0.0,5.0,89,531,532205,21.0,4.0,19.047619
6,TUCSON,ARIZONA,531674,2016-01-07,2016-01-07,91,3,29.0,30.0,0.0,5.0,88,619,532293,25.0,4.0,16.0
7,TUCSON,ARIZONA,531674,2016-01-08,2016-01-08,78,11,29.0,31.0,0.0,6.0,67,686,532360,29.0,0.0,0.0
8,TUCSON,ARIZONA,531674,2016-01-09,2016-01-09,92,16,21.0,31.0,0.0,7.0,76,762,532436,29.0,-8.0,-27.586207
9,TUCSON,ARIZONA,531674,2016-01-10,2016-01-10,93,15,18.0,26.0,0.0,6.0,78,840,532514,21.0,-3.0,-14.285714


#### 4.5 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Immigration table**
	
* cicid: double (nullable = true) : unique number for the immigrants
* i94yr: double (nullable = true) : 4 digit year
* i94mon: double (nullable = true) : Numeric month
* i94cit: string (nullable = true) : Citizenship
* i94res: string (nullable = true) : Residence
* i94port: string (nullable = true) : City, State code
* i94port_city: string (nullable = true) : City
* i94port_statecode: string (nullable = true) : State code
* arrdate_todate: string (nullable = true) : Arrival Date in the U.S.
* i94mode: string (nullable = true) : Travel by Land, Air , and Sea
* i94addr: string (nullable = true) : is where the immigrants resides in U.S.
* depdate_todate: string (nullable = true) : Departure Date from the U.S.
* i94bir: double (nullable = true) : Age of Respondent in Years
* i94visa: string (nullable = true) : Visa categories (Business, Pleasure, and Student)
* biryear: double (nullable = true) : 4 digit year of birth
* gender: string (nullable = true) : Non-immigrant sex
* airline: string (nullable = true) : Airline used to arrive in U.S.
* fltno: string (nullable = true) : Flight number of Airline used to arrive in U.S.
 
**WorldTemp table**

* Record_Date: string (nullable = true) : Date of records
* AverageTemperature: string (nullable = true) : Average Temperature
* AverageTemperatureUncertainty: string (nullable = true) : Average Temperature Uncertainty
* City: string (nullable = true) : City
* Country: string (nullable = true) : Country

**Demographic table**

* City: string (nullable = true) : City
* State: string (nullable = true) : State
* Median_Age: string (nullable = true) : Median Age of Population
* Male_Population: string (nullable = true) : Male Population
* Female_Population: string (nullable = true) : Female Population
* Total_Population: string (nullable = true) : Total Population
* NumberofVeterans: string (nullable = true) : Number of Veterans
* Foreign_born: string (nullable = true) : Foreign-born
* Average_Household_Size: string (nullable = true) : Average Household Size
* State_Code: string (nullable = true) : State code
* Race: string (nullable = true) : Race
* Count: string (nullable = true) : Count by Race

**Pollution table**

* No: string (nullable = true) : Number of records
* State_Code: string (nullable = true) : State Code
* County_Code: string (nullable = true) : County Code
* Site_Num: string (nullable = true) : Site Num
* Address: string (nullable = true) : Address
* State: string (nullable = true) : State
* County: string (nullable = true) : County
* City: string (nullable = true) : City
* Date_Local: string (nullable = true) : Date of records
* NO2_Units: string (nullable = true) : Units of NO2
* NO2_Mean: double (nullable = true) : Mean of NO2
* NO2_1st_Max_Value: double (nullable = true) = 1st Max Value of NO2
* NO2_1st_Max_Hour: double (nullable = true) = 1st Max Hour of NO2
* NO2_AQI: double (nullable = true) = AQI of NO2
* O3_Units: string (nullable = true) : Units of O3
* O3_Mean: double (nullable = true) : Mean of O3
* O3_1st_Max_Value: double (nullable = true) = 1st Max Value of O3
* O3_1st_Max_Hour: double (nullable = true) = 1st Max Hour of O3
* O3_AQI: double (nullable = true) = AQI of O3
* SO2_Units: string (nullable = true) : Units of SO2
* SO2_Mean: double (nullable = true) : Mean of SO2
* SO2_1st_Max_Value: double (nullable = true) = 1st Max Value of SO2
* SO2_1st_Max_Hour: double (nullable = true) = 1st Max Hour of SO2
* SO2_AQI: double (nullable = true) = AQI of 
* CO_Units: string (nullable = true) : Units of CO
* CO_Mean: double (nullable = true) :  : Mean of CO
* CO_1st_Max_Value: double (nullable = true) = 1st Max Value of CO
* CO_1st_Max_Hour: double (nullable = true) = 1st Max Hour of CO
* CO_AQI: double (nullable = true) = AQI of CO

**Airline table**

* Airline_ID: string (nullable = true) : Unique OpenFlights identifier for this airline
* Name: string (nullable = true) : Name of the airline.
* Alias: string (nullable = true) : Alias of the airline. For example, All Nippon Airways is commonly known as "ANA"
* IATA: string (nullable = true) : 2-letter IATA code, if available
* ICAO: string (nullable = true) : 3-letter ICAO code, if available
* Callsign: string (nullable = true) : Airline callsign
* Country: string (nullable = true) : Country or territory where airport is located
* Active: string (nullable = true) : "Y" if the airline is or has until recently been operational, "N" if it is defunct. This field is not reliable: in particular, 
             major airlines that stopped flying long ago, but have not had their IATA code reassigned (eg. Ansett/AN), will incorrectly show as "Y".

**Time table**

* date_ref: string (nullable = true) : Date are referenced in Data Model
* day: integer (nullable = true) : Day of month
* week: integer (nullable = true) : Week of year
* month: integer (nullable = true) : Month of year
* year: integer (nullable = true) : 4 digit year
* weekday: integer (nullable = true) : Day of week

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

> In this project, I used Python language, Apache Spark, and Data Lakes technology running on AWS with S3 and EMR.
The main reason is that Data Lakes support Unstructured data which in this project uses 2 types of data files: sas7bdat and CSV.
Next, it is possible to Data Analysis without inserting into a pre-defined schema/table, also known as "Schema-On-Read".
It's can reduce the time to analyze the data considerably. This can be further applied for predictive analytics or machine learning.


* Propose how often the data should be updated and why.

> Our Data Models should use at least Daily or Monthly data to update the data. Since the analysis uses Historical Data, the more data the model is, the more efficient the model is, and predictive analytics or machine learning will produce more accurate results.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 > For the Infrastructure side, it needs to consider the scale up and scale out of the AWS EMR cluster to support more data/sources.
On the Data side, there must be steps to do Data Quality & Cleansing in order to get the most efficient data.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 > First, add/edit processes to work effectively with Daily Data, such as correct ETL Process, setting Job Schedules, Data Quality, and process of verifying data with Daily Data received and data after processing to ensure that it is correct. and reliable.
 
 * The database needed to be accessed by 100+ people.
 
 > Data Lakes may not be suitable for mass user access, which requires adding a Pipeline to Load processed data to Relational Table. Therefore, consider using an RDBMS such as AWS Redshift that has the ability to support heavy concurrent usage, suitable for both OLTP and OLAB Workload.
It also supports Scaling to be able to support a large number of users.
 
 

#### Reference:

I have been searching for some ideas or other help to find solutions in my code that are shown as below:
    
1. Pandas read csv() Tutorial: Importing Data ... [Link](https://knowledge.udacity.com/questions/368715)
2. How to add header row to a pandas DataFrame ... [Link](https://stackoverflow.com/questions/34091877/how-to-add-header-row-to-a-pandas-dataframe)
3. Pandas – Read only the first n rows of a CSV file ... [Link](https://knowledge.udacity.com/questions/368715)
4. Find Minimum, Maximum, and Average Value of PySpark Dataframe column ... [Link](https://www.geeksforgeeks.org/find-minimum-maximum-and-average-value-of-pyspark-dataframe-column/)
5. MAXIMUM OR MINIMUM VALUE OF COLUMN IN PYSPARK ... [Link](https://www.datasciencemadesimple.com/maximum-or-minimum-value-of-column-in-pyspark/)
6. Best way to get the max value in a Spark dataframe column ... [Link](https://stackoverflow.com/questions/33224740/best-way-to-get-the-max-value-in-a-spark-dataframe-column)
7. PySpark Read CSV file into DataFrame ... [Link](https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/#header)
8. Spark 2.4 CSV Load Issue with option "nullvalue" ... [Link](https://stackoverflow.com/questions/56752408/spark-2-4-csv-load-issue-with-option-nullvalue)
9. How to Replace a String in Spark DataFrame | Spark Scenario Based Question ... [Link](https://www.learntospark.com/2021/03/replace-a-string-in-spark.html)
10. How to Replace Null Values in Spark DataFrames ... [Link](https://towardsdatascience.com/how-to-replace-null-values-in-spark-dataframes-ab183945b57d)
11. Navigating None and null in PySpark ... [Link](https://mungingdata.com/pyspark/none-null/)
12. Create Spark SQL isdate Function – Date Validation ... [Link](https://dwgeek.com/create-spark-sql-isdate-function-date-validation.html/)
13. How to Write Spark UDFs (User Defined Functions) in Python ... [Link](https://www.bmc.com/blogs/how-to-write-spark-udf-python/)
14. The Ultimate Guide to Data Cleaning ... [Link](https://towardsdatascience.com/the-ultimate-guide-to-data-cleaning-3969843991d4)
15. Python: The Boolean confusion ... [Link](https://towardsdatascience.com/python-the-boolean-confusion-f7fc5288f0ce)
16. Column definitions of immigration data ... [Link](https://knowledge.udacity.com/questions/185453)
17. How i94yr and i94mon columns are related to depdate and arrdate columns in I94 immigration ?. ... [Link](https://knowledge.udacity.com/questions/297018)
18. Matching i94port (I94 Immigration Dataset) to local_code (Airport Codes Dataset) ... [Link](https://knowledge.udacity.com/questions/613179)
19. SAS date format conversion in spark ... [Link](https://knowledge.udacity.com/questions/741863)
20. Trying to convert a SAS time format to Pandas datetime ... [Link](https://knowledge.udacity.com/questions/783322)
21. How to convert SAS numeric date to Date data type in SQL? ... [Link](https://knowledge.udacity.com/questions/381099)
22. what is cicid in immigration file ... [Link](https://knowledge.udacity.com/questions/709930)
23. Column definitions of immigration data ... [Link](https://knowledge.udacity.com/questions/185453)
24. I am confused by what the i94addr represents, could you please explain how it should be used ... [Link](https://knowledge.udacity.com/questions/418299)
25. Not clear what they are asking with \"Integrity constraints on the relational database (e.g., unique key, data type, etc.)\" ... [Link](https://knowledge.udacity.com/questions/767640)
26. Failed to find data source: com.github.saurfang.sas.spark error ... [Link](https://knowledge.udacity.com/questions/224269)
27. Can not read metadata from sas7bdat file ... [Link](https://knowledge.udacity.com/questions/386959)
28. ModuleNotFoundError: No module named 'pandas' ... [Link](http://net-informations.com/ds/err/pderr.htm)
29. Reading in SAS files from S3 ... [Link](https://knowledge.udacity.com/questions/197139)
30. How to keep SSH connections alive ... [Link](https://www.a2hosting.com/kb/getting-started-guide/accessing-your-account/keeping-ssh-connections-alive)
31. I am trying load data into S3 for Capstone project but I get error - Exception: Java gateway process exited before sending its port number ... [Link](https://knowledge.udacity.com/questions/703078)
32. Read a sas7bdat as Spark Dataframe ... [Link](https://knowledge.udacity.com/questions/72864)
33. Reading and appending files into a spark dataframe ... [Link](https://stackoverflow.com/questions/57824016/reading-and-appending-files-into-a-spark-dataframe)
34. Python Pandas Read multiple SAS files from a list into separate dataframes ... [Link](https://stackoverflow.com/questions/52894787/python-pandas-read-multiple-sas-files-from-a-list-into-separate-dataframes)
35. Read all the SAS files in the folder to a single dataframe in a single load. ... [Link](https://knowledge.udacity.com/questions/280093)
36. Read Immigration Dataset in SAS format AND Write as parquet format on S3. ... [Link](https://github.com/MaulikDave9/DEND-Immigration-2016/blob/f3f00480a0d438878280a03007102341b75f5cf7/Miscellaneous/Test_SparkRead_I94SAS.ipynb)
37. How to read multiple .sas7bdat files into data frame ?.. ... [Link](https://knowledge.udacity.com/questions/292327)
38. Reading in SAS files from S3 ... [Link](https://knowledge.udacity.com/questions/197139)
39. Spark SQL Join Types with examples ... [Link](https://sparkbyexamples.com/spark/spark-sql-dataframe-join/)
40. Spark vs NoSQL:Is (Hadoop or S3(for storage)+Spark(for querying)) combination more powerful than NoSQL like Cassandra as we can join tables in spark ? ... [Link](https://knowledge.udacity.com/questions/708563)
41. How to calculate cumulative sum using sqlContext ... [Link](https://stackoverflow.com/questions/34726268/how-to-calculate-cumulative-sum-using-sqlcontext)
42. How to get cumulative sum ... [Link](https://stackoverflow.com/questions/2120544/how-to-get-cumulative-sum)
43. EMR notebook fails because JupyterEnterpriseGateway application not installed on existing cluster AWS [closed] ... [Link](https://stackoverflow.com/questions/66729542/emr-notebook-fails-because-jupyterenterprisegateway-application-not-installed-on)
44. create-cluster ... [Link](https://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html)
45. PySpark and SparkSQL Basics ... [Link](https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53)
46. PySpark Window Functions ... [Link](https://sparkbyexamples.com/pyspark/pyspark-window-functions/#aggregate-functions)
47. SQL PARTITION BY Clause overview ... [Link](https://www.sqlshack.com/sql-partition-by-clause-overview/)
48. Unable to use PARTITION BY with COUNT(*) when multiple columns are used ... [Link](https://stackoverflow.com/questions/35026644/unable-to-use-partition-by-with-count-when-multiple-columns-are-used)
49. Demographics dataset 'count' column ... [Link](https://knowledge.udacity.com/questions/319686)
50. Splitting a string in SparkSQL ... [Link](https://stackoverflow.com/questions/44688168/splitting-a-string-in-sparksql)
51. Spark split() function to convert string to Array column ... [Link](https://sparkbyexamples.com/spark/convert-delimiter-separated-string-to-array-column-in-spark/#:~:text=Spark%20SQL%20provides%20split(),e.t.c%2C%20and%20converting%20into%20ArrayType.)
52. Spark SQL String Functions Explained ... [Link](https://sparkbyexamples.com/spark/usage-of-spark-sql-string-functions/)
53. Spark Docs » Functions ... [Link](https://spark.apache.org/docs/2.3.0/api/sql/index.html)
54. PySpark Convert String Type to Double Type ... [Link](https://sparkbyexamples.com/pyspark/pyspark-convert-string-type-to-double-type-float-type/)
55. PySpark Drop Rows with NULL or None Values ... [Link](https://sparkbyexamples.com/pyspark/pyspark-drop-rows-with-null-values/)
56. PySpark Groupby Explained with Example ... [Link](https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/)
57. SQL - AVG Function ... [Link](https://www.tutorialspoint.com/sql/sql-avg-function.htm#:~:text=SQL%20AVG%20function%20is%20used,a%20field%20in%20various%20records.&text=You%20can%20take%20average%20of,typed%20pages%20by%20every%20person.)
58. Spark Cast String Type to Integer Type (int) ... [Link](https://sparkbyexamples.com/spark/spark-cast-string-type-to-integer-type-int/#:~:text=In%20Spark%20SQL%2C%20in%20order,selectExpr()%20and%20SQL%20expression.)
59. PySpark to_date() – Convert String to Date Format ... [Link](https://sparkbyexamples.com/pyspark/pyspark-to_date-convert-string-to-date-format/)
60. Python Spark Cumulative Sum by Group Using DataFrame ... [Link](https://stackoverflow.com/questions/45946349/python-spark-cumulative-sum-by-group-using-dataframe)
61. Check Type: How to check if something is a RDD or a DataFrame? ... [Link](https://stackoverflow.com/questions/36731365/check-type-how-to-check-if-something-is-a-rdd-or-a-dataframe)
62. AttributeError: ‘DataFrame’ object has no attribute ‘map’ in PySpark ... [Link](https://sparkbyexamples.com/pyspark/attributeerror-dataframe-object-has-no-attribute-map-in-pyspark/)
63. ValueError: Cannot convert column into bool ... [Link](https://stackoverflow.com/questions/48282321/valueerror-cannot-convert-column-into-bool)
64. understanding level =0 and group_keys ... [Link](https://stackoverflow.com/questions/49859182/understanding-level-0-and-group-keys)
65. How to calculate top 5 max values in Pyspark ... [Link](https://www.learneasysteps.com/how-to-calculate-top-5-max-values-in-pyspark/)
66. Comprehensive Guide to Grouping and Aggregating with Pandas ... [Link](https://pbpython.com/groupby-agg.html)
67. Pandas .groupby(), Lambda Functions, & Pivot Tables ... [Link](https://mode.com/python-tutorial/pandas-groupby-and-python-lambda-functions/)
68. pandas.DataFrame.groupby ... [Link](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.groupby.html)
69. Pandas groupby() Syntax ... [Link](https://sparkbyexamples.com/pandas/pandas-groupby-explained-with-examples/)
70. Indexing and selecting data ... [Link](https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html)
71. Time series / date functionality ... [Link](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases)
72. Pyspark Column Transformation: Calculate Percentage Change for Each Group in a Column ... [Link](https://stackoverflow.com/questions/57470314/pyspark-column-transformation-calculate-percentage-change-for-each-group-in-a-c)
73. Rate of Change (ROC) ... [Link](https://www.investopedia.com/terms/r/rateofchange.asp#:~:text=Rate%20of%20change%20is%20used,value%20from%20an%20earlier%20period.)
74. 8 Examples of Data Lake Architectures on Amazon S3 ... [Link](https://www.upsolver.com/blog/examples-of-data-lake-architecture-on-amazon-s3)
75. Immigration Dataset Dictionary ... [Link](https://knowledge.udacity.com/questions/529723)
76. Data dictionary given in workspace, do we need to create reference table for some field ... [Link](https://knowledge.udacity.com/questions/230175)
77. What is Data Dictionary ... [Link](https://www.tutorialspoint.com/What-is-Data-Dictionary)
78. data dictionary ... [Link](https://knowledge.udacity.com/questions/196779)
79. error output for long running process for Data Lake project ... [Link](https://knowledge.udacity.com/questions/131502)
80. udacity/workspaces-student-support ... [Link](https://github.com/udacity/workspaces-student-support/tree/master/jupyter)
81. Display the Pandas DataFrame in table style ... [Link](https://www.geeksforgeeks.org/display-the-pandas-dataframe-in-table-style/)
82. Climate Change: Earth Surface Temperature Data ... [Link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalLandTemperaturesByCity.csv)
83. Country code and names ... [Link](https://www.kaggle.com/leogenzano/country-code-and-names)
84. U.S. Pollution Data ... [Link](https://www.kaggle.com/sogun3/uspollution)
85. Animation, Basemap, Plotly for Air Quality Index ... [Link](https://www.kaggle.com/jaeyoonpark/animation-basemap-plotly-for-air-quality-index)
86. Air Data Basic Information ... [Link](https://www.epa.gov/outdoor-air-quality-data/air-data-basic-information)
87. Air Quality Index (AQI) ... [Link](https://www.epa.gov/sites/default/files/2014-05/documents/zell-aqi.pdf)
88. U.S. Electric Power Generators ... [Link](https://www.kaggle.com/saurabhshahane/us-electric-power-generators)
89. Form EIA-860 detailed data with previous form data (EIA-860A/860B) ... [Link](https://www.eia.gov/electricity/data/eia860/)
90. Airline Database ... [Link](https://www.kaggle.com/open-flights/airline-database)
91. Airport, airline and route data ... [Link](https://openflights.org/data.html#airline)
92. Aviation-Data-Download ... [Link](https://www.aviationfile.com/aviation-data-download/)
93. ICAO airport code ... [Link](https://en.wikipedia.org/wiki/ICAO_airport_code)
94. Airline codes ... [Link](https://en.wikipedia.org/wiki/Airline_codes#:~:text=The%20ICAO%20airline%20designator%20is,designator%20and%20a%20telephony%20designator.)
95. List of airline codes ... [Link](https://en.wikipedia.org/wiki/List_of_airline_codes)
96. ISO 3166-1 alpha-2 ... [Link](https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2)
97. List of ISO 3166 country codes ... [Link](https://en.wikipedia.org/wiki/List_of_ISO_3166_country_codes#UNI4)
98. Spark SAS Data Source (sas7bdat) ... [Link](https://github.com/saurfang/spark-sas7bdat)
99. US Cities: Demographics ... [Link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/?dataChart=eyJxdWVyaWVzIjpbeyJjb25maWciOnsiZGF0YXNldCI6InVzLWNpdGllcy1kZW1vZ3JhcGhpY3MiLCJvcHRpb25zIjp7fX0sImNoYXJ0cyI6W3siYWxpZ25Nb250aCI6dHJ1ZSwidHlwZSI6ImNvbHVtbiIsImZ1bmMiOiJBVkciLCJ5QXhpcyI6Im1lZGlhbl9hZ2UiLCJzY2llbnRpZmljRGlzcGxheSI6dHJ1ZSwiY29sb3IiOiIjRkY1MTVBIn1dLCJ4QXhpcyI6ImNpdHkiLCJtYXhwb2ludHMiOjUwLCJzb3J0IjoiIn1dLCJ0aW1lc2NhbGUiOiIiLCJkaXNwbGF5TGVnZW5kIjp0cnVlLCJhbGlnbk1vbnRoIjp0cnVlfQ%3D%3D)
100. Scale up vs Scale out: What’s the difference? ... [Link](https://opsani.com/blog/scale-up-vs-scale-out-whats-the-difference/#:~:text=Scaling%20out%20is%20adding%20more,to%20handle%20a%20greater%20load.)
