# Yelp Data Analysis
### Data Engineering Capstone Project

#### Project Summary
In this project, We will explore Yelp dataset and find various data points that will help local businesses.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
%%bash
pip install -r requirement.txt



In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import psycopg2
import uuid
import re
import os
import configparser
import boto3

# Configuration
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

# NLTK data setup
import nltk
nltk.download('vader_lexicon')
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

# Redshift connection setup
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))

# Get the Spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

spark

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

-- From Yelp dataset, We will extract various data points related to an individual local business such as the trend of ratings, how many positive/negative reviews are, words their user highlighted and so on. In the end, We will build a dashboard to show those found data points to understand the current state of each local business.

-- We will use Pyspark to read/transform data, Natural Language Toolkit to analyze reviews, Redshift to save analyzed data and Jupyter Notebook to build a dashboard.

-- The data process pipeline in this notebook will only process a partial data set. This data process pipeline will be automated by Airflow Pipeline to process the whole data set.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [3]:
main_bucket = config.get("DATA_S3", "ETL_TEMP_S3_BUCKET")
data_folder = "s3a://yura.udacity.dend.capstone.sample/"
temp_bucket = main_bucket + "/temp/"

##### Business data
Contains business data including location data, attributes, and categories.

```json
{
    // string, 22 character unique string business id
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg",
    // string, the business's name
    "name": "Garaje",
    // string, the full address of the business
    "address": "475 3rd St",
    // string, the city
    "city": "San Francisco",
    // string, 2 character state code, if applicable
    "state": "CA",
    // string, the postal code
    "postal code": "94107",
    // float, latitude
    "latitude": 37.7817529521,
    // float, longitude
    "longitude": -122.39612197,
    // float, star rating, rounded to half-stars
    "stars": 4.5,
    // integer, number of reviews
    "review_count": 1198,
    // integer, 0 or 1 for closed or open, respectively
    "is_open": 1,
    // an array of strings of business categories
    "categories": [
        "Mexican",
        "Burgers",
        "Gastropubs"
    ]
}

```

In [4]:
business_df = spark.read.json(data_folder + 'business.json')
business_df.limit(2).toPandas()

Unnamed: 0,address,attributes,business_id,categories,city,hours,is_open,latitude,longitude,name,postal_code,review_count,stars,state
0,2818 E Camino Acequia Drive,"(None, None, None, None, None, None, None, Non...",1SWheh84yJXfytovILXOAQ,"Golf, Active Life",Phoenix,,0,33.522143,-112.018481,Arizona Biltmore Golf Club,85016,5,3.0,AZ
1,30 Eglinton Avenue W,"(None, None, u'full_bar', {'romantic': False, ...",QXAEGFB4oINsVuTFxEYKFQ,"Specialty Food, Restaurants, Dim Sum, Imported...",Mississauga,"(9:0-1:0, 9:0-0:0, 9:0-1:0, 9:0-0:0, 9:0-0:0, ...",1,43.605499,-79.652289,Emerald Chinese Restaurant,L5R 3E7,128,2.5,ON


##### Checkin data
Checkins on a business.

| business_id            | date                                                          |
|------------------------|---------------------------------------------------------------|
| --1UhMGODdWsrMastO9DZw | 2011-06-04 18:22:23, 2011-07-23 23:51:33, 2012-04-15 01:07:50 |
| --6MefnULPED_I942VcFNA |  2014-12-29 19:25:50, 2015-01-17 01:49:14                     |

In [5]:
checkin_df = spark.read.format('csv').option('header', 'true').load(data_folder + 'checkin.csv')
checkin_df.limit(2).toPandas()

Unnamed: 0,date,business_id
0,"2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016...",--1UhMGODdWsrMastO9DZw
1,"2011-06-04 18:22:23, 2011-07-23 23:51:33, 2012...",--6MefnULPED_I942VcFNA


##### Review Data
Contains full review text data including the user_id that wrote the review and the business_id the review is written for.

```json
{
    // string, 22 character unique review id
    "review_id": "zdSx_SD6obEhz9VrW9uAWA",
    // string, 22 character unique user id, maps to the user in user.json
    "user_id": "Ha3iJu77CxlrFm-vQRs_8g",
    // string, 22 character business id, maps to business in business.json
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg",
    // integer, star rating
    "stars": 4,
    // string, date formatted YYYY-MM-DD
    "date": "2016-03-09",
    // string, the review itself
    "text": "Great place to hang out after work: the prices are decent, and the ambience is fun. It's a bit loud, but very lively. The staff is friendly, and the food is good. They have a good selection of drinks."
}
```

