# Awesome Museum
### Data Engineering Capstone Project

#### Project Summary
This project collects and presents information about museums (US only at current stage, may extend to global in next pharse), including museum categories, locations, ratings and more.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# import libraries

import boto3
import configparser
import json
import os
import pandas as pd
import psycopg2
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

from helper import *

In [14]:
boto3.__version__

'1.9.7'

In [2]:
# Read configuration files
config = configparser.ConfigParser()
config.read('aws.cfg')

# AWS access key
os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

# AWS role
IAM = config['IAM_ROLE']['ARN']

# Redshift cluster
rs_host=config['CLUSTER']['HOST']
rs_dbname=config['CLUSTER']['DB_NAME']
rs_user=config['CLUSTER']['DB_USER']
rs_password=config['CLUSTER']['DB_PASSWORD']
rs_port=config['CLUSTER']['DB_PORT']

# S3
MUSEUM_DATA_RAW = config['S3']['MUSEUM_DATA']             # csv file
MUSEUM_DATA = config['S3']['MUSEUM_DATA_OUTPUT']          # parquet file output by Spark
MUSEUM_DATA_S3 = config['S3']['MUSEUM_DATA_S3']  

WEATHER_DATA_RAW = config['S3']['WEATHER_DATA']           # csv file
WEATHER_DATA = config['S3']['WEATHER_DATA_OUTPUT']        # parquet file output by Spark
WEATHER_DATA_S3 = config['S3']['WEATHER_DATA_S3']

CATEGORY_BUCKET = config['S3']['CATEGORY_BUCKET'] 
CATEGORY_KEY = config['S3']['CATEGORY_KEY']
CATEGORY_OUTPUT_KEY = config['S3']['CATEGORY_OUTPUT_KEY']
CATEGORY_DATA = config['S3']['CATEGORY_DATA']             # json file
CATEGORY_DATA_S3 = config['S3']['CATEGORY_DATA_S3']

TRAVELER_BUCKET = config['S3']['TRAVELER_BUCKET'] 
TRAVELER_KEY = config['S3']['TRAVELER_KEY']
TRAVELER_OUTPUT_KEY = config['S3']['TRAVELER_OUTPUT_KEY']
TRAVELER_DATA = config['S3']['TRAVELER_DATA']             # json file
TRAVELER_DATA_S3 = config['S3']['TRAVELER_DATA_S3']

S3_REGION = config['S3']['REGION']

# Country and weather date
COUNTRY = config['FILTER']['COUNTRY']
WEATHER_DATE = config['FILTER']['DATE']

### Step 1: Scope the Project and Gather Data

#### Scope 
Gather information for museums. Redshift and Spark are used for this project.

#### Describe and Gather Data 
Data are collected from Kaggle and Tripadvisor and are uploaded to AWS S3 s3://udacity-dend-shell845/museum-data/

museum overall summary of museums extracted from Tripadvisor. In csv format.

Address, Description, FeatureCount, Fee, Langtitude, Latitude, LengthOfVisit, MuseumName, PhoneNum, Rank, Rating, ReviewCount,TotalThingsToDo

category categories of museums, e.g. art museum, history museum, science museum etc. In json format.

{'museum': ['museum type 1','museum type 2', …]}

rating how many ratings did the museums receive and what are the ratings. In json format.

{'museum': ['Excellent','Very good','Average','Poor','Terrible']}

traveler how the travers travel. In json format.

{'museum': ['Families','Couples','Solo','Business','Friends']}

weather average tempature of the cities where the museums are located. In csv format.

dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude

In [3]:
# Create spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()

In [20]:
# process museum raw data  
process_museum_data(spark, MUSEUM_DATA_RAW, MUSEUM_DATA)

Process museum data start...
Process museum data complete


In [21]:
# process weather raw data
process_weather_data(spark, WEATHER_DATA_RAW, WEATHER_DATA, COUNTRY, WEATHER_SINCE)

Process weather data start...
Process weather data complete


In [4]:
# process category raw data
process_category_data(CATEGORY_BUCKET, CATEGORY_KEY, CATEGORY_OUTPUT_KEY, S3_REGION, os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])

Process category data start
Process category data complete


In [5]:
# process traveler raw data
process_traveler_data(TRAVELER_BUCKET, TRAVELER_KEY, TRAVELER_OUTPUT_KEY, S3_REGION, os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])

