# Project Title
### Data Engineering Capstone Project

#### Project Summary
A core responsibility of The National Travel and Tourism Office (NTTO) is to collect, analyze, and disseminate international travel and tourism statistics.

NTTO's Board of Managers are charged with managing, improving, and expanding the system to fully account and report the impact of travel and tourism in the United States. The analysis results help to forcecast and operation, support make decision creates a positive climate for growth in travel and tourism by reducing institutional barriers to tourism, administers joint marketing efforts, provides official travel and tourism statistics, and coordinates efforts across federal agencies.

The target of project is analysis the relationship between amount of travel immigration and weather duration by month of city.

The source datas will be use to do data modeling are

- I94 Immigration: The source data for I94 immigration data comes from US National Tourism and Trade Office. The actual source of the data is from https://travel.trade.gov/research/reports/i94/historical/2016.html. 

- World Temperature Data This dataset came from Kaggle. 

- I94_SAS_Labels_Descriptions.SAS to get validation dataset. We will use I94Port.txt as list of airport, city, state.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
I will be using the following pieces of data provided in the workspace
1. I94 Immigration Data 
2. I94 SAS Label Descriptions
3. Global Land Temperatures
4. Airport Codes
5. US City Demographics

#### Technologies
- Python
- Spark
- Pandas
- AWS

#### Describe and Gather Data 
##### I94 Immigration Data 
- cicid: Unique code assigned to the foreigners entering US
- i94yr, i94month: Year and Month of entering the US
- i94cit, i94res: Information about the country the person is coming from. Specific codes are available in the SAS Label Description file
- i94port: US port of entry. Specific codes are available in the SAS Label Description file. This file gives details of which are the ports where immigration is allowed
- arrdate: Date of arrival in the US
- i94mode: Information about mode of entry. Eg: Air, Sea, Land. Specific codes are available in the SAS Label Description file
- i94addr: State where foreigner is going.State codes are available in the SAS Label Description file
- depdate: Date of departure from the US
- i94bir: Specifies the age of the foreigner
- i94visa: Information about the visa type. Eg; Business, Pleasure, Student. Specific codes are available in the SAS Label Description file
- gender: gender of the foreigner
- airline: Airline used to enter the US

- count, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, insnum, admnum, fitno, visatype - Not used in this project

##### I94 SAS Label Descriptions
- I94PORT: Specifies airport code, city name, state code for port of entry
- I94Mode: Specifies code and description for mode of travel - Air, Sea, Land
- I94ADDR: Specifies immigration state code and state name
- I94VISA: Specifies code and description for type of visa - Business, Pleasure, Student

##### Global Land Temperatures
- dt: Date the temperature was measured
- AverageTemperature: Avg temperature captured
- City: City where the temperature was measured
- Country: Country where the temperature was measured
- Latitude & Longitude: Latitude & Longitude of the location where the temperature was measured

##### Airport Codes
- ident: Unique identifier in the table
- iso_country, iso_region: Country and region where the airport is located
- type, name, elevation_ft, continent, municipality, gps_code, iata_code, coordinates - These fields are not used

##### US City Demographics
- City: City Name
- State, State Code: State where the city is located
- Foreign-born: May use this field
- Median Age, Male Population, Female Population, Total Population, Number of Veterans, Race, Count - These fields are not used, Average Household Size

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import os
import configparser
import shutil
from pyspark.sql.types import StructType as R, StructField as Fld,\
    DoubleType as Dbl, StringType as Str, IntegerType as Int,\
    TimestampType as Timestamp, DateType as Date, LongType as Long

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
config = configparser.ConfigParser()
config.read('dwh.cfg')

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']
AWS_ACCESS_KEY_ID = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
#Get the spark session

spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .config("spark.hadoop.fs.s3a.access.key",AWS_ACCESS_KEY_ID)\
        .config("spark.hadoop.fs.s3a.secret.key",AWS_SECRET_ACCESS_KEY)\
        .enableHiveSupport().getOrCreate()
print("Created spark session")

Created spark session


### Step 2: Explore and Assess the Data
#### Explore the Data
##### Read the Immigration data from the folder "sas_data"
- Immigration Data has 3096313 rows of data


##### Copy "GlobalLandTemperaturesByCity.csv" into the workspace and read the Global Temp data
- Global Temp Data csv has 8599212 rows of data


##### Read the SAS Labels file - I94_SAS_Labels_Descriptions.SAS 
- Extract the port Data into a temp csv file. Port data has 660 rows
- Extract the state Data into a temp csv file. State data has 55 rows

In [4]:
def readPortDataFromSASLabels(I94_SAS_Labels_Descriptions_String):
    #I94Port Data - Extract the i94PortData and write them to csv files

    #Extract I94 Port string by getting the substring from "I94PORT" to ";"
    I94Port_String = I94_SAS_Labels_Descriptions_String[I94_SAS_Labels_Descriptions_String.index('I94PORT'):]
    I94Port_String = I94Port_String[:I94Port_String.index(';')]

    I94Port_String_lines = I94Port_String.splitlines()
    I94Port_Codes = []
    for I94Port_String_line in I94Port_String_lines:
        try:
                I94Port_Code, I94Port_Value = I94Port_String_line.split('=')
                I94Port_Code = I94Port_Code.strip().strip("'").strip('"')
                I94Port_Value = I94Port_Value.strip().strip("'").strip('"').strip()
                I94Port_Codes.append((I94Port_Code, I94Port_Value))
        except:
            pass

    # Write to CSV files
    parent_dir = "./Temp/"
    I94Port_csv = 'I94Port_csv'
    path = os.path.join(parent_dir, I94Port_csv)
    try:
        os.makedirs(path, exist_ok = True)
    except OSError as error:
        print("Directory could not be created")

    schema = R([
            Fld("I94Port_Code", Str()),
            Fld("I94Port_Value", Str())
        ])

    df_i94Port = spark.createDataFrame(
            data=I94Port_Codes,
            schema=schema
        )

    shutil.rmtree(path, ignore_errors=False, onerror=None)
    df_i94Port.write.options(header='True', delimiter=',').csv(path)
    df_i94Port = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(path)
    df_i94Port.select("I94Port_Code").distinct().count()
    return df_i94Port

