<img src="yelp_logo.png" width="400">

# Analyzing Yelp Reviews

### Import libraries

In [1]:
from pyspark.sql import SparkSession
import pyspark
from pyspark import sql
import re
import pandas as pd
import numpy as np

### Create Spark session

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Load Data

#### Dataset available at: https://www.kaggle.com/yelp-dataset/yelp-dataset

In [3]:
path = '/Users/ryanfrench/Dropbox/cloud_documents/college/9_fall_2019/IST_718/yelp_dataset/yelp_academic_dataset_review.json'
reviews_df = spark.read.json(path)
reviews_df.count()

6685900

# REMEMBER TO REMOVE THIS, THIS SAMPLES ONLY 1/1000 OF THE DATA!

In [4]:
reviews_df = reviews_df.sample(False, 0.001, seed = 123)

### Check schema

In [87]:
# Check schema
reviews_df.printSchema()

root
 |-- label: integer (nullable = false)
 |-- text: string (nullable = true)
 |-- f_text: string (nullable = true)



### Check number of NULL values per column

In [88]:
for col in reviews_df.columns:
    print(col, ':', reviews_df.filter(reviews_df[col].isNull()).count())

label : 0
text : 0
f_text : 0


### View data

In [89]:
reviews_df.show(5)
reviews_df.count()

+-----+--------------------+--------------------+
|label|                text|              f_text|
+-----+--------------------+--------------------+
|    1|I absolutely love...|i absolutely love...|
|    1|This is so much f...|this is so much f...|
|    1|The pollo mole is...|the pollo mole is...|
|    1|We recently hit u...|we recently hit u...|
|    1|Love this locatio...|love this locatio...|
+-----+--------------------+--------------------+
only showing top 5 rows



4531

## Let's Visualize our Data

In [90]:
plt_reviews_df = reviews_df.toPandas()

In [91]:
# Check schema
reviews_df.printSchema()

root
 |-- label: integer (nullable = false)
 |-- text: string (nullable = true)
 |-- f_text: string (nullable = true)



### Log Distribution of Users who Found a Review Funny

In [92]:
import plotly.express as px
import pyspark.sql.functions as F

fig = px.histogram(x=np.log(plt_reviews_df['funny']),
                   nbins=15)
fig = fig.update_layout(
        title_text='Log Distribution of Users who Found a Review Funny',
        xaxis_title_text='Number of Users',
        yaxis_title_text='Frequency',
        bargap=0.2,
        bargroupgap=0.1
        )

fig.show()

KeyError: 'funny'

#### The vast majority of reviews have not been marked as funny by any other users.

### Pie chart of stars per review

In [None]:
import plotly.graph_objects as go

labels = ['1 Stars', '2 Stars', '3 Stars', '4 Stars', '5 Stars']

fig = go.Figure(data=[go.Pie(labels=labels, values=plt_reviews_df.groupby(['stars']).size())],
                layout=go.Layout(title=go.layout.Title(text='Distribution of Reviews by Star Count')))
fig.show()

#### As evidenced above, we have quite a skew towards 5 star reviews in our data which we will need to keep in mind later.

### Distribution of Review Length

In [None]:
import plotly.express as px

fig = px.histogram(x=plt_reviews_df['text'].astype(str).map(len),
                   nbins=75)
fig = fig.update_layout(
        title_text='Distribution of Reviews by Text Length',
        xaxis_title_text='Length of Review',
        yaxis_title_text='Frequency',
        bargap=0.2,
        bargroupgap=0.1
        )

fig.show()

#### It appears that review length is unsurprisingly heavily right skewed with the majority of reviews coming in at under 500 characters.

### Log Distribution of Users who Found a Review Useful

In [None]:
import plotly.express as px

fig = px.histogram(x=np.log(plt_reviews_df['useful']),
                   nbins=15)
fig = fig.update_layout(
        title_text='Log Distribution of Users who Found a Review Useful',
        xaxis_title_text='Number of Users',
        yaxis_title_text='Frequency',
        bargap=0.2,
        bargroupgap=0.1
        )

fig.show()

#### The vast majority of reviews have not been marked as useful by any other users.