Process traveler data start
Process traveler data complete


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# completed in step 1

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [3]:
# connect to redshift
def create_redshift_connection():
    """
        Create Redshift connection
        and return the connection
    """
    print("Connect Redshift start")

    conn = psycopg2.connect(f"host={rs_host} dbname={rs_dbname} user={rs_user} password={rs_password} port={rs_port}")
    cur = conn.cursor()

    print("Connect Redshift complete")
    return conn, cur

conn, cur = create_redshift_connection()

Connect Redshift start
Connect Redshift complete


In [4]:
print(type(conn))
print(type(cur))

<class 'psycopg2.extensions.connection'>
<class 'psycopg2.extensions.cursor'>


In [None]:
drop_tables(conn, cur, drop_table_queries)

In [5]:
create_tables(conn, cur, create_table_queries)

Complete create tables


In [5]:
arn = "'" + IAM + "'"

In [7]:
staging_museum_sql = staging_parquet_copy.format(table_name='staging_museum', s3_bucket=MUSEUM_DATA_S3, arn_role=arn)
staging_parquet_data(cur, conn, [staging_museum_sql])

Copy parquet data from S3 to Redshift staging complete


In [8]:
staging_weather_sql = staging_parquet_copy.format(table_name='staging_weather', s3_bucket=WEATHER_DATA_S3, arn_role=arn)
staging_parquet_data(cur, conn, [staging_weather_sql])

Copy parquet data from S3 to Redshift staging complete


In [11]:
cur.execute("SELECT * FROM staging_weather LIMIT 10;")
cur.fetchall()

[(datetime.date(2013, 3, 1), 1.323, 'Chicago', 'United States'),
 (datetime.date(2012, 12, 1), 2.791, 'Roseville', 'United States'),
 (datetime.date(2013, 5, 1), 14.543, 'Roseville', 'United States'),
 (datetime.date(2013, 6, 1), 19.446, 'Roseville', 'United States'),
 (datetime.date(2013, 2, 1), 4.662, 'Roseville', 'United States'),
 (datetime.date(2013, 7, 1), 23.589, 'Roseville', 'United States'),
 (datetime.date(2013, 8, 1), 22.23, 'Chicago', 'United States'),
 (datetime.date(2013, 1, 1), 1.056, 'Chicago', 'United States'),
 (datetime.date(2013, 7, 1), 21.914, 'Chicago', 'United States'),
 (datetime.date(2013, 5, 1), 13.734, 'Chicago', 'United States')]

In [9]:
staging_category_sql = staging_json_copy.format(table_name='staging_category', s3_bucket=CATEGORY_DATA_S3, arn_role=arn)
staging_json_data(cur, conn, [staging_category_sql])   

Copy json data from S3 to Redshift staging complete


In [6]:
# cur.execute("DELETE FROM staging_traveler;")
# cur.execute("DELETE FROM city;")

In [10]:
staging_traveler_sql = staging_json_copy.format(table_name='staging_traveler', s3_bucket=TRAVELER_DATA_S3, arn_role=arn)
staging_json_data(cur, conn, [staging_traveler_sql]) 

Copy json data from S3 to Redshift staging complete


In [11]:
cur.execute("select * from staging_category limit 5;")
cur.fetchall()

[('Gettysburg Heritage Center', 'History Museums'),
 ('Hudson River Museum', 'Specialty Museums'),
 ('The Aurora Ice Museum', 'Specialty Museums'),
 ('Chrysler Museum of Art', 'Art Museums'),
 ('Pink Palace Museum', 'History Museums')]

In [12]:
cur.execute("select * from staging_traveler limit 10;")
cur.fetchall()

[('Gettysburg Heritage Center', 'families', 88),
 ('Gettysburg Heritage Center', 'couples', 86),
 ('Gettysburg Heritage Center', 'solo', 17),
 ('Gettysburg Heritage Center', 'business', 2),
 ('Gettysburg Heritage Center', 'friends', 33),
 ('Hudson River Museum', 'families', 25),
 ('Hudson River Museum', 'couples', 22),
 ('Hudson River Museum', 'solo', 2),
 ('Hudson River Museum', 'business', 4),
 ('Hudson River Museum', 'friends', 15)]

In [13]:
# data quality check on staging tables
data_quality_check(cur, conn, select_count_staging_queries)

Data quality check - ['\n    SELECT COUNT(*) FROM staging_category\n', '\n    SELECT COUNT(*) FROM staging_traveler\n', '\n    SELECT COUNT(*) FROM staging_weather\n', '\n    SELECT COUNT(*) FROM staging_museum\n']
Running 
    SELECT COUNT(*) FROM staging_category

    1007
Running 
    SELECT COUNT(*) FROM staging_traveler

    5035
Running 
    SELECT COUNT(*) FROM staging_weather

    24