In [5]:
def readAddrDataFromSASLabels(I94_SAS_Labels_Descriptions_String):
    #I94Addr Data - Extract the i94AddrData and write them to csv files

    #Extract I94 Addr string by getting the substring from "I94Addr" to ";"
    I94Addr_String = I94_SAS_Labels_Descriptions_String[I94_SAS_Labels_Descriptions_String.index('I94ADDR'):]
    I94Addr_String = I94Addr_String[:I94Addr_String.index(';')]

    I94Addr_String_lines = I94Addr_String.splitlines()
    I94Addr_Codes = []
    for I94Addr_String_line in I94Addr_String_lines:
        try:
                I94Addr_Code, I94Addr_Value = I94Addr_String_line.split('=')
                I94Addr_Code = I94Addr_Code.strip().strip("'").strip('"')
                I94Addr_Value = I94Addr_Value.strip().strip("'").strip('"').strip()
                I94Addr_Codes.append((I94Addr_Code, I94Addr_Value))
        except:
            pass

    # Write to CSV files
    parent_dir = "./Temp/"
    I94Addr_csv = 'I94Addr_csv'
    addr_path = os.path.join(parent_dir, I94Addr_csv)
    try:
        os.makedirs(addr_path, exist_ok = True)
    except OSError as error:
        print("Directory could not be created")

    schema = R([
            Fld("I94Addr_Code", Str()),
            Fld("I94Addr_Value", Str())
        ])

    df_i94Addr = spark.createDataFrame(
            data=I94Addr_Codes,
            schema=schema
        )

    shutil.rmtree(addr_path, ignore_errors=False, onerror=None)
    df_i94Addr.write.options(header='True', delimiter=',').csv(addr_path)

    df_i94Addr = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(addr_path)
    df_i94Addr.select("I94Addr_Code").distinct().count()
    return df_i94Addr

In [6]:
def writeDataFrameToCSV(dataframe, parent_dir, csvName):
    immi_clean_path = os.path.join(parent_dir, csvName)
    try:
        os.makedirs(immi_clean_path, exist_ok = True)
        print("Directory created")
    except OSError as error:
        print("Directory could not be created")
    
    shutil.rmtree(immi_clean_path, ignore_errors=False, onerror=None)
    dataframe.write.options(header='True', delimiter=',').csv(immi_clean_path)

In [7]:
# Define the SQL queries

# Gather Data and Clean up Queries
query_GetImmigrationCount = \
    "SELECT count(*) \
        FROM i94immi_raw_dataset_table"
    
query_GetImmigrationForArrivalByAir = \
    "SELECT * \
        FROM i94immi_raw_dataset_table \
        WHERE i94mode == 1.0"
    
query_GetImmigrationForSpecificGender = \
    "SELECT * \
        FROM i94immi_raw_dataset_table \
        WHERE gender IN ('F', 'M')"

query_GetImmigrationWithoutNullAddr = \
    "SELECT * \
        FROM i94immi_raw_dataset_table \
        WHERE i94addr IS NOT NULL"

query_GetImmigrationWithVisaName = \
    "SELECT *, CASE \
        WHEN i94visa = 1 THEN 'Business' \
        WHEN i94visa = 2 THEN 'Pleasure' \
        WHEN i94visa = 3 THEN 'Student' \
        END AS i94immi_visatype \
        FROM i94immi_raw_dataset_table"

query_GetImmigrationWithDatTimeArrivalDate = \
    "SELECT *, date_add(to_date('1960-01-01'), arrdate) AS i94immi_arrival_date \
        FROM i94immi_raw_dataset_table"

query_GetImmigrationWithDatTimeDepartureDate = \
    "SELECT *, date_add(to_date('1960-01-01'), depdate) AS i94immi_dep_date \
        FROM i94immi_raw_dataset_table"

query_GetImmigrationCleanedUp = \
    "SELECT \
            cicid, \
            i94cit, \
            i94res, \
            i94port, \
            i94immi_arrival_date, \
            i94yr, \
            i94mon, \
            i94mode, \
            i94addr, \
            i94immi_dep_date, \
            i94bir, \
            i94visa, \
            gender, \
            airline, \
            admnum, \
            fltno, \
            i94immi_visatype, \
            visatype \
        FROM i94immi_raw_dataset_table"

query_GetImmigrationAll = \
    "SELECT * \
        FROM i94immi_raw_dataset_table"


# Queries to create the fact and dimention tables
query_CreateFactImmigrationTable = \
    "SELECT \
        cicid AS i94immi_cicid, \
        i94cit AS i94immi_citizenship_country_code, \
        i94res AS i94immi_residence_country_code, \
        i94port AS i94immi_arrival_port_code, \
        i94immi_arrival_date, \
        i94yr AS i94immi_arrival_year, \
        i94mon AS i94immi_arrival_month, \
        i94mode AS i94immi_airline_mode_code, \
        i94addr AS i94immi_state_code, \
        i94immi_dep_date, \
        i94bir AS i94immi_foreigner_age, \
        i94visa AS i94immi_visatype_number, \
        gender AS i94immi_foreigner_sex, \
        fltno AS i94immi_flight_code, \
        visatype AS i94immi_visatype_code, \
        i94immi_visatype \
    FROM fact_i94ImmigrationData"

