In [None]:
# Creating Spark Session object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("yelp_analysis").getOrCreate()

In [2]:
# Loading the data from amazon s3 bucket
yelp_review = spark.read.csv('s3://yelpdatanalysis/yelp_reviews.csv',header=True,inferSchema=True)

### Data Cleaning

In [4]:
# Clean and Prepare the Data

df_reviews = yelp_review.select('stars','text')

filterd_df_1 = df_reviews.na.drop(subset=['stars','text'])

filter_df_2 = filterd_df_1.select(filterd_df_1.stars.cast('float'),filterd_df_1.text.cast('string')).na.drop(subset=['stars','text'])
filter_df_2.show()

+--------------------+--------------------+
|               stars|                text|
+--------------------+--------------------+
|                   1|Total bill for th...|
|                   5|I *adore* Travis ...|
|                null|                null|
|                null|                null|
|                null|                null|
|                   5|I have to say tha...|
|                   5|Went in for a lun...|
|                null|                null|
|                null|                null|
|                null|                null|
|                   1|"Today was my sec...|
|                   4|"I'll be the firs...|
|                null|                null|
|                null|                null|
|                null|                null|
|                null|                null|
|                null|                null|
|                   3|Tracy dessert had...|
|                null|                null|
| more than $5 is ...| it is Tra

In [8]:
# Removing the stop words and cleaning reviews
import string
import re

def remove_punct(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", text)  
    return nopunct

# giving a class label to the rating

def convert_rating(rating):
    if rating >=4:
        return 1
    else:
        return 0

In [12]:
from pyspark.sql.functions import udf
punct_remover = udf(lambda x: remove_punct(x))
rating_convert = udf(lambda x: convert_rating(x))

resultDF = filter_df_2.select(punct_remover('text'), rating_convert('stars'))

#user defined functions change column names so we rename the columns back to its original names
resultDF = resultDF.withColumnRenamed('<lambda>(text)', 'text')
resultDF = resultDF.withColumnRenamed('<lambda>(stars)', 'stars')

resultDF.show()

+--------------------+-----+
|                text|stars|
+--------------------+-----+
|Total bill for th...|    0|
|I  adore  Travis ...|    1|
|I have to say tha...|    1|
|Went in for a lun...|    1|
| Today was my sec...|    0|
| I ll be the firs...|    1|
|Tracy dessert had...|    0|
|This place has go...|    0|
| I was really loo...|    0|
|It s a giant Best...|    0|
|Like walking back...|    1|
|Walked in around ...|    0|
|Wow  So surprised...|    1|
|Michael from Red ...|    1|
|I cannot believe ...|    0|
|You can t really ...|    1|
|Great lunch today...|    1|
| I love chinese f...|    0|
|We ve been a huge...|    1|
|Good selection of...|    0|
+--------------------+-----+
only showing top 20 rows



### Feature Engineering

In [13]:
# ** Create a new length feature: **
from pyspark.sql.functions import length
resultDF.withColumn('length',length(resultDF['text'])).groupBy('stars').mean().show()
resultDF = resultDF.withColumnRenamed('stars','label')

# There isn't much Difference, hence it cannot be is used as attribute


+-----+------------------+
|stars|       avg(length)|
+-----+------------------+
|    0| 358.4106661671254|
|    1|295.06875408579606|
+-----+------------------+



### Feature Transformation

In [14]:
# Feature Transformations

from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")


In [15]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

clean_up = VectorAssembler(inputCols=['tf_idf'],outputCol='features')

In [17]:
from pyspark.ml.classification import NaiveBayes
# We will be using simple naive bayes model
nb = NaiveBayes()

In [18]:
from pyspark.ml import Pipeline

data_prep_pipe = Pipeline(stages=[tokenizer,stopremove,count_vec,idf,clean_up])

cleaner = data_prep_pipe.fit(resultDF)

clean_data = cleaner.transform(resultDF)

clean_data = clean_data.select(['label','features'])

clean_data.show()

### Model Validation

In [34]:
#Splitting the data

(training,testing) = clean_data.randomSplit([0.7,0.3])

training.printSchema()

training = training.select('features',training.label.cast('integer'))
testing = testing.select('features',testing.label.cast('integer'))

### Training the Model

In [37]:
# training the model
spam_predictor = nb.fit(training)

test_results = spam_predictor.transform(testing)

test_results.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|       (80152,[],[])|    0|[-1.0715920695799...|[0.34246285847122...|       1.0|
|(80152,[0,1,2,3,4...|    0|[-4374.7909341232...|[0.99999999999999...|       0.0|
|(80152,[0,1,2,3,4...|    0|[-4274.0765000638...|[4.80832995934765...|       1.0|
|(80152,[0,1,2,3,4...|    0|[-1797.2194013785...|[0.99999999999999...|       0.0|
|(80152,[0,1,2,3,4...|    0|[-1640.9987822162...|[1.0,5.0376493933...|       0.0|
|(80152,[0,1,2,3,4...|    0|[-2288.2264636044...|[1.87425164908585...|       1.0|
|(80152,[0,1,2,3,4...|    0|[-761.51374012714...|[1.0,7.6121040044...|       0.0|
|(80152,[0,1,2,3,4...|    0|[-687.69770395469...|[0.00302243577364...|       1.0|
|(80152,[0,1,2,3,4...|    0|[-5496.4596506964...|[1.0,7.5411797821...|       0.0|
|(80152,[0,1,2,3

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Model Evaluation 

In [39]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting positive or negative  was: {}".format(acc))
#Not bad considering we're using straight math on text data! 
# We can Try switching out with multiple classification models! 
# Or even try to come up with other engineered features!

Accuracy of model at predicting spam was: 0.8169506197443153
