# Data Pipeline / Data Warehouse for International Tourism
### Data Engineering Capstone Project
### Matthew Prout

#### Project Summary
The purpose of this project is to use skills I have learned throughout this program to combine data sources into a data warehouse that can be used for further analysis.  For my project, I am using the data sets from the Udacity provided project, and adding another data source for additional insights.

### 1. Scope the Project and Gather Data

#### Scope 
The purpose of this project is to load data into a Redshift analytical database so that can be used to analyze international tourism.  The data warehouse will support queries used to uncover factors that affect international tourism, such as weather (in the US and abroad), the degree to how international cities and states are to drawing tourism, and whether oil prices affect international tourism.  

This will be accomplished in two steps.  The first step will be to clean and explore the data using Apache Spark.  The cleaned data sets will be stored in Parquet files in S3.  The second step will be to run an ETL script which loads the Parquet files from S3 into Redshift.

#### Describe and Gather Data 

Here is a description of the data sources that I am using in the project:

| Data Set | Source |Notes |
| ------ | ------ | ------ |
| I94 Immigration Data | [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html) | A data dictionary I94_SAS_Lablels_Descriptions.SAS describes the contents of this data set. The data set is in an SAS format. |
| World Temperature Data | [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) | There are several data sets for different geographic areas. I am using the one for states. The data file is a time series .csv file. |
| U.S. City Demographic Data | [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) | Contains demogrpahic information for cities such as population, median age, number of each race, population of each sex. The data file is a .csv file. |
| Brent Oil Prices | [Kaggle](https://www.kaggle.com/mabusalah/brent-oil-prices) | Lists the price of Brent oil over time. The file is a .csv file. |

#### Imports and Spark Setup

In [1]:
import configparser
import os
from datetime import datetime, timedelta

In [2]:
from pyspark.sql import SparkSession, types as T
from pyspark.sql.functions import col, isnan, when, count, udf
from pyspark.sql.types import IntegerType

In [3]:
from sql_queries import visit_key, demographics_key, oilprice_key, tempbystate_key

In [4]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

# AWS credentials
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

# S3 bucket
s3_bucket = config.get('S3', 'S3A_BUCKET')

In [5]:
# Spark release to access S3
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

### 2. Explore and Assess the Data

**1. Load the I-94 immigration data**

In [6]:
# Read the data
i94_df = spark.read.parquet("sas_data")

# Select the columns of interest
i94_df = i94_df.select(['cicid', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94visa'])

# Rename the columns
i94_df = i94_df.withColumnRenamed("cicid","id") \
.withColumnRenamed("i94cit","citizen_code") \
.withColumnRenamed("i94res","resident_code") \
.withColumnRenamed("i94port","port") \
.withColumnRenamed("arrdate","arrival_date_days") \
.withColumnRenamed("i94mode","mode") \
.withColumnRenamed("i94addr","destination_state") \
.withColumnRenamed("depdate","departure_date_days") \
.withColumnRenamed("i94visa","visa")

In [7]:
# View the schema
i94_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- citizen_code: double (nullable = true)
 |-- resident_code: double (nullable = true)
 |-- port: string (nullable = true)
 |-- arrival_date_days: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- destination_state: string (nullable = true)
 |-- departure_date_days: double (nullable = true)
 |-- visa: double (nullable = true)



In [8]:
# View the data
i94_df.head()

Row(id=5748517.0, citizen_code=245.0, resident_code=438.0, port='LOS', arrival_date_days=20574.0, mode=1.0, destination_state='CA', departure_date_days=20582.0, visa=1.0)

Notice that the arrival_date_days and departure_date_days fields are doubles.
* These values represent the days since January 1, 1960 and need to be converted to dates

Notice that the port is a code.
* The port needs to be converted to a port name and state.

Notice that citizen_code and resident_code are numbers.
* These need to be converted country names.

Notice that id, mode, and visa are doubles instead of integers.
* Convert these types to integers.

**Explore the I-94 immigration data**

In [9]:
# Number of rows
i94_df.count()

3096313

Observation: Over three million rows in the sample data

In [10]:
# Check for null values
i94_df.select([count(when(col(c).isNull(), c)).alias(c) for c in i94_df.columns]).show()

+---+------------+-------------+----+-----------------+----+-----------------+-------------------+----+
| id|citizen_code|resident_code|port|arrival_date_days|mode|destination_state|departure_date_days|visa|
+---+------------+-------------+----+-----------------+----+-----------------+-------------------+----+
|  0|           0|            0|   0|                0| 239|           152592|             142457|   0|
+---+------------+-------------+----+-----------------+----+-----------------+-------------------+----+



Observation: There are missing 'mode', 'destination_state', and 'departure_date_days' fields.
* The rows where 'mode' is null should be removed, as there are so few, but leave the 'destination_state', and 'departure_date_days' fields alone.

**2. Load the demographic data**

In [5]:
import  pyspark.sql.functions as F
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

In [12]:
demographicSchema = R([
    Fld("City",Str()),
    Fld("State",Str()),
    Fld("MedianAge",Dbl()),
    Fld("Male Population",Int()),
    Fld("Female Population",Int()),
    Fld("Total Population",Int()),
    Fld("Number of Veterans",Int()),
    Fld("Foreign-born",Int()),
    Fld("Average Household Size",Dbl()),
    Fld("State Code",Str()),
    Fld("Race",Str()),
    Fld("Count",Int())
])

In [13]:
# Load and rename the columns
demographics_df = spark.read.csv('data/us-cities-demographics.csv', sep=";", schema=demographicSchema, header=True)
demographics_df = demographics_df.select(col('City').alias('city'),col('State Code').alias('state'),col('MedianAge').alias('median_age'),col('Total Population').alias('total_population'),col('Foreign-born').alias('foreign_born'),col('Race').alias('race'))

Look at one city:

In [14]:
demographics_df.where(demographics_df.city == 'Silver Spring').show()

+-------------+-----+----------+----------------+------------+--------------------+
|         city|state|median_age|total_population|foreign_born|                race|
+-------------+-----+----------+----------------+------------+--------------------+
|Silver Spring|   MD|      33.8|           82463|       30908|  Hispanic or Latino|
|Silver Spring|   MD|      33.8|           82463|       30908|               White|
|Silver Spring|   MD|      33.8|           82463|       30908|Black or African-...|
|Silver Spring|   MD|      33.8|           82463|       30908|American Indian a...|
|Silver Spring|   MD|      33.8|           82463|       30908|               Asian|
+-------------+-----+----------+----------------+------------+--------------------+



Notice that there are 5 rows for each city (one for each race).  In this case, I only want one row, and I do not need the 'race' column.
* Therefore, filter out the rows by choosing one race.

It would be helpful to have a column for the foreign born percent of the population.
* This can be created as a calculated column.

**Explore the demographic data**

In [15]:
# Number of rows for one race
demographics_df.where(demographics_df.race == 'Hispanic or Latino').count()

596

In [16]:
# Check for null values
demographics_df.select([count(when(col(c).isNull(), c)).alias(c) for c in demographics_df.columns]).show()

+----+-----+----------+----------------+------------+----+
|city|state|median_age|total_population|foreign_born|race|
+----+-----+----------+----------------+------------+----+
|   0|    0|         0|               0|          13|   0|
+----+-----+----------+----------------+------------+----+



Observation: There are 13 cities where 'foreign_born' is null.
* Just remove these rows.

**3. Load the temperature data**

In [17]:
temperatureSchema = R([
    Fld("dt",Date()),
    Fld("AverageTemperature",Dbl()),
    Fld("AverageTemperatureUncertainty",Dbl()),
    Fld("State",Str()),
    Fld("Country",Str())
])

In [18]:
temperatures_df = spark.read.csv('data/GlobalLandTemperaturesByState.csv', schema=temperatureSchema, header=True)

In [19]:
# Select the columns of interest
temperatures_df = temperatures_df.select(['dt', 'AverageTemperature', 'State', 'Country'])

# Rename the columns
temperatures_df = temperatures_df.withColumnRenamed("dt","date") \
.withColumnRenamed("AverageTemperature","average_temperature") \
.withColumnRenamed("State","state") \
.withColumnRenamed("Country","country")

In [20]:
# View the data
temperatures_df.head()

Row(date=datetime.date(1855, 5, 1), average_temperature=25.544, state='Acre', country='Brazil')

Note that for considering US states, 'state' should be converted to a 2 letter abbreviation in order to join with other data sets.
* When doing analytical queries in Redshift, be sure to abbreviate US state codes by joining with the StateCodes table

**Explore the temperature data**

In [21]:
temperatures_df.count()

645675

In [22]:
# Check for null values
temperatures_df.select([count(when(col(c).isNull(), c)).alias(c) for c in temperatures_df.columns]).show()

+----+-------------------+-----+-------+
|date|average_temperature|state|country|
+----+-------------------+-----+-------+
|   0|              25648|    0|      0|
+----+-------------------+-----+-------+



There are many temperature measurements which are null.  Many of the null values are due to missing measurements for dates in the 18th and 19th centuries.  These early measurements can be ignored and instead use more recent temperature trends which are more complete.

In [23]:
# Take a look at the date range:
temperatures_df.agg(F.min('date'), F.max('date')).show()

+----------+----------+
| min(date)| max(date)|
+----------+----------+
|1743-11-01|2013-09-01|
+----------+----------+



Note that the measurements for 2013 are not complete. Get a recent range of dates after the year 2000.
* Filter the measurements for dates after 1999 and the last full month in 2013.

**4. Load the oil price data**

In [24]:
oil_df = spark.read.csv('data/BrentOilPrices.csv', header=True)
oil_df = oil_df.withColumnRenamed("Date","date").withColumnRenamed("Price","price")

Note that the data has two formats for the date, so create two datasets with different date formats, and then combine.  
This will be done here instead of in the cleaning section, as we also want to explore the data here.

In [25]:
# Part of the data has the date format as: dd-MMM-yy
oil_df1 = oil_df.select(F.to_date(col('date'), 'dd-MMM-yy').alias('date'), col('price'))

# The remaining data has the date format as: MMM dd, yyyy
oil_df2 = oil_df.select(F.to_date(col('date'), 'MMM dd, yyyy').alias('date'), col('price'))

In [26]:
# Remove rows that are null due to the wrong date format
oil_df1 = oil_df1.filter(oil_df1.date.isNotNull())
oil_df2 = oil_df2.filter(oil_df2.date.isNotNull())

In [27]:
# Combine the datasets
oil_df = oil_df1.union(oil_df2)

In [28]:
# View the data
oil_df.head()

Row(date=datetime.date(1987, 5, 20), price='18.63')

**Explore the oil data**

In [29]:
oil_df.count()

8360

In [30]:
# Check for null values
oil_df.select([count(when(col(c).isNull(), c)).alias(c) for c in oil_df.columns]).show()

+----+-----+
|date|price|
+----+-----+
|   0|    0|
+----+-----+



### Cleaning Steps

Load lookup tables

In [31]:
country_codes_df = spark.read.csv('data/CountryCodes.csv', header=True).withColumn("code",col("code").cast(IntegerType()))
state_codes_df = spark.read.csv('data/StateCodes.csv', header=True)
port_codes_df = spark.read.csv('data/PortCodes.csv', header=True)

**1. Clean the immigration data**

In [32]:
# Convert columns to dates using a user-defined function

def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

# Rename columns and drop unused columns
i94_df = i94_df.withColumn('arrival_date', udf_datetime_from_sas("arrival_date_days")) \
.withColumn('departure_date', udf_datetime_from_sas("departure_date_days"))

i94_df = i94_df.drop('arrival_date_days', 'departure_date_days')

# Convert 'mode' and 'visa' from double to integer types
i94_df = i94_df.withColumn("id",col("id").cast(IntegerType())).withColumn("mode",col("mode").cast(IntegerType())).withColumn("visa",col("visa").cast(IntegerType()))

# Drop the 'mode' rows that are null
i94_df = i94_df.filter(i94_df.mode.isNotNull())

In [33]:
i94_df.head(1)

[Row(id=5748517, citizen_code=245.0, resident_code=438.0, port='LOS', mode=1, destination_state='CA', visa=1, arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 8))]

In [34]:
# The following query will perform the following steps:

# 1. Convert citizen_code and resident_code to countries
# 2. Convert port to port name and port state

# Create temporary tables
i94_df.createOrReplaceTempView('i94')
country_codes_df.createOrReplaceTempView('country_codes')
port_codes_df.createOrReplaceTempView('port_codes')

# Perform the Spark SQL query
i94_df = spark.sql("""
    SELECT id, citizen, resident, port_codes.port AS port_name, port_codes.state AS port_state, mode, destination_state, visa, arrival_date, departure_date
    FROM (
        SELECT id, citizen, country_codes.country AS resident, port, mode, destination_state, visa, arrival_date, departure_date
        FROM (
            SELECT id, country_codes.country AS citizen, resident_code, port, mode, destination_state, visa, arrival_date, departure_date
            FROM i94 LEFT JOIN country_codes
            ON i94.citizen_code = country_codes.code
        ) A
        LEFT JOIN country_codes
        ON A.resident_code = country_codes.code
    ) B
    LEFT JOIN port_codes ON B.port = port_codes.code
    """)

In [35]:
# Keep only tourist visas
i94_df = i94_df.filter(i94_df.visa == 2)

In [39]:
i94_df.head(5)

[Row(id=5748522, citizen='CHINA, PRC', resident='NEW ZEALAND', port_name='HONOLULU', port_state='HI', mode=1, destination_state='HI', visa=2, arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 5)),
 Row(id=5748523, citizen='CHINA, PRC', resident='NEW ZEALAND', port_name='HONOLULU', port_state='HI', mode=1, destination_state='HI', visa=2, arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 12)),
 Row(id=5748524, citizen='CHINA, PRC', resident='NEW ZEALAND', port_name='HONOLULU', port_state='HI', mode=1, destination_state='HI', visa=2, arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 12)),
 Row(id=5748525, citizen='CHINA, PRC', resident='NEW ZEALAND', port_name='HOUSTON', port_state='TX', mode=1, destination_state='FL', visa=2, arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 7)),
 Row(id=5748526, citizen='CHINA, PRC', resident='NEW ZEALAND', port_name='LOS ANGELES', port_st

In [44]:
i94_df.write.mode('overwrite').parquet(os.path.join(s3_bucket,'i94'))

**2. Clean the demographic information**

In [45]:
# Keep the demographic information for just one race (the information is duplicated)
demographics_df = demographics_df.where(demographics_df.race == 'Hispanic or Latino')

# Filter out rows where 'foreign_born' is null
demographics_df = demographics_df.filter(demographics_df.foreign_born.isNotNull())

# Drop the race column and create a calculated column for 'foreign_born_pct'
demographics_df = demographics_df.withColumn('foreign_born_pct', col('foreign_born')/col('total_population')).drop('race', 'foreign_born')

# Add the year column (constant as all this data comes from 2015)
demographics_df = demographics_df.withColumn('year', F.lit('2015').cast(IntegerType()))

In [46]:
demographics_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- foreign_born_pct: double (nullable = true)
 |-- year: integer (nullable = true)



In [47]:
demographics_df.head(5)

[Row(city='Silver Spring', state='MD', median_age=33.8, total_population=82463, foreign_born_pct=0.3748105210821823, year=2015),
 Row(city="O'Fallon", state='MO', median_age=36.0, total_population=85032, foreign_born_pct=0.03844435036221658, year=2015),
 Row(city='Folsom', state='CA', median_age=40.9, total_population=76368, foreign_born_pct=0.17329247852503665, year=2015),
 Row(city='Wichita', state='KS', median_age=34.6, total_population=389955, foreign_born_pct=0.10326832583246785, year=2015),
 Row(city='Sparks', state='NV', median_age=36.1, total_population=96098, foreign_born_pct=0.16327082769672627, year=2015)]

In [48]:
demographics_df.write.mode('overwrite').parquet(os.path.join(s3_bucket,'demographics'))

**3. Clean the temperature information**

In [49]:
# Get temperatures after 2000, and up to the last complete month
temperatures_2000_2013_df = temperatures_df.filter( (temperatures_df.date >= F.to_date(F.lit('2000-01-01')).cast(T.DateType())) &\
                                                    (temperatures_df.date < F.to_date(F.lit('2013-09-01')).cast(T.DateType())) )

In [50]:
# Check for null values
temperatures_2000_2013_df.select([count(when(col(c).isNull(), c)).alias(c) for c in temperatures_2000_2013_df.columns]).show()

+----+-------------------+-----+-------+
|date|average_temperature|state|country|
+----+-------------------+-----+-------+
|   0|                  0|    0|      0|
+----+-------------------+-----+-------+



There are no null values in the most recent complete range of years.

Get the average monthly temperature for each city. This requires aggregating by month for each city.

In [51]:
# Create year and month fields to aggregate on
temperatures_2000_2013_df = temperatures_2000_2013_df.withColumn('year', F.year('date')).withColumn('month', F.month('date'))

# Perform the aggregation
temperatures_2000_2013_df = temperatures_2000_2013_df.groupBy('year', 'month', 'country', 'state').agg(F.avg('average_temperature').alias('monthly_average_temp'))

In [52]:
temperatures_2000_2013_df.head(5)

[Row(year=2011, month=4, country='Brazil', state='Acre', monthly_average_temp=26.264),
 Row(year=2000, month=4, country='Russia', state='Adygey', monthly_average_temp=13.386),
 Row(year=2002, month=11, country='Brazil', state='Alagoas', monthly_average_temp=27.194000000000003),
 Row(year=2000, month=12, country='Brazil', state='Amazonas', monthly_average_temp=26.776999999999997),
 Row(year=2000, month=3, country='Russia', state='Amur', monthly_average_temp=-13.56)]

In [53]:
temperatures_2000_2013_df.write.mode('overwrite').parquet(os.path.join(s3_bucket,'temperatures'))

**4. Clean the oil information**

Get the average monthly oil price. This requires aggregating by month.

In [54]:
# Create year and month fields to aggregate on
oil_df = oil_df.withColumn('year', F.year('date')).withColumn('month', F.month('date'))

# Perform the aggregation
oil_df = oil_df.groupBy('year', 'month').agg(F.avg('price').alias('monthly_average_price'))

In [55]:
oil_df.head(5)

[Row(year=1990, month=7, monthly_average_price=17.16909090909091),
 Row(year=1997, month=11, monthly_average_price=19.174210526315786),
 Row(year=1987, month=10, monthly_average_price=18.75772727272728),
 Row(year=1998, month=2, monthly_average_price=14.0695),
 Row(year=1995, month=12, monthly_average_price=17.92526315789474)]

In [56]:
oil_df.write.mode('overwrite').parquet(os.path.join(s3_bucket,'oilprices'))

### 3. Define the Data Model

#### 3.1 Conceptual Data Model

The data model is a star schema.  Here is a diagram of the data model:

![Schema](./images/Schema.png)

The star schema was chosen as it is an efficient design for performing OLAP queries (such as joins and aggregations), which is the purpose of this particular data warehouse.

**Fact Table**

The fact table `visit` is the I-94 immigration data, which consists of a large number of travel events (in the millions) from international travelers to the United States.  Of interest are what country the visitor is arriving from, where they are arriving, what is their mode of transportation, and when they depart.  The visa field indicates whether the travel was for business or pleasure, and this field will be used to filter out only travelers for pleasure (tourism).

**Dimension Tables**

A `datetime` table is a dimension table for the dates used by the `visit` table.  

A `tempbystate` table is a dimension table for the average monthly temperature for states for different countries.  This can be used to explore the correlation between source and destination travel locations with temperature.  

`demographics` is a dimension table for the demographics for cities in the United States.  In this case, I am keeping the total population, median age, and the total foreign born percent.  This can be used to explore the relationship between demographics and international tourism.  Note that this information is at the city level, and needs to be rolled up to the state level for the analysis, since visitor information is at the state level.  Also note that the data is for the year 2015.  If this table is used in the analysis, it should only be used in queries where the date is within a few years from 2015.

`oilprice` is a dimension table for Brent oil prices in dollars.  This can be used to explore the relationship between oil prices and international travel.  For instance, when oil prices are low, this may influence the cost of travel, which may also influence international travel.

#### 3.2 Mapping Out Data Pipelines

**Simplified Pipeline for the Capstone Project**

The following steps are necessary to extract, transform, and load (ETL) the data into the data warehouse:
1. Spark is used to load and transform the data into the correct schema.  The data is stored in Parquet format in S3.
2. The tables (staging and regular tables) in the data warehouse are created. This should only have to be done once in production.
3. The Redshift script to copy parquet to staging tables is executed, followed by the script to insert the data from the staging tables into the fact and dimension tables.
5. Finally, data quality checks are run to verify pipeline.

Here is a diagram of the pipeline for my project:

![Simplified Pipeline](images/CapstonePipelineSimple.png)

These preconditions must be true before running the pipeline:
* An S3 bucket has been configured to write the Parquet files into.
* A Redshift cluster has created.

**Production Pipeline for the Capstone Project**

In a production environment several changes are necessary for the pipeline, as the Udacity workspace will not be available:
1. The data files will need to reside on S3 or some other server
2. Spark will need to run on its own EMR cluster
3. The workflow will not be run by a person using a Jupyter notebook, but instead a workflow tool such as Airflow will need to orchestrate the pipeline.

Below is a diagram of a pipeline which can be used in a production environment:

![Production Pipeline](images/CapstonePipelineProduction.png)

### 4. Run Pipelines to Model the Data 

#### 4.1 Create the data model

In [6]:
from create_tables import imp_create_tables

# Create the staging and fact/dimension tables in the data warehouse.  Note: This should only be done once in production.
imp_create_tables()

#### 4.2 Load the staging tables, then update the fact and dimension tables with this data.  
Note: These steps implement a work-around for the lack of upsert in Redshift.

In [7]:
from etl import imp_update_tables

# Load data into the staging tables, and then update the fact and dimension tables from them
imp_update_tables()


COPY staging_visit
FROM 's3://dataengineering-nano-dwh-bucket/i94/'
IAM_ROLE 'arn:aws:iam::829000336786:role/dwhRole'
FORMAT AS PARQUET;

COPY staging_demographics
FROM 's3://dataengineering-nano-dwh-bucket/demographics/'
IAM_ROLE 'arn:aws:iam::829000336786:role/dwhRole'
FORMAT AS PARQUET;

COPY staging_oilprice
FROM 's3://dataengineering-nano-dwh-bucket/oilprices/'
IAM_ROLE 'arn:aws:iam::829000336786:role/dwhRole'
FORMAT AS PARQUET;

COPY staging_tempbystate
FROM 's3://dataengineering-nano-dwh-bucket/temperatures/'
IAM_ROLE 'arn:aws:iam::829000336786:role/dwhRole'
FORMAT AS PARQUET;


#### 4.3 Data Quality Checks

There are several data quality checks added to the pipeline.  
First, where possible, NULL constraints have been added to tables in the database.  Primary keys have also been added to the `visit` and `datetime` tables to ensure uniqueness.  
Second, after data is loaded into the database, quality checks are also run to verify that the tables in the database are not empty, and also that the latest values from the staging tables were copied over.

In [8]:
from quality_checks import imp_run_checks

# Run data quality checks
imp_run_checks()

Quality check on visit table passed
Quality check on datetime table passed
Quality check on demographics table passed
Quality check on oilprice table passed
Quality check on tempbystate table passed


#### 4.3 Data dictionary 

**Visit**  
Description: This table records visits from international travelers.  
Source: US National Tourism and Trade Office  

| | |
| -- | -- |
| id | The unique ID of the visit. |
| citizen | The country where the visitor is a citizen. |
| resident | The country where the visitor resides. |
| port_name | The name of the entry port for the visitor. |
| port_state | The state where the entry port is located. |
| arrival_date | When the visitor arrived. |
| mode | The mode of transportation that the visitor took. 1 = 'Air', 2 = 'Sea', 3 = 'Land', 9 = 'Not reported' |
| destination_state | The destination state of the visitor. |
| departure_date | When the visitor departed. |
| visa | The type of visa. Types: 1 = Business, 2 = Pleasure, 3 = Student.  This has been filtered for only Pleasure (2). |


**DateTime**  
Description: This table is a denormalization of the times found in the visit table.  
Source: From the Visit table (above)  

| | |
| -- | -- |
| event | The time of the event. |
| year | The year of the event. |
| month | The month of the event. |
| day | The day of the month of the event. |
| weekday | Whether the event was a weekday (Mon-Fri). |
| weekend | Whether the event was a weekend (Sat-Sun). | 

**Demographics**  
Description: Demographics about the states in the US.  
Source: OpenSoft  

| | | 
|--|--|
| year | The year that the survey was taken. |
| city | The city of for the statistics. |
| median_age | The median age of the city. |
| total_population | The population of the city. |
| foreign_born_pct | The percent of the population that was foreign born. |

**TempByState**  
Description: A time series of average temperatures for each state in a country. The date range is from Jan 1, 2000 to the last complete month of measurements in 2013.  
Source: Kaggle  

| | |
| -- | -- |
| country | The country of the measurement. |
| state | The state of the measurement. |
| year | The year of the measurement. |
| month | The month of the measurement. |
| monthly_average_temp | The average temperature for that month. |


**OilPrice**  
Description: A time series of Brent oil prices.  
Source: Kaggle  

| | |
| -- | -- |
| year | The year of the measurement. |
| month | The month of the measurement. |
| monthly_average_price | The average price of oil for that month. |