## Can we predict high star Yelp reviews based off of their text content?

### By creating a text transformation and prediction pipeline can we predict which Yelp reviews will have a high number of stars (4/5 or above)?

### Create dummy variable for high reviews (defined as a 4/5 or above)

#### By creating a dummy variable we can easily predict the binary outcomes of low vs high reviews.

In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import col, when

In [None]:
reviews_df = reviews_df.withColumn('label',when(col('stars') >= 4, 1).otherwise(0))
reviews_df = reviews_df.select('label', 'text')
reviews_df.groupBy('label').count().show()

#### As we can see from the output, we have an imbalance in regard to our data's proportions. In order to rectify this, we will be downsample our prediction data.

In [16]:
from sklearn.utils import resample

# Separate labels into dataframes
high_reviews_df = reviews_df.where('label=1')
low_reviews_df = reviews_df.where('label=0')

# Get ratio of low reviews to high reviews
ratio = low_reviews_df.count() / high_reviews_df.count()

# Downsample
ds_high_reviews_df = high_reviews_df.sample(False, ratio, seed=0)
 
# Recombine
reviews_df = ds_high_reviews_df.union(low_reviews_df)

### Check new score distributions

In [17]:
reviews_df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 2287|
|    0| 2244|
+-----+-----+



## Prepare the text data for analysis

### Remove special characters from reviews and create new column containing filtered text

In [18]:
from pyspark.sql.functions import regexp_replace

reviews_df = reviews_df.withColumn('f_text', regexp_replace(col('text'), '[.,/#!$%^&*;:{}=_`~()-]', ''))