In [6]:
review_df = spark.read.json(data_folder + 'review.json')
review_df.limit(2).toPandas()

Unnamed: 0,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,ujmEBvifdJM6h6RLv4wQIg,0,2013-05-07 04:34:36,1,Q1sbwvVQXV2734tPgoKj4Q,1.0,Total bill for this horrible service? Over $8G...,6,hG7b0MtEbXx5QzbzE6C_VA
1,NZnhc2sEQy3RmzKTZnqtwQ,0,2017-01-14 21:30:33,0,GJXCdrto3ASJOqKeVWPi6Q,5.0,I *adore* Travis at the Hard Rock's new Kelly ...,0,yXQM5uF2jS6es16SJzNHfg


##### Tip data
Tips written by a user on a business. Tips are shorter than reviews and tend to convey quick suggestions.

```json
{
    // string, text of the tip
    "text": "Secret menu - fried chicken sando is da bombbbbbb Their zapatos are good too.",
    // string, when the tip was written, formatted like YYYY-MM-DD
    "date": "2013-09-20",
    // integer, how many compliments it has
    "compliment_count": 172,
    // string, 22 character business id, maps to business in business.json
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg",
    // string, 22 character unique user id, maps to the user in user.json
    "user_id": "49JhAJh8vSQ-vM4Aourl0g"
}

```

In [7]:
uuid_udf = F.udf(lambda: str(uuid.uuid4()), StringType())

tip_df = spark.read.json(data_folder + 'tip.json')
tip_df = tip_df.withColumn("tip_id", uuid_udf())
tip_df.limit(2).toPandas()

Unnamed: 0,business_id,compliment_count,date,text,user_id,tip_id
0,VaKXUpmWTTWDKbpJ3aQdMw,0,2014-03-27 03:51:24,"Great for watching games, ufc, and whatever el...",UPw5DWs_b-e2JRBS-t37Ag,9b9632e4-ab29-48fe-9af9-661b2637f54c
1,OPiPeoJiv92rENwbq76orA,0,2013-05-25 06:00:56,Happy Hour 2-4 daily with 1/2 price drinks and...,Ocha4kZBHb4JK0lOWvE0sg,759056ea-2714-4d25-b1ea-07f8db2088c3


##### User data
User data including the user's friend mapping and all the metadata associated with the user.

```json
{
    // string, 22 character unique user id, maps to the user in user.json
    "user_id": "Ha3iJu77CxlrFm-vQRs_8g",
    // string, the user's first name
    "name": "Sebastien",
    // integer, the number of reviews they've written
    "review_count": 56,
    // string, when the user joined Yelp, formatted like YYYY-MM-DD
    "yelping_since": "2011-01-01",
    // array of strings, an array of the user's friend as user_ids
    "friends": [
        "wqoXYLWmpkEH0YvTmHBsJQ",
        "KUXLLiJGrjtSsapmxmpvTA",
        "6e9rJKQC3n0RSKyHLViL-Q"
    ],
    // array of integers, the years the user was elite
    "elite": [
        2012,
        2013
    ]
}

```

In [8]:
user_df = spark.read.json(data_folder + 'user.json')
user_df.limit(2).toPandas()

Unnamed: 0,average_stars,compliment_cool,compliment_cute,compliment_funny,compliment_hot,compliment_list,compliment_more,compliment_note,compliment_photos,compliment_plain,...,cool,elite,fans,friends,funny,name,review_count,useful,user_id,yelping_since
0,4.03,1,0,1,2,0,0,1,0,1,...,25,201520162017.0,5,"c78V-rj8NQcQjOI8KP3UEA, alRMgPcngYSCJ5naFRBz5g...",17,Rashmi,95,84,l6BmjZMeQD3rDxWUbiAiow,2013-10-08 23:11:33
1,3.63,1,0,1,1,0,0,0,0,0,...,16,,4,"kEBTgDvFX754S68FllfCaA, aB2DynOxNOJK9st2ZeGTPg...",22,Jenna,33,48,4XChL029mKr5hydo79Ljxg,2013-02-21 22:29:06


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### Business data
We'd like to only analyze data for active businesses. Inactive(is_open = 0) business should be excluded.