query_CreateDimensionFlightTable = \
    "SELECT \
            fltno as flight_number, \
            airline as flight_brand, \
            i94port as flight_airport_city \
        FROM dim_Flight"

query_CreateDimensionVisaTable = \
        "SELECT \
            visatype as visa_visatype_code, \
            i94visa as visa_visatype_number, \
            i94immi_visatype as visa_type_name \
        FROM dim_Visa"

query_CreateDimensionAddressTable = \
        "SELECT \
            I94Addr_Code as address_state_code, \
            I94Addr_Value as address_state_name \
        FROM dim_Address"

query_CreateDimensionPortTable = \
        "SELECT \
            I94Port_Code as port_code, \
            I94Port_City as port_city_name, \
            I94Port_State as port_state_code \
        FROM dim_Port"

query_CreateDimensionForeignerTable = \
        "SELECT \
            cicid AS foreigner_cicid, \
            i94cit AS foreigner_cit_country_code, \
            i94res AS foreigner_res_country_code, \
            i94port AS foreigner_arrival_port_code, \
            i94immi_arrival_date AS foreigner_arrival_date, \
            i94addr AS foreigner_state_code, \
            i94bir AS foreigner_age, \
            gender AS foreigner_sex, \
            i94visa AS foreigner_visatype_code \
        FROM dim_Foreigner"

query_CreateFactTempTemperatureTable = \
        "SELECT  \
            dt, \
            MONTH(Temperature_Table.dt) as month, \
            YEAR(Temperature_Table.dt) as year, \
            city, \
            averagetemperature, \
            averagetemperatureuncertainty \
        FROM Temperature_Table"

query_UpdateFactTempTemperatureTableByCity = \
    "SELECT  \
        city, \
        month, \
        averagetemperature, \
        averagetemperatureuncertainty, \
        year, \
        dt \
    FROM Temperature_Table \
    GROUP BY city, month, year, dt, averagetemperature, averagetemperatureuncertainty \
    ORDER BY year"

query_GetAllFromFactTempTemperatureTable = \
    "SELECT * FROM Temperature_Table"

query_CreateFactCitiesTemperatureTable = \
        "SELECT \
            averagetemperature AS worldtemp_average, \
            city AS worldtemp_city, \
            month AS worldtemp_month, \
            dt AS worldtemp_measuredate \
        FROM fact_Cities_Temperature"

# Queries for statistics
query_GetImmigrationByGender = \
    "SELECT \
        i94immi_foreigner_sex AS gender, \
        count(*) AS foreigner_count_bygender \
        FROM Fact_i94ImmigrationData_View as Fact_i94ImmigrationData \
        GROUP BY gender \
        LIMIT 5"

query_GetImmigrationByAge = \
    "SELECT \
        i94immi_foreigner_age AS age, \
        count(*) AS foreigner_count_byage \
        FROM Fact_i94ImmigrationData_View as Fact_i94ImmigrationData \
        GROUP BY age \
        ORDER BY foreigner_count_byage desc \
        LIMIT 5"

query_GetImmigrationByCity = \
    "SELECT \
            Fact_i94ImmigrationData.i94immi_arrival_port_code as airport_code, \
            Dim_portData.port_city_name as city_name, \
            Fact_i94ImmigrationData.i94immi_flight_code as flight_traffic \
        FROM Fact_i94ImmigrationData_View as Fact_i94ImmigrationData \
        JOIN Dim_portData_View as Dim_portData \
            ON Dim_portData.port_code = Fact_i94ImmigrationData.i94immi_arrival_port_code \
        GROUP BY city_name, airport_code, flight_traffic"

query_GetCityCountByImmigration = \
    "SELECT count(*) FROM ( \
        SELECT DISTINCT city_name FROM \
        ImmigrationStatisticsByCity \
    )"

query_GetForeignerCountByVisa = \
    "SELECT \
        Dim_visaData.visa_type_name AS visa_name, \
        count(*) \
    FROM Fact_i94ImmigrationData_View as Fact_i94ImmigrationData \
    JOIN Dim_visaData_View as Dim_visaData \
        ON Dim_visaData.visa_visatype_code = Fact_i94ImmigrationData.i94immi_visatype_code \
    GROUP BY visa_name"

query_GetImmigrationNumbersByCityPrep = \
    "SELECT \
        Fact_i94ImmigrationData.i94immi_cicid as travel_cicid, \
        Fact_i94ImmigrationData.i94immi_arrival_port_code as airport_code, \
        LOWER(Dim_portData.port_city_name) as city_name, \
        Fact_i94ImmigrationData.i94immi_arrival_month as travel_month, \
        Fact_i94ImmigrationData.i94immi_arrival_date as travel_date \
    FROM Fact_i94ImmigrationData_View as Fact_i94ImmigrationData \
    JOIN Dim_portData_View as Dim_portData \
    ON Dim_portData.port_code = Fact_i94ImmigrationData.i94immi_arrival_port_code"

query_GetCityAirportForeignerCount = \
    "SELECT \
        city_name, \
        airport_code, \
        COUNT(travel_cicid) as total_foreigners \
    FROM Immigration_City \
    GROUP BY city_name, airport_code \
    ORDER BY total_foreigners DESC"

query_GetTop5CitiesByForeignerCount = \
    "SELECT * from Immigration_City \
    LIMIT 5"

query_GetCityTemperature = \
            "SELECT \
                worldtemp_average, \
                LOWER(worldtemp_city) AS worldtemp_city, \
                worldtemp_month, \
                worldtemp_measuredate \
            FROM Fact_temperatureData_View \
            WHERE worldtemp_month = 4"