Running 
    SELECT COUNT(*) FROM staging_museum

    11


In [14]:
# Transform traveler table
transform_traveler(cur, conn)

Complete traveler table


In [15]:
cur.execute("select * from traveler limit 5;")
cur.fetchall()

[(1, 'families'), (2, 'couples'), (3, 'solo'), (4, 'business'), (5, 'friends')]

In [8]:
# transform city table
transform_city(cur, conn, city_table_insert, COUNTRY)

Complete city table


In [9]:
cur.execute("select * from city limit 5;")
cur.fetchall()

[(9, 'Denver', 'United States'),
 (13, 'New York City', 'United States'),
 (6, 'Boston', 'United States'),
 (8, 'Chicago', 'United States'),
 (15, 'New Orleans', 'United States')]

In [18]:
# transform weather table
transform_weather(cur, conn, weather_table_insert, WEATHER_DATE)

Complete weather table


In [10]:
cur.execute("select * from weather limit 5;")
cur.fetchall()

[(2, datetime.date(2012, 10, 1), 12.229)]

In [20]:
# transform category table

transform_category(cur, conn, category_table_insert)

Complete category table


In [21]:
cur.execute("select * from category limit 5;")
cur.fetchall()

[(3, 'Specialty Museums'),
 (7, 'Historic Walking Areas'),
 (11, 'Natural History Museums'),
 (15, 'Points of Interest & Landmarks'),
 (19, 'Military Museums')]

In [22]:
# transform museum table
transform_museum(cur, conn, museum_table_insert)

Complete museum table


In [23]:
cur.execute("select * from museum limit 3;")
cur.fetchall()

[(10,
  'American Museum of Natural History',
  11,
  '79th Street and Central Park West, New York City, NY 10024',
  1,
  4.5,
  'solo'),
 (12,
  'The Metropolitan Museum of Art',
  15,
  '1000 5th Ave, New York City, NY 10028-0198',
  1,
  5.0,
  'solo'),
 (2,
  'The National 9/11 Memorial & Museum',
  3,
  '180 Greenwich St, World Trade Center, New York City, NY 10007-0089',
  1,
  4.5,
  'solo')]

In [24]:
# transform museum fact table
transform_museum_fact(cur, conn, museum_fact_table_insert, WEATHER_DATE)

Complete museum_fact table


In [26]:
cur.execute("select * from museum_fact limit 5;")
cur.fetchall()

[(6, 2, 3, 1, 4.5, None, 3, datetime.date(2012, 10, 1)),
 (2, 10, 11, 1, 4.5, None, 3, datetime.date(2012, 10, 1)),
 (4, 12, 15, 1, 5.0, None, 3, datetime.date(2012, 10, 1)),
 (8, 14, 19, 3, 5.0, None, 3, datetime.date(2012, 10, 1)),
 (7, 1, 1, 7, 4.5, None, 3, datetime.date(2012, 10, 1))]

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

In [10]:
data_quality_check(cur, conn, select_count_staging_queries)

Data quality check - ['\n    SELECT COUNT(*) FROM staging_category\n', '\n    SELECT COUNT(*) FROM staging_traveler\n', '\n    SELECT COUNT(*) FROM staging_weather\n', '\n    SELECT COUNT(*) FROM staging_museum\n']
Running 
    SELECT COUNT(*) FROM staging_category

    2014
Running 
    SELECT COUNT(*) FROM staging_traveler

    1007
Running 
    SELECT COUNT(*) FROM staging_weather

    24
Running 
    SELECT COUNT(*) FROM staging_museum

    11


In [27]:
data_quality_check(cur, conn, select_count_queries)

Data quality check - ['\n    SELECT COUNT(*) FROM city\n', '\n    SELECT COUNT(*) FROM category\n', '\n    SELECT COUNT(*) FROM traveler\n', '\n    SELECT COUNT(*) FROM weather\n', '\n    SELECT COUNT(*) FROM museum\n', '\n    SELECT COUNT(*) FROM museum_fact\n']
Running 
    SELECT COUNT(*) FROM city

    6
Running 
    SELECT COUNT(*) FROM category

    42
Running 
    SELECT COUNT(*) FROM traveler

    5
Running 
    SELECT COUNT(*) FROM weather

    1
Running 
    SELECT COUNT(*) FROM museum

    10
Running 
    SELECT COUNT(*) FROM museum_fact

    10


In [28]:
cur.close()
conn.close()

In [31]:
spark.stop()

In [30]:
print("awesome-museum ETL complete")

awesome-museum ETL complete


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.