# ETL pipeline for analysing immigration behaviours and effects of temperature in selecting destination 

## Data Engineering Capstone Project

This project was made using Spark in order to make a datawarehouse in parquet file format that reflects immigration and temperatue data in US. It's used a star schema with a facts table an dimensional tables.
The datawarehouse will then be used to answer questions regarding immigration behavior to location tempatures.

The project follows the follow steps:
- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up


In [2]:
#import libraries
from datetime import datetime
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, SQLContext 
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, rand
from pyspark.sql.functions import isnan, when, count, col
import configparser
import psycopg2
from convert_sas_to_df import convert
from pyspark import SparkContext
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.functions import sum as _sum
import re

In [3]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

# Step 1: Scope the Project and Gather Data

## Scope

This project will pull data from all sources and create fact and dimension tables to show movement of immigration and to determine if temperature affects the selection of destination cities for immigration.

## Describe and Gather Data

- **U.S. City Demographic Data (city_demo):** comes from OpenSoft and includes data by city, state, age, population, veteran status and race.

- **I94 Immigration Data (immigration_data):** comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

- **Airport Code Table (airport):** comes from datahub.io and includes airport codes and corresponding cities.

- **Countries (country):** comes from I94_SAS_Labels_Descriptions.SAS

- **Visas (visa):** comes from I94_SAS_Labels_Descriptions.SAS

- **Immigrant Entry Mode (mode):** comes from I94_SAS_Labels_Descriptions.SAS

- **Address(address):** comes from I94_SAS_Labels_Descriptions.SAS

- **Temperature Data(temp_data)**: comes from Kaggle

## View Dataset schema and rows in raw format

In [22]:
# I94 immigration_data
immigration_data = spark.read.parquet("sas_data")
print(immigration_data.printSchema())
immigration_data.show(5)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [26]:
#City Demographics data
city_demo = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
print(city_demo.printSchema())
city_demo.show(5)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

None
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+--------------

In [6]:
# Airport data
airport = spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")
print(airport.printSchema())
airport.show(5)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

None
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport

In [7]:
#City Temperature data
temp_data = spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByCity.csv")
print(temp_data.printSchema())
temp_data.show(5)

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

None
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|           

In [8]:
#Extracting data from the label.sas file
convert('I94_SAS_Labels_Descriptions.SAS', 'i94cntyl', ['country_code', 'country'], 'Country')

In [9]:
#Country data
country = spark.read.format("csv").option("header", "true").load("Country.csv")
print(country.drop('_c0').printSchema())
country.show(5)

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)

None
+---+------------+--------------------+
|_c0|country_code|             country|
+---+------------+--------------------+
|  0|         582|MEXICO Air Sea, a...|
|  1|         236|         AFGHANISTAN|
|  2|         101|             ALBANIA|
|  3|         316|             ALGERIA|
|  4|         102|             ANDORRA|
+---+------------+--------------------+
only showing top 5 rows



In [10]:
convert('I94_SAS_Labels_Descriptions.SAS', 'I94VISA', ['visa_code', 'visa'], 'Visa')

In [10]:
#Visa Data
visa = spark.read.format("csv").option("header", "true").load("Visa.csv")
print(visa.drop('_c0').printSchema())
visa.show(5)

root
 |-- visa_code: string (nullable = true)
 |-- visa: string (nullable = true)

None
+---+---------+--------+
|_c0|visa_code|    visa|
+---+---------+--------+
|  0|        1|Business|
|  1|        2|Pleasure|
|  2|        3| Student|
+---+---------+--------+



In [11]:
convert('I94_SAS_Labels_Descriptions.SAS', 'I94MODE', ['mode_code', 'mode'], 'Mode')

In [12]:
#Mode data
mode = spark.read.format("csv").option("header", "true").load("Mode.csv")
print(mode.drop('_c0').printSchema())
mode.show(5)

root
 |-- mode_code: string (nullable = true)
 |-- mode: string (nullable = true)

None
+---+---------+------------+
|_c0|mode_code|        mode|
+---+---------+------------+
|  0|        1|         Air|
|  1|        2|         Sea|
|  2|        3|        Land|
|  3|        9|Not reported|
+---+---------+------------+



In [13]:
convert('I94_SAS_Labels_Descriptions.SAS', 'I94ADDR', ['addr_code', 'address'],'address')

