This notebook aims to demonstrate the use of Spark DataFrame in processing large amounts of data. I will be using a compressed version of the Yelp Academic Dataset. The dataset is not included in the github repo, but you should be able to easily download it Yelp's website.

In [1]:
# Set up a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

sc = spark.sparkContext

There are 5 datasets in total, covering the restarant attributes, user attributes, checkins, reviews and tips. You can get details about each dataframe by calling .printSchema()

In [2]:
# Import datasets
business = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
review = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_review.json.gz')
tip = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_tip.json.gz')
user = spark.read.json('../non_auto_assignments/data/yelp_academic/yelp_academic_dataset_user.json.gz')

In [3]:
# Inspect the business dataset
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [4]:
# Inspect the user dataset
user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [7]:
# Inspect the tip dataset
tip.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



# Sentiment Analysis With Restaurant Reviews


We all know that reading user reviews is a reliable measure when choosing whether to dine at a particular restaurant. But what exactly constitutes a positive, or a negative reviews ? Yelp has done a great job embedding sentiments into reviews by allowing users to 'react' to other users' reviews. Which brings me to the next question -- How do we know when a review is 'funny', or 'cool' ? Let's find out.

First, let's get familiarized with spark by answering a few basic questions. How many users have received a whole lot of cool compliments (> 5000) ? By reading the schema of the datasets, we know that the answer lies in the user dataframe.

In [5]:
# Let's print the first line of the user df out
user.head()