In [9]:
business_df.createOrReplaceTempView("business")

spark.sql("""
select count(*) from business
""").show()
spark.sql("""
select count(*) from business where is_open = 0
""").show()

# Remove inactive businesses
business_df = spark.sql("""
select * from business where is_open = 1
""")
business_df.count()

+--------+
|count(1)|
+--------+
|     100|
+--------+

+--------+
|count(1)|
+--------+
|      20|
+--------+



80

##### Review/Tip data
We will tokenize and run the sentimental analysis for reviews and tips. If reviews or tips have en empty or null text, then they should be excluded.

In [10]:
review_df.createOrReplaceTempView("review")

spark.sql("""
select count(*) from review where text is null or length(text) == 0
""").show()

# Remove review is null or emtpy review
review_df = spark.sql("""
select * from review where text is not null and length(text) > 0
""")
review_df.count()

+--------+
|count(1)|
+--------+
|       0|
+--------+



100

In [11]:
tip_df.createOrReplaceTempView("tip")

spark.sql("""
select count(*) from tip where text is null or length(text) == 0
""").show()

# Remove tip is null or emtpy tip
tip_df = spark.sql("""
select * from tip where text is not null and length(text) > 0
""")
tip_df.count()

+--------+
|count(1)|
+--------+
|       0|
+--------+



100

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

##### Fact tables

1. review: Reviews in the review data and the result of the sentiment analysis
    - <em>review_id, user_id, business_id, stars, date, text, sentiment</em>
2. tip: Tips in the tip data and the result of the sentiment analysis
    - <em>tip_id, user_id, date, business_id, text, sentiment</em>

##### Demention tables

1. business: Businesses in the business data
    - <em>business_id, address, city, latitude, longitude, name, postal_code, state</em>
2. business_category: Categories in the business data
    - <em>business_id, category</em>
3. review_text: Tokenized review texts
    - <em>review_id, word</em>
4. tip_text: Tokenized tip texts
    - <em>tip_id, word</em>
5. checkin: Checkin info in checkin data
    - <em>business_id, date</em>
6. user: User in the user data
    - <em>user_id, name</em>
7. user_elite: Elite users and years the user was elite
    - <em>user_id, year</em>
8. user_friend: Friends of elite users
    - <em>user_id, friend_id</em>

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Read the business data and load business and business categories information.
    1. If a business have multiple business categories, they should be separate rows.
2. Read the user data nad load user, elite user, user friends information.
    1. If a user was the elite in multiple years, they should be separate rows.
    2. We are only interested in Elite user's friends information. Only elite users' friend information should be saved to the user_friend.
    3. If a user has multiple friends, they should be seperate rows.
3. Read the checkin data and load checkin information.
    1. The date field in the checkin data is a comma-separated list of timestamps for each checkin. Each individual checkin should be separate rows.
4. Read the review data, and run the sentiment analysis and tokenization. Save them to each corresponding tables.
    1. If a review has more the 'positive' element than the 'negative' from the result of sentiment analysis, the review should be considered a positive review, and vice versa.
    2. The tokenization should be executed after the spelling check and the lemmatization, and known english stopwords should be excluded from tokens.
5. Red the tip data and run the sentiment analysis and tokenization. Save them to each corresponding tables.
    1. If a tip has more the 'positive' element than the 'negative' from the result of sentiment analysis, the review should be considered as a positive tip and vice versa.
    2. The tokenization should be executed after the spelling check and the lemmatization, and known english stopwords should be excluded from tokens.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Optional: Create tables(This is only required one time)

In [12]:
with conn.cursor() as cur:
    cur.execute(open("create_tables.sql", "r").read())
conn.commit()

In [13]:
copy_sql = """
COPY {}
FROM 's3://{}/'
IAM_ROLE '{}'
FORMAT AS PARQUET
"""

##### 4.1.1.1 Load the business table

In [14]:
business_table_df = spark.sql("""
select
    business_id,
    address,
    city,
    latitude,
    longitude,
    name,
    postal_code,
    state
from business
""")
business_table_df.limit(2).toPandas()

