# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project uses the Immigration Data and Temperature Data to generate a dataset that will be used by the analyst to answer the following questions:
- What are the correlations between destination in the U.S and source climates? 
- Compare visitors' volume based on arrival time.


__The project follows the following steps:__
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np 
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
import pyspark.sql.functions as F
import datetime
import os
import configparser
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import isnan, when, count
from pyspark.sql.functions import to_date

### Step 1: Scope the Project and Gather Data

#### Scope  
This project work with immigration and temperature data. We will process the ETL using Spark and store the data in a S3 bucket. 


#### Data Source
the project uses the Immigration Data `18-83510-I94-Data-2016/` and Temperature Data `GlobalLandTemperaturesByCity.csv`. Also, we will use `I94_SAS_Labels_Descriptions.SAS` file which describes the immigration data.
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. 
- World Temperature Data: This dataset came from Kaggle.

#### Result tables:
- __The Fact Table:__ 
    - immigration table
- __Dimension Tables:__
    - date table
    - country table
    - state table
    - temperature table

#### Data Dictionary: 

Describe the __selected__ fields in Immigration dataset.

| __Column Name__ | __Description__ |
| :--- | :--- |
| CICID* | ID that uniquely identify one record in the dataset |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | 3 digit code of source city for immigration (Born country) |
| I94RES | 3 digit code of source country for immigration (Residence country) |
| ARRDATE | Arrival date in the USA |
| I94ADDR | State of arrival |
| DEPDATE | Departure date |
| DTADFILE | Character Date Field |
| VISAPOST | Department of State where where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. 
| BIRYEAR | 4 digit year of birth |
| GENDER | Gender |
| AIRLINE | Airline used to arrive in U.S. |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

Describe the fields in temerature dataset.

| Column Name | Description |
| :--- | :--- |
| month | the month |
| AverageTemperature | Average temperature of the city in a given month |
| Country | Country Name |
| Latitude | Latitude |
| Longitude | Longitude |

Describe the fields in date dataset.

| Column Name | Description |
| :--- | :--- |
| date | Date in format YYYY-MM-DD as a primary key|
| day | 2 digits day |
| week | 2 digits week |
| month | 2 digits month |
| year | 4 digits year|
| weekday | the day of the week |

Describe the fields in country dataset.

| Column Name | Description |
| :--- | :--- |
| code | country code as provided in `I94_SAS_Labels_Descriptions.SAS` file|
| country | country name|

Describe the fields in country dataset.

| Column Name | Description |
| :--- | :--- |
| code | country code as provided in `I94_SAS_Labels_Descriptions.SAS` file|
| state | state name|

### Step 2: Explore and Assess the Data
#### Explore and clean the Data 


