# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [4]:
# Do all imports and installs here
import pandas as pd
import re 
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from datetime import datetime, timedelta

### Scope 
In this project, we will collect I94(immigration data) and create a dimention table from it. Next, we will gather a temperature data city wise and load into a dimention table. 

Next we will be joining these two datasets and analyze on how the temperature of a city can play a role in people immigrating to it. 

We shall be using SPARK to analyze the data.

#### About the Data and where to find it. 
I94 data came be taken from [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).We have got a sample of the data in the course for our analysis. 
It is provided to us in SAS7BDAT formart. 

**Data Attributes(I94)**:

* i94yr = Year in 4 digits. 
* i94mon = Month of the immingration(in mumeric). 
* i94cit = Orgin city's code.(3 Digit) 
* i94port = Destination USA city code( 3 digit) 
* arrddate = arrival date in US. 
* i94mode = Travel code( 1 digit). 
* depdate = Deperature date from US. 
* i94visa = Immigration VISA. 

Temperature data can be collected from [Kragge](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) Data format is .csv

**Data Attributes(Temperature)**: 

* AverageTemperature = Average Temperature
* City = City Name
* Country = Country Name
* Latitude= Latitude
* Longitude = Longitude


In [7]:
# Read in the data here
file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(file, 'sas7bdat', encoding="ISO-8859-1")

In [8]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [9]:

# Read in the temperature data into Pandas for exploration
file_temperature = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(file_temperature, sep=',')
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [5]:
#creating a sparksession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [6]:
# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_i94_port = {}
with open('port_valid.txt') as file:
     for line in file:
         match = re_obj.search(line)
         valid_i94_port[match[1]]=[match[2]]

In [7]:
# Clean I94 immigration data
def clean_data(file):
    '''    
    Input: Path to I94 immigration data file
    Output: Returns a spark dataframe of I94 immigration data with valid i94port
    '''    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(valid_i94_port.keys())))

    return df_immigration

In [8]:
#Function smoke test
immigration_test_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_test = clean_data(immigration_test_file)
df_test.head()

Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1')

In [9]:
# Clean temperature data
df_temperature = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [10]:
df_temperature.head()

Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')

In [11]:
# Filter out data points with NaN average temperature
df_temperature = df_temperature.filter(df_temperature.AverageTemperature != 'NaN')

In [12]:
# Remove duplicate locations
df_temperature = df_temperature.dropDuplicates(['City', 'Country'])

In [13]:

@udf()
def get_i94port(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in valid_i94_port:
        if city.lower() in valid_i94_port[key][0].lower():
            return key

In [14]:
# Add iport94 code based on city name
df_temperature = df_temperature.withColumn("i94port", get_i94port(df_temperature.City))

In [15]:
# Remove data points with no iport94 code
df_temperature = df_temperature.filter(df_temperature.i94port != 'null')

In [16]:
# Show results
df_temperature.head()

Row(dt='1852-07-01', AverageTemperature='15.488', AverageTemperatureUncertainty='1.395', City='Perth', Country='Australia', Latitude='31.35S', Longitude='114.97E', i94port='PER')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
**Fact Table** - This table contains the information for the immigration data and temperature which are clubbed together on the column *i94* port. 

*columns*: 
* i94yr   =  4 Digit year
* i94mon  =  Numeric month
* i94cit  =  3 Digit code of Origin City
* i94port =  3 character code of Destination City
* arrdate =  Arrival date into US 
* i94mode =  1 digit of travel code
* depdate =  Departure date
* i94visa =  Reason for immigration
* AverageTemperature = Average temperature of destination city

*Dimention Tables*: 

1. Immigration Table: 
* i94yr   =  4 Digit year
* i94mon  =  Numeric month
* i94cit  =  3 Digit code of Origin City
* i94port =  3 character code of Destination City
* arrdate =  Arrival date into US
* i94mode =  1 digit of travel code
* depdate =  Departure date
* i94visa =  Reason for immigration

2. Temperature Table: 
* i94port = 3 Digit char code of Destination city(acheived during clean up) 
* City    = City Name
* Country = Country Name
* AverageTemperature = Average Temperature in the City
* Latitude = Latitude
* Longitude = Longitude

These tables will be stored in Parquet and will be partioned based on the City(i94port) in US.

#### 3.2 Mapping Out Data Pipelines
1. Creak a SPARK dataframe for the immigration data and clean it as mentioned above. 
2. Create a SPARK dataframe for the temperature data and clean it. 
3. From the immigration data, select the columns which are necesarry and write them to parquet file which is partioned on i94port. This will be one the dimention table. 
4. From the temperature data, select the necessary columns and write them to parquet file which is partioned on i94port. This will be the next dimention table.  
5. Now create a fact table by joining the above immigration and temperature data on i94 port and wite them to parquet file file which is partioned on i94port. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Path to I94 data 
#immigration_data_path = '/data/18-83510-I94-Data-2016/*.sas7bdat'
immigration_data_path = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

#Loading i94 data into a spark dataframe and cleaning it.
data_frame_immigration = clean_i94_data(immigration_data_path)

# Extracting columns 
immigration_data = data_frame_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Writing immigration dataframe to parquet files partitioned on i94port
immigration_data.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [None]:
# Extracting columns for temperature table
temperature_data = df_temperature.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# Writing temperature dataframe table to parquet files partitioned on i94port
temperature_data.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [None]:
# Creating Temporary views of the immigration and temperature data
data_frame_immigration.createOrReplaceTempView("v_immigration")
temperature_data.createOrReplaceTempView("v_temperature")

# Creating the fact table by joining the immigration and temperature view on i94port
fact_table = spark.sql('''
SELECT v_immigration.i94yr as year,
       v_immigration.i94mon as month,
       v_immigration.i94cit as mity,
       v_immigration.i94port as i94port,
       v_immigration.arrdate as arrival_date,
       v_immigration.depdate as departure_date,
       v_immigration.i94visa as reason,
       v_temperature.AverageTemperature as temperature,
       v_temperature.Latitude as latitude,
       v_temperature.Longitude as longitude
FROM v_immigration
JOIN v_temperature ON (v_immigration.i94port = v_temperature.i94port)
''')

# Writing the fact table to parquet file partitioned on i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Data Qaulity is to ensure the records are present in the table as expected. 

In [None]:
def quality_check(df, description):
    '''
    Input: Spark dataframe, description dataframe
    
    Output: Prints the output of data quality check
    '''
    result = df.count()
    
    if result == 0:
        print("There are zero records in {}. Data has NOT been processed sucessfully".format(description))
    else:
        print("{} has {} records".format(description.capitalize(), result))
    return 0

# Perform data quality check
quality_check(data_frame_immigration, "immigration table")
quality_check(temperature_data, "temperature table")

#### 4.3 Data dictionary 
**Fact Table** - This table contains the information for the immigration data and temperature which are clubbed together on the column *i94* port. 

*columns*: 
* i94yr   =  4 Digit year
* i94mon  =  Numeric month
* i94cit  =  3 Digit code of Origin City
* i94port =  3 character code of Destination City
* arrdate =  Arrival date into US 
* i94mode =  1 digit of travel code
* depdate =  Departure date
* i94visa =  Reason for immigration
* AverageTemperature = Average temperature of destination city

*Dimention Tables*: 

1. Immigration Table: 
* i94yr   =  4 Digit year
* i94mon  =  Numeric month
* i94cit  =  3 Digit code of Origin City
* i94port =  3 character code of Destination City
* arrdate =  Arrival date into US
* i94mode =  1 digit of travel code
* depdate =  Departure date
* i94visa =  Reason for immigration

2. Temperature Table: 
* i94port = 3 Digit char code of Destination city(acheived during clean up) 
* City    = City Name
* Country = Country Name
* AverageTemperature = Average Temperature in the City
* Latitude = Latitude
* Longitude = Longitude

### Project Write Up

**Technology**

* SPARK has been choosen because of its power to process large data scale. Secound reason for the choosing spark is because of its power to deal with mutilple data fromatas(example here SAS). 
* Leveraged SparkSQL to process large amount of data by converting the data into data frames and processing it. Like cleaning and slection. 

**Frequency of Update**

* Data should be updated on a monthy basis as source of the data is a montly load. 

#### Edge Case Conditions

1. If the Data is increased by 100x. 
    Using a Cloud Based approach will help us to scale up powerful machines on demand and scale down.
    
    
2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
    We can use orchestrator(Ozzie or Airflow) to run the pipeline on demand basis. The result of the pipeline will be read by the dashboard and be populated. We will be using orchestator's alert functionality to alert us on errors. 
    
    
3. The database needed to be accessed by 100+ people.
    * If we have a Hadoop cluster, we can push the data to HDFS and let the users read this data from Hive or Presto. 
    * If we are using a cloud platform(AWS or GCP), we will be writing the data to Redshift or Bigquey and use their's fucntionality to sever our users. 