query_GetCityAverageTemperature = \
                "SELECT \
                    worldtemp_city, \
                    AVG(worldtemp_average) as worldtemp_Average_For_City \
                FROM City_Temperature \
                GROUP BY worldtemp_city \
                ORDER BY worldtemp_Average_For_City DESC"

query_GetCityImmigrationByTemperature = \
        "SELECT \
            Immigration_City.city_name as city_name, \
                City_Temperature.worldtemp_Average_For_City as temperature, \
                Immigration_City.total_foreigners as total_foreigners \
        FROM Immigration_City \
        LEFT JOIN City_Temperature \
        ON City_Temperature.worldtemp_city = Immigration_City.city_name"

query_GetTop5CityImmigrationByTemperature = \
    "SELECT * FROM Immigration_City_Temperature \
    LIMIT 5"

In [8]:
#Read the Immigration data
i94_immi_df=spark.read.parquet("sas_data")
i94_immi_df.count()

3096313

In [9]:
#Read the Global Temp data
world_temp_file = 'GlobalLandTemperaturesByCity.csv'
world_temp_df = pd.read_csv(world_temp_file,sep=",")
len(world_temp_df)

8599212

In [10]:
# Read SAS Labels file
with open('I94_SAS_Labels_Descriptions.SAS') as I94_SAS_Labels_Descriptions:
    I94_SAS_Labels_Descriptions_String = I94_SAS_Labels_Descriptions.read()

In [11]:
df_i94Port = readPortDataFromSASLabels(I94_SAS_Labels_Descriptions_String)
df_i94Port = df_i94Port.toPandas()

In [12]:
df_i94Addr = readAddrDataFromSASLabels(I94_SAS_Labels_Descriptions_String)
df_i94Addr = df_i94Addr.toPandas()

#### Cleaning Steps
Identify data quality issues, like missing values, duplicate data, etc.
Document steps necessary to clean the data

##### I94 Immigration Data
- Data used - ./sas_data/i94immi_raw_dataset
- Create Spark SQL table
- Filter out only immigration records for arrival by air
- Filter out gender column by values of 'M' and 'F'
- Clean up null values for I94Addr
- Replace visatype number by visatype category
- Convert arrival and dep dates to datetime value

In [13]:
# Performing cleaning tasks here

# Read & Clean up the immigration raw dataset
i94immi_raw_dataset = './sas_data'
i94immi_raw_dataset_df = spark.read.parquet(i94immi_raw_dataset)

# Create Spark SQL table for clean up
i94immi_raw_dataset_df.createOrReplaceTempView('i94immi_raw_dataset_table')
spark.sql(query_GetImmigrationCount).show()

+--------+
|count(1)|
+--------+
| 3096313|
+--------+



In [14]:
#Filter out only immigration records for arrival by air
spark.sql(query_GetImmigrationForArrivalByAir).createOrReplaceTempView("i94immi_raw_dataset_table")
spark.sql(query_GetImmigrationCount).show()

+--------+
|count(1)|
+--------+
| 2994505|
+--------+



In [15]:
# Filter out gender column by values of 'M' and 'F'
spark.sql(query_GetImmigrationForSpecificGender).createOrReplaceTempView("i94immi_raw_dataset_table")
spark.sql(query_GetImmigrationCount).show()

+--------+
|count(1)|
+--------+
| 2581480|
+--------+



In [16]:
# Clean up null values for I94Addr
# Drop NULL value on arrival state
spark.sql(query_GetImmigrationWithoutNullAddr).createOrReplaceTempView("i94immi_raw_dataset_table")
spark.sql(query_GetImmigrationCount).show()

+--------+
|count(1)|
+--------+
| 2485241|
+--------+



In [17]:
# Replace visatype number by visatype category
spark.sql(query_GetImmigrationWithVisaName).createOrReplaceTempView("i94immi_raw_dataset_table")

In [18]:
# Convert arrival and departure dates to DateTime
spark.sql(query_GetImmigrationWithDatTimeArrivalDate).createOrReplaceTempView("i94immi_raw_dataset_table")
spark.sql(query_GetImmigrationWithDatTimeDepartureDate).createOrReplaceTempView("i94immi_raw_dataset_table")
spark.sql(query_GetImmigrationCount).show(5)

+--------+
|count(1)|
+--------+
| 2485241|
+--------+



In [19]:
spark.sql(query_GetImmigrationCleanedUp).createOrReplaceTempView('i94immi_raw_dataset_table')
df_i94immi_cleaned = spark.sql(query_GetImmigrationAll)
df_i94immi_cleaned.count()

2485241

In [20]:
# Write cleaned up immigration data to csv 
writeDataFrameToCSV(df_i94immi_cleaned, './Clean/', 'I94Immi_Data_Clean_csv')
immi_clean_path = os.path.join('./Clean/', 'I94Immi_Data_Clean_csv')

df_i94immi_cleaned = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(immi_clean_path)
df_i94immi_cleaned.count()

Directory created


2485241

##### Flight Data
- Data used - ./i94Immi_Data_Clean_csv
- Get unique airline information by i94immi_flight_code, airline,  i94immi_arrival_port_code
- Remove duplicates for i94immi_flight_code

In [21]:
# Gather Flight Data from the Immigration file
df_flightinfo = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(immi_clean_path)
df_flightinfo.count()

2485241

In [22]:
# Clean up Flight Data
df_flightinfo = df_flightinfo.select(['fltno', 'airline', 'i94port'])
df_flightinfo = df_flightinfo.dropDuplicates(['fltno'])
df_flightinfo.count()