In [2]:
config = configparser.ConfigParser()
config.read('helpers/dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder.config("spark.jars.packages",
                                        "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
    .enableHiveSupport().getOrCreate()

#### Immigration table

In [4]:
#read immigration file
immigration_file = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(immigration_file)


In [5]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
#the selected columns
columns = ['cicid','i94yr','i94mon', 'i94cit', 'i94res', 'arrdate', 'i94addr', 
      'depdate', 'dtadfile', 'visapost', 'occup', 'biryear', 'gender',
      'airline', 'fltno', 'visatype']
immigration_df = immigration_df[columns]

# drop rows where all elements are missing
immigration_df = immigration_df.dropna(how='all')

In [7]:
#number of nulls in each column
immigration_df.toPandas().isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
arrdate           0
i94addr      152592
depdate      142457
dtadfile          1
visapost    1881250
occup       3088187
biryear         802
gender       414269
airline       83627
fltno         19549
visatype          0
dtype: int64

In [8]:
#number of nulls in each column
immigration_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in immigration_df.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+--------+--------+-------+-------+------+-------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|arrdate|i94addr|depdate|dtadfile|visapost|  occup|biryear|gender|airline|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+--------+--------+-------+-------+------+-------+-----+--------+
|    0|    0|     0|     0|     0|      0| 152592| 142457|       1| 1881250|3088187|    802|414269|  83627|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+--------+--------+-------+-------+------+-------+-----+--------+



In [9]:
print((immigration_df.count(), len(immigration_df.columns)))

(3096313, 16)


> since the `visapost` and `occup` columns have a lot of nulls we will drop them 

In [10]:
#We repeated the below step to remove visapost and occup columns from the selected columns
#the selected columns
columns = ['cicid','i94yr','i94mon', 'i94cit', 'i94res', 'arrdate', 'i94addr', 
      'depdate', 'dtadfile', 'biryear', 'gender',
      'airline', 'fltno', 'visatype']
immigration_df = immigration_df[columns]

# drop rows where all elements are missing
immigration_df = immigration_df.dropna(how='all')

In [11]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [12]:
spark.udf.register("date", lambda x: str(pd.to_timedelta(x , unit='D') + pd.Timestamp('1960-1-1')))

<function __main__.<lambda>(x)>

In [13]:
immigration_df.createOrReplaceTempView("immigration_df")

In [14]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [15]:
immigration = spark.sql('''
                  SELECT DISTINCT 
                  int(cicid),
                  i94yr,
                  i94mon, 
                  int(i94cit), 
                  int(i94res), 
                  date(arrdate) AS arrdate,
                  i94addr,
                  depdate,
                  dtadfile, 
                  biryear, 
                  gender,
                  airline, 
                  fltno, 
                  visatype 
                  FROM immigration_df
                  ORDER BY i94yr, i94mon
          '''
          )

In [16]:
immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94addr,depdate,dtadfile,biryear,gender,airline,fltno,visatype
0,74,2016.0,4.0,103,103,2016-04-01 00:00:00,GA,20554.0,20160401,1950.0,,LH,444,WT
1,469,2016.0,4.0,103,103,2016-04-01 00:00:00,TN,20573.0,20160401,1961.0,,OS,97,WT
2,503,2016.0,4.0,103,103,2016-04-01 00:00:00,IL,20548.0,20160401,1970.0,,OS,65,WB
3,993,2016.0,4.0,104,104,2016-04-01 00:00:00,NY,20550.0,20160401,1958.0,F,UA,56,WT
4,1111,2016.0,4.0,104,104,2016-04-01 00:00:00,NY,20554.0,20160401,1961.0,F,UA,2067,WT


In [17]:
immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [18]:
#save the result in the output folder
immigration.write.partitionBy("i94yr", "i94mon").parquet('data/output/immigration/', mode = "overwrite")

#### Temperatures table

> Since the date in the temperature table does not match the date in the immigration table. We will take the average per month for each country. Also, We do not need the city column because it is not given in the immigration table.
 
 
 >As a result, we will have the month, country, average temperature, latitude,  and longitude columns only

In [5]:
#read immigration file
temperature_file = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").options(header='true').load(temperature_file)

In [6]:
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [7]:
temperature_df = temperature_df.withColumn('dt',to_timestamp(temperature_df.dt))

In [8]:
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [9]:
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [10]:
temperature_df.createOrReplaceTempView("temperature_df")

In [14]:
temperature = spark.sql("""
    SELECT DISTINCT 
    Country,
    month(dt) AS Month,
    AVG(AverageTemperature) as AverageTemperature, 
    FIRST(Latitude) AS Latitude, 
    fIRST(Longitude) AS Longitude
    FROM temperature_df
    GROUP BY Month, Country
    ORDER BY Country, Month
""")

In [15]:
temperature.limit(5).toPandas()

Unnamed: 0,Country,Month,AverageTemperature,Latitude,Longitude
0,Afghanistan,1,0.607802,36.17N,69.61E
1,Afghanistan,2,2.842286,36.17N,69.61E
2,Afghanistan,3,8.142778,36.17N,69.61E
3,Afghanistan,4,14.090881,36.17N,69.61E
4,Afghanistan,5,19.8848,36.17N,69.61E


In [17]:
#save the result in the output folder as parquet
temperature.write.parquet('data/output/temperature', mode = "overwrite")

In [27]:
#save the result in the output folder as CSV
temperature.coalesce(1).write.format('com.databricks.spark.csv').save('data/output/temperature',header = 'true', mode = "overwrite")

#### Date tabe
Extract the date from the arrival column in the immigration table

In [19]:
date = spark.sql('''
                  SELECT DISTINCT 
                  date,
                  dayofmonth(date) AS day,
                  weekofyear(date) AS week,
                  month(date) AS month,
                  year(date) AS year,
                  dayofweek(date) AS weekday
                  FROM (SELECT date(arrdate) AS date FROM immigration_df)
          '''
          )

In [20]:
date.limit(5).toPandas()

Unnamed: 0,date,day,week,month,year,weekday
0,2016-04-08 00:00:00,8,14,4,2016,6
1,2016-04-22 00:00:00,22,16,4,2016,6
2,2016-04-24 00:00:00,24,16,4,2016,1
3,2016-04-29 00:00:00,29,17,4,2016,6
4,2016-04-10 00:00:00,10,14,4,2016,1


In [21]:
#save the result in the output folder
date.coalesce(1).write.format('com.databricks.spark.csv').save('data/output/date',header = 'true', mode = "overwrite")

#### Country tabe
No editing needed. We are only going to copy the country table to the output directory

In [22]:
#read country file
country_file = "data/countries.csv"
country = spark.read.format("csv").options(header='true').load(country_file)

In [23]:
country.limit(5).toPandas()

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [24]:
#save the result in the output folder
country.coalesce(1).write.format('com.databricks.spark.csv').save('data/output/country',header = 'true', mode = "overwrite")

#### state tabe
No editing needed. We are only going to copy the state table to the output directory

In [25]:
#read country file
state_file = "data/states.csv"
state = spark.read.format("csv").options(header='true').load(state_file)

In [26]:
state.limit(5).toPandas()

Unnamed: 0,code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [27]:
#save the result in the output folder
state.coalesce(1).write.format('com.databricks.spark.csv').save('data/output/state',header = 'true', mode = "overwrite")

In [35]:
spark.stop()

-------------------------------------------------------------------------------------------------------------------------

-------------------------------------------------------------------------------------------------------------------------

-------------------------------------------------------------------------------------------------------------------------


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
<img src="images/ER_diagram.png" alt="ER_diagram"/>

The model used the immigration dataset as a star-table. For simplicity, we will work on Apr data only. However, later we can run the code on all the datasets. We only select the data that is related to the analyst purpose. Using the arrival date, the date table created. 


The country and state tables, there are created from the data dictionary.


The temperature table grouped by month and country. Since the date in the temperature table does not match the date in the immigration table. We will take the average per month for each country. Also, We do not need the city column because it is not given in the immigration table. As a result, we will have the month, country, average temperature, latitude,  and longitude columns only.

The country table will be the bridge between the immigration and temperature table, which will allow the analyst to identify the correlation between the arrival and country of residency.  

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are:
- Load the datasets
- Clean the data using spark and save the result into S3 bucket
- quality test for the data




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

The code in this notebook saves the result locally. However, the final code is store in `etl.py`. By running this file, the result will be saved in S3 bucket.

`etl.py` file include the following functions: 
- create_spark_session()
    - return: spark session
-  process_immigration_data(spark, input_data, output_data)
    - this function  read the original immigration data and write the final immigration and date data into the s3bucket
- process_temperature_data(spark, input_data, output_data) 
    - this function  read the original temperature data and write the final temerature data into the s3bucket
- process_country_data(spark, input_data, output_data)
    - this function  copy the country table into the s3bucket
- process_state_data(spark, input_data, output_data)
    - this function  copy the state table into the s3bucket

In [1]:
#before runing this cell run (aws configure) in terminal
# Write code here
#!python etl.py

#### 4.2 Data Quality Checks
For data quality we will check two things:
 * the data is not dublicated based on selected columns
 * If the records is greated than 0

In [12]:
#the quality check function
def quality_check(path, table, columns):
    df_spark=spark.read.parquet(path)
    if int(df_spark.count()) < 1:
        print(f"Data quality check for {table} failed. {table} returned no results")
    else:
        print(f"Data quality check for {table} Success. {table} has {int(df_spark.count())} records")
        
    if df_spark.count() > df_spark.dropDuplicates(columns).count():
        print(f'{table} has duplicates rows')
    else: 
        print(f'{table} has no duplicate rows')


In [13]:
# immigration data check
path = "s3a://wejdan-dend/output/immigration"
table = "immigration"
columns = ['cicid']
quality_check(path, table, columns)

Data quality check for immigration Success. immigration has 3096313 records
immigration has no duplicate rows


In [14]:
# date data check
path = "s3a://wejdan-dend/output/date"
table = "date"
columns = ['date']
quality_check(path, table, columns)

Data quality check for date Success. date has 30 records
date has no duplicate rows


In [17]:
# temperature data check
path = "s3a://wejdan-dend/output/temperature"
table = "temperature"
columns = ['Country', 'Month']
quality_check(path, table, columns)

Data quality check for temperature Success. temperature has 1908 records
temperature has no duplicate rows


In [18]:
# Contry data check
path = "s3a://wejdan-dend/output/country"
table = "country"
columns = ['code']
quality_check(path, table, columns)

Data quality check for country Success. country has 289 records
country has no duplicate rows


In [19]:
# state data check
path = "s3a://wejdan-dend/output/state"
table = "state"
columns = ['code']
quality_check(path, table, columns)

Data quality check for state Success. state has 55 records
state has no duplicate rows


#### 4.3 Data dictionary 
Please check the data dictionary section under the 

#### Step 5: Complete Project Write Up

I used spark because it will be easy to move the code into the EMR cluster later. The data do not need to be updated frequently. This analysis is done based on request. Therefore, we did not use airflow. 

 A description of how we would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * we will run the code in EMR cluster. 
 * The data populates a dashboard that must be updated on a daily basis by 7 am every day.
     * We will use the airflow to schedule and run the pipeline. By creating a dag that read data from S3 and process the ETL and the data quality. Then save the data in A S3 bucket
 * The database needed to be accessed by 100+ people.
     * we will use Amazon Redshift 