Unnamed: 0,business_id,address,city,latitude,longitude,name,postal_code,state
0,1SWheh84yJXfytovILXOAQ,2818 E Camino Acequia Drive,Phoenix,33.522143,-112.018481,Arizona Biltmore Golf Club,85016,AZ
1,QXAEGFB4oINsVuTFxEYKFQ,30 Eglinton Avenue W,Mississauga,43.605499,-79.652289,Emerald Chinese Restaurant,L5R 3E7,ON


In [49]:
table_name = "business"
parquet_file_name = temp_bucket + table_name + ".parquet"
business_table_df.write.parquet("s3a://" + parquet_file_name)

In [50]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.1.2 Load the business_category table

In [17]:
business_category_table_df = business_df \
    .select("business_id", "categories") \
    .withColumn("categories", F.explode(F.split("categories", ", ")))
business_category_table_df.limit(2).toPandas()

Unnamed: 0,business_id,categories
0,QXAEGFB4oINsVuTFxEYKFQ,Specialty Food
1,QXAEGFB4oINsVuTFxEYKFQ,Restaurants


In [18]:
table_name = "business_category"
parquet_file_name = temp_bucket + table_name + ".parquet"
business_category_table_df.write.parquet("s3a://" + parquet_file_name)

In [19]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.2.1 Load user table

In [20]:
user_df.createOrReplaceTempView("yelp_user")
user_table_df = spark.sql("""
select
    user_id,
    name
from yelp_user
""")
user_table_df.limit(2).toPandas()

Unnamed: 0,user_id,name
0,l6BmjZMeQD3rDxWUbiAiow,Rashmi
1,4XChL029mKr5hydo79Ljxg,Jenna


In [21]:
table_name = "yelp_user"
parquet_file_name = temp_bucket + table_name + ".parquet"
user_table_df.write.parquet("s3a://" + parquet_file_name)

In [22]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.2.2 Load user_elite table

In [23]:
user_elite_table_df = spark.sql("""
select
    user_id,
    elite
from yelp_user
where elite is not null
""")

user_elite_table_df = user_elite_table_df \
    .withColumn("year", F.explode(F.split("elite", ","))) \
    .select("user_id", "year")
user_elite_table_df.limit(2).toPandas()

Unnamed: 0,user_id,year
0,l6BmjZMeQD3rDxWUbiAiow,2015
1,l6BmjZMeQD3rDxWUbiAiow,2016


In [24]:
table_name = "yelp_user_elite"
parquet_file_name = temp_bucket + table_name + ".parquet"
user_elite_table_df.write.parquet("s3a://" + parquet_file_name)

In [25]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.2.3 Load user_friend table

In [26]:
user_friend_table_df = spark.sql("""
select
    user_id,
    friends
from yelp_user
where elite is not null and friends is not null
""")

user_friend_table_df = user_friend_table_df \
    .withColumn("friend_id", F.explode(F.split("friends", ","))) \
    .select("user_id", "friend_id")
user_friend_table_df.limit(2).toPandas()

Unnamed: 0,user_id,friend_id
0,l6BmjZMeQD3rDxWUbiAiow,c78V-rj8NQcQjOI8KP3UEA
1,l6BmjZMeQD3rDxWUbiAiow,alRMgPcngYSCJ5naFRBz5g


In [27]:
table_name = "yelp_user_friend"
parquet_file_name = temp_bucket + table_name + ".parquet"
user_friend_table_df.write.parquet("s3a://" + parquet_file_name)

In [28]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.3 Load checkin table

In [29]:
checkin_table_df = checkin_df \
    .withColumn("splited_date", F.explode(F.split("date", ", "))) \
    .withColumn("casted_date", F.to_timestamp("splited_date", "yyyy-MM-dd HH:mm:ss")) \
    .select(
        "business_id",
        F.col("casted_date").alias("date")
    )
checkin_table_df.limit(2).toPandas()

Unnamed: 0,business_id,date
0,--1UhMGODdWsrMastO9DZw,2016-04-26 19:49:16
1,--1UhMGODdWsrMastO9DZw,2016-08-30 18:36:57


In [30]:
table_name = "checkin"
parquet_file_name = temp_bucket + table_name + ".parquet"
checkin_table_df.write.parquet("s3a://" + parquet_file_name)

