# International Tourists Arrivals into USA Airports

__Data Engineer:__ Lysmar Freitas

#### Project Summary
A Car Rental Company that operates at USA airports hired me to develop a data model in order to do fast data analysis about the international tourists arrivals into the country airport cities in the last years. Some of the main interests with the analysis is to know tourists origin countries and their ammount, tourists travel reasons and the average time they spent during their stay.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob

from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

from origin_codes import city_code_udf, city_codes, country_udf, visa_code_udf

from datetime import datetime
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [46]:
#Build SQL context object
sqlContext = SQLContext(spark)

In [3]:
# Create helper to convert spark to pandas dataframe 
def spark_to_pandas(results):
    import pandas as pd
    df = pd.DataFrame(results, columns=results[0].__fields__)
    return df

### Step 1: Scope the Project and Gather Data

#### Scope 

The goal of this project is to pull data from two sources, and create a fact and dimension tables to show international travelers patterns that arrives into USA cities airports. 

Spark was chosen to develop this project for processing large amount of data fast, scale easily with additional worker nodes, with ability to digest different data formats (e.g. SAS, Parquet, CSV).

#### Describe and Gather Data 
To develop the project, it was used the following datasets:

- __I94 Immigration Data:__ this dataset comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry. A dataset that includes flight passenger data collected at immigration, such as the airport, arrival and departure, birthyear, gender, airline, etc.
https://travel.trade.gov/research/reports/i94/historical/2016.html
 

- __Airport Code Table:__ this dataset comes from datahub.io and includes airport codes and corresponding cities.
https://datahub.io/core/airport-codes#data

In [4]:
# Read in the data 

df_immi=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")


df_airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identifying data quality issues, like missing values, duplicate data, etc.

##### I94 Immigration Data

In [5]:
df_immi.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- validres: double (nullable = true)
 |-- delete_days: double (nullable = true)
 |-- delete_mexl: double (nullable = true)
 |-- delete_dup: double (nullable = true)
 |-- delete_visa: double (nullable = true)
 |-- delete_recdup: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- mat

In [6]:
# view data into a dataframe
spark_to_pandas(df_immi.take(5))

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,...,U,,1957.0,10032016,,,,14938460000.0,,WT
1,5.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,...,U,,1966.0,10032016,,,,17460060000.0,,WT
2,6.0,2016.0,6.0,213.0,213.0,XXX,20609.0,,,,...,U,,1989.0,D/S,,,,1679298000.0,,F1
3,7.0,2016.0,6.0,213.0,213.0,XXX,20611.0,,,,...,U,,1993.0,D/S,,,,1140963000.0,,F1
4,16.0,2016.0,6.0,245.0,245.0,XXX,20632.0,,,,...,U,,1992.0,D/S,,,,1934535000.0,,F1


In [7]:
# check if there is any arrival besides airport (i94mode = 1)
df_immi.select("i94mode").dropDuplicates().sort("i94mode").show()

+-------+
|i94mode|
+-------+
|   null|
|    1.0|
|    2.0|
|    3.0|
|    9.0|
+-------+



In [8]:
# Check if there is any duplicated row in the dataset, by counting unique row values
print(df_immi.count())
print(df_immi.dropDuplicates().count())

3574989
3574989


###### Findings
- There are no duplicated rows
- There are columns with null values, like i94mode, i94addr, depdate and gender
- Column i94mode has values besides 1
- Columns like cicid, i94cit, visapost, dtadfile, dentdepa and entdepd sound not relevant to the project
- Columns like i94yr, i94mon, i94port, arrdate, i94mode, i94addr, depdate, biryear, gender, visatype can be more descriptive to the project
- Columns with numerical values as 'double' type, that are not proper to the project
- Columns like i94res and i94visa can have more nominative values

__Airport Code Table__ 

In [9]:
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [10]:
# view data into a dataframe
spark_to_pandas(df_airport.take(5))

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [11]:
# check if there is iso_country besides US
df_airport.select("iso_country").dropDuplicates().sort("iso_country").show()

