In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly as pl
%matplotlib inline

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Yelp Rating Analysis").config("spark.some.config.option", "some-value").getOrCreate()

In [3]:
spark

In [4]:
yelp_df = pd.read_csv('yelp_ratings.csv')
yelp_df.head()

Unnamed: 0,text,stars,sentiment
0,Total bill for this horrible service? Over $8G...,1.0,0
1,I *adore* Travis at the Hard Rock's new Kelly ...,5.0,1
2,I have to say that this office really has it t...,5.0,1
3,Went in for a lunch. Steak sandwich was delici...,5.0,1
4,Today was my second out of three sessions I ha...,1.0,0


In [5]:
yelp_spark_df = spark.createDataFrame(yelp_df)
yelp_spark_df.printSchema()
yelp_spark_df.show(10)

root
 |-- text: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- sentiment: long (nullable = true)

+--------------------+-----+---------+
|                text|stars|sentiment|
+--------------------+-----+---------+
|Total bill for th...|  1.0|        0|
|I *adore* Travis ...|  5.0|        1|
|I have to say tha...|  5.0|        1|
|Went in for a lun...|  5.0|        1|
|Today was my seco...|  1.0|        0|
|I'll be the first...|  4.0|        1|
|This place has go...|  1.0|        0|
|I was really look...|  2.0|        0|
|Like walking back...|  4.0|        1|
|Walked in around ...|  1.0|        0|
+--------------------+-----+---------+
only showing top 10 rows



In [6]:
# In out text, we can have a lot of punctuations, hashtags, hyperlinks etc
# These are irrelevant to our model and won't give us any information about the sentiment
# So, we need to remove all those things form the text and get only relevant information
# There are 2 ways to perform pre-processing
# 1. By creating a custom function
# 2. By using PySpark functions

# We will use PySpark inbuilt function to perform the analysis and create our custom methods as well
# Like this, we can reduce the number of errors and line of code

In [7]:
!pip install langdetect

[0m

In [8]:
# To check for whitespaces
def check_blanks(data_str):
    is_blank = str(data_str.isspace()) 
    return is_blank

In [9]:
from langdetect import detect

# To determine whether the laguage of the text is english or not
def detect_language(text):
    try:
        return detect(text)
    except:
        return None

In [10]:
import re
import string

# Removing hyperlinks, punctuations, number, mentions etc
def remove_features(data_str): # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?') 
    punc_re = re.compile('[%s]' % re.escape(string.punctuation)) 
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)

    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 3 characters 
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 2:
                cleaned_str = word 
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 2:
                cleaned_str = cleaned_str + ' ' + word 
            else:
                cleaned_str += ' '
        list_pos += 1
    return cleaned_str

In [11]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sahilchhillar/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [12]:
# Removing all the stop words
stops = set(stopwords.words("english")) 