In [14]:
#Address data
address = spark.read.format("csv").option("header", "true").load("address.csv")
print(address.drop('_c0').printSchema())
address.show(5)

root
 |-- addr_code: string (nullable = true)
 |-- address: string (nullable = true)

None
+---+---------+----------+
|_c0|addr_code|   address|
+---+---------+----------+
|  0|       AL|   ALABAMA|
|  1|       AK|    ALASKA|
|  2|       AZ|   ARIZONA|
|  3|       AR|  ARKANSAS|
|  4|       CA|CALIFORNIA|
+---+---------+----------+
only showing top 5 rows



# Step-2 Explore and Assess Dataset

This step involves preprocessing of datasets which includes:
- Renaming the columns 
- Changing datatypes of columns
- Dropping duplicates
- Dropping/filling null values
- Put correct formats in dates and select only important columns
- Generating new columns

#### **Immigration dataset:** 
Rename columns with understandable names. Put correct formats in dates and select only important columns


In [23]:
immigration_data = immigration_data \
            .withColumn("cic_id", col("cicid").cast("integer")) \
            .drop("cicid") \
            .withColumnRenamed("i94addr", "code_state") \
            .withColumnRenamed("i94port", "code_port") \
            .withColumn("code_visa", col("i94visa").cast("integer")) \
            .drop("i94visa") \
            .withColumn("code_mode", col("i94mode").cast("integer")) \
            .drop("i94mode") \
            .withColumn("code_country_origin", col("i94res").cast("integer")) \
            .drop("i94res") \
            .withColumn("code_country_cit", col("i94cit").cast("integer")) \
            .drop("i94cit") \
            .withColumn("year", col("i94yr").cast("integer")) \
            .drop("i94yr") \
            .withColumn("month", col("i94mon").cast("integer")) \
            .drop("i94mon") \
            .withColumn("bird_year", col("biryear").cast("integer")) \
            .drop("biryear") \
            .withColumn("age", col("i94bir").cast("integer")) \
            .drop("i94bir") \
            .withColumn("counter", col("count").cast("integer")) \
            .drop("count") \
            .withColumn("data_base_sas", F.to_date(F.lit("01/01/1960"), "MM/dd/yyyy")) \
            .withColumn("arrival_date", F.expr("date_add(data_base_sas, arrdate)")) \
            .withColumn("departure_date", F.expr("date_add(data_base_sas, depdate)")) \
            .drop("data_base_sas", "arrdate", "depdate")


In [24]:
immigration_data = immigration_data.select(col("cic_id"), col("code_port"), col("code_state"), col("visapost"), col("matflag"),
                                  col("dtaddto") \
                                  , col("gender"), col("airline"), col("admnum"), col("fltno"), col("visatype"),
                                  col("code_visa"), col("code_mode") \
                                  , col("code_country_origin"), col("code_country_cit"), col("year"), col("month"),
                                  col("bird_year") \
                                  , col("age"), col("counter"), col("arrival_date"), col("departure_date"))
immigration_data.show(10)

+-------+---------+----------+--------+-------+--------+------+-------+--------------+-----+--------+---------+---------+-------------------+----------------+----+-----+---------+---+-------+------------+--------------+
| cic_id|code_port|code_state|visapost|matflag| dtaddto|gender|airline|        admnum|fltno|visatype|code_visa|code_mode|code_country_origin|code_country_cit|year|month|bird_year|age|counter|arrival_date|departure_date|
+-------+---------+----------+--------+-------+--------+------+-------+--------------+-----+--------+---------+---------+-------------------+----------------+----+-----+---------+---+-------+------------+--------------+
|5748517|      LOS|        CA|     SYD|      M|10292016|     F|     QF|9.495387003E10|00011|      B1|        1|        1|                438|             245|2016|    4|     1976| 40|      1|  2016-04-30|    2016-05-08|
|5748518|      LOS|        NV|     SYD|      M|10292016|     F|     VA|9.495562283E10|00007|      B1|        1|        1

### **Demographics dataset:** 
Fill null values with 0 and grouping by city and state and pivot Race in diferent columns