+-----------+
|iso_country|
+-----------+
|         AD|
|         AE|
|         AF|
|         AG|
|         AI|
|         AL|
|         AM|
|         AO|
|         AQ|
|         AR|
|         AS|
|         AT|
|         AU|
|         AW|
|         AZ|
|         BA|
|         BB|
|         BD|
|         BE|
|         BF|
+-----------+
only showing top 20 rows



In [12]:
# check if there are nulls in the municipality 
df_airport.select("municipality").dropDuplicates().sort("municipality").show()

+-----------------+
|     municipality|
+-----------------+
|             null|
|  'S Gravenvoeren|
|         108 Mile|
|         3 Marias|
|         ;ifi,bwe|
|           <Olmos|
|       A. IbaÃ±ez|
|           Aachen|
|          Aalborg|
|            Aalen|
|            Aalst|
|Aamby Valley City|
|        Aarbergen|
|           Aarhus|
|             Aars|
|          Aasiaat|
|              Aba|
|           Abadan|
|           Abadeh|
|          AbaetÃ©|
+-----------------+
only showing top 20 rows



In [13]:
# check if there are airports not relevant to car rental company
df_airport.select("type").dropDuplicates().sort("type").show()

+--------------+
|          type|
+--------------+
|   balloonport|
|        closed|
|      heliport|
| large_airport|
|medium_airport|
| seaplane_base|
| small_airport|
+--------------+



In [14]:
# Check if there is any duplicated row in the dataset, by counting unique row values
print(df_airport.count())
print(df_airport.dropDuplicates().count())

55075
55075


###### Findings
- There are no duplicated rows
- There are columns with null values, like iata_code
- Column iso_country has values besides "US"
- Columns with numerical values as 'string' type, that are not proper to the project
- Column type has values closed, heliport and seaplane_base, where car rental can't operate
- Column municipality has null values

#### Cleaning Steps
- Drop rows with values different than air arrival (i94mode = 1)
- Drop columns from datasets that are not relevant to the project
- Rename columns to make them more descriptive and standardized
- Fix columns data types as noted before
- Convert i94res column numbers into country of origin names
- Convert i94visa column numbers into travel reason names
- Remove nulls and drop any duplicates

__I94 Immigration Data__

In [15]:
# select only people who came by plane -> i94mode==1
df_immi = df_immi[df_immi['i94mode']==1]

In [16]:
# select relevant columns to the project
df_immi = df_immi['i94yr', 'i94mon', 'i94res','i94port', 'i94addr', 'i94visa', 'arrdate', 'depdate', 'biryear', 'gender','admnum']

In [17]:
df_immi.show(5)

+------+------+------+-------+-------+-------+-------+-------+-------+------+--------------+
| i94yr|i94mon|i94res|i94port|i94addr|i94visa|arrdate|depdate|biryear|gender|        admnum|
+------+------+------+-------+-------+-------+-------+-------+-------+------+--------------+
|2016.0|   6.0| 276.0|    SFR|     CA|    3.0|20623.0|   null| 1994.0|     F|8.432352323E10|
|2016.0|   6.0| 276.0|    SFR|     CA|    3.0|20623.0|   null| 1989.0|     M|8.694424303E10|
|2016.0|   6.0| 127.0|    HOU|   null|    2.0|20617.0|20679.0| 1973.0|     M|9.238008823E10|
|2016.0|   6.0| 127.0|    HOU|     TX|    2.0|20617.0|20679.0| 1947.0|     F|9.237990593E10|
|2016.0|   6.0| 101.0|    BOS|     MA|    1.0|20606.0|20608.0| 1985.0|  null|9.790576503E10|
+------+------+------+-------+-------+-------+-------+-------+-------+------+--------------+
only showing top 5 rows



In [18]:
# drop missing values from columns
df_immi = df_immi.dropna(subset=['i94yr', 'i94mon', 'i94res','i94port', 'i94addr', 'i94visa', 'arrdate', 'depdate', 'biryear', 'gender', 'admnum'])