6314

In [23]:
# Write cleaned up flight data to csv 
writeDataFrameToCSV(df_flightinfo, './Clean/', 'Flight_Data_Clean_csv')
flight_clean_path = os.path.join('./Clean/', 'Flight_Data_Clean_csv')

df_flightinfo = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(flight_clean_path)
df_flightinfo.count()

Directory created


6314

##### Foreigner Dataset
- Data used - ./i94Immi_Data_Clean_csv

In [24]:
# Gather Foreigner Data from the Immigration file
df_foreignerinfo = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(immi_clean_path)
df_foreignerinfo.count()

2485241

In [25]:
df_foreignerinfo = df_foreignerinfo.select(
        ['cicid','i94cit','i94res','i94port','i94immi_arrival_date',
            'i94mode','i94addr','i94bir','i94visa','gender'])
df_foreignerinfo.count()

2485241

In [26]:
# Write cleaned up foreigner data to csv 
writeDataFrameToCSV(df_foreignerinfo, './Clean/', 'Foreigner_Data_Clean_csv')
foreigner_clean_path = os.path.join('./Clean/', 'Foreigner_Data_Clean_csv')

df_foreignerinfo = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(foreigner_clean_path)
df_foreignerinfo.count()

Directory created


2485241

##### Visa Dataset
- Data used - ./i94Immi_Data_Clean_csv
- Select columns i94visa,i94immi_visatype,visatype
- Drop duplicates by i94visa

In [27]:
# Gather Visa Data from the Immigration file
df_visainfo = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(immi_clean_path)
df_visainfo.count()

2485241

In [28]:
df_visainfo = df_visainfo.select(['i94visa','i94immi_visatype','visatype'])
df_visainfo.count()

2485241

In [47]:
#Clean up Visa Info
df_visainfo = df_visainfo.dropDuplicates(['visatype'])
df_visainfo.count()
df_visainfo.show()

+-------+----------------+--------+
|i94visa|i94immi_visatype|visatype|
+-------+----------------+--------+
|    3.0|         Student|      F2|
|    1.0|        Business|     GMB|
|    2.0|        Pleasure|      B2|
|    3.0|         Student|      F1|
|    2.0|        Pleasure|     CPL|
|    1.0|        Business|      I1|
|    1.0|        Business|      WB|
|    3.0|         Student|      M1|
|    1.0|        Business|      B1|
|    2.0|        Pleasure|      WT|
|    3.0|         Student|      M2|
|    2.0|        Pleasure|      CP|
|    2.0|        Pleasure|     GMT|
|    1.0|        Business|      E1|
|    1.0|        Business|       I|
|    1.0|        Business|      E2|
|    2.0|        Pleasure|     SBP|
+-------+----------------+--------+



In [30]:
# Write cleaned up visa data to csv 
writeDataFrameToCSV(df_visainfo, './Clean/', 'Visa_Data_Clean_csv')
visa_clean_path = os.path.join('./Clean/', 'Visa_Data_Clean_csv')

df_visainfo = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(visa_clean_path)
df_visainfo.count()

Directory created


17

##### World Temperature dataset
- Data used ./GlobalLandWorldTemperatures.csv
- Filter data to just look at US cities
- Filter data to just look at data from 1960 to match immigration data

In [49]:
# Gather the world temerature Data & Clean it up
US_Cities_Temp_CSV = 'GlobalLandTemperaturesByCity.csv'
df_test = pd.read_csv(US_Cities_Temp_CSV, sep=",")

In [50]:
df_us_cities_temperature = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(US_Cities_Temp_CSV)
df_us_cities_temperature.count()

8599212

In [51]:
# Filter the data by US Cities
df_us_cities_temperature = df_us_cities_temperature[df_us_cities_temperature['Country']=='United States']
df_us_cities_temperature.count()

687289

In [53]:
# Filter the data by date
df_us_cities_temperature = df_us_cities_temperature[df_us_cities_temperature['dt']>"1960-01-01"]
df_us_cities_temperature.count()
df_us_cities_temperature.show(1)

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1960-01-01 00:00:00| 5.242999999999999|                          0.3|Abilene|United States|  32.95N|  100.53W|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
only showing top 1 row



In [54]:
# Write cleaned up temperature data to csv 
writeDataFrameToCSV(df_us_cities_temperature, './Clean/', 'Temperature_Data_Clean_csv')
temperature_clean_path = os.path.join('./Clean/', 'Temperature_Data_Clean_csv')

df_us_cities_temperature = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(temperature_clean_path)
df_us_cities_temperature.count()
df_us_cities_temperature.show(1)

Directory created
+-------------------+------------------+-----------------------------+-----------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|       City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----------+-------------+--------+---------+
|1960-01-01 00:00:00|7.5329999999999995|           0.7979999999999999|Los Angeles|United States|  34.56N|  118.70W|
+-------------------+------------------+-----------------------------+-----------+-------------+--------+---------+
only showing top 1 row



##### i94 SAS Label Descriptions - Port
- Data used df_i94Port
- Split into port, city, state

In [36]:
#Split to port, city, state
df_i94Port["I94Port_City"] = df_i94Port["I94Port_Value"].str.split(",").str.get(0)
df_i94Port["I94Port_State"] = df_i94Port["I94Port_Value"].str.split(",").str.get(1)
df_i94Port = df_i94Port[['I94Port_Code', 'I94Port_City', 'I94Port_State']]

In [37]:
#Remove empty values for state
df_i94Port = df_i94Port.dropna(subset = ["I94Port_State"])

In [38]:
df_i94Port = spark.createDataFrame(df_i94Port)