def remove_stops(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stops:
            # rebuild cleaned_str 
            if list_pos == 0:
                cleaned_str = word 
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str  

In [13]:
nltk.download('averaged_perceptron_tagger')
from nltk import pos_tag

# Tagging the text
def tag_and_remove(data_str):
    cleaned_str = ' '
    # noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS'] 
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    # verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'] 
    nltk_tags = nn_tags + jj_tags + vb_tags

    # break string into 'words'
    text = data_str.split()
    # tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags: 
            cleaned_str += tagged_word[0] + ' '
    return cleaned_str

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/sahilchhillar/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [14]:
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/sahilchhillar/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [15]:
from nltk import WordNetLemmatizer

# Lemmatization of text
def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer() 
    text = data_str.split() 
    tagged_words = pos_tag(text) 
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0: 
            cleaned_str = lemma
        else:
                cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1 
    return cleaned_str

In [16]:
# We will create UDF functions which stands for User Defined Functions
# These are used to create reusable functions in Spark
# Once created these functions can be later used in multiple DataFrames and SQL
from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType

# We will register all our custom functions to these User Defined Functions for reusability
check_lang_udf = udf(detect_language, StringType())
remove_stops_udf = udf(remove_stops, StringType())
remove_features_udf = udf(remove_features, StringType())
tag_and_remove_udf = udf(tag_and_remove, StringType())
lemmatize_udf = udf(lemmatize, StringType())
check_blanks_udf = udf(check_blanks, StringType())

In [17]:
raw_cols = yelp_spark_df.columns

In [18]:
# Language Identification
lang_df = yelp_spark_df.withColumn("lang", check_lang_udf(yelp_spark_df["text"]))
lang_df = lang_df.filter(lang_df["lang"] == "en")
lang_df.show(5)

+--------------------+-----+---------+----+
|                text|stars|sentiment|lang|
+--------------------+-----+---------+----+
|Total bill for th...|  1.0|        0|  en|
|I *adore* Travis ...|  5.0|        1|  en|
|I have to say tha...|  5.0|        1|  en|
|Went in for a lun...|  5.0|        1|  en|
|Today was my seco...|  1.0|        0|  en|
+--------------------+-----+---------+----+
only showing top 5 rows



In [19]:
lang_df.count()

44310

In [20]:
# Removing all the stop words
rm_stops_df = lang_df.select(raw_cols).withColumn("stop_text", remove_stops_udf(lang_df["text"]))
rm_stops_df.show(5)

+--------------------+-----+---------+--------------------+
|                text|stars|sentiment|           stop_text|
+--------------------+-----+---------+--------------------+
|Total bill for th...|  1.0|        0|Total bill horrib...|
|I *adore* Travis ...|  5.0|        1|I *adore* Travis ...|
|I have to say tha...|  5.0|        1|I say office real...|
|Went in for a lun...|  5.0|        1|Went lunch. Steak...|
|Today was my seco...|  1.0|        0|Today second thre...|
+--------------------+-----+---------+--------------------+
only showing top 5 rows



In [21]:
# Removing irrelevant features
rm_features_df = rm_stops_df.select(raw_cols+["stop_text"])\
                            .withColumn("feat_text", \
                                remove_features_udf(rm_stops_df["stop_text"]))
rm_features_df.show(5)

+--------------------+-----+---------+--------------------+--------------------+
|                text|stars|sentiment|           stop_text|           feat_text|
+--------------------+-----+---------+--------------------+--------------------+
|Total bill for th...|  1.0|        0|Total bill horrib...|total bill horrib...|
|I *adore* Travis ...|  5.0|        1|I *adore* Travis ...|  adore travis ha...|
|I have to say tha...|  5.0|        1|I say office real...|  say office real...|
|Went in for a lun...|  5.0|        1|Went lunch. Steak...|went lunch steak ...|
|Today was my seco...|  1.0|        0|Today second thre...|today second thre...|
+--------------------+-----+---------+--------------------+--------------------+
only showing top 5 rows



In [22]:
# Tagging the words
tagged_df = rm_features_df.select(raw_cols+["feat_text"]) \
                          .withColumn("tagged_text", \
                            tag_and_remove_udf(rm_features_df.feat_text))
tagged_df.show(5)

+--------------------+-----+---------+--------------------+--------------------+
|                text|stars|sentiment|           feat_text|         tagged_text|
+--------------------+-----+---------+--------------------+--------------------+
|Total bill for th...|  1.0|        0|total bill horrib...| total bill horri...|
|I *adore* Travis ...|  5.0|        1|  adore travis ha...| travis hard rock...|
|I have to say tha...|  5.0|        1|  say office real...| say office organ...|
|Went in for a lun...|  5.0|        1|went lunch steak ...| went lunch steak...|
|Today was my seco...|  1.0|        0|today second thre...| today second ses...|
+--------------------+-----+---------+--------------------+--------------------+
only showing top 5 rows



In [23]:
# Lemmatizing words
lemm_df = tagged_df.select(raw_cols+["tagged_text"]) \
                   .withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"])) 
lemm_df.show(5)

+--------------------+-----+---------+--------------------+--------------------+
|                text|stars|sentiment|         tagged_text|           lemm_text|
+--------------------+-----+---------+--------------------+--------------------+
|Total bill for th...|  1.0|        0| total bill horri...|total bill horrib...|
|I *adore* Travis ...|  5.0|        1| travis hard rock...|travis hard rock ...|
|I have to say tha...|  5.0|        1| say office organ...|say office organi...|
|Went in for a lun...|  5.0|        1| went lunch steak...|go lunch steak sa...|
|Today was my seco...|  1.0|        0| today second ses...|today second sess...|
+--------------------+-----+---------+--------------------+--------------------+
only showing top 5 rows



In [26]:
data = lemm_df.select('lemm_text', 'sentiment')
training, testing = data.randomSplit([0.6, 0.4])

In [27]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer 
from pyspark.ml.feature import CountVectorizer

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF and ML model
tokenizer = Tokenizer(inputCol="lemm_text", outputCol="words")

In [28]:
# Using CountVectorizer first
vectorizer = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="cv_features") 
# idf = IDF(minDocFreq=3, inputCol="words", outputCol="features")

In [32]:
# Creating a Machine Learning model
# As this is a Binary Classification problem, we will use NaiveBayes model first
nb = NaiveBayes(featuresCol="cv_features", labelCol="sentiment")

In [33]:
# Creating pipeline
nb_pipe = Pipeline(stages=[tokenizer, vectorizer, nb])

In [34]:
nb_model = nb_pipe.fit(training)

In [35]:
prediction = nb_model.transform(testing)

In [37]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="sentiment", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
accuracy = evaluator.evaluate(prediction)
print("Accuracy: ", accuracy)

Accuracy:  0.6222714256346977


In [38]:
# Using Logistic regression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="cv_features", labelCol="sentiment")
lr_pipe = Pipeline(stages=[tokenizer, vectorizer, lr])

In [39]:
lr_model = lr_pipe.fit(training)
lr_pred = lr_model.transform(testing)

In [40]:
evaluator = BinaryClassificationEvaluator(labelCol="sentiment", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
accuracy = evaluator.evaluate(lr_pred)
print("Accuracy: ", accuracy)

Accuracy:  0.9402538281394938