In [19]:
# rename columns
df_immi = df_immi.withColumnRenamed("i94yr", "year") \
                 .withColumnRenamed("i94mon", "month") \
                 .withColumnRenamed("i94res", "origin_country") \
                 .withColumnRenamed("i94port", "airport_code") \
                 .withColumnRenamed("i94addr", "state_code") \
                 .withColumnRenamed("i94visa", "travel_reason") \
                 .withColumnRenamed("biryear", "birth_year") \
                 .withColumnRenamed("admnum", "admission_number") 

In [20]:
# convet columns values using country_udf and visa_code_udf created with data from i94 SAS labels Descriptions file.
df_immi=df_immi\
.withColumn("origin_country",country_udf(df_immi["origin_country"]))\
.withColumn("travel_reason",visa_code_udf(df_immi["travel_reason"]))

In [21]:
# replace arrdate and depdate columns with stay_duration column
date_converter = F.udf(lambda x: datetime.fromordinal(x), T.DateType())
df_immi = df_immi.withColumn("stay_duration", (F.col("depdate") - F.col("arrdate")))
df_immi = df_immi.drop("depdate").drop("arrdate")
df_immi.show(5)

+------+-----+--------------+------------+----------+-------------+----------+------+----------------+-------------+
|  year|month|origin_country|airport_code|state_code|travel_reason|birth_year|gender|admission_number|stay_duration|
+------+-----+--------------+------------+----------+-------------+----------+------+----------------+-------------+
|2016.0|  6.0|       ROMANIA|         HOU|        TX|     Pleasure|    1947.0|     F|  9.237990593E10|         62.0|
|2016.0|  6.0|       ALBANIA|         NEW|        PA|     Pleasure|    1964.0|     F|   1.117456185E9|         33.0|
|2016.0|  6.0|       ALBANIA|         BOS|        MA|     Pleasure|    1989.0|     F|  9.794448313E10|         26.0|
|2016.0|  6.0|       ALBANIA|         BOS|        MA|     Pleasure|    1955.0|     F|  9.794688673E10|         91.0|
|2016.0|  6.0|       ALBANIA|         WAS|        VA|     Pleasure|    1962.0|     F|  9.794752923E10|         85.0|
+------+-----+--------------+------------+----------+-----------

In [22]:
# check datatypes
df_immi.printSchema()