In [39]:
#Write cleaned up port data to csv 
writeDataFrameToCSV(df_i94Port, './Clean/', 'Port_Data_Clean_csv')
port_clean_path = os.path.join('./Clean/', 'Port_Data_Clean_csv')

df_i94Port = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(port_clean_path)
df_i94Port.count()

Directory created


583

##### i94 SAS Label Descriptions - Addr

In [40]:
df_i94Addr = spark.createDataFrame(df_i94Addr)

In [41]:
# Write cleaned up state data to csv 
writeDataFrameToCSV(df_i94Addr, './Clean/', 'Addr_Data_Clean_csv')
addr_clean_path = os.path.join('./Clean/', 'Addr_Data_Clean_csv')

df_i94Addr = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(addr_clean_path)
df_i94Addr.count()

Directory created


55

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The goal here is to utilize the immigration data and the world temperature data provided to get statistics for immigration. the various statistics captured here are
1. Immigration by city
2. Immigration by gender
3. Immigration by age
4. Immigration by visa type
5. Immmigration by city temperature

I am using a star schema here with the following tables
1. Fact Tables
    1. Immigration Table
    2. World Temperature Table
2. Dimension tables
    1. Flight Table
    2. Visa Table
    3. Address Table
    4. Port Table
    5. Foreigner Table
    
##### Details of the tables
###### Immigration Fact Table
- Table Name: fact_i94ImmigrationData
- Columns:
    - i94immi_cicid,
    - i94immi_citizenship_country_code,
    - i94immi_residence_country_code,
    - i94immi_arrival_port_code,
    - i94immi_arrival_date,
    - i94immi_arrival_year,
    - i94immi_arrival_month,
    - i94immi_airline_mode_code,
    - i94immi_state_code,
    - i94immi_dep_date,
    - i94bir AS i94immi_foreigner_age,
    - i94immi_visatype_number,
    - i94immi_foreigner_sex,
    - i94immi_flight_code,
    - i94immi_visatype_code,
    - i94immi_visatype

###### World Temperature Fact Table
- Table Name: fact_Cities_Temperature
- Columns:
    - averagetemperature
    - worldtemp_city
    - worldtemp_month
    - worldtemp_measuredate

###### Flight Dimension Table
- Table Name: dim_Flight
- Columns:
    - flight_number
    - flight_brand
    - flight_airport_city

###### Visa Dimension Table
- Table Name: dim_Visa
- Columns:
    - visa_visatype_code
    - visa_visatype_number
    - visa_type_name

###### Address Dimension Table
- Table Name: dim_Address
- Columns:
    - address_state_code
    - address_state_name

###### Port Dimension Table
- Table Name: dim_Port
- Columns:
    - port_code
    - port_city_name
    - port_state_code

###### Foreigner Dimension Table
- Table Name: dim_Foreigner
- Columns:
    - foreigner_cicid
    - foreigner_cit_country_code
    - foreigner_res_country_code
    - foreigner_arrival_port_code
    - foreigner_arrival_date
    - foreigner_state_code
    - foreigner_age
    - foreigner_sex
    - foreigner_visatype_code

#### 3.2 Mapping Out Data Pipelines


##### Locate the raw data sources
- I94_SAS_Labels_Descriptions.SAS  
- sas_data
- GlobalLandTemperaturesByCity.csv  metastore_db 

##### Read the data into dataframes
- Read the port and state data into temp csv files and load them to data frames
	- df_i94Port
	- df_i94Addr
- Read the immigration data into a data frame - i94_immi_df
- Read the global temperature data into a data frame - world_temp_df

##### Clean up the data and write the cleaned up data into csv files
- Load the cleaned up immigration data into a data frame - df_i94immi_cleaned and write it into a csv file - I94Immi_Data_Clean_csv
- Load the cleaned up flight data into a data frame - df_flightinfo and write it into a csv file - Flight_Data_Clean_csv
- Load the cleaned up foreigner data into a data frame - df_foreignerinfo and write it into a csv file - Foreigner_Data_Clean_csv
- Load the cleaned up visa data into a data frame - df_visainfo and write it into a csv file - Visa_Data_Clean_csv
- Load the cleaned up world temperature data into a data frame - df_us_cities_temperature and write it into a csv file - Temperature_Data_Clean_csv
- Load the cleaned up port data into a data frame - df_i94Port and write it into a csv file - Port_Data_Clean_csv
- Load the cleaned up address data into a data frame - df_i94Addr and write it into a csv file - Addr_Data_Clean_csv

##### Apply the required transforms and load the parquet files into S3
- Immigration fact table - fact_i94ImmigrationData.parquet
- Flight dimension table - dim_Flight.parquet
- Visa dimension table - dim_Visa.parquet
- Address dimension table - dim_Address.parquet
- Port dimension table - dim_Port.parquet
- Foreigner dimension table - dim_Foreigner.parquet
- Cities temperature fact table - fact_Cities_Temperature.parquet

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [45]:
# S3 Location for parquet files
parquet_files_directory = 's3a://spcapstone1/parquetFiles/'

# Local Location for parquet files
#parquet_files_directory = './parquetFiles/'

##### Immigration Fact Table

In [46]:
df_i94immi_cleaned.createOrReplaceTempView('fact_i94ImmigrationData')
fact_i94ImmigrationData = spark.sql(query_CreateFactImmigrationTable)

immi_parquet_path = parquet_files_directory + 'fact_i94ImmigrationData.parquet'
fact_i94ImmigrationData.write.mode("overwrite").parquet(immi_parquet_path)

##### Flight dim table