In [19]:
reviews_df.select('f_text').show(1, truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|f_text                                                                                                                                                                                                                                                                              

### Remove numbers

In [20]:
from pyspark.sql.functions import regexp_replace

reviews_df = reviews_df.withColumn('f_text', regexp_replace(col('f_text'), '[\d-]', ''))

In [21]:
reviews_df.select('f_text').show(1, truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|f_text                                                                                                                                                                                                                                                                               

### Reduce instances of multiple whitespaces to just one

In [22]:
reviews_df = reviews_df.withColumn('f_text', regexp_replace(col('f_text'),"\\s+"," "))

In [23]:
reviews_df.select('f_text').show(1, truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|f_text                                                                                                                                                                                                                                                                                 

### Convert all words to lowercase for consistency

In [24]:
from pyspark.sql.functions import lower

reviews_df = reviews_df.withColumn('f_text', lower(col('f_text')))

In [25]:
reviews_df.select('f_text').show(1, truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|f_text                                                                                                                                                                                                                                                                                 

### Tokenize 'f_text' column

#### By tokenizing the 'f_text' column we can break out the long strings of our formatted review text into their individual words.

In [26]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer().setInputCol('f_text').setOutputCol('tokens')

#### Sample output of Tokenizer

In [27]:
tokenizer.transform(reviews_df).show()

+-----+--------------------+--------------------+--------------------+
|label|                text|              f_text|              tokens|
+-----+--------------------+--------------------+--------------------+
|    1|I absolutely love...|i absolutely love...|[i, absolutely, l...|
|    1|This is so much f...|this is so much f...|[this, is, so, mu...|
|    1|The pollo mole is...|the pollo mole is...|[the, pollo, mole...|
|    1|We recently hit u...|we recently hit u...|[we, recently, hi...|
|    1|Love this locatio...|love this locatio...|[love, this, loca...|
|    1|I've been coming ...|i've been coming ...|[i've, been, comi...|
|    1|The wife and I ha...|the wife and i ha...|[the, wife, and, ...|
|    1|Aly did a wonderf...|aly did a wonderf...|[aly, did, a, won...|
|    1|Not sure why this...|not sure why this...|[not, sure, why, ...|
|    1|Our Louie looks s...|our louie looks s...|[our, louie, look...|
|    1|Where to start, n...|where to start no...|[where, to, start...|
|    1

### Get NTLK stop words from GitHub

#### Stop words are words which are generally considered to offer little insight in linguistic processing due to the fact that they are utilized so frequently. We will be utilizing the list contained in the Natural Language Tool Kit (NLTK) to remove these words in this case.

In [28]:
import requests

try:
    stop_words = requests.get('https://gist.github.com/sebleier/554280').text.split()
except Exception as e:
    print(e)

### Remove stop words

In [29]:
from pyspark.ml.feature import StopWordsRemover

sw_filter = StopWordsRemover(). \
            setStopWords(stop_words). \
            setCaseSensitive(False). \
            setInputCol("tokens"). \
            setOutputCol("filtered_tokens")

# ADJUST THE MINDF PARAM WITH FULL REVIEWS TAKEN INTO ACCOUNT

### Create CountVectorizer Estimator

#### The CountVectorizer counts the number of times that a token appears in each review and then saves this list of numbers as a vector for each review.

In [30]:
from pyspark.ml.feature import CountVectorizer

# we will remove words that appear in 50 docs or less
cv = CountVectorizer(minTF=1., minDF=6, vocabSize=2**17). \
     setInputCol("filtered_tokens"). \
     setOutputCol("tf")

### Create initial text processing pipeline

In [31]:
from pyspark.ml import Pipeline

cv_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(reviews_df)

#### Sample output of initial text processing pipeline

In [32]:
cv_pipeline.transform(reviews_df).show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              f_text|              tokens|     filtered_tokens|                  tf|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|    1|I absolutely love...|i absolutely love...|[i, absolutely, l...|[absolutely, chat...|(3991,[0,4,5,6,8,...|
|    1|This is so much f...|this is so much f...|[this, is, so, mu...|[fun, drove, scoo...|(3991,[6,7,16,49,...|
|    1|The pollo mole is...|the pollo mole is...|[the, pollo, mole...|[pollo, mole, wor...|(3991,[3,15,19,67...|
|    1|We recently hit u...|we recently hit u...|[we, recently, hi...|[recently, hit, c...|(3991,[0,1,3,8,10...|
|    1|Love this locatio...|love this locatio...|[love, this, loca...|[location, great,...|(3991,[4,9,17,31,...|
+-----+--------------------+--------------------+--------------------+--------------------+-----

### Create Inverse Document Frequency (IDF) stage for the pipeline

#### This will lower the realtive value of tokens which appear frequently in documents allowing those that are more specialized to stand out more.

In [33]:
from pyspark.ml.feature import IDF

idf = IDF(). \
      setInputCol('tf'). \
      setOutputCol('tfidf')

### Create IDF text processing pipeline

In [34]:
idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(reviews_df)

#### Sample output of IDF text processing pipeline

In [35]:
idf_pipeline.transform(reviews_df).show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              f_text|              tokens|     filtered_tokens|                  tf|               tfidf|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    1|I absolutely love...|i absolutely love...|[i, absolutely, l...|[absolutely, chat...|(3991,[0,4,5,6,8,...|(3991,[0,4,5,6,8,...|
|    1|This is so much f...|this is so much f...|[this, is, so, mu...|[fun, drove, scoo...|(3991,[6,7,16,49,...|(3991,[6,7,16,49,...|
|    1|The pollo mole is...|the pollo mole is...|[the, pollo, mole...|[pollo, mole, wor...|(3991,[3,15,19,67...|(3991,[3,15,19,67...|
|    1|We recently hit u...|we recently hit u...|[we, recently, hi...|[recently, hit, c...|(3991,[0,1,3,8,10...|(3991,[0,1,3,8,10...|
|    1|Love this locatio...|love this locatio...|[love, this, 

### Create Text Frequency Inverse Document Frequency (TFIDF) Reviews Dataframe

#### By passing the reviews data frame to the IDF pipeline we can create a TFIDF dataframe which we will then use to predict the review scores.

In [36]:
tfidf_df = idf_pipeline.transform(reviews_df)
pd_tfidf_df = tfidf_df.toPandas()

In [37]:
tfidf_df.columns

['label', 'text', 'f_text', 'tokens', 'filtered_tokens', 'tf', 'tfidf']

In [38]:
lr_df = pd_tfidf_df[['label', 'tfidf']]
lr_df.head()

Unnamed: 0,label,tfidf
0,1,"(1.1204735206397498, 0.0, 0.0, 0.0, 1.31477452..."
1,1,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.3627433380474..."
2,1,"(0.0, 0.0, 0.0, 1.182579279393553, 0.0, 0.0, 0..."
3,1,"(2.2409470412794996, 1.1090371363231102, 0.0, ..."
4,1,"(0.0, 0.0, 0.0, 0.0, 1.3147745291603699, 0.0, ..."


### Create prediction pipeline for estimating high star reviews

### Split data into training, validation, and testing groups

In [39]:
train_df, validation_df, test_df = reviews_df.randomSplit([0.6, 0.3, 0.1], seed=0)

#### Number of training records

In [40]:
train_df.count()

2727

#### Number of validation records

In [41]:
validation_df.count()

1362

#### Number of testing records

In [42]:
test_df.count()

442

### Build logistic regression model for later predictions

In [43]:
from pyspark.ml.classification import LogisticRegression

In [44]:
en_lr = LogisticRegression(). \
        setLabelCol('label'). \
        setFeaturesCol('tfidf'). \
        setRegParam(0.0). \
        setMaxIter(100). \
        setElasticNetParam(0.)

### Create new pipeline for prediction

In [45]:
en_lr_estimator = Pipeline(stages=[tokenizer, 
                                   sw_filter, 
                                   cv, 
                                   idf, 
                                   en_lr])

### Build grid search estimator pipeline

#### This will allow us to iteratively search for the optimal $\lambda$ and $\alpha$ parameters to achieve the highest accuracy.

In [46]:
from pyspark.ml.tuning import ParamGridBuilder

In [47]:
en_lr_estimator.getStages()

[Tokenizer_f6577143fa56,
 StopWordsRemover_182ef4d4ec1f,
 CountVectorizer_4ded9736a8aa,
 IDF_23200a16d74c,
 LogisticRegression_dafa80c14f20]

Lets create a pipeline transformation by chaining the `idf_pipeline` with the logistic regression step (`lr`)

In [48]:
grid = ParamGridBuilder(). \
       addGrid(en_lr.regParam, [0., 0.01, 0.02]). \
       addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]). \
       build()

#### View grid parameters

In [49]:
grid

[{Param(parent='LogisticRegression_dafa80c14f20', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
  Param(parent='LogisticRegression_dafa80c14f20', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0},
 {Param(parent='LogisticRegression_dafa80c14f20', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
  Param(parent='LogisticRegression_dafa80c14f20', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2},
 {Param(parent='LogisticRegression_dafa80c14f20', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
  Param(parent='LogisticRegression_dafa80c14f20', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.4},
 {Pa

In [50]:
all_models = []

for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = en_lr_estimator.fit(train_df, grid[j])
    all_models.append(model)

Fitting model 1
Fitting model 2
Fitting model 3
Fitting model 4
Fitting model 5
Fitting model 6
Fitting model 7
Fitting model 8
Fitting model 9


In [51]:
import pyspark.sql.functions as fn

accuracies = [m. \
              transform(validation_df). \
              select(fn.avg(fn.expr('float(label = prediction)')).alias('accuracy')). \
              first(). \
              accuracy for m in all_models]

In [52]:
accuracies

[0.7929515418502202,
 0.7929515418502202,
 0.7929515418502202,
 0.8179148311306902,
 0.8252569750367107,
 0.8142437591776799,
 0.8274596182085169,
 0.8274596182085169,
 0.8069016152716594]

In [53]:
import numpy as np

In [54]:
best_model_idx = np.argmax(accuracies)
print("best model index =", best_model_idx)

best model index = 6


#### View the grid parameters of the best model

In [55]:
grid[best_model_idx]

{Param(parent='LogisticRegression_dafa80c14f20', name='regParam', doc='regularization parameter (>= 0).'): 0.02,
 Param(parent='LogisticRegression_dafa80c14f20', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}

In [56]:
best_model = all_models[best_model_idx]

In [57]:
accuracies[best_model_idx]

0.8274596182085169

In [58]:
best_model. \
    transform(test_df). \
    select(fn.avg(fn.expr('float(label = prediction)')).alias('accuracy')). \
    show()

+------------------+
|          accuracy|
+------------------+
|0.8212669683257918|
+------------------+



### Create pipeline utilizing the best model

In [59]:
en_lr_pipeline = Pipeline(stages=[idf_pipeline, en_lr]).fit(train_df)

### Estimate model accuracy

In [60]:
en_lr_pipeline.transform(validation_df). \
            select(fn.expr('float(prediction = label)').alias('correct')). \
            select(fn.avg('correct')).show()

+------------------+
|      avg(correct)|
+------------------+
|0.7790014684287812|
+------------------+



### Get most significant words in the prediction of both high and low reviews

In [61]:
en_weights = en_lr_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': en_lr_pipeline.stages[0].stages[0].stages[2].vocabulary,
                             'weight': en_weights})

### View most significant words indicating high reviews

In [62]:
en_coeffs_df.sort_values('weight', ascending=False).head(15)

Unnamed: 0,word,weight
3915,ça,15.486939
3565,yonge,11.661323
2504,unbelievable,10.518981
3814,land,9.979885
4,great,9.439205
1619,josh,9.298235
3487,mocha,9.174781
3919,mon,8.65118
1481,snack,8.649628
2011,relax,8.574402


### View most significant words indicating low reviews

In [63]:
en_coeffs_df.sort_values('weight').head(15)

Unnamed: 0,word,weight
3309,scoop,-13.166549
3205,attended,-10.874645
2405,dans,-10.353636
2866,tv's,-9.394144
426,overpriced,-9.122672
3696,passable,-9.056737
3546,gourmet,-8.966345
3778,quinoa,-8.867422
2758,subs,-8.455331
3774,aok,-8.220361


## Can we improve this accuracy utilizing a Random Forest model?

#### Build Random Forest object

In [64]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'tfidf')

### Create estimator utilizing the same preprocessing methods as the Logistic Regression model

In [65]:
rf_estimator = Pipeline(stages=[tokenizer, 
                                sw_filter, 
                                cv, 
                                idf,
                                rf])

### Create pipeline with the estimator

In [66]:
rf_pipeline = rf_estimator.fit(train_df)

### Calculate estimated accuracy 

In [67]:
rf_pipeline.transform(validation_df). \
            select(fn.expr('float(prediction = label)').alias('correct')). \
            select(fn.avg('correct')).show()

+------------------+
|      avg(correct)|
+------------------+
|0.7195301027900147|
+------------------+



### Get most significant words in the prediction of both high and low reviews

In [68]:
rf_model = rf_pipeline.stages[-1]

In [69]:
pd.DataFrame(list(zip(reviews_df.columns[2:], rf_model.featureImportances.toArray())),
            columns = ['column', 'weight']).sort_values('weight')

Unnamed: 0,column,weight
0,f_text,0.001373


### View most significant words indicating high reviews

In [70]:
en_coeffs_df.sort_values('weight', ascending=False).head(15)

Unnamed: 0,word,weight
3915,ça,15.486939
3565,yonge,11.661323
2504,unbelievable,10.518981
3814,land,9.979885
4,great,9.439205
1619,josh,9.298235
3487,mocha,9.174781
3919,mon,8.65118
1481,snack,8.649628
2011,relax,8.574402


### View most significant words indicating low reviews

In [71]:
en_coeffs_df.sort_values('weight').head(15)

Unnamed: 0,word,weight
3309,scoop,-13.166549
3205,attended,-10.874645
2405,dans,-10.353636
2866,tv's,-9.394144
426,overpriced,-9.122672
3696,passable,-9.056737
3546,gourmet,-8.966345
3778,quinoa,-8.867422
2758,subs,-8.455331
3774,aok,-8.220361


### Evaluate performance utilizing a binary classification evaluator

#### Create binary classification evaluator object

In [72]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

bce = BinaryClassificationEvaluator()

### Evaluate Logistic Regression model with Remote Operating Characteristic

In [73]:
bce.evaluate(en_lr_pipeline.transform(validation_df))

0.849293705795948

### Evaluate Random Forest model with Remote Operating Characteristic

In [74]:
bce.evaluate(rf_pipeline.transform(validation_df))

0.81806534272902