In [31]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.4.1 Build the sentiment analyser

In [32]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()

def get_sentiment_analysis_score(sentence):
    score = analyser.polarity_scores(sentence)
    return score['compound']

def get_sentiment_analysis_result(score):
    if score >= 0.05:
        return "POSITIVE"
    elif score <= -0.05:
        return "NEGATIVE"
    else:
        return "NEUTRAL"
    
get_sentiment_analysis_score_udf = F.udf(lambda x: get_sentiment_analysis_score(x), DoubleType())
get_sentiment_analysis_result_udf = F.udf(lambda x: get_sentiment_analysis_result(x), StringType())

##### 4.1.4.2 Build the tokenizer

In [33]:
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

tokenizer = RegexpTokenizer('\w+|\$[\d\.]+|\S+')
def tokenize(sentence):
    tokens = tokenizer.tokenize(sentence)
    return tokens

stop_words = set(stopwords.words('english'))
def is_not_stopword(word):
    return not word in stop_words

lemmatizer = WordNetLemmatizer()
def lemmatize(word):
    return lemmatizer.lemmatize(word)

def is_valid_word(word):
    return len(word) > 1 and not re.match("^([^a-z|0-9]|\'[a-z|0-9])$", word) and not word.isdecimal()
    
tokenize_udf = F.udf(lambda x: tokenize(x), ArrayType(StringType()))
is_not_stopword_udf = F.udf(lambda x: is_not_stopword(x), BooleanType())
lemmatize_udf = F.udf(lambda x: lemmatize(x), StringType())
is_valid_word_udf = F.udf(lambda x: is_valid_word(x), BooleanType())

##### 4.1.4.3 Load review table

In [34]:
review_staging_df = spark.sql("""
select
    review_id,
    user_id,
    business_id,
    stars,
    to_timestamp(date, 'yyyy-MM-dd HH:mm:ss') as date,
    lower(text) as text
from review
""")

In [35]:
review_table_df = review_staging_df \
    .withColumn("sa_score", get_sentiment_analysis_score_udf("text")) \
    .withColumn("sentiment", get_sentiment_analysis_result_udf("sa_score")) \
    .select(
        "review_id",
        "user_id",
        "business_id",
        "stars",
        "date",
        "text",
        "sentiment")
review_table_df.limit(2).toPandas()

Unnamed: 0,review_id,user_id,business_id,stars,date,text,sentiment
0,Q1sbwvVQXV2734tPgoKj4Q,hG7b0MtEbXx5QzbzE6C_VA,ujmEBvifdJM6h6RLv4wQIg,1.0,2013-05-07 04:34:36,total bill for this horrible service? over $8g...,NEGATIVE
1,GJXCdrto3ASJOqKeVWPi6Q,yXQM5uF2jS6es16SJzNHfg,NZnhc2sEQy3RmzKTZnqtwQ,5.0,2017-01-14 21:30:33,i *adore* travis at the hard rock's new kelly ...,POSITIVE


In [36]:
table_name = "review"
parquet_file_name = temp_bucket + table_name + ".parquet"
review_table_df.write.parquet("s3a://" + parquet_file_name)

In [37]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.4.4 Load review_text table

In [38]:
review_text_table_df = review_staging_df \
    .withColumn("word", F.explode(tokenize_udf("text"))) \
    .withColumn("word", lemmatize_udf("word")) \
    .filter(is_not_stopword_udf("word")) \
    .filter(is_valid_word_udf("word")) \
    .select(
        "review_id",
        "word")

review_text_table_df.limit(2).toPandas()

Unnamed: 0,review_id,word
0,Q1sbwvVQXV2734tPgoKj4Q,total
1,Q1sbwvVQXV2734tPgoKj4Q,bill


In [39]:
table_name = "review_text"
parquet_file_name = temp_bucket + table_name + ".parquet"
review_text_table_df.write.parquet("s3a://" + parquet_file_name)

In [40]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.5.1 Load tip table

In [41]:
tip_staging_df = spark.sql("""
select
    tip_id,
    user_id,
    to_timestamp(date, 'yyyy-MM-dd HH:mm:ss') as date,
    business_id,
    lower(text) as text
from tip
""")