In [43]:
df_flightinfo.createOrReplaceTempView('dim_Flight')
dim_Flight = spark.sql(query_CreateDimensionFlightTable)
flight_parquet_path = parquet_files_directory + 'dim_flightData.parquet'
dim_Flight.write.mode("overwrite").parquet(flight_parquet_path)

##### Visa dim table

In [44]:
df_visainfo.createOrReplaceTempView('dim_Visa')
dim_Visa = spark.sql(query_CreateDimensionVisaTable)
visa_parquet_path = parquet_files_directory + 'dim_visaData.parquet'
dim_Visa.write.mode("overwrite").parquet(visa_parquet_path)

##### Address dim Table

In [45]:
df_i94Addr.createOrReplaceTempView('dim_Address')
dim_Address = spark.sql(query_CreateDimensionAddressTable)
address_parquet_path = parquet_files_directory + 'dim_addressData.parquet'
dim_Address.write.mode("overwrite").parquet(address_parquet_path)

##### Port dim table

In [46]:
df_i94Port.createOrReplaceTempView('dim_Port')
dim_Port = spark.sql(query_CreateDimensionPortTable)
port_parquet_path = parquet_files_directory + 'dim_portData.parquet'
dim_Port.write.mode("overwrite").parquet(port_parquet_path)

##### Foreigner dim table

In [48]:
df_foreignerinfo.createOrReplaceTempView('dim_Foreigner')
dim_Foreigner = spark.sql(query_CreateDimensionForeignerTable)
foreigner_parquet_path = parquet_files_directory + 'dim_foreignerData.parquet'
dim_Foreigner.write.mode("overwrite").parquet(foreigner_parquet_path)

##### Temperature fact table

In [50]:
df_us_cities_temperature.createOrReplaceTempView('Temperature_Table')

In [51]:
spark.sql(query_CreateFactTempTemperatureTable).createOrReplaceTempView('Temperature_Table')

In [54]:
# Grouping average temperature by city 
spark.sql(query_UpdateFactTempTemperatureTableByCity).createOrReplaceTempView('Temperature_Table')

In [56]:
df_us_cities_temperature = spark.sql(query_GetAllFromFactTempTemperatureTable)

In [60]:
df_us_cities_temperature.createOrReplaceTempView('fact_Cities_Temperature')
fact_Cities_Temperature = spark.sql(query_CreateFactCitiesTemperatureTable)
cities_Temperature__parquet_path = parquet_files_directory + 'fact_Cities_Temperature.parquet'
fact_Cities_Temperature.write.mode("overwrite").parquet(cities_Temperature__parquet_path)

In [61]:
# Write code here

#parquet_files_directory = 's3a://spcapstone1/parquetFiles/'
parquet_files_directory = './parquetFiles/'

fact_i94ImmigrationDataPath = parquet_files_directory + 'fact_i94ImmigrationData.parquet'
df_Fact_i94ImmigrationData = spark.read.parquet(fact_i94ImmigrationDataPath)
df_Fact_i94ImmigrationData.createOrReplaceTempView('Fact_i94ImmigrationData_View')

dim_portDataPath = parquet_files_directory + 'dim_portData.parquet'
df_Dim_portData = spark.read.parquet(dim_portDataPath)
df_Dim_portData.createOrReplaceTempView('Dim_portData_View')

dim_visaDataPath = parquet_files_directory + 'dim_visaData.parquet'
df_Dim_visaData = spark.read.parquet(dim_visaDataPath)
df_Dim_visaData.createOrReplaceTempView('Dim_visaData_View')

fact_temperatureDataPath = parquet_files_directory + 'fact_Cities_Temperature.parquet'
df_fact_temperatureData = spark.read.parquet(fact_temperatureDataPath)
df_fact_temperatureData.createOrReplaceTempView('Fact_temperatureData_View')

In [68]:
print("Immigration by gender:")
spark.sql(query_GetImmigrationByGender).show(5)

Immigration by gender:
+------+------------------------+
|gender|foreigner_count_bygender|
+------+------------------------+
|     F|                 1210561|
|     M|                 1274680|
+------+------------------------+



In [69]:
print("Immigration by age:")
spark.sql(query_GetImmigrationByAge).show(5)

Immigration by age:
+----+---------------------+
| age|foreigner_count_byage|
+----+---------------------+
|30.0|                58560|
|31.0|                57159|
|33.0|                56957|
|34.0|                56841|
|32.0|                56417|
+----+---------------------+



In [70]:
spark.sql(query_GetImmigrationByCity).createOrReplaceTempView('ImmigrationStatisticsByCity')

In [71]:
print("Total Number of Cities where foreigners arrived:")
spark.sql(query_GetCityCountByImmigration).show()

Total Number of Cities where foreigners arrived:
+--------+
|count(1)|
+--------+
|     161|
+--------+



In [75]:
print("Foreigner count by visa:")
spark.sql(query_GetForeignerCountByVisa).show(5)

Foreigner count by visa:
+---------+--------+
|visa_name|count(1)|
+---------+--------+
| Pleasure| 2057664|
|  Student|   38732|
| Business|  388845|
+---------+--------+



In [79]:
spark.sql(query_GetImmigrationNumbersByCityPrep).createOrReplaceTempView('Immigration_City')

In [80]:
spark.sql(query_GetCityAirportForeignerCount).createOrReplaceTempView('Immigration_City')

In [85]:
print("Cities with most foreigner arrival:")
spark.sql(query_GetTop5CitiesByForeignerCount).show()

Cities with most foreigner arrival:
+----------------+------------+----------------+
|       city_name|airport_code|total_foreigners|
+----------------+------------+----------------+
|        new york|         NYC|          389149|
|           miami|         MIA|          286905|
|     los angeles|         LOS|          247509|
|   san francisco|         SFR|          137126|
|newark/teterboro|         NEW|          131263|
+----------------+------------+----------------+



