# Udacity Capstone: US Tourism Data Lake
## Data Engineering Capstone Project
The Project is an exercise to practice what I have learnt through Udacity Data Engineering Program.
I chose to use given datasets to build a Data Lake.

## Structure of the Project

Following the Udacity guide for this project, the structure is as shown below:

- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up


In [1]:
# Do all imports and installs here
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, regexp_replace, countDistinct, \
                                  year, month, col, mean, desc, udf, first

### Step 1: Scope the Project and Gather Data

#### Scope 
Scenario: Tourism Agency Sales team would like to plan their advertisement budget by Analytic department's recommendation. The relation between the US cities and visitor's profile is the key value for Sales team. Data Analyst and Data Scientists define their goal to find out:
   - Correlation between US Cities temperature and Visitor's Residency Country temperature
   - Correlation between US Cities demography and Visitor's Nationality
   - How to categorise the Visitors profile by US cities
   - Is there trend (weekly, monthly, quarterly) 

Data Engineer makes a data lake with a proper data model that makes easy to understand and access.

#### Describe and Gather Data 

In order to satisfy the goal, it requires information about US visitors, US Cities demography, the temperature of US cities and other Countries.

#### Data Source:
   - I94 Immigration: Non-U.S. Resident Visitor Arrivals to the United States are registered and anonymous information are available in National Travel and Tourism Office website(https://www.trade.gov/i-94-arrivals-program) The immigration data includes massive information and millions of visitors’ registry per month. This dataset is a major source to extract Visitors profiles. 
   - I94 SAS LABELS DESCRIPTIONS: This is given by the project, It is an SAS label description file that includes I94 Immigration datasets description and supplementary information about the codes used in the datasets. 
   - World Temperature Data: This dataset came from Kaggle. The data source is (https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). The dataset includes Country, City average temperature as daily basis. This dataset is useful to extract average temperature for US cities and Visitors' countries. 
   - U.S. City Demographic Data: This data comes from OpenSoft. The data source is (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). The dataset includes population details and different races count for each US cities. This dataset is useful to extract Cities demography.
   
#### Tools
It is written in python programming language with pyspark libraries to get benefit of Apache Spark. This solution is designed to run on Amazon Web Services cloud and to store Apache Parquet files on AWS S3 Buckets. 

In [2]:
# Build a spark session
spark = SparkSession \
    .builder \
    .config("spark.jars.repositories",
            "https://repos.spark-packages.org/") \
    .config("spark.jars.packages",
            "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
# Read immigration file
immigration_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigrations = spark.read.format('com.github.saurfang.sas.spark') \
        .load(immigration_file)

In [4]:
# Read temperatures data file
temperature_file = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperatures = spark.read.format('csv') \
                            .option("header", True) \
                            .load(temperature_file)

In [5]:
# Read cities demographic data file
cities_demographic_file = './us-cities-demographics.csv'
df_cities_demographic = spark.read.format('csv') \
    .options(header='True', delimiter=';').load(cities_demographic_file)

In [6]:
# Read i94 sas labels descriptions file
i94_sas_labels_descriptions_file = './I94_SAS_Labels_Descriptions.SAS'
text_i94_sas_labels_descriptions = open(i94_sas_labels_descriptions_file,'r').read()

### Step 2: Explore and Assess the Data
#### Explore the Data 
In order to identify data quality issues, these spark functions are used:
- show(10) first 10 rows to review
- printSchema() to observe data types
- summary().show() to basic data statistic to understand dataset.
- countDistinct() to count unique values on spesific columns.

#### Cleaning Steps
In order to clean the data, these spark functions are used:
- na.drop() to drop rows that has na values on specified columns.
- filter() to narrow down data to target information.
- dropDuplicates() to drop duplicated row based on specifide columns.
- isNotNull() to filter out null rows for targeted columns.

#### TEMPERATURES DATA

In [7]:
# Exploration and cleaning
df_temperatures.show(10)
df_temperatures.printSchema()
df_temperatures.describe().show()

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

In [8]:
# Drop rows for NA value in specific columns
df_temperatures = df_temperatures.na.drop(subset=["dt",
                                                      "AverageTemperature",
                                                      "City",
                                                      "Country"])

In [9]:
# generate year and month columns from dt column
df_temperatures = df_temperatures.withColumn("year", year(col("dt"))) \
                                 .withColumn("month", month(col("dt")))

# process to generate temperatures_us_cities
# filter 'United States' from Country Column
df_temperatures_us_cities = df_temperatures.filter(
                                            "Country == 'United States'")

# aggregate temperature dataframe by year, month and city
df_temperatures_us_cities = df_temperatures_us_cities \
    .groupby("year", "month", "City") \
    .agg(mean(df_temperatures_us_cities.AverageTemperature
              .cast('decimal')).alias('average_temperature')) \
    .orderBy(desc('year'), desc('month'), desc('City'))

# Show first 10 rows
df_temperatures_us_cities.show(10)

+----+-----+----------------+-------------------+
|year|month|            City|average_temperature|
+----+-----+----------------+-------------------+
|2013|    9|         Yonkers|            17.0000|
|2013|    9|       Worcester|            16.0000|
|2013|    9|   Winston Salem|            22.0000|
|2013|    9|         Windsor|            18.0000|
|2013|    9|   Wichita Falls|            26.0000|
|2013|    9|         Wichita|            24.0000|
|2013|    9|     Westminster|            19.0000|
|2013|    9|West Valley City|            20.0000|
|2013|    9|     West Jordan|            20.0000|
|2013|    9|     West Covina|            23.0000|
+----+-----+----------------+-------------------+
only showing top 10 rows



In [10]:
# process to generate temperatures_countries
# filter out 'United States' from Country column
df_temperatures_countries = df_temperatures \
    .filter("Country != 'United States'")

# aggregate temperature dataframe by year, month and country
df_temperatures_countries = df_temperatures_countries \
    .groupby('year', 'month', 'Country') \
    .agg(mean(df_temperatures_countries.AverageTemperature.cast('decimal'))
         .alias('average_temperature')) \
    .orderBy(desc('year'), desc('month'), desc('Country'))

# Show first 10 rows
df_temperatures_countries.show(10)

+----+-----+------------------+-------------------+
|year|month|           Country|average_temperature|
+----+-----+------------------+-------------------+
|2013|    9|       Puerto Rico|            28.0000|
|2013|    9|         Nicaragua|            28.5000|
|2013|    9|            Mexico|            23.7526|
|2013|    9|           Jamaica|            28.6667|
|2013|    9|          Honduras|            26.8000|
|2013|    9|             Haiti|            28.2000|
|2013|    9|         Guatemala|            22.0000|
|2013|    9|       El Salvador|            26.8333|
|2013|    9|Dominican Republic|            28.2222|
|2013|    9|              Cuba|            28.5000|
+----+-----+------------------+-------------------+
only showing top 10 rows



#### CITIES DEMOGRAPHIC DATA

In [11]:
# Exploration and cleaning
df_cities_demographic.show(10)
df_cities_demographic.printSchema()
df_cities_demographic.summary().show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [12]:
# Drop rows for NA value in columns
df_cities_demographic = df_cities_demographic.na.drop()

In [13]:
# Aggregate cities demographic dataframe by Race with Count
df_cities_demographic = df_cities_demographic \
    .groupby('State', 'City', 'Median Age', 'Male Population',
             'Female Population', 'Total Population', 'Number of Veterans',
             'Foreign-born', 'Average Household Size', 'State Code') \
    .pivot('Race').agg(first('Count')).orderBy(desc('State'), desc('City'))

df_cities_demographic.show(10)
df_cities_demographic.summary().show()

+----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|     State|      City|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
| Wisconsin|  Waukesha|      35.8|          36263|            35713|           71976|              3116|        4982|                  2.35|        WI|                              481| 2904|                     1923|              9713| 66757|
| Wisconsin|    Racine| 

#### i94 SAS LABELS DESCRIPTIONS DATA

In [14]:
# i94 SAS LABELS DESCRIPTIONS exploration and cleaning
dictionary_i94_sas_labels_descriptions = {}
list_i94_sas_labels_descriptions = text_i94_sas_labels_descriptions \
    .replace('\t', '').replace('    ', '').replace('   ', '') \
    .replace('  ', '').replace(' \'', '\'').replace('\'', '') \
    .replace('$', '').replace('\n', '|').split('value ')

for line in list_i94_sas_labels_descriptions:
    line = line.split(';')[0].split("|")
    dictionary_i94_sas_labels_descriptions[
        line[0]] = [x.split("=") for x in line[0:] if "=" in x]

In [15]:
# Create I94 COUNTRY dataframe with spark
df_i94_country = spark \
    .createDataFrame(dictionary_i94_sas_labels_descriptions['i94cntyl'],
                     ('country_code', 'country'))

df_i94_country.show(10)

+------------+--------------------+
|country_code|             country|
+------------+--------------------+
|        582 |MEXICO Air Sea, a...|
|        236 |         AFGHANISTAN|
|        101 |             ALBANIA|
|        316 |             ALGERIA|
|        102 |             ANDORRA|
|        324 |              ANGOLA|
|        529 |            ANGUILLA|
|        518 |     ANTIGUA-BARBUDA|
|        687 |           ARGENTINA|
|        151 |             ARMENIA|
+------------+--------------------+
only showing top 10 rows



In [16]:
# Create I94 STATE dataframe
df_i94_state = spark \
    .createDataFrame(dictionary_i94_sas_labels_descriptions['i94addrl'],
                     ('state_code', 'state'))

df_i94_state.show(10)

+----------+-----------------+
|state_code|            state|
+----------+-----------------+
|        AL|          ALABAMA|
|        AK|           ALASKA|
|        AZ|          ARIZONA|
|        AR|         ARKANSAS|
|        CA|       CALIFORNIA|
|        CO|         COLORADO|
|        CT|      CONNECTICUT|
|        DE|         DELAWARE|
|        DC|DIST. OF COLUMBIA|
|        FL|          FLORIDA|
+----------+-----------------+
only showing top 10 rows



In [17]:
# Create I94 PORT dataframe
df_i94_port = spark \
    .createDataFrame(dictionary_i94_sas_labels_descriptions['i94prtl'],
                     ('port_code', 'port'))
df_i94_port = df_i94_port \
    .withColumn('city', split(df_i94_port['port'], ',')[0]) \
    .withColumn('state_code',
                regexp_replace(
                    split(df_i94_port['port'], ',')[1], ' ', '')[0:2])

df_i94_port.show(10)

+---------+--------------------+--------------------+----------+
|port_code|                port|                city|state_code|
+---------+--------------------+--------------------+----------+
|      ALC|           ALCAN, AK|               ALCAN|        AK|
|      ANC|       ANCHORAGE, AK|           ANCHORAGE|        AK|
|      BAR|BAKER AAF - BAKER...|BAKER AAF - BAKER...|        AK|
|      DAC|   DALTONS CACHE, AK|       DALTONS CACHE|        AK|
|      PIZ|DEW STATION PT LA...|DEW STATION PT LA...|        AK|
|      DTH|    DUTCH HARBOR, AK|        DUTCH HARBOR|        AK|
|      EGL|           EAGLE, AK|               EAGLE|        AK|
|      FRB|       FAIRBANKS, AK|           FAIRBANKS|        AK|
|      HOM|           HOMER, AK|               HOMER|        AK|
|      HYD|           HYDER, AK|               HYDER|        AK|
+---------+--------------------+--------------------+----------+
only showing top 10 rows



In [18]:
# Create I94 MODE dataframe
df_i94_mode = spark \
    .createDataFrame(dictionary_i94_sas_labels_descriptions['i94model'],
                     ('travel_mode_code', 'travel_mode'))

df_i94_mode.show(10)

+----------------+-------------+
|travel_mode_code|  travel_mode|
+----------------+-------------+
|              1 |          Air|
|              2 |          Sea|
|              3 |         Land|
|              9 |Not reported |
+----------------+-------------+



#### i94 IMMIGRATIONS DATA

In [19]:
# Exploration and cleaning
df_immigrations.show(10)
df_immigrations.printSchema()
df_immigrations.summary().show()
df_immigrations.agg(countDistinct('cicid')).show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [20]:
# Show statistic by visa categories, 1.0 = Business, 2.0 = Pleasure, 3.0 = Student
df_immigrations.groupBy('I94VISA').count().show()

# Filter i94visa by 2.0 for pleasure visiting 
# due to the fact that our target profile is tourist
df_immigrations = df_immigrations.filter("I94VISA==2.0")

+-------+-------+
|I94VISA|  count|
+-------+-------+
|    1.0| 522079|
|    3.0|  43366|
|    2.0|2530868|
+-------+-------+



In [21]:
# Drop rows for na value in columns
df_immigrations = df_immigrations.na \
    .drop(how='any',
          subset=["cicid", "i94res", "i94port", "arrdate", "i94mode"])

df_immigrations.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  6.66643185E8|   93|      B2|
| 16.0|2016.0|   4.0| 101.0| 101.0|    NYC|20545.0|    1.0|     MA|20567.0|  28.0|    2.0|  1.0|20160401|    nul

In [22]:
# Convert sasdate to datetime 
SASdate_to_datetime = udf(lambda z: (timedelta(days=z) +
                          datetime(1960, 1, 1)).strftime('%Y-%m-%d'))
df_immigrations = df_immigrations \
    .withColumn("arrival_date", SASdate_to_datetime(col("arrdate")))

df_immigrations.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  6.66643185E8|   93|      B2|  2016-04-01|
| 16.0|2016.0|   4.0| 101.0| 101.0|    NYC|20545.0|    1.0| 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Star Schema model has been applied due to the fact that Star Schema makes analysis easier for our Data Analytic team. 

There is one Fact table in center with information extract from Immigration data. 
Dimension tables have connection to Fact table with uniq key set. Dimention tables could be used to enrich the fact table by join queries. 

<img src="./Udacity_Capstone_US_Tourism_Data_Lake_data_model.png">

In [23]:
# Generate fact_i94_us_visitor_arrivals_table from df_immigrations
fact_i94_us_visitor_arrivals_table = df_immigrations \
    .filter(df_immigrations.cicid.isNotNull() &
            df_immigrations.i94res.isNotNull() &
            df_immigrations.i94port.isNotNull() &
            df_immigrations.arrival_date.isNotNull() &
            df_immigrations.i94mode.isNotNull()) \
    .selectExpr([
        "cast(cicid as long)",
        "cast(i94yr as int) arrival_year",
        "cast(i94mon as int) arrival_month",
        "cast(arrival_date as date) arrival_date",
        "cast(i94port as string) arrival_port_code",
        "cast(i94cit as int) visitor_birth_country",
        "cast(i94res as int) visitor_residence_country",
        "cast(i94bir as int) visitor_age",
        "cast(gender as string) visitor_gender",
        "cast(i94mode as int) travel_mode_code",
        "cast(airline as string) travel_airline",
        "cast(visatype as string) travel_visa_type"
    ]).dropDuplicates(["cicid"])

# Print Schema
fact_i94_us_visitor_arrivals_table.printSchema()

# write fact_i94_us_visitor_arrivals_table to parquet files
# partitioned by arrival_year and arrival_month
fact_i94_us_visitor_arrivals_table.write.mode("overwrite") \
    .partitionBy("arrival_year", "arrival_month") \
    .parquet("./tables/fact_i94_us_visitor_arrivals_table.parquet")

root
 |-- cicid: long (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- arrival_port_code: string (nullable = true)
 |-- visitor_birth_country: integer (nullable = true)
 |-- visitor_residence_country: integer (nullable = true)
 |-- visitor_age: integer (nullable = true)
 |-- visitor_gender: string (nullable = true)
 |-- travel_mode_code: integer (nullable = true)
 |-- travel_airline: string (nullable = true)
 |-- travel_visa_type: string (nullable = true)



In [24]:
# Generate dim_date_table from df_immigrations dataframe
dim_date_table = df_immigrations \
    .filter(df_immigrations.arrival_date.isNotNull()) \
    .selectExpr([
        "cast(arrival_date as date) arrival_date",
        "cast(dayofmonth(arrival_date) as int) day",
        "cast(weekofyear(arrival_date) as int) week",
        "cast(month(arrival_date) as int) month",
        "cast(year(arrival_date) as int) year",
        "cast(quarter(arrival_date) as int) quarter"
    ]).dropDuplicates(["arrival_date"])

# Print Schema
dim_date_table.printSchema()

# write dim_date_table to parquet files partitioned by year and month
dim_date_table.write.mode("overwrite").partitionBy("year", "month") \
    .parquet("./tables/dim_date_table.parquet")

root
 |-- arrival_date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)



In [25]:
# generate dim_temperatures_us_cities_table from
# df_temperatures_us_cities dataframe
dim_temperatures_us_cities_table = df_temperatures_us_cities \
    .filter(df_temperatures_us_cities.year.isNotNull() &
            df_temperatures_us_cities.month.isNotNull() &
            df_temperatures_us_cities.City.isNotNull()) \
    .selectExpr([
        "cast(year as int)",
        "cast(month as int)",
        "cast(city as string)",
        "cast(average_temperature as decimal(4,2))"
        ]).dropDuplicates(["year", 'month', 'city'])

# write dim_temperatures_us_cities_table to parquet files
# partitioned by year and month
dim_temperatures_us_cities_table.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("./tables/dim_temperatures_us_cities_table.parquet")

# Print Schema
dim_temperatures_us_cities_table.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- average_temperature: decimal(4,2) (nullable = true)



In [26]:
# generate dim_temperatures_countries_table from
# df_temperatures_countries dataframe
dim_temperatures_countries_table = df_temperatures_countries \
    .filter(df_temperatures_countries.year.isNotNull() &
            df_temperatures_countries.month.isNotNull() &
            df_temperatures_countries.Country.isNotNull()) \
    .selectExpr([
        "cast(year as int)",
        "cast(month as int)",
        "cast(country as string)",
        "cast(average_temperature as decimal(4,2))"
        ]).dropDuplicates(["year", 'month', 'country'])

# write dim_temperatures_countries_table to parquet files
# partitioned by year and month
dim_temperatures_countries_table.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("./tables/dim_temperatures_countries_table.parquet")

# Print Schema
dim_temperatures_countries_table.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- average_temperature: decimal(4,2) (nullable = true)



In [27]:
# generate dim_cities_demographic_table from cities_demographic dataframe
dim_cities_demographic_table = df_cities_demographic \
    .filter(df_cities_demographic.State.isNotNull() &
            df_cities_demographic.City.isNotNull()) \
    .selectExpr([
        "cast(state as string)",
        "cast(`State Code` as string) state_code",
        "cast(city as string)",
        "cast(`Median Age` as decimal(4,1)) median_age",
        "cast(`Male Population` as int) male_population",
        "cast(`Female Population` as int) female_population",
        "cast(`Total Population` as int) total_population",
        "cast(`Number of Veterans` as int) number_of_veterans",
        "cast(`Foreign-born` as int) number_of_foregn_born",
        "cast(`Average Household Size` as decimal(4,2)) \
            avarage_household_size",
        "cast(`American Indian and Alaska Native` as int) \
            american_indian_alaska_native",
        "cast(`Asian` as int) asian",
        "cast(`Black or African-American` as int) \
            black_or_africanamerican",
        "cast(`Hispanic or Latino` as int) hispanic_or_latino",
        "cast(`White` as int) white"
        ]).dropDuplicates(["state", 'city'])

# write dim_cities_demographic_table to parquet files
dim_cities_demographic_table.write.mode("overwrite") \
    .parquet("./tables/dim_cities_demographic_table.parquet")

# Print Schema
dim_cities_demographic_table.printSchema()

root
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- median_age: decimal(4,1) (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- number_of_foregn_born: integer (nullable = true)
 |-- avarage_household_size: decimal(4,2) (nullable = true)
 |-- american_indian_alaska_native: integer (nullable = true)
 |-- asian: integer (nullable = true)
 |-- black_or_africanamerican: integer (nullable = true)
 |-- hispanic_or_latino: integer (nullable = true)
 |-- white: integer (nullable = true)



In [28]:
# generate dim_i94_country_table from df_i94_country dataframe
dim_i94_country_table = df_i94_country \
    .withColumn('country_code', regexp_replace('country_code', ' ', '')) \
    .filter(df_i94_country.country_code.isNotNull()) \
    .selectExpr([
        "cast(country_code as int)",
        "cast(country as string)"
    ]).dropDuplicates(["country_code"])

# write dim_i94_country_table to parquet files
dim_i94_country_table.write.mode("overwrite") \
    .parquet("./tables/dim_i94_country_table.parquet")

# Print Schema
dim_i94_country_table.printSchema()

root
 |-- country_code: integer (nullable = true)
 |-- country: string (nullable = true)



In [29]:
# generate dim_i94_port_table from df_i94_port dataframe and df_i94_state
dim_i94_port_table = df_i94_port \
    .join(df_i94_state, 'state_code') \
    .filter(df_i94_port.port_code.isNotNull()) \
    .selectExpr([
        "cast(port_code as string)",
        "cast(city as string)",
        "cast(state_code as string) state_code",
        "cast(state as string)"
    ]).dropDuplicates(["port_code"])

# write dim_i94_port_table to parquet files
dim_i94_port_table.write.mode("overwrite") \
    .parquet("./tables/dim_i94_port_table.parquet")

# Print Schema
dim_i94_port_table.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)



In [30]:
# generate dim_travel_mode_table from df_i94_mode dataframe
dim_i94_travel_mode_table = df_i94_mode \
    .withColumn('travel_mode_code',
                regexp_replace('travel_mode_code', ' ', '')) \
    .selectExpr([
        "cast(travel_mode_code as int)",
        "cast(travel_mode as string)"
    ]).dropDuplicates(["travel_mode_code"])

# write dim_i94_travel_mode_table to parquet files
dim_i94_travel_mode_table.write.mode("overwrite") \
    .parquet("./tables/dim_i94_travel_mode_table.parquet")

# Print Schema
dim_i94_travel_mode_table.printSchema()

root
 |-- travel_mode_code: integer (nullable = true)
 |-- travel_mode: string (nullable = true)



#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. Build a spark session
2. Process Temperature Data
    - Load and clean Temperature data 
    - Filter "United States" temperature data and aggregate into dim_temperatures_us_cities_table
    - Write dim_temperatures_us_cities_table to parquet files partitioned by year and month
    - Filter out "United States" temperature data and aggregate into dim_temperatures_us_cities_table
    - Write dim_temperatures_countries_table to parquet files partitioned by year and month
3. Process Cities Demographic Data
    - Load and clean Cities Demographic data 
    - Aggregate Cities Demographic dataframe by Race with Count
    - Write dim_cities_demographic_table to parquet files
4. Process I94 SAS Labels Data
    - Load, clean and parse i94_sas_labels data
    - Generate dim_i94_country_table and write to parquet files
    - Generate dim_i94_port_table and write to parquet files
    - Generate dim_i94_travel_mode_table and write to parquet files
5. Process Immigration I94 Data
    - Load and clean Immigration I94 data
    - Filter visitor data that visited for pleasure
    - Convert sasdate to datetime
    - Generate dim_date_table and write to parquet files partitioned by year and month
    - Generate fact_i94_us_visitor_arrivals_table and write to parquet files partitioned by arrival_year and arrival_month

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

'etl.py' can parse data and create/update the data model as Data Pipeline.

In [31]:
# %run ./etl.py

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [33]:
def quality_check(table,tablename):
    """
    - Check whether the table created or not
    - Count table rows number and raise error if it has no row.
    """
    if table is None:
        print(f"{tablename} is not exist")
        raise ValueError(f"Table is not exist")
    
    # Count number of rows for given table    
    count = table.count()
    print(f"{tablename} is processing now, it has {count} rows")
    if not count > 0 :
        raise ValueError(f"Data quality check failed")
    table.show(5)

table_list = [(fact_i94_us_visitor_arrivals_table,"fact_i94_us_visitor_arrivals_table"),
(dim_date_table,"dim_date_table"),
(dim_temperatures_us_cities_table,"dim_temperatures_us_cities_table"),
(dim_temperatures_countries_table,"dim_temperatures_countries_table"),
(dim_cities_demographic_table,"dim_cities_demographic_table"),
(dim_i94_country_table,"dim_i94_country_table"),
(dim_i94_port_table,"dim_i94_port_table"),
(dim_i94_travel_mode_table,"dim_i94_travel_mode_table")]

for table in table_list: quality_check(table[0],table[1])

fact_i94_us_visitor_arrivals_table is processing now, it has 2530642 rows
+-----+------------+-------------+------------+-----------------+---------------------+-------------------------+-----------+--------------+----------------+--------------+----------------+
|cicid|arrival_year|arrival_month|arrival_date|arrival_port_code|visitor_birth_country|visitor_residence_country|visitor_age|visitor_gender|travel_mode_code|travel_airline|travel_visa_type|
+-----+------------+-------------+------------+-----------------+---------------------+-------------------------+-----------+--------------+----------------+--------------+----------------+
|   29|        2016|            4|  2016-04-01|              ATL|                  101|                      101|         62|             M|               1|            AZ|              B2|
|  474|        2016|            4|  2016-04-01|              NEW|                  103|                      103|         25|             F|               2|         

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### DICTIONARY - fact_i94_us_visitor_arrivals_table
| Column Name               | Descriptions                                                                         |
|---------------------------|--------------------------------------------------------------------------------------|
| cicid                     | Uniq id for   non-us visitors i94 entry form                                         |
| arrival_year              | Arrival Year as YYYY                                                                 |
| arrival_month             | Arrival Month as MM                                                                  |
| arrival_date              | Arrival date to US as   YYYY-MM-DD format                                            |
| arrival_port_code         | Arrival Port code as   3 digits String                                               |
| visitor_birth_country     | Visiter's nationality   country as 3 digits integer                                  |
| visitor_residence_country | Visiter's residence   country as 3 digits integer                                    |
| visitor_age               | Age of Visitor in   Years                                                            |
| visitor_gender            | Visiter's Sex                                                                        |
| travel_mode_code          | Transportation   mode, there are missing values as well as not reported (9)          |
| travel_airline            | Airline used to   arrive in U.S.                                                     |
| travel_visa_type          | Class of admission   legally admitting the non-immigrant to temporarily stay in U.S. |

#### DICTIONARY - dim_date_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| arriving_date                    | Arrival date to US as   YYYY-MM-DD format                         |
| day                              | Arrival Day to US                                                 |
| week                             | Arrival Week number   to US                                       |
| month                            | Arrival Month number   to US                                      |
| year                             | Arrival Year to US                                                |
| quarter                          | Arrival Quarter   number to US                                    |

#### DICTIONARY - dim_temperatures_us_cities_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| year                             | Year of temperature record                                        |
| month                            | Month number of temperature record                                |
| city                             | US city name of temperature record                                |
| average_temperature              | Monthly Average temperature for given US city                     |


#### DICTIONARY - dim_temperatures_countries_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| year                             | Year of temperature record                                        |
| month                            | Month number of temperature record                                |
| country                          | Country name of temperature record                                |
| average_temperature              | Monthly Average temperature for given Country                     |

#### DICTIONARY - dim_cities_demographic_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| state                            | US state name                                                     |
| state_code                       | US state code as 2 letters                                        |
| city                             | US city name                                                      |
| median_age                       | The median of the age of the   population                         |
| male_population                  | Number of the   male population                                   |
| female_population                | Number of the   female population                                 |
| total_population                 | Number of the   total population                                  |
| number_of_veterans               | Number of   veterans residents in given city                      |
| number_of_foregn_born            | Number of residents who born in   foregn country                  |
| avarage_household_size           | Average size of houses in given   city                            |
| american_indian_alaska_native    | Number of american indian alaska   native residents in given city |
| asian                            | Number of asian   residents in given city                         |
| black_or_africanamerican         | Number of black   or african-american residents in given city     |
| hispanic_or_latino               | Number of   hispanic or latino residents in given city            |
| white                            | Number of white   residents in given city                         |

#### DICTIONARY - dim_i94_country_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| country_code                     | 3 digits integer code stands for   country                        |
| country                          | Country name                                                      |

#### DICTIONARY - dim_i94_port_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| port_code                        | 3 letters code stands for Ports                                   |
| city                             | City name                                                         |
| state_code                       | 2 letters code stands for US   States                             |
| state                            | US States name                                                    |

#### DICTIONARY - dim_i94_travel_mode_table
| Column Name                      | Descriptions                                                      |
|----------------------------------|-------------------------------------------------------------------|
| travel_mode_code                 | 1 digit for mode of trasnsportation                               |
| travel_mode                      | Mode of transportation                                            |

#### Step 5: Complete Project Write Up

* **Clearly state the rationale for the choice of tools and technologies for the project:**

  It is built to run on AWS Cloud due to the fact that the source data is huge, it requires high processing power and storage that can not be handled on a local PC. AWS S3 Bucket is scalable and faster to access from processing Unit. The Pipeline run periodically so we will use some resources occasionally. AWS Cloud Services provide cheaper solution called "pay as you go" that is fitting well for our design as a cost-effective solution. Apache Spark is immensely powerful to process data faster in a parallel and failure tolerant way. Apache Spark has several built-in libraries that makes easy by selecting familiar methods.


* **Propose how often the data should be updated and why:**

  The Data Pipeline can be run by monthly due to the fact output parquet files are partitioned by year and month. So, it will make sense to parse I94 Immigration data and   temperature data monthly, however US City Demographics data is not frequently updated like once a decade so it might be run when the source data updated.


* Write a description of how you would approach the problem differently under the following scenarios:
 * **The data was increased by 100x:**
   For storage perspective, Amazon S3 Bucket is already right solution and can be scaled. For data processing purpuso Apache Spark can be deployed in AWS EMR as a cluster to increase processing capacity. 
   
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day:**
   APACHE AIRFLOW is the solution for scheduling Pipeline and monitor the processes on a simple dashboard. 
   
 * **The database needed to be accessed by 100+ people:**
   AWS REDSHIFT could be a smart solution to make the output data accessible for 100+ people since it is easy to scale our Apache parquet files to AWS REDSHIFT cluster that can be queried by many people concurrently.