root
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- travel_reason: string (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- admission_number: double (nullable = true)
 |-- stay_duration: double (nullable = true)



In [23]:
# Fix data types
df_immi=df_immi.withColumn("year", col("year").cast("integer"))\
               .withColumn("month", col("month").cast("integer"))\
               .withColumn("birth_year", col("birth_year").cast("integer"))\
               .withColumn("admission_number", col("admission_number").cast("bigint")) \
               .withColumn("stay_duration", col("stay_duration").cast("integer"))

In [24]:
#drop duplicates
df_immi = df_immi.dropDuplicates()

In [25]:
# check new dataframe
print(df_immi.count())
df_immi.show(5)

2536793
+----+-----+--------------+------------+----------+-------------+----------+------+----------------+-------------+
|year|month|origin_country|airport_code|state_code|travel_reason|birth_year|gender|admission_number|stay_duration|
+----+-----+--------------+------------+----------+-------------+----------+------+----------------+-------------+
|2016|    6|       BELGIUM|         NYC|        NY|     Business|      1968|     M|     61300969733|            8|
|2016|    6|       BELGIUM|         NYC|        NY|     Business|      1964|     F|     61337302733|           14|
|2016|    6|       BELGIUM|         SFR|        CA|     Business|      1979|     M|     97925165330|            7|
|2016|    6|      BULGARIA|         CIN|        KY|     Pleasure|      1961|     F|     97931772730|           85|
|2016|    6|        POLAND|         CHI|        IL|     Pleasure|      1951|     M|     97954536230|            6|
+----+-----+--------------+------------+----------+-------------+-------

__Airport Code Table__ 

Since in the immigration data only airports with an international airport code are given, any rows without an iata code will be removed code from the airport table.

In [26]:
# drop all rows without an IATA code
df_airport = df_airport.dropna(subset=["iata_code"])

In [27]:
# select only data from US 
df_airport = df_airport[df_airport['iso_country']=="US"]

In [28]:
# select only data from "small_airport", "medium_airport", "large_airport"
df_airport = df_airport[df_airport['type'].isin(["small_airport", "medium_airport", "large_airport"])]

In [29]:
# Check if the airport types were updated
df_airport.select("type").dropDuplicates().sort("type").show()

+--------------+
|          type|
+--------------+
| large_airport|
|medium_airport|
| small_airport|
+--------------+



In [30]:
# view dataframe
spark_to_pandas(df_airport.take(5))

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,07FA,small_airport,Ocean Reef Club Airport,8,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
1,0AK,small_airport,Pilot Station Airport,305,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
2,0CO2,small_airport,Crested Butte Airpark,8980,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
3,0TE7,small_airport,LBJ Ranch Airport,1515,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"
4,13MA,small_airport,Metropolitan Airport,418,,US,US-MA,Palmer,13MA,PMX,13MA,"-72.31140136719999, 42.223300933800004"


In [31]:
# select relevant columns to the project
df_airport = df_airport['type', 'name', 'elevation_ft', 'municipality', 'iata_code']

In [32]:
# fix data types
df_airport=df_airport\
.withColumn("elevation_ft", col("elevation_ft").cast("float"))
df_airport.printSchema()

root
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- municipality: string (nullable = true)
 |-- iata_code: string (nullable = true)



In [33]:
# rename columns
df_airport = df_airport.withColumnRenamed("name", "airport_name") \
                       .withColumnRenamed("municipality", "city") \
                       .withColumnRenamed("iata_code", "airport_code") 

In [34]:
# drop null values
df_airpot = df_airport.dropna(subset=['airport_name', 'elevation_ft', 'city','airport_code'])

In [35]:
# drop duplicated rows
df_airport = df_airport.dropDuplicates()

In [36]:
#  View new dataframe
print(df_airport.count())
df_airport.show(5)

1865
+--------------+--------------------+------------+-------------+------------+
|          type|        airport_name|elevation_ft|         city|airport_code|
+--------------+--------------------+------------+-------------+------------+
| small_airport|Pilot Station Air...|       305.0|Pilot Station|         PQS|
| small_airport|       Lampson Field|      1379.0|     Lakeport|         CKE|
|medium_airport|Alamogordo White ...|      4200.0|   Alamogordo|         ALM|
| large_airport|Baltimore/Washing...|       146.0|    Baltimore|         BWI|
| small_airport|Clinton Sherman A...|      1922.0|      Clinton|         CSM|
+--------------+--------------------+------------+-------------+------------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Mapping out the conceptual data model and explain it 

 The data model is a __star schema__ with a fact table (tourist_arrivals) and two dimension tables (immigration and airports) that can be joined onto it. Possible JOINs are indicated with -> other_table.column_name.
 
This schema was chosen because of it's simple to build, visualize and to perform queries to make data analytics the car rental company needs.


__Fact Table__  

tourist_arrivals:
- year
- month
- airport_code
- admission_number
- travel_reason
- stay_duration

__Dimension Tables__  

immigration:
- year
- month
- admission_number >> tourist_arrivals.admission_number
- origin_country
- airport_code
- state_code
- travel_reason
- birth_year
- gender
- stay_duration


airports:
- airport_name
- airpot_code >> tourist_arrivals.airport_code
- type
- city
- state_code
- elevation_ft

#### 3.2 Mapping Out Data Pipelines
Listing the steps necessary to pipeline the data into the chosen data model
- Clean the data on nulls, data types, duplicates, relevance, etc
- Create immigration and airports dimension tables respecivaly from df_immi and df_airport
- Create air_arrivals fact table from df_immi 
- Save processed dimension and fact tables in parquet for query

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [37]:
# Create immigration dimension table 
immigration = df_immi.select("year",
                             "month",
                             "admission_number",
                             "origin_country",
                             "airport_code",
                             "state_code",
                             "travel_reason",
                             "birth_year",
                             "gender",
                             "stay_duration"
                             ).drop_duplicates()

In [38]:
# View created table
spark_to_pandas(immigration.take(5))

Unnamed: 0,year,month,admission_number,origin_country,airport_code,state_code,travel_reason,birth_year,gender,stay_duration
0,2016,6,61300969733,BELGIUM,NYC,NY,Business,1968,M,8
1,2016,6,61337302733,BELGIUM,NYC,NY,Business,1964,F,14
2,2016,6,97925165330,BELGIUM,SFR,CA,Business,1979,M,7
3,2016,6,97931772730,BULGARIA,CIN,KY,Pleasure,1961,F,85
4,2016,6,97954536230,POLAND,CHI,IL,Pleasure,1951,M,6


In [40]:
# Create airport dimension table 
airports = df_airport.select("airport_name",
                             "airport_code",
                             "type",
                             "city",
                             "elevation_ft",
                             ).drop_duplicates()

In [41]:
# View created table
spark_to_pandas(airports.take(5))

Unnamed: 0,airport_name,airport_code,type,city,elevation_ft
0,Pilot Station Airport,PQS,small_airport,Pilot Station,305.0
1,Lampson Field,CKE,small_airport,Lakeport,1379.0
2,Alamogordo White Sands Regional Airport,ALM,medium_airport,Alamogordo,4200.0
3,Baltimore/Washington International Thurgood Ma...,BWI,large_airport,Baltimore,146.0
4,Clinton Sherman Airport,CSM,small_airport,Clinton,1922.0


In [42]:
# create tourist_arrivals fact table
tourist_arrivals = df_immi.select("year", 
                                  "month",
                                  "admission_number",
                                  "airport_code", 
                                  "travel_reason",
                                  "stay_duration").drop_duplicates()

In [44]:
# Viwe fact table created
spark_to_pandas(tourist_arrivals.take(5))

Unnamed: 0,year,month,admission_number,airport_code,travel_reason,stay_duration
0,2016,6,61336552933,NYC,Pleasure,6
1,2016,6,97961067530,ORL,Pleasure,30
2,2016,6,98013697030,BOS,Business,8
3,2016,6,86913554930,SFR,Pleasure,59
4,2016,6,61439980333,NYC,Pleasure,4


In [47]:
# Create temporary views in the Spark Session in order to query them using simple SQL
immigration.createOrReplaceTempView("immigration")
airports.createOrReplaceTempView("airports")
tourist_arrivals.createOrReplaceTempView("tourist_arrivals")

#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [53]:
# Write fact and dimenson tables to parquet
immigration.write.parquet("immigration")
airports.write.parquet("airports_parquet")
tourist_arrivals.write.parquet("tourist_arrivals")

#### 4.2 Data Quality Checks
 
Run Quality Checks

In [48]:
# Perform quality checks to see if tables were created

def check_table(df):
    if df is not None:
        return True
    else:
        return False
        
if check_table(immigration) & check_table(airports) & check_table(tourist_arrivals):
    print("data quality check passed")
    print("dimension tables and fact table exist")
    print()
else:
    print("data quality check failed")
    print("table missing...")

data quality check passed
dimension tables and fact table exist



In [49]:
# Perform quality checks to see if tables contain records
def table_records(df):
    return df.count() != 0 

if table_records(immigration) & table_records(airports) & table_records(tourist_arrivals):
    print("data quality check passed!")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed!")
    print("null records...")

data quality check passed!
dimension tables and fact table contain records



#### 4.3 Data dictionary 


Immigration table

- __year:__ integer (nullable = true) > Year of Immigration
- __month:__ integer (nullable = true) > Month of Immigration
-__admission_number:__ long (nullable = true) > Number of tourist admission at airport
- __origin_country:__ string (nullable = true) > Tourist country origin
- __airport_code:__ string (nullable = true) > City port code where immigrant entered
- __state_code:__ string (nullable = true) > Abbreviated state code
- __travel_reason:__ string (nullable = true) > Reason of travel given by the immigrant
- __birth_year:__ integer (nullable = true) > Year of birth of tourist
- __gender:__ string (nullable = true) > Tourist gender
- __stay_duration:__ integer (nullable = true) > Number of days tourist stayed

Airport table

- __airport_name:__ string (nullable = true) > Name of the airport 
- __airport_code:__ string (nullable = true) > City port code
- __type:__ string (nullable = true) > Type of airport (small, medium or large)
- __city:__ string (nullable = true) > City name where the airport is located
- __elevation_ft:__ float (nullable = true) > Elevation in feet of the airport

Tourist_Arrivals table

- __year:__ integer (nullable = true) > Year of Immigration
- __month:__ integer (nullable = true) > Month of Immigration
- __admission_number:__ long (nullable = true) > Number of tourist admission at airport
- __airport_code:__ string (nullable = true) > City port code where immigrant entered
- __travel_reason:__ string (nullable = true) > Reason of travel given by the immigrant
- __stay_duration:__ integer (nullable = true) Number of days tourist stayed

#### 4.4 Query examples from Data Model

In [51]:
# This query will count air_arrivals and calculate their average stay duration and
# group by airport and travel reason, using only fact table
count_air_arrivals=spark.sql("""SELECT airport_code, travel_reason,
                                    COUNT(admission_number) AS number_arrivals,
                                    cast(AVG(stay_duration) AS DECIMAL(10,2)) AS avg_day_stay
                                    
                                                                                                       
                                    FROM tourist_arrivals
                                    
                                    GROUP BY airport_code, travel_reason
                                    
                                    ORDER BY airport_code LIMIT 20
""")

In [52]:
# View query results as a dataframe
spark_to_pandas(count_air_arrivals.take(5))

Unnamed: 0,airport_code,travel_reason,number_arrivals,avg_day_stay
0,5T6,Pleasure,10,19.8
1,5T6,Business,2,2.0
2,ABQ,Business,1,72.0
3,ACY,Business,4,3.25
4,ACY,Pleasure,1,1.0


In [53]:
# This query will count air_arrivals and calculate their average stay duration and group by city, 
## origin country and travel reason, using fact and dimension tables
count_air_arrivals_origin_country=spark.sql("""SELECT a.city,
                                                    i.origin_country,
                                                    t.travel_reason,                                                    
                                                    COUNT(t.admission_number) AS number_arrivals,
                                                    cast(AVG(t.stay_duration) AS DECIMAL(10,2)) AS avg_day_stay
                                    
                                                                                                       
                                                    FROM tourist_arrivals t JOIN airports a ON t.airport_code = a.airport_code
                                                    JOIN immigration i ON t.admission_number = i.admission_number
                                    
                                                    GROUP BY a.city, i.origin_country, t.travel_reason
                                                    
                                                    ORDER BY a.city LIMIT 10
                                    
""")

In [54]:
spark_to_pandas(count_air_arrivals_origin_country.take(5))

Unnamed: 0,city,origin_country,travel_reason,number_arrivals,avg_day_stay
0,Akron,BAHAMAS,Pleasure,1,6.0
1,Albany,AUSTRIA,Pleasure,2,4.0
2,Albany,IRELAND,Pleasure,1,10.0
3,Albuquerque,FRANCE,Business,1,72.0
4,Alexandria Bay,FRANCE,Pleasure,2,4.0


#### Step 5: Complete Project Write Up

1. To develop this project, I used Apache Spark to read, transform, create and load data in order to make data analysis to the Car Rental Company possible. The reason for this was due to the small amount of data, the size of the data model (star schema with 3 tables)  and the speed of Spark.
2. The data should be updated annually, as soon as they are available by the resources organizations, since tourist behaviors can change differently by destinations (cities)
3. Under the following scenarios, I would approach the problem differently:
- __In case the data was increased by 100x__,  Apache Hadoop could be used to create a distributed processing system for faster processing.
- __In case the data needs to populate a dashboard that must be updated on a daily basis by 7am every day__, Apache Airflow could be used to create a schedule to run a distributed update on all tables with data streamed from the source.
- __In case the database needed to be accessed by 100+ people__, the project could be hosted in a solution in production scale data warehouse in the cloud, like  Amazon AWS, with larger capacity to serve a lot of users, and workload management to ensure equitable usage of resources across users.