In [86]:
spark.sql(query_GetCityTemperature).createOrReplaceTempView('City_Temperature')

In [87]:
spark.sql(query_GetCityAverageTemperature).createOrReplaceTempView('City_Temperature')

In [89]:
spark.sql(query_GetCityImmigrationByTemperature).createOrReplaceTempView('Immigration_City_Temperature')

In [91]:
print("Foreigner travel to city vs temperature:")
spark.sql(query_GetTop5CityImmigrationByTemperature).show()

Foreigner travel to city vs temperature:
+----------------+------------------+----------------+
|       city_name|       temperature|total_foreigners|
+----------------+------------------+----------------+
|        new york| 9.409092592592593|          389149|
|           miami|22.989722222222216|          286905|
|     los angeles|14.007444444444445|          247509|
|   san francisco|13.686240740740741|          137126|
|newark/teterboro|              null|          131263|
+----------------+------------------+----------------+



#### 4.2 Data Quality Checks
Data quality checks performed:
 * Primary key constraints on the relational database (e.g., unique key, data type, etc.)
 
Run the following checks:
- Immigration by gender
- Immigration by age
- Total number of cities where foreigners arrived
- Top 5 cities where most foreigners arrived
- Foreigner count by visa
- Cities with most foreigner arrival
- Foreigner travel to city vs temperature


##### Immigration Table Quality Check

In [98]:
# Perform quality checks here
fact_i94ImmigrationDataPath = parquet_files_directory + 'fact_i94ImmigrationData.parquet'
df_Fact_i94ImmigrationData = spark.read.parquet(fact_i94ImmigrationDataPath)
originalCountImmi = df_Fact_i94ImmigrationData.count()
immi_primary_key = ['i94immi_cicid']
afterDropDupliCountImmi = df_Fact_i94ImmigrationData.dropDuplicates(immi_primary_key).count()
if(originalCountImmi != afterDropDupliCountImmi):
    print("Quality check failed: Immigration Table")
else:
    print("Quality check passed: Immigration Table")

Quality check passed: Immigration Table


##### Port Table Quality Check

In [99]:
# Perform quality checks here
dim_portDataPath = parquet_files_directory + 'dim_portData.parquet'
df_Dim_portData = spark.read.parquet(dim_portDataPath)
originalCountPort = df_Dim_portData.count()
port_primary_key = ['port_code']
afterDropDupliCountPort = df_Dim_portData.dropDuplicates(port_primary_key).count()
if(originalCountPort != afterDropDupliCountPort):
    print("Quality check failed: Port Table")
else:
    print("Quality check passed: Port Table")

Quality check passed: Port Table


##### Visa Table Quality Check

In [100]:
dim_visaDataPath = parquet_files_directory + 'dim_visaData.parquet'
df_Dim_visaData = spark.read.parquet(dim_visaDataPath)
originalCountVisa = df_Dim_visaData.count()
visa_primary_key = ['visa_visatype_code']
afterDropDupliCountVisa = df_Dim_visaData.dropDuplicates(visa_primary_key).count()
if(originalCountVisa != afterDropDupliCountVisa):
    print("Quality check failed: Visa Table")
else:
    print("Quality check passed: Visa Table")

Quality check passed: Visa Table


##### Foreigner Table Quality Check

In [107]:
dim_foreignerDataPath = parquet_files_directory + 'dim_foreignerData.parquet'
df_dim_foreignerData = spark.read.parquet(dim_foreignerDataPath)
originalCountForeigner = df_dim_foreignerData.count()
temp_primary_key = ['foreigner_cicid']
afterDropDupliCountForeigner = df_dim_foreignerData.dropDuplicates(temp_primary_key).count()
if(originalCountForeigner != afterDropDupliCountForeigner):
    print("Quality check failed: Foreigner Table")
else:
    print("Quality check passed: Foreigner Table")

Quality check passed: Foreigner Table


##### Address Table Quality Check

In [110]:
# Perform quality checks here
dim_addressDataPath = parquet_files_directory + 'dim_addressData.parquet'
df_Dim_addressData = spark.read.parquet(dim_addressDataPath)
originalCountAddress = df_Dim_addressData.count()
address_primary_key = ['address_state_code']
afterDropDupliCountAddress = df_Dim_addressData.dropDuplicates(address_primary_key).count()
if(originalCountAddress != afterDropDupliCountAddress):
    print("Quality check failed: Address Table")
else:
    print("Quality check passed: Address Table")

Quality check passed: Address Table


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Details of the tables

See diagram Capstone_DataModel.png
See PDF for Data Dictionary

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * Spark Dataframes provide the ability to work with multiple data formats. It is also easy to use and has APIs for python. We can also convert easily from pandas to spark dataframes.
    * Spark SQL is easy to use and can be easily integrated
    * Pandas is made for python and pandas dataframe can be easily converted to spark dataframes.
* Propose how often the data should be updated and why.
    * One use case for the data collected here is to track tourism by city. This will help monitor tourism policies,  city development & planning. For this use case, an annual update of this data is sufficient
    * For the use case of airlines using this information to plan their flight schedules, an annual update of this data is sufficient.
    * For use cases like tracking individuals a daily update of this information may be useful
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * We can use cloud based technologies to scale. We can use Redshift storage optimized nodes.  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * We can use Airflow to monitor and schedule workflows. To update the dashboard on a daily basis at a specific time, we can create a DAG and schedule it to be run daily at a given start time. We can create an operator in the DAG to perform the task to populate the dashboard
 * The database needed to be accessed by 100+ people.
     * We can use distributed file systems and distributed databases to scale as users increase