# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
import glob
import os
import sys
import shutil
from pyspark.sql import SparkSession
import re
from pyspark.sql.types import DoubleType, IntegerType, TimestampType, StringType
import time

# Create a new spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

In This project, A data lake will be preapred for immigration analysis and functionalities. Immigration is an ongoing daily activity, hence data is always increasing. The reason behind choosing a data lake was that they can be accessed and updated easily. 
<br>
The Project will prepare the data model & data lake for Immigration analytics. The notebook is broken into 5 steps
1. We will first import the data sets (which as stored as csv/text and sas formats) and explore them defining the cleaning steps required for our data
2. Cleaning steps will be performed and data will be verified. 
3. The definition of immigration data lake's DATA MODEL.
4. Definitions and code of data pipeline stages along with full ETL cycle which reads data, cleans it and writes it back as parquet files
5. Final write up.

At Step 4 the project, we will provide some examples of the analytic reports including:
  - Which where the top 5 busiest ports in month 04 / 2016  
  - Which months have the highest immigration inflow.
  - Which US Port/States have the highest number of immigrants, what is their visa type (i.e. Student, Business, Pleasure)
  - Which airline has the maximum number of passengers
  - Which residence country has the highest number of students

### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?  

5 different data sets are used to prepare the final data model 
  1. US Cities Demographics - Data set for population and race distribution in each US city.
  2. Airline-Codes - A data set with all iata airline codes (Source : https://www.kaggle.com/open-flights/airline-database#airlines.csv) 
  3. I94 Immigration Data - A detailed data set on Immigration flow in US<br>With I94 Immigration Data, we will also use some additional data sets for correlation. namely:
  4. PortCodes.txt : This text file contains the relevant City-State information for the arrival port alphabetic codes mentioned within I94 Immigration data
  5. CountryCodes.txt : This text file contains the relevant Country to numeric code relation mentioned within I94 Immigration Data 


### Reading Data 


In [26]:
# Read in the data here
us_ct_demogr = pd.read_csv("us-cities-demographics.csv",sep=";")
airline_codes = pd.read_csv("airline_codes.csv")

# We will also read in the I94 correlation codes here
i94_portcodes = pd.read_csv('PortCodes.txt', sep="=", header=None)
i94_countrycodes = pd.read_csv('CountryCodes.txt', sep="=", header=None)


We will now explore the amount of data within the I94 Immigration set

In [27]:
# check amount of data files in i94 immigration data
path = '../../data/18-83510-I94-Data-2016/'
files = [f for f in glob.glob(path + "**/*", recursive=True)]
for f in files:
    statinfo = os.stat(f)
    print("file = %s, size in MB = %s " % (f,str(statinfo.st_size/1024/1024)) )

file = ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat, size in MB = 450.125 
file = ../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat, size in MB = 542.8125 
file = ../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat, size in MB = 423.75 
file = ../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat, size in MB = 459.0 
file = ../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat, size in MB = 683.375 
file = ../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat, size in MB = 596.5625 
file = ../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat, size in MB = 500.6875 
file = ../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat, size in MB = 414.0625 
file = ../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat, size in MB = 530.5 
file = ../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat, size in MB = 620.0 
file = ../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat, size in MB = 373.75 
file = ../../data/18-83510-I94-Data-2016/i94_de

<br>We can see that there a lot of files, and the sizes state that the files are large, We will use spark to read in one of files at this moment

In [28]:
# using spark session, read in one file
immigration_data =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

<br> 
#### Let us see now how many rows of data each one has

In [29]:
print("US city demographics = %s " % str(us_ct_demogr.shape))
print("airline codes = %s " % str(airline_codes.shape))
print("immigration data = %s " % str(immigration_data.count()))

US city demographics = (2891, 12) 
airline codes = (6162, 8) 
immigration data = 3096313 


<br><br>
### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

<br>
We will first look at  **US City demographics**

In [30]:
us_ct_demogr.head(3)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759


In [31]:
us_ct_demogr.isnull().any()

City                      False
State                     False
Median Age                False
Male Population            True
Female Population          True
Total Population          False
Number of Veterans         True
Foreign-born               True
Average Household Size     True
State Code                False
Race                      False
Count                     False
dtype: bool

Overall, the data looks neat and in the format desired. 
<br> The following steps will be required to clean data and bring it to format desired
1. For our project purpose, we mainly require the City and state information.  Columns **[MedianAge, Male population, female population, total population, Number of Veterans,Foreign born, Average household size, race, count]** are not important for the project, they will be removed. 
2. Looking at the Nulls info, the columns we require are OK (City, State, State Code). 
3. Normalize column names
4. Drop any duplicates

<br>

Now let us explore **Airline codes**

In [32]:
airline_codes.head(3)

Unnamed: 0,Airline ID,Name,Alias,IATA,ICAO,Callsign,Country,Active
0,-1,Unknown,\N,-,,\N,\N,Y
1,1,Private flight,\N,-,,,,Y
2,2,135 Airways,\N,,GNL,GENERAL,United States,N


For our analytic requirements, the following changes will be required
1. Select only rows where iata_code and Country are present
2. Remove columns **Alias,ICAO,Callsign,Active**
3. Remove any rows where airline ID is negative

<br><br>
Finally, lets look at our **Immigration data-set**

Let us first look at the codes we have imported

In [33]:
i94_portcodes.head(3)

Unnamed: 0,0,1
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"


In [34]:
# Verify Null values
i94_portcodes[i94_portcodes.isnull().any(axis=1)]


Unnamed: 0,0,1


In the **Port Codes** 
 - The second column will be split into port-Name, State-code using "," as seperator
 - Port codes that are non-US are Nan, they will be updated with "Non-US" as their state code
 - Normalize column names

In [35]:
i94_countrycodes.head(3)

Unnamed: 0,0,1
0,582,"MEXICO Air Sea, and Not Reported (I-94, no l..."
1,236,AFGHANISTAN
2,101,ALBANIA


In **Country Codes** The format is perfect. We will simply normazlie the columns as [countyCode,countryName]

In [36]:
immigration_data.head(3)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [37]:
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

**Dropping Columns:** 
Using the Label descriptions provided with the source of Data we will **keep** the following column labels
- I94YR - 4 digit year   : Year of arrival
- I94MON - Numeric month : Month of arrival
- I94CIT & I94RES : Corresponding source and residence country. 
- I94PORT : Corresponding port/airport of arrival. 
- ARRDATE is the Arrival Date in the USA
- I94MODE : mode of arrival. There are 4 options 1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not reported'
- DEPDATE is the Departure Date from the USA
- I94BIR - Age of Respondent in Years ( We have the column BIRYEAR )
- I94VISA - Visa codes collapsed into three categories: 1 = Business, 2 = Pleasure, 3 = Student
- MATFLAG - Match flag - Match of arrival and departure records 
- GENDER - Non-immigrant sex 
- AIRLINE - Airline used to arrive in U.S. 
- FLTNO - Flight number of Airline used to arrive in U.S. 
- VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S. 

The rest of the columns will be dropped

**Adjusting Format**
- The following columns will be changed to integer types. [i94yr, i94mon, i94cit, i94res, arrdate, depdate, i94bir, i94visa]
- **i94MODE** will be changed to string and will be replaced with relevant mode values (i.e. Air, Sea, Land, Unknown)
- **i94VISA** will be changed to string and will be replaced with relevant visa types (i.e. Business, Pleasure, Student)


<br><br>
## STEP 2 : Cleaning Steps
Document steps necessary to clean the data

### US Cities Demographics

In [38]:
# Drop the columns
us_ct_demogr.drop(columns=['Median Age','Male Population','Female Population','Total Population','Number of Veterans','Foreign-born','Average Household Size','Race','Count'],inplace=True)

# For simplicity in calling columns, we will modify the column names and remove whitespaces and being them to normalized format
us_ct_demogr.columns = ['cityName', 'stateName', 'stateCode']

# bring the city column to lower state
us_ct_demogr['cityName'] = us_ct_demogr['cityName'].str.lower()

# Let us drop duplicates
us_ct_demogr.drop_duplicates(inplace=True)

# Verify format
us_ct_demogr.head(2)

Unnamed: 0,cityName,stateName,stateCode
0,silver spring,Maryland,MD
1,quincy,Massachusetts,MA


### Airline Codes

In [39]:
# Select only rows with iata_code and country
airline_codes = airline_codes[pd.notnull(airline_codes['IATA'])]
airline_codes = airline_codes[pd.notnull(airline_codes['Country'])]

# Select only rows where airline ID is > 0
airline_codes = airline_codes[airline_codes['Airline ID'] > 0]

# Remove columns
airline_codes.drop(columns=['Alias','ICAO','Callsign','Active'],inplace=True)

# Rename Columns
airline_codes.columns=['airlineID','airlineName','airline_iata_code','airlineCountry']

airline_codes.head(2)

Unnamed: 0,airlineID,airlineName,airline_iata_code,airlineCountry
3,3,1Time Airline,1T,South Africa
10,10,40-Mile Air,Q5,United States


<br>

### I94 Immigration Data


In [40]:
## PORT CODES
# Split and strip columns 
i94_portcodes['portName'] = i94_portcodes[1].str.split(',').str[0].str.strip()
i94_portcodes['stateCode'] = i94_portcodes[1].str.split(',').str[1].str.strip()

# Remove white-spaces
i94_portcodes[0] = i94_portcodes[0].str.strip()

#  Drop and rename columns
i94_portcodes.drop(columns=[1],inplace=True)
i94_portcodes.columns=['portCode','portName','stateCode']

# Append any NAN stateCodes with 'Non-US'
i94_portcodes['stateCode'].fillna('Non-US',inplace=True)

i94_portcodes.head(2)

Unnamed: 0,portCode,portName,stateCode
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK


In [41]:
## COUNTRY CODES
# Rename columns in country codes
i94_countrycodes.columns = ['countryCode','countryName']
i94_countrycodes.head(2)

Unnamed: 0,countryCode,countryName
0,582,"MEXICO Air Sea, and Not Reported (I-94, no l..."
1,236,AFGHANISTAN


In [42]:
## IMMIGRATION DATA
# First we will fix the column types, below function does that
def convert_to_int(df, input_column_name):
    df = df.withColumn(input_column_name, df[input_column_name].cast(IntegerType()) )
    return df

# create a list of columns to convert to integer
integer_cols = ['i94yr','i94mon','i94cit','i94res', 'arrdate', 'i94mode','depdate', 'i94bir', 'i94visa']
# execute the convert function for each column above.
for column in integer_cols:
    immigration_data = convert_to_int(immigration_data,column)

# convert the i94Mode and i94Visa to string
immigration_data = immigration_data.withColumn("i94mode", immigration_data["i94mode"].cast( StringType() ) )
immigration_data = immigration_data.withColumn("i94visa", immigration_data["i94visa"].cast( StringType() ) )

# Replace the mode and visa with its true values
i94modes = { '1':'Air', '2':'Sea', '3':'Land', '9':'Not reported' }
i94visas = { '1':'Business', '2':'Pleasure', '3':'Student' }

immigration_data = immigration_data.replace(to_replace=i94modes, subset=['i94mode'])
immigration_data = immigration_data.replace(to_replace=i94visas, subset=['i94visa'])

# now we drop all columns that are not required. 
# create a list of columns to keep
select_list = ['i94yr','i94mon','i94cit','i94res','i94port','arrdate','i94mode','depdate','i94bir','i94visa','matflag','gender','airline','fltno','visatype']
for column in immigration_data.columns:
    if column not in select_list:
        immigration_data = immigration_data.drop(column)

# Normalize the column names
new_names = ['year', 'month','cit','res','portCode','arrDate','mode','depDate','age','visaType','matchFlag','gender','airlineCode','flightNumber','visaCat']
immigration_data = immigration_data.toDF(*new_names)

# Print the schema to make sure everything is as required
immigration_data.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- cit: integer (nullable = true)
 |-- res: integer (nullable = true)
 |-- portCode: string (nullable = true)
 |-- arrDate: integer (nullable = true)
 |-- mode: string (nullable = true)
 |-- depDate: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- visaType: string (nullable = true)
 |-- matchFlag: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airlineCode: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- visaCat: string (nullable = true)



In [50]:
immigration_data.head(15)

[Row(year=2016, month=9, cit=213, res=213, portCode='HOU', arrDate=20698, mode='Air', depDate=20725, age=27, visaType='Business', matchFlag='M', gender='M', airlineCode='QK', flightNumber='8111', visaCat='B1'),
 Row(year=2016, month=9, cit=369, res=369, portCode='WAS', arrDate=20698, mode='Air', depDate=20725, age=71, visaType='Pleasure', matchFlag='M', gender='M', airlineCode='KL', flightNumber='651', visaCat='B2'),
 Row(year=2016, month=9, cit=582, res=582, portCode='LVG', arrDate=20698, mode='Air', depDate=20702, age=25, visaType='Pleasure', matchFlag='M', gender='M', airlineCode='4O', flightNumber='970', visaCat='B2'),
 Row(year=2016, month=9, cit=691, res=582, portCode='MIA', arrDate=20698, mode='Air', depDate=20702, age=50, visaType='Business', matchFlag='M', gender='M', airlineCode='AM', flightNumber='428', visaCat='B1'),
 Row(year=2016, month=9, cit=266, res=266, portCode='CHI', arrDate=20698, mode='Air', depDate=20714, age=34, visaType='Student', matchFlag='M', gender='F', air

<br><br>

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model.
Our source data is now in the required format, below is the column summary<br>
**Data Sources**
1. City-Demographics : cityName, stateName, stateCode
2. i94_portCodes     : portCode, portName, stateCode
3. airline_codes     : airlineID, airlineName,airline_iata_code,airlineCountry
5. i94_countryCodes  : countryCode, countryName
6. immigration_data  : year,month,cit,res,portCode,arrDate,mode,depDate,age,visaCat,matchflag,gender,airlineCode,flightNumber,visatype

**FINAL DATA LAKE MODEL**
From the cleaned data sets above, we will map the data into our final data lake model as shown in the figure below


<img src="files/DataModel.png">

Data Tables: 

1. immigration_data : year, month, cit, res, portCode, arrDate, mode, depDate, age, visaType, matchFlag, gender, airlineCode, flightNumber, visaCat
2. countryCodes  : countryCode, countryName
3. us_state_city : stateCode, stateName, cityName
4. us_ports      : portCode, stateCode, portName
5. airlineCodes  : airlineID, airlineName, airline_iata_code, airlineCountry

### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

There will be two seperate data pipelines for our project

- **1st Pipeline : Creating the data model.**
    - Read in data from sources
    - Clean data 
    - Create spark data frames from cleaned data
    - Write out data frames as parquet files
- **2nd Pipeline : Updating immigration data.**
    - Read in immigration data
    - Clean immigration data 
    - Read in immigration parquest file into spark data frame
    - Update spark data frame with new cleaned data
    - Write out data frame as parquest files
    

## Step 4: Run Pipelines to Model the Data 
### 4.1 Pipeline common workflows
From the pipelines defined : we can combine and reuse the following code for all three pipelines.
1. Clean data source
2. Write out parquet file

Making a common function for both will be useful, as any changes to cleaning / partitioning can be applied directly to these functions without altering the actual pipelines. 
<br>
We will define this code below

In [12]:
# Function we used to convert column types.
def convert_to_int(df, input_column_name):
    df = df.withColumn(input_column_name, df[input_column_name].cast(IntegerType()) )
    return df

# Function to clean data sources, code reused from cleaning steps in section 3
def clean_data_source(df,sourceName):
    """
        function clean_data_source
            input = df :: Type = Spark data frame --> the data frame that requires cleaning
                    sourcename :: Type = String --> variable that defines which cleaning steps to take in data. 
            returns df
    """
    
    try:
        print("cleaning {} ".format(sourceName) )
        # We seperate cleaning based on sourceName and reuse the code we defined above in cleaning stage
        if sourceName == "immigration_data":

            # create a list of columns to convert to integer
            integer_cols = ['i94yr','i94mon','i94cit','i94res', 'arrdate', 'i94mode','depdate', 'i94bir', 'i94visa']
            # execute the convert function for each column above.
            for column in integer_cols:
                df = convert_to_int(df,column)

            # convert the i94Mode and i94Visa to string
            df = df.withColumn("i94mode", df["i94mode"].cast( StringType() ) )
            df = df.withColumn("i94visa", df["i94visa"].cast( StringType() ) )

            # Replace the mode and visa with its true values
            i94modes = { '1':'Air', '2':'Sea', '3':'Land', '9':'Not reported' }
            i94visas = { '1':'Business', '2':'Pleasure', '3':'Student' }

            df = df.replace(to_replace=i94modes, subset=['i94mode'])
            df = df.replace(to_replace=i94visas, subset=['i94visa'])

            # now we drop all columns that are not required. 
            # create a list of columns to keep
            select_list = ['i94yr','i94mon','i94cit','i94res','i94port','arrdate','i94mode','depdate','i94bir','i94visa','matflag','gender','airline','fltno','visatype']
            for column in df.columns:
                if column not in select_list:
                    df = df.drop(column)

            # Normalize the column names
            new_names = ['year', 'month','cit','res','portCode','arrDate','mode','depDate','age','visaType','matchFlag','gender','airlineCode','flightNumber','visaCat']
            df = df.toDF(*new_names)

        elif sourceName == "airlineCodes":

            # Select only rows with iata_code and country
            df = df[pd.notnull(df['IATA'])]
            df = df[pd.notnull(df['Country'])]

            # Select only rows where airline ID is > 0
            df = df[df['Airline ID'] > 0]

            # Remove columns
            df.drop(columns=['Alias','ICAO','Callsign','Active'],inplace=True)

            # Rename Columns
            df.columns=['airlineID','airlineName','airline_iata_code','airlineCountry']


        elif sourceName == "countryCodes":

            # Rename columns in country codes
            df.columns = ['countryCode','countryName']


        elif sourceName == "us_ports":

            # Split and strip columns 
            df['portName'] = df[1].str.split(',').str[0].str.strip()
            df['stateCode'] = df[1].str.split(',').str[1].str.strip()
            df[0] = df[0].str.strip()

            #  Drop and rename columns
            df.drop(columns=[1],inplace=True)
            df.columns=['portCode','portName','stateCode']

            # Append any NAN stateCodes with 'Non-US'
            df['stateCode'].fillna('Non-US',inplace=True)


        elif sourceName == "us_state_cities":
            # Drop the columns
            df.drop(columns=['Median Age','Male Population','Female Population','Total Population','Number of Veterans','Foreign-born','Average Household Size','Race','Count'],inplace=True)

            # For simplicity in calling columns, we will modify the column names and remove whitespaces and being them to normalized format
            df.columns = ['cityName', 'stateName', 'stateCode']

            # bring the city column to lower state
            df['cityName'] = df['cityName'].str.lower()

            # Let us drop duplicates
            df.drop_duplicates(inplace=True)


        else:
            print("Source Unknown, pipleline not programmed to clean {} ".format(sourceName) )   
            sys.exit(1)


    except BaseException as e:
        print("An exception occurred, invalid data format given for source {} -- ERROR = {} ".format(sourceName,e))
        print("")

    
    return df

def writeParquet(df, sourceName, output_dest, remove_first):
    """
        function writeParquet
            input = df :: Type= Spark Data Frame --> the data frame that requires to be written out to parquet
                    sourcename :: Type = String --> variable that defines name of parquet file and also process to write out (e.g. partition by etc). 
                    output_dest :: destination string to write the parquet file
                    remove_first :: Type = Int --> flag indicating whether or not to remove the existing parquet
    """
    try:
        #output_dest =
        outputfile = output_dest + sourceName + ".parquet"

        # time flag to keep track of how long it takes to write the file. 
        start = time.time()

        #check remove flag
        if os.path.exists(outputfile) & remove_first == 1: 
            print("parquet file {} exists, Remove flag is set to 1".format(outputfile))
            shutil.rmtree(outputfile)


        if sourceName == "immigration_data":
            if remove_first==0:
                print("appending ... ")
                df.write.mode("append").partitionBy("year","month").format("parquet").save(outputfile)
            else:
                df.write.partitionBy("year","month").format("parquet").save(outputfile)

        elif sourceName in ["airlineCodes","countryCodes","us_ports","us_state_cities"]:
            df.write.format("parquet").save(outputfile)

        else:
            print("Source Unknown, pipleline not programmed to write parquet for {} ".format(sourceName) )
            sys.exit(1)

        end = time.time()
        print("written out parquet file {} , time taken in seconds = {:.2f} ".format(outputfile,(end - start) ) )
        
    except BaseException as e:
        print("Exception occured during writing parquet file -- ERROR = {}".format(e))
        print("")
        
    


<br> 

### 4.2 1st Pipeline : Create data model
Build the data pipelines to create the data model.


In [11]:
# Step 1:  Read in the data here
us_ct_demogr = pd.read_csv("us-cities-demographics.csv",sep=";")
airline_codes = pd.read_csv("airline_codes.csv")
i94_portcodes = pd.read_csv('PortCodes.txt', sep="=", header=None)
i94_countrycodes = pd.read_csv('CountryCodes.txt', sep="=", header=None)
immigration_data =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


# Step 2: Clean Data 
us_ct_demogr = clean_data_source(us_ct_demogr,"us_state_cities")
airline_codes = clean_data_source(airline_codes,"airlineCodes")
i94_portcodes = clean_data_source(i94_portcodes,"us_ports")
i94_countrycodes = clean_data_source(i94_countrycodes,"countryCodes")
immigration_data = clean_data_source(immigration_data,"immigration_data")


# Step 3: Create Spark Data frames from clean data 
# We do not convert immigration data as it is already a spark data frame
us_ct_demogr = spark.createDataFrame(us_ct_demogr)
airline_codes = spark.createDataFrame(airline_codes)
i94_portcodes = spark.createDataFrame(i94_portcodes)
i94_countrycodes = spark.createDataFrame(i94_countrycodes)


# Step 4 : Write out data to parquet files
writeParquet(us_ct_demogr,"us_state_cities","parquet_files/",1)
writeParquet(airline_codes,"airlineCodes","parquet_files/",1)
writeParquet(i94_portcodes,"us_ports","parquet_files/",1)
writeParquet(i94_countrycodes,"countryCodes","parquet_files/",1)
writeParquet(immigration_data,"immigration_data","parquet_files/",1)


cleaning us_state_cities 
cleaning airlineCodes 
cleaning us_ports 
cleaning countryCodes 
cleaning immigration_data 
parquet file parquet_files/us_state_cities.parquet exists, Remove flag is set to 1
written out parquet file parquet_files/us_state_cities.parquet , time taken in seconds = 0.21 
parquet file parquet_files/airlineCodes.parquet exists, Remove flag is set to 1
written out parquet file parquet_files/airlineCodes.parquet , time taken in seconds = 0.26 
parquet file parquet_files/us_ports.parquet exists, Remove flag is set to 1
written out parquet file parquet_files/us_ports.parquet , time taken in seconds = 0.21 
parquet file parquet_files/countryCodes.parquet exists, Remove flag is set to 1
written out parquet file parquet_files/countryCodes.parquet , time taken in seconds = 0.22 
parquet file parquet_files/immigration_data.parquet exists, Remove flag is set to 1
written out parquet file parquet_files/immigration_data.parquet , time taken in seconds = 53.83 


<br>

### 4.2 2nd Pipeline : Updating Immigration Data

In [13]:
# Step 1:  Read in new immigration the data here
immigration_data =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat')

# Step 2: Clean Data 
immigration_data = clean_data_source(immigration_data,"immigration_data")

# Step 3: update  / append parquet file
writeParquet(immigration_data,"immigration_data","parquet_files/",0)

print("immigration data appended ")


cleaning immigration_data 
appending ... 
written out parquet file parquet_files/immigration_data.parquet , time taken in seconds = 67.22 
immigration data appended 


<br>

### 4.3 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [17]:
# Perform quality checks here
# The first check we will do is read in the parquet files and make sure all have data inside
# We will define a function that can be applied to all data sources. 
def check_data_rows(sourceName,parquet_storageSource):
    """
        function check_data_rows
            input = sourcename :: Type = String --> variable that defines name of parquet file and also process to write out (e.g. partition by etc).                 
    """
    try:
#        parquet_storageSource = "parquet_files/"  
        filename = parquet_storageSource + sourceName + ".parquet"

        # time flag to keep track of how long it takes to write the file. 
        start = time.time()

        # Check if parquet file exists
        if not os.path.exists(filename):
            print("Parquet file {} not found".format(filename))
            sys.exit(1)

        # read in the parquet file if the first test is passed. 
        parquetFile = spark.read.parquet(filename)

        # Parquet files can also be used to create a temporary view and then used in SQL statements.       
        parquetFile.createOrReplaceTempView(sourceName)
        # create sql command
        sql_command = " select count(1) as n_rows from {} ".format(sourceName)
        # execute sql
        result = spark.sql(sql_command)
        # get result for n_rows
        rows = result.collect()[0]['n_rows']
        
        
        end = time.time()
        print("{} \t rows={} \t calculation time ={:.2f} s \t Parquet file={} ".format(sourceName,rows,(end-start),filename ) )
    except BaseException as e:
        print("Exception occured during reading parquet file -- ERROR = {}".format(e))
        
# Execute the procedure for all data sources
parquet_storageSource = "parquet_files/"
check_data_rows("immigration_data",parquet_storageSource)
check_data_rows("countryCodes",parquet_storageSource)
check_data_rows("us_ports",parquet_storageSource)
check_data_rows("airlineCodes",parquet_storageSource)
check_data_rows("us_state_cities",parquet_storageSource)

immigration_data 	 rows=6830099 	 calculation time =0.56 s 	 Parquet file=parquet_files/immigration_data.parquet 
countryCodes 	 rows=289 	 calculation time =0.31 s 	 Parquet file=parquet_files/countryCodes.parquet 
us_ports 	 rows=591 	 calculation time =0.30 s 	 Parquet file=parquet_files/us_ports.parquet 
airlineCodes 	 rows=1525 	 calculation time =0.36 s 	 Parquet file=parquet_files/airlineCodes.parquet 
us_state_cities 	 rows=596 	 calculation time =0.40 s 	 Parquet file=parquet_files/us_state_cities.parquet 


In [48]:
### FUNCTIONALITY TESTS
# The second test would be to run the invidual functions with wrong input data (e.g. wrong source name, wrong data format)
# First we will try to clean data with wrong source name
immigration_data = clean_data_source(immigration_data,"us_state_cities")


cleaning us_state_cities 
An exception occurred, invalid data format given for source us_state_cities -- ERROR = drop() got an unexpected keyword argument 'columns' 



In [49]:
# Next we will try to execute both our cleaning and writing functions for a source that isnt programmed / defined
us_ct_demogr = writeParquet(immigration_data,"us_states",1)
immigration_data = clean_data_source(immigration_data,"us_states")



Source Unknown, pipleline not programmed to write parquet for us_states 
Exception occured during writing parquet file -- ERROR = 1

cleaning us_states 
Source Unknown, pipleline not programmed to clean us_states 
An exception occurred, invalid data format given for source us_states -- ERROR = 1 



### 4.4 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.
<br> A data dictionary is created under file/data_dictionary.html

In [2]:
from IPython.display import HTML
HTML(filename='files/data_dictionary.html')

0,1,2,3,4,5,6
Table Comments,,,,,,
Columns,Columns,Columns,Columns,Columns,Columns,Columns
Name,Data Type,Nullable,PK,FK,Default,Comment
countryCode,INT,Yes,Yes,No,,Unique numeric country code for the country
countryName,VARCHAR(45),No,No,No,,Name of the country

0,1,2,3,4,5,6
Table Comments,,,,,,
Columns,Columns,Columns,Columns,Columns,Columns,Columns
Name,Data Type,Nullable,PK,FK,Default,Comment
stateCode,VARCHAR(5),Yes,No,No,,State code for the State (e.g. CA = California)
stateName,VARCHAR(45),Yes,No,No,,Name of State
cityName,VARCHAR(45),Yes,No,No,,Name of City

0,1,2,3,4,5,6
Table Comments,,,,,,
Columns,Columns,Columns,Columns,Columns,Columns,Columns
Name,Data Type,Nullable,PK,FK,Default,Comment
portCode,VARCHAR(5),Yes,No,No,,Unique port code which identifies the port of immigration
stateCode,VARCHAR(5),Yes,No,Yes,,State code where the port resides. Extracted from source.
portName,VARCHAR(80),No,No,No,,Name of port

0,1,2,3,4,5,6
Table Comments,,,,,,
Columns,Columns,Columns,Columns,Columns,Columns,Columns
Name,Data Type,Nullable,PK,FK,Default,Comment
airlineID,INT,Yes,Yes,No,,"Unique ID for each airline, extracted from airline database."
airlineName,VARCHAR(100),No,No,No,,Name of the airline
airline_iata_code,VARCHAR(10),Yes,No,No,,the international iata code for the airline
airlineCountry,VARCHAR(50),No,No,No,,Airline's home country

0,1,2,3,4,5,6
Table Comments,,,,,,
Columns,Columns,Columns,Columns,Columns,Columns,Columns
Name,Data Type,Nullable,PK,FK,Default,Comment
year,INT,No,No,No,,Year which immigration data refers to.
month,INT,No,No,No,,Month which immigration data refers to.
cit,INT,No,No,Yes,,Country code which identifies the country of travel for the immigrant.
res,INT,No,No,Yes,,Country code which identifies the residence country for the immigrant.
portCode,VARCHAR(5),No,No,Yes,,Port code referring to the port of arrival
arrivalDate,DATE,No,No,No,,arrival date of the immigrant
mode,VARCHAR(50),No,No,No,,"mode of transport. The viable options are 1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not reported'"


<br>

### 4.4 Analytic Query examples 
Below is a demostration of how the parquet files can be read in and the data-model be queried
<br>
A few example queries are shown below

In [62]:
# Query 1: Which where the top 5 busiest ports in month 04 / 2016
# We will read in two data sets, 1- Immigration data, 2- us_ports

immigration_data = spark.read.parquet("parquet_files/immigration_data.parquet")
us_ports = spark.read.parquet("parquet_files/us_ports.parquet")

# time flag to keep track of how long it takes to write the file. 
start = time.time()

# Parquet files can also be used to create a temporary view and then used in SQL statements.       
immigration_data.createOrReplaceTempView("immigration_data")
us_ports.createOrReplaceTempView("us_ports")

# create sql command
sql_command = (""" 
        select p.portName,count(1) as n_of_immigrants  
        from immigration_data i join us_ports p on i.portCode = p.portCode
        where i.year=2016
        and i.month = 4
        group by i.year,i.month,p.portCode,p.portName
        order by count(1) desc
""")


# execute sql
result = spark.sql(sql_command)

# show the top 5 results
result.show(n=5)

end = time.time()

print("Query took {} s".format(end-start))

+-------------+---------------+
|     portName|n_of_immigrants|
+-------------+---------------+
|     NEW YORK|         485916|
|        MIAMI|         343941|
|  LOS ANGELES|         310163|
|SAN FRANCISCO|         152586|
|      ORLANDO|         149195|
+-------------+---------------+
only showing top 5 rows

Query took 3.221921682357788 s


In [111]:
# Query 2: Which months have the highest immigration inflow
# since we have loaded only 2 months of data, the query will list the months and their counts descending

# We will read in just Immigration data
immigration_data = spark.read.parquet("parquet_files/immigration_data.parquet")

# time flag to keep track of how long it takes to write the file. 
start = time.time()

# Parquet files can also be used to create a temporary view and then used in SQL statements.       
immigration_data.createOrReplaceTempView("immigration_data")

# create sql command
sql_command = (""" 
        select i.year,i.month,count(1) as n_of_immigrants  
        from immigration_data i 
        group by i.year,i.month
        order by count(1) desc
""")

# execute sql
result = spark.sql(sql_command)

# show the top 5 results
result.show(n=5)

end = time.time()

print("Query took {} s".format(end-start))

+----+-----+---------------+
|year|month|n_of_immigrants|
+----+-----+---------------+
|2016|    9|        3733786|
|2016|    4|        3096313|
+----+-----+---------------+

Query took 1.2313463687896729 s


In [105]:
# Query 3 : Which US Port/States have the highest number of immigrants, what is their visa type (i.e. Student, Business, Pleasure)

# We will read in three data sets, 1- Immigration data, 2- us_ports, 3- us_state_cities

immigration_data = spark.read.parquet("parquet_files/immigration_data.parquet")
us_ports = spark.read.parquet("parquet_files/us_ports.parquet")
us_state_cities = spark.read.parquet("parquet_files/us_state_cities.parquet")

# time flag to keep track of how long it takes to write the file. 
start = time.time()

# Parquet files can also be used to create a temporary view and then used in SQL statements.       
immigration_data.createOrReplaceTempView("immigration_data")
us_ports.createOrReplaceTempView("us_ports")
us_state_cities.createOrReplaceTempView("us_state_cities")

# create sql command
sql_command = (""" 
        select s.stateName,p.portName,i.visaType, count(1) as n_of_immigrants  
        from immigration_data i join us_ports p on i.portCode = p.portCode
                                join us_state_cities s on p.stateCode = s.stateCode
        group by s.stateName,p.portName,i.visaType
        order by count(1) desc
""")


# execute sql
result = spark.sql(sql_command)

# show the top 5 results
#result.orderBy('n_of_immigrants',ascending=False).show(n=15)
result.show(n=15)

end = time.time()

print("Query took {} s".format(end-start))


+----------+----------------+--------+---------------+
| stateName|        portName|visaType|n_of_immigrants|
+----------+----------------+--------+---------------+
|California|     LOS ANGELES|Pleasure|       87425728|
|California|   SAN FRANCISCO|Pleasure|       40026331|
|   Florida|           MIAMI|Pleasure|       31660752|
|California|     LOS ANGELES|Business|       14349928|
|   Florida|         ORLANDO|Pleasure|       12747408|
|California|   SAN FRANCISCO|Business|       12345070|
|  New York|        NEW YORK|Pleasure|       10523700|
|     Texas|         HOUSTON|Pleasure|        8995170|
|   Florida| FORT LAUDERDALE|Pleasure|        7545744|
|     Texas|          DALLAS|Pleasure|        6371631|
|California|     LOS ANGELES| Student|        4056159|
|  Illinois|         CHICAGO|Pleasure|        3800684|
|     Texas|         HOUSTON|Business|        3019917|
|   Florida|           MIAMI|Business|        2978256|
|New Jersey|NEWARK/TETERBORO|Pleasure|        2889780|
+---------

In [100]:
# Query 4 : which airline has the maximum number of passengers
# We will read in two data sets, 1- Immigration data, 2- airlineCodes

immigration_data = spark.read.parquet("parquet_files/immigration_data.parquet")
airlineCodes = spark.read.parquet("parquet_files/airlineCodes.parquet")

# time flag to keep track of how long it takes to write the file. 
start = time.time()

# Parquet files can also be used to create a temporary view and then used in SQL statements.       
immigration_data.createOrReplaceTempView("immigration_data")
airlineCodes.createOrReplaceTempView("airlineCodes")

# create sql command
sql_command = (""" 
        select a.airlineName,a.airlineCountry,count(1) as n_of_immigrants  
        from immigration_data i join airlineCodes a on i.airlineCode = a.airline_iata_code
        group by a.airlineName,a.airlineCountry
        order by count(1) desc
""")


# execute sql
result = spark.sql(sql_command)

result.show(n=10)

end = time.time()

print("Query took {} s".format(end-start))

+--------------------+-----------------+---------------+
|         airlineName|   airlineCountry|n_of_immigrants|
+--------------------+-----------------+---------------+
|   American Airlines|    United States|         656844|
|     United Airlines|    United States|         601940|
|     Delta Air Lines|    United States|         554346|
|     British Airways|   United Kingdom|         386662|
|     Lufthansa Cargo|          Germany|         253547|
|           Lufthansa|          Germany|         253547|
|Virgin Atlantic A...|   United Kingdom|         233030|
|          Korean Air|Republic of Korea|         160031|
|          Air France|           France|         152751|
|      Japan Airlines|            Japan|         150572|
+--------------------+-----------------+---------------+
only showing top 10 rows

Query took 5.1944944858551025 s


In [117]:
# Query 5 : Which residence country has the highest number of students
# We will read in two data sets, 1- Immigration data, 2- country Codes

immigration_data = spark.read.parquet("parquet_files/immigration_data.parquet")
countryCodes = spark.read.parquet("parquet_files/countryCodes.parquet")

# time flag to keep track of how long it takes to write the file. 
start = time.time()

# Parquet files can also be used to create a temporary view and then used in SQL statements.       
immigration_data.createOrReplaceTempView("immigration_data")
countryCodes.createOrReplaceTempView("countryCodes")

# create sql command
sql_command = (""" 
        select c.countryName,count(1) as n_of_immigrants  
        from immigration_data i join countryCodes c on i.res = c.countryCode
        where visaType='Student'
        group by c.countryName
        order by count(1) desc
""")


# execute sql
result = spark.sql(sql_command)

result.show(n=10)

end = time.time()

print("Query took {} s".format(end-start))

+--------------------+---------------+
|         countryName|n_of_immigrants|
+--------------------+---------------+
|          CHINA, PRC|          61819|
|               INDIA|          13984|
|         SOUTH KOREA|          11726|
|        SAUDI ARABIA|           7708|
|               JAPAN|           7558|
|              TAIWAN|           5101|
|              BRAZIL|           4317|
|  MEXICO Air Sea,...|           4105|
|             VIETNAM|           2915|
|              FRANCE|           2779|
+--------------------+---------------+
only showing top 10 rows

Query took 2.113755941390991 s


<br>

### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Due to the nature of data and the requirement of continues updates to data, spark framework for processing along with parquet file as storage was chosen 
<br> Core of our model will be the immigration data set. Which itself is huge, millions of rows of data every month. 
<br> To process this **big** data efficiently, we will use spark as our framework. 
<br> Spark has various libraries and features built in e.g. Spark SQL which gives a very neat way to query and extract information from spark data frames.
Spark dataframes work best with parquet files, so it will be used to create our data lake. 
There are various advantages for choosing parquet files :
   1. They are much more storage efficient than normal csv / delimited files
   2. Columnar based storage, so we will limit the amount of IO based on our query (only columns involved in query are read)
   3. It can be directly read into Spark dataframe, and the schema is preserved.
   4. It is easy to increment / write out partitions

Immigration data is updated every second, since we are creating a data lake for historical analysis it is recommended to update the immigration every month
<br> CountryCodes , PortCodes will rarely be updated as its not everyday that a new country or a new state is made. Hence this part of data will remain pretty static
<br> Airlines comparatively updated frequently (e.g new airline introduced, an old airline closed, airline iata code is update and so on). It is recommended that along with immigration_data, this data should also be updated. 

Currently the data is imported and analysis run as a simulation for what can be achieved. 
<br> **if this data was to increase by 100x**, A couple of options can be tested to process it more efficiently
1. Build a pipeline that can be run in parallel. Since the steps of etl are fairly broken down, it will be simple to integrate them within Airflow and run them in parallel to speed up the loading / cleaning and writing of data. 
1. Increasing the number of spark cluster nodes
2. Defining a different partitioning pattern
3. Working with a hybrid model (i.e. distributed accross a database and data lakes)

**If the data pipeine needs to be run every-day** 
<br> Airflow will again be a good choice to load the data regularly in parallel without intervention. 
<br> depending on the dashboard needs static cached data views (similar to database mateliazed views) can be created for every 24Hrs. So everyday before 7am, a procedure can run that can create a view for the remaining 24Hrs only and can be cached. 
<br> benefit of caching would be that there will be no need to reprocess / re-read all data. Instead only the *incremental* processing is done which would be much faster. 

**if data needs to be access by 100+ people**
<br>An option to explore will be data replication and load balancing factors accross spark cluster.
1. Data will be partitioned and distributed so each server has to process its own data only
2. Data will be replicated so Person X's query can be answered by Server A where as the same query by Person Y can be answered by Server B. This will even out load accross the servers
3. If the peoeple are geo-graphically distributed, we can also distributed the data geo-graphically so that the region's data stays local