In [27]:
city_demo = city_demo.groupBy(col("City"), col("State"), col("Median Age"), col("Male Population"),
                                     col("Female Population") \
                                     , col("Total Population"), col("Number of Veterans"), col("Foreign-born"),
                                     col("Average Household Size") \
                                     , col("State Code")).pivot("Race").agg(_sum("count").cast("integer")) \
            .fillna({"American Indian and Alaska Native": 0,
                     "Asian": 0,
                     "Black or African-American": 0,
                     "Hispanic or Latino": 0,
                     "White": 0})

In [28]:
city_demo = city_demo.withColumnRenamed('Median Age', 'Median_age').withColumnRenamed('Male Population' , 'Male_population')\
.withColumnRenamed('Female Population', 'Female_population').withColumnRenamed('Total Population', 'Total_population')\
.withColumnRenamed('Number of Veterans', 'Number_of_veterans').withColumnRenamed('Average Household Size', 'Average_Household_Size')\
.withColumnRenamed('State Code', 'State_Code').withColumnRenamed('American Indian and Alaska Native', 'American_Indian_and_Alaska_Native')\
.withColumnRenamed('Black or African-American', 'Black_or_African-American').withColumnRenamed('Hispanic or Latino', 'Hispanic_or_Latino')

In [29]:
city_demo.show(5)

+-------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|         City|         State|Median_age|Male_population|Female_population|Total_population|Number_of_veterans|Foreign-born|Average_Household_Size|State_Code|American_Indian_and_Alaska_Native|Asian|Black_or_African-American|Hispanic_or_Latino| White|
+-------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|       Skokie|      Illinois|      43.4|          31382|            33437|           64819|              1066|       27424|                  2.78|        IL|                                0|20272|                     4937|              6590| 406

### **Airport dataset:** 
Extract iso regions and cast as float elevation feet.

In [30]:
airport = airport \
            .where(col("type").isin("large_airport", "medium_airport", "small_airport")) \
            .withColumn("iso_region", F.substring(col("iso_region"), 4, 2)) \
            .withColumn("elevation_ft", col("elevation_ft").cast("float")).dropDuplicates().drop('continent')

In [31]:
airport.show(5)

+-----+-------------+--------------------+------------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+-----------+----------+------------+--------+---------+----------+--------------------+
| 05IA|small_airport|        Spotts Field|      1155.0|         US|        IA|Nora Springs|    05IA|     null|      05IA|-93.0682983398437...|
| 0LS9|small_airport|   Huenefeld Airport|        72.0|         US|        LA|      Monroe|    0LS9|     null|      0LS9|-91.9821014404296...|
| 0NY7|small_airport|Murphys Landing S...|       940.0|         US|        NY|       Perth|    0NY7|     null|      0NY7|-74.1843032836914...|
| 0TX8|small_airport|       Jacobia Field|       570.0|         US|        TX|  Greenville|    0TX8|     null|      0TX8|-96.0432968139648...|

### **Temperature dataset:** 
Generating city_code column from city column and droppping duplicates and null values

In [32]:
temp_data = temp_data.dropDuplicates(['City', 'Country'])
temp_data = temp_data.filter(temp_data.AverageTemperature != 'NaN')

In [33]:
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_code = {}
with open('i94port.txt') as f:
     for line in f:
        match = re_obj.search(line)
        valid_code[match[1]]=[match[2]]

In [34]:
@udf()
def get_code_city(city):
    '''
    Input: City name
    
    Output: Corresponding i94port
    
    '''
    
    for key in valid_code:
        if city.lower() in valid_code[key][0].lower():
            return key

In [35]:
temp_data = temp_data.withColumn("city_code", get_code_city(temp_data.City))
temp_data = temp_data.filter(temp_data.city_code != 'null')
temp_data.show(5)

+----------+------------------+-----------------------------+--------+-------------+--------+---------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|      Country|Latitude|Longitude|city_code|
+----------+------------------+-----------------------------+--------+-------------+--------+---------+---------+
|1852-07-01|            15.488|                        1.395|   Perth|    Australia|  31.35S|  114.97E|      PER|
|1828-01-01|            -1.977|                        2.551| Seattle|United States|  47.42N|  121.97W|      SEA|
|1743-11-01|             2.767|                        1.905|Hamilton|       Canada|  42.59N|   80.73W|      HAM|
|1849-01-01| 7.399999999999999|                        2.699| Ontario|United States|  34.56N|  116.76W|      ONT|
|1821-11-01|             2.322|                        2.375| Spokane|United States|  47.42N|  117.24W|      SPO|
+----------+------------------+-----------------------------+--------+-------------+----