Row(average_stars=4.03, compliment_cool=1, compliment_cute=0, compliment_funny=1, compliment_hot=2, compliment_list=0, compliment_more=0, compliment_note=1, compliment_photos=0, compliment_plain=1, compliment_profile=0, compliment_writer=2, cool=25, elite='2015,2016,2017', fans=5, friends='c78V-rj8NQcQjOI8KP3UEA, alRMgPcngYSCJ5naFRBz5g, ajcnq75Z5xxkvUSmmJ1bCg, BSMAmp2-wMzCkhTfq9ToNg, jka10dk9ygX76hJG0gfPZQ, dut0e4xvme7QSlesOycHQA, l4l5lBnK356zBua7B-UJ6Q, 0HicMOOs-M_gl2eO-zES4Q, _uI57wL2fLyftrcSFpfSGQ, T4_Qd0YWbC3co6WSMw4vxg, iBRoLWPtWmsI1kdbE9ORSA, xjrUcid6Ymq0DoTJELkYyw, GqadWVzJ6At-vgLzK_SKgA, DvB13VJBmSnbFXBVBsKmDA, vRP9nQkYTeNioDjtxZlVhg, gT0A1iN3eeQ8EMAjJhwQtw, 6yCWjFPtp_AD4x93WAwmnw, 1dKzpNnib-JlViKv8_Gt5g, 3Bv4_JxHXq-gVLOxYMQX0Q, ikQyfu1iViYh8T0us7wiFQ, f1GGltNaB7K5DR1jf3dOmg, tgeFUChlh7v8bZFVl2-hjQ, -9-9oyXlqsMG2he5xIWdLQ, Adj9fBPVJad8vSs-mIP7gw, Ce49RY8CKXVsTifxRYFTsw, M1_7TLi8CbdA89nFLlH4iw, wFsNv-hqbW_F5-IRqfBN6g, 0Q1L7zXHocaUZ2gsG2XJeg, cBFgmOCBdhYa0xoFEAzp_g, VrD_AgiFvzqtl

To get the number of users with more than 5000 compliments, the process is:
- Filter the dataframe to only users with more than 5000 compliments.
- Use collect() to get the filtered results in a Python list.
- Use len() to get the number of users.

In [6]:
# User with more than 5000 compliments

len(user.filter(user['compliment_cool'] > 5000).collect())

79

Next, let's have a look at the businesses. What are our top complimented businesses ? To find out, we will need to look at the tip dataframe.

In [8]:
# Inspect the datatypes in the tip df
tip.dtypes

[('business_id', 'string'),
 ('compliment_count', 'bigint'),
 ('date', 'string'),
 ('text', 'string'),
 ('user_id', 'string')]

The tip df does contain information to compliment counts, but does not contain the business names. To get them, we'll first need to join with the business df on business_id. The next step is to group by name and sum the number of compliments. Sort the df so that the top 10 business names are displayed.

In [9]:
import pyspark.sql.functions as func

In [10]:
result = tip \
.join(business, tip.business_id == business.business_id) \
.groupby('name') \
.agg(func.sum("compliment_count") \
     .alias('total_compliment')) \
.orderBy('total_compliment', ascending=False) \
.show(10)

+--------------------+----------------+
|                name|total_compliment|
+--------------------+----------------+
|           Starbucks|             213|
|Il Chianti Italia...|              68|
|          McDonald's|              62|
|McCarran Internat...|              54|
|    Bacchanal Buffet|              50|
|     Costco Gasoline|              45|
| Walmart Supercenter|              43|
|     In-N-Out Burger|              43|
|        Trader Joe's|              41|
|Chipotle Mexican ...|              40|
+--------------------+----------------+
only showing top 10 rows



Unsurprisingly a lot of the big names show up, like Starbucks or MacD. It's also interesting to note that 8/10 businesses are restaurants, with the exception of Costco Gasoline and Walmart. I wonder if I would ever leave a review for a supermarket! 

Next, let's take a look at the reviews. What are the top 10 useful positive reviews ? For this question, we'll take a look at the review df. Note that the reviews should be **positive** reviews. Only reviews scoring 3 stars and above will be considered positive.

In [11]:
# Inspect the review dataset
review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [12]:
# Print out the first row
review.head()

Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, date='2013-05-07 04:34:36', funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')

The result can be obtained by first filtering out positive (3 and above stars) reviews, then group by the review ids and sum the number of usefulness counts. Sort the result in a descending order and output the top 10.

In [13]:
review \
.filter(review['stars'] >= 3.0) \
.groupby('review_id') \
.agg(func.sum("useful") \
     .alias('most_useful')) \
.orderBy('most_useful', ascending=False) \
.show(10)

+--------------------+-----------+
|           review_id|most_useful|
+--------------------+-----------+
|O1YX1g7Wbf0rmcoud...|        808|
|1lGXlyq4MALOMx17v...|        358|
|7BNr0xFRpOO4PKvbv...|        333|
|RfqGVszoXCw5YhDxj...|        303|
|gAUkgn4dTO-R2n5LB...|        278|
|5S985RjfmDJYsJvUt...|        244|
|0nr6SQFKpR6JCYl1z...|        241|
|-hRpmcavsC0UDI_Qs...|        235|
|ScbDdrWZLmPdHDD1-...|        229|
|UlAVHI58m5XLWvzzh...|        228|
+--------------------+-----------+
only showing top 10 rows



That isn't particularly useful (ironically), but bear with me here. We will be using this for the sentiment analysis in just a bit. Before we get there, let's try to look at one more thing -- During what hour of the day do most checkins occur? As the question suggests, we shall be inspecting the checkin dataframe.

In [14]:
checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [15]:
checkin.head()

Row(business_id='--1UhMGODdWsrMastO9DZw', date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02')

First, we'll split the string of dates into individual dates, similar to what was done in the lecture.

In [16]:
from pyspark.sql.functions import udf, explode, hour, to_timestamp
from pyspark.sql.types import ArrayType,StringType
datesplit = udf(lambda x: x.split(','), ArrayType(StringType()))

In [17]:
checkin_time = checkin \
.select('business_id', datesplit('date').alias('dates')) \
.withColumn('checkin_date', explode('dates'))

The next step is to convert the date string into timestamp, and extract the hour value from it.

In [18]:
checkin_time_converted = checkin_time \
.select('business_id', 'dates', 'checkin_date') \
.withColumn('hour', hour(to_timestamp('checkin_date', 'yyyy-MM-dd HH:mm:ss')))

In [19]:
checkin_time_converted.first()

Row(business_id='--1UhMGODdWsrMastO9DZw', dates=['2016-04-26 19:49:16', ' 2016-08-30 18:36:57', ' 2016-10-15 02:45:18', ' 2016-11-18 01:54:50', ' 2017-04-20 18:39:06', ' 2017-05-03 17:58:02'], checkin_date='2016-04-26 19:49:16', hour=19)

The last step is to group by the hour.

In [20]:
checkin_time_converted \
.groupby('hour') \
.agg(func.count("business_id") \
     .alias('checkin_count')) \
.orderBy('checkin_count', ascending=False) \
.show(5)

+----+-------------+
|hour|checkin_count|
+----+-------------+
|   1|      1561788|
|  19|      1502271|
|   0|      1491176|
|   2|      1411255|
|  20|      1350195|
+----+-------------+
only showing top 5 rows



Most check-ins occur at 1am in the morning, followed by 7pm in the evening and midnight. It is rather surprising see how many checkins are done at and post-midnight. However the dataset does not give us information on whether all checkins are in the same timezone and the timezone difference if any could potentially be an impact in how we get the number of checkins by hour.

**Now to the star of the show - which are the non-stopwords that we will see the most in a positive, and vice versa, in a negative review?** Having this insight would be extremely useful in allowing us to predict sentiment in a review later on, say in case the reviews do not come with any sentiment attributes like this dataset. 

As an example, consider the following two reviews:

* Positive: The meal was great, and the service was the best we ever experienced.
* Negative: The meal was awful.  It was the worst thing we ever experienced.

Assume our stopwords are {'the','was','and','the','was','we','it'}

* Positive unique: {'great', 'service', 'best'}

* Negative unique: {'awful', 'worst', 'thing'}

In this example, each unique word occurs just once, so the concept of "top 50" doesn't make sense.  For your data, you'll need to count the number of times each unique word occurs. Using the same definition as what was laid out previously, a review is considered positive if it has 3 stars and above. Vice versa, a review is negative if it has less than 3 stars.

In [21]:
# list of stopwords
STOPWORDS_AND_SPACE = ['', 'i', 'we', 'ourselves', 'hers', 'between', 'yourself', 'but', 'again', 'there', 'about', 'once', 'during', 'out', 'very', 'having', 'with', 'they', 'own', 'an', 'be', 'some', 'for', 'do', 'its', 'yours', 'such', 'into', 'of', 'most', 'itself', 'other', 'off', 'is', 's', 'am', 'or', 'who', 'as', 'from', 'him', 'each', 'the', 'themselves', 'until', 'below', 'are', 'we', 'these', 'your', 'his', 'through', 'don', 'nor', 'me', 'were', 'her', 'more', 'himself', 'this', 'down', 'should', 'our', 'their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all', 'no', 'when', 'at', 'any', 'before', 'them', 'same', 'and', 'been', 'have', 'in', 'will', 'on', 'does', 'yourselves', 'then', 'that', 'because', 'what', 'over', 'why', 'so', 'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 'just', 'where', 'too', 'only', 'myself', 'which', 'those', 'i', 'after', 'few', 'whom', 't', 'being', 'if', 'theirs', 'my', 'against', 'a', 'by', 'doing', 'it', 'how', 'further', 'was', 'here', 'than']



In [22]:
# create 2 separate df for positive and negative
pos = review.filter(review['stars'] >= 3.0)
neg = review.filter(review['stars'] < 3.0)

The first step is, for each positive/negative dataframe, get all the unique words within that dataframe and their corresponding counts. This can be done with the following steps:
- Create a list of stopwords. Note that I also included whitespace for a regex split later on
- Split the review text into a list of words, and use explode() to create separate rows for each word.
- Group the dataFrame by the word and take the count.
- Sort the count in a descending order.

In [23]:
# Split the review string annd strip out all the stopwords 
import re
stringsplit = udf(lambda line: [w for w in re.split("[^A-Za-z0-9]", line.strip().lower()) if w not in STOPWORDS_AND_SPACE], ArrayType(StringType()))
                  

In [24]:
pos_split = pos \
.select('review_id', stringsplit('text').alias('words')) \
.withColumn('word', explode('words'))

In [25]:
pos_split.show(20)

+--------------------+--------------------+--------+
|           review_id|               words|    word|
+--------------------+--------------------+--------+
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|   adore|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|  travis|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|    hard|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|    rock|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|     new|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|   kelly|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|cardenas|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|   salon|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|       m|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|  always|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|     fan|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|   great|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...| blowout|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|stranger|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|  chains|
|GJXCdrto3ASJOqKeV...|[adore, travis, h...|   

In [32]:
pos_words_count = pos_split \
.groupby('word') \
.agg(func.count('review_id') \
     .alias('word_count')) \
.orderBy('word_count', ascending=False)
     

We'll do the same for the negative dataframe:

In [27]:
neg_split = neg \
.select('review_id', stringsplit('text').alias('words')) \
.withColumn('word', explode('words'))

In [28]:
neg_split.show(20)

+--------------------+--------------------+--------+
|           review_id|               words|    word|
+--------------------+--------------------+--------+
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|   total|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|    bill|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|horrible|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...| service|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|     8gs|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|  crooks|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|actually|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|   nerve|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|  charge|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|      us|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|      69|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|       3|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|   pills|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...| checked|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|  online|
|Q1sbwvVQXV2734tPg...|[total, bill, hor...|   

In [33]:
neg_words_count = neg_split \
.groupby('word') \
.agg(func.count('review_id') \
     .alias('word_count')) \
.orderBy('word_count', ascending=False)

The code above has all the unique words and their counts, but has not addressed whether a word is only unique to positive reviews, i.e. a word appears only in the positive df, but not the negative df, and vice versa. To do this, we can leverage **Spark's left anti-join.** A left anti-join will return rows that are only in the first dataframe, but not in the second dataframe.

In [34]:
pos_unique = pos_words_count \
.join(neg_words_count, on=['word'], how='left_anti') \
.orderBy('word_count', ascending=False) \
.show(50)


+--------------------+----------+
|                word|word_count|
+--------------------+----------+
|               eloff|       302|
|                jabz|       202|
|              fixler|       168|
|         ahhhhmazing|       167|
|              popbar|       146|
|           heartwood|       143|
|                emme|       137|
|              boothe|       134|
|               artur|       126|
|           delizioso|       124|
|                homa|       123|
|               safak|       119|
|                 f45|       117|
|              sidell|       114|
|        shutterbooth|       110|
|                meux|       107|
|y3fcl4bly0ellkb0s...|       106|
|               hubba|       105|
|            perfects|       104|
|              exquis|       101|
|                 imr|       100|
|           merveille|        98|
|             hobgood|        98|
|             koshari|        97|
|           ahhmazing|        95|
|               trego|        89|
|             

In [35]:
# Same with negative
neg_unique = neg_words_count \
.join(pos_words_count, on=['word'], how='left_anti') \
.orderBy('word_count', ascending=False) \
.show(50)

+---------------+----------+
|           word|word_count|
+---------------+----------+
|   ripoffreport|        72|
|consumeraffairs|        41|
|   unempathetic|        38|
|      discusted|        30|
|           fahw|        30|
|         voiers|        30|
|     unlawfully|        28|
|     nonpayment|        27|
|            ybt|        26|
|            amj|        26|
|         theifs|        26|
|            hra|        23|
|           azag|        23|
|           tkps|        23|
|        horable|        22|
|       comenity|        22|
|        frauded|        21|
|       cobraron|        20|
|        suppost|        20|
|          nedia|        20|
|       belitsky|        19|
|      fraudster|        19|
|   incompetance|        19|
|            1ns|        19|
|        searshc|        19|
|          fmcsa|        19|
|      repossess|        19|
|         rebill|        19|
|          emele|        18|
|       thummala|        18|
|          azdhs|        17|
|          usa

The result is astonishing. For example, check out the most frequent word in the positive review bucket: **"eloff"**. Does it ring a bell to you? Not so much. And yet for some reason, this word reigned champion. It is also interesting to note that because these reviews are written by humans, it is not so surprising to see words like **"ahhhhmazing"** (heck, I do that a lot myself). Same goes with the negative reviews' results, where we can see words like **"discusted"** coming up on top. I guess typos aren't so important when you are writing an angry review :)

This is an important consideration when constructing a library of words and their associated sentiments. Not all words are grammatically accurate, and yet contains important information about how a user feels about a business! 