In [42]:
tip_table_df = tip_staging_df \
    .withColumn("sa_score", get_sentiment_analysis_score_udf("text")) \
    .withColumn("sentiment", get_sentiment_analysis_result_udf("sa_score")) \
    .select(
        "tip_id",
        "user_id",
        "date",
        "business_id",
        "text",
        "sentiment")
tip_table_df.limit(2).toPandas()

Unnamed: 0,tip_id,user_id,date,business_id,text,sentiment
0,9510aac3-8e37-4f8d-ae15-7bc411ecab5e,UPw5DWs_b-e2JRBS-t37Ag,2014-03-27 03:51:24,VaKXUpmWTTWDKbpJ3aQdMw,"great for watching games, ufc, and whatever el...",POSITIVE
1,b1ade63a-744b-4905-b80f-b743e4306d8f,Ocha4kZBHb4JK0lOWvE0sg,2013-05-25 06:00:56,OPiPeoJiv92rENwbq76orA,happy hour 2-4 daily with 1/2 price drinks and...,POSITIVE


In [43]:
table_name = "tip"
parquet_file_name = temp_bucket + table_name + ".parquet"
tip_table_df.write.parquet("s3a://" + parquet_file_name)

In [44]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.5.2 Load tip_text table

In [45]:
tip_text_table_df = tip_staging_df \
    .withColumn("word", F.explode(tokenize_udf("text"))) \
    .withColumn("word", lemmatize_udf("word")) \
    .filter(is_not_stopword_udf("word")) \
    .filter(is_valid_word_udf("word")) \
    .select(
        "tip_id",
        "word")

tip_text_table_df.limit(2).toPandas()

Unnamed: 0,tip_id,word
0,37553bf8-5ea7-4251-8637-5a5c2d2a0968,great
1,37553bf8-5ea7-4251-8637-5a5c2d2a0968,watching


In [46]:
table_name = "tip_text"
parquet_file_name = temp_bucket + table_name + ".parquet"
tip_text_table_df.write.parquet("s3a://" + parquet_file_name)

In [47]:
try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

##### 4.1.6 Clean up the temp data in S3

In [None]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(main_bucket)
bucket.objects.filter(Prefix="temp").delete()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### 4.2.1.1 Script unit test : Non nullable field check

In [55]:
non_nullable_fields = [
    ("business_category", "category"),
    ("review_text", "word"),
    ("tip_text", "word"),
    ("yelp_user_elite", "year"),
    ("yelp_user_friend", "friend_id")
]

with conn.cursor() as cur:
    for non_nullable_field in non_nullable_fields:
        cur.execute("select count(*) from {} where {} is null".format(non_nullable_field[0], non_nullable_field[1]))
        print("Null count of {}.{}: {}".format(non_nullable_field[0], non_nullable_field[1], str(cur.fetchone()[0])))

Count of null business_category.category: 0
Count of null review_text.word: 0
Count of null tip_text.word: 0
Count of null yelp_user_elite.year: 0
Count of null yelp_user_friend.friend_id: 0


##### 4.2.1.2 Script unit test : Sentiment analysis result check

In [59]:
sentiment_analysis_tables = [
    "review",
    "tip"
]

with conn.cursor() as cur:
    for table_name in sentiment_analysis_tables:
        cur.execute("select count(*) from {} where sentiment not in ('POSITIVE', 'NEGATIVE', 'NEUTRAL')".format(table_name))
        print("Invalid value count of {} table: {}".format(table_name, str(cur.fetchone()[0])))

Invalid value count of review table: 0
Invalid value count of tip table: 0


##### 4.2.2 Count check

In [53]:
table_names = [
    "business",
    "business_category",
    "review",
    "review_text",
    "tip",
    "tip_text",
    "checkin",
    "yelp_user",
    "yelp_user_elite",
    "yelp_user_friend"
]

with conn.cursor() as cur:
    for table_name in table_names:
        cur.execute("select count(*) from {}".format(table_name))
        print("Count of {} table: {}".format(table_name, str(cur.fetchone()[0])))

Count of business table: 0
Count of business_category table: 341
Count of review table: 100
Count of review_text table: 6960
Count of tip table: 100
Count of tip_text table: 706
Count of checkin table: 13961
Count of yelp_user table: 100
Count of yelp_user_elite table: 263
Count of yelp_user_friend table: 63777


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.