# Step 3: Define the Data Model

#### 3.1 Conceptual Model

##### Star Schema

**Fact Table** - This will contain information from the I94 immigration data joined with the city temperature data on city_code and similarly other tables.

fact_immigration:
- cic_id: integer 
- code_port: string 
- code_state: string 
- visapost: string 
- matflag: string
- dtaddto: string
- gender: string
- airline: string
- admnum: double
- fltno: string 
- visatype: string
- code_visa: integer
- code_mode: integer
- code_country_origin: integer
- code_country_cit: integer
- year: integer
- month: integer
- bird_year: integer
- age: integer
- counter: integer
- arrival_date: date
- departure_date: date


**Dimension Table** - This will contain events from the I94 immigration data.

city_demo:
- City: string 
- State: string 
- Median_age: string 
- Male_population: string 
- Female_population: string 
- Total_population: string 
- Number_of_veterans: string 
- Foreign-born: string 
- Average_Household_Size: string 
- State_Code: string 
- American_Indian_and_Alaska_Native: integer 
- Asian: integer 
- Black_or_African-American: integer 
- Hispanic_or_Latino: integer 
- White: integer 


airport:
- ident: string 
- type: string 
- name: string 
- elevation_ft: float 
- iso_country: string 
- iso_region: string 
- municipality: string 
- gps_code: string 
- iata_code: string 
- local_code: string 
- coordinates: string


country:
- country_code: string 
- country: string 


visa:
- visa_code: string
- visa: string


mode:
- mode_code: string
- mode: string


address:
- addr_code: string
- address: string


temp_data:
- dt: string 
- AverageTemperature: string 
- AverageTemperatureUncertainty: string 
- City: string 
- Country: string 
- Latitude: string 
- Longitude: string 
- city_code: string 


#### 3.2 Mapping Out Data Pipelines
Pipeline Steps:
- Model data
 - Create star schema with one facts table and six dimension tables
 - Write fact table in a parquet file partition by arrival_date
 - Write dimension table in a parquet file partitioned by respective columns
 - Insert in fact table with dimension keys for integrity and consistency

# Step 4: Run Pipelines to Model the Data

#### Create the data model

Build the data pipelines to create the data model.


##### Creating Facts table

In [36]:
fact_immigration = immigration_data\
.join(city_demo, immigration_data['code_state'] == city_demo['State_Code'], 'left_semi')\
.join(airport, immigration_data['code_port'] == airport['local_code'], 'left_semi')\
.join(country, immigration_data['code_country_origin'] == country['country_code'], 'left_semi')\
.join(visa, immigration_data['code_visa'] == visa['visa_code'], 'left_semi')\
.join(mode, immigration_data['code_mode'] == mode['mode_code'], 'left_semi')\
.join(address, immigration_data['code_state'] == address['addr_code'], 'left_semi')\
.join(temp_data, immigration_data['code_port'] == temp_data['city_code'], 'left_semi')

##### Writing to parquet files

In [43]:
fact_immigration.write.partitionBy('arrival_date').mode('overwrite').parquet('./parquet/fact_immigration.parquet')

In [96]:
city_demo.write.partitionBy("State_Code").mode('overwrite').parquet('./parquet/dimension_city_demo.parquet')

In [97]:
airport.write.partitionBy("local_code").mode('overwrite').parquet('./parquet/dimension_airport.parquet')

In [98]:
country.write.partitionBy('country_code').mode('overwrite').parquet('./parquet/dimension_country.parquet')

In [99]:
visa.write.partitionBy('visa_code').mode('overwrite').parquet('./parquet/dimension_visa.parquet')

In [100]:
mode.write.partitionBy('mode_code').mode('overwrite').parquet('./parquet/dimension_mode.parquet')

In [101]:
address.write.partitionBy('addr_code').mode('overwrite').parquet('./parquet/dimension_address.parquet')

In [102]:
temp_data.write.partitionBy('year').mode('overwrite').parquet('./parquet/dimension_temperature.parquet')

## 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
- Integrity constraints on the relational database (e.g., unique key, data type, etc.)
- Unit tests for the scripts to ensure they are doing the right thing
- Source/Count checks to ensure completeness


### Quality checks:

In [44]:
def qCheck(df, tabname):
    '''
    Input: Spark dataframe, name of Spark datafram
    Output: Print outcome of data quality check
    '''
    res = df.count()
    if res == 0:
        print("Data quality check failed for {} with zero records".format(tabname))
    else:
        print("Data quality check passed for {} with {} records".format(tabname, res))

In [45]:
qCheck(fact_immigration, 'Immigration_table')
qCheck(city_demo, 'City_demo_table')
qCheck(airport, 'Airport_table')
qCheck(country, 'Country_table')
qCheck(visa, 'Visa_table')
qCheck(mode, 'Mode_table')
qCheck(address, 'Address_table')
qCheck(temp_data, 'Temperature_table')

Data quality check passed for Immigration_table with 1637289 records
Data quality check passed for City_demo_table with 596 records
Data quality check passed for Airport_table with 39142 records
Data quality check passed for Country_table with 289 records
Data quality check passed for Visa_table with 3 records
Data quality check passed for Mode_table with 4 records
Data quality check passed for Address_table with 55 records
Data quality check passed for Temperature_table with 207 records


#### 4.3 Data dictionary
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

fact_immigration:
- cic_id: integer - CIC id
- code_port: string - Airport code 
- code_state: string - State code
- visapost: string - Department of State where where Visa was issued
- matflag: string - Match of arrival and departure records
- dtaddto: string - Date to which admitted to U.S. 
- gender: string - Gender
- airline: string - Airline code
- admnum: double - Admission Number
- fltno: string - Flight number of Airline used to arrive in U.S.
- visatype: string -  Class of admission legally admitting the non-immigrant to temporarily stay in U.S
- code_visa: integer - Visa code
- code_mode: integer - Mode code
- code_country_origin: integer - Country of origin code
- code_country_cit: integer - City of origin code
- year: integer - Year
- month: integer - Month 
- bird_year: integer - Year of Birth
- age: integer - Age
- counter: integer  - Used for summary statistics
- arrival_date: date - Arrival date
- departure_date: date - Departure date


**Dimension Table** - This will contain events from the I94 immigration data.

city_demo:
- City: string - City name
- State: string - State name
- Median_age: string - Median age
- Male_population: string  - Male population per state
- Female_population: string - Female population per state
- Total_population: string - Total population per state
- Number_of_veterans: string - Number of veterans
- Foreign-born: string - Foreign born
- Average_Household_Size: string - Average Household Size
- State_Code: string - State code
- American_Indian_and_Alaska_Native: integer - Belonging to this category 
- Asian: integer - Belonging to this category 
- Black_or_African-American: integer - Belonging to this category 
- Hispanic_or_Latino: integer -Belonging to this category 
- White: integer - Belonging to this category 


airport:
- ident: string - Airport id
- type: string - size of airport
- name: string - Airport name
- elevation_ft: float - elevation in feet
- iso_country: string -  country 
- iso_region: string - region
- municipality: string - municipality
- gps_code: string - gps
- iata_code: string - iata code
- local_code: string - local code
- coordinates: string - coordinates


country:
- country_code: string - country code 
- country: string - country


visa:
- visa_code: string - visa code
- visa: string - type of visa


mode:
- mode_code: string - mode code
- mode: string - type of mode


address:
- addr_code: string - address code
- address: string - address


temp_data:
- dt: string - date
- AverageTemperature: string  - average temperature  
- AverageTemperatureUncertainty: string - average temperature uncertainity 
- City: string - City
- Country: string - Country
- Latitude: string - Latitude
- Longitude: string - Longitude
- city_code: string - city code


# Step 5: Complete Project Write Up

- Clearly state the rationale for the choice of tools and technologies for the project.
    - For this project, I used Spark since it can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. The data persisted in parquet files can scale to lots of terabytes without any problems. 


- Propose how often the data should be updated and why.
    - The data should be updated every day because fact table is partitioned by arrival date.


- Write a description of how you would approach the problem differently under the following scenarios:
    - The data was increased by 100x.
      - Spark can handle without any problem if data was increased by 100x 
    - The data populates a dashboard that must be updated on a daily basis by 7am every day.
      - Apache airflow can be used to schedule this update
    - The database needed to be accessed by 100+ people.
      - Redshift can be used to store this data since it has auto-scaling capabilities and good read performance
