In [1]:
# import libraries and initiate spark context
from pyspark import SparkContext 
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import re

sc = spark.sparkContext 
sqlContext = SQLContext(sc)

## Preprocessing Steps

In [2]:
# construct schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("ProductId", StringType(), True),
    StructField("UserId", StringType(), True),
    StructField("ProfileName", StringType(), True),
    StructField("Numerator", IntegerType(), True),
    StructField("Denominator", IntegerType(), True),
    StructField("Score", IntegerType(), True),
    StructField("Time", StringType(), True),
    StructField("Summary", StringType(), True),
    StructField("Text", StringType(), True)])

# load data into a pyspark DataFrame, observing schema
df = spark.read.csv("hdfs://saltdean/data/reviews/Reviews.csv",header= True,schema=schema)

In [3]:
# remove punctuation 
import re
def remove_punct(text):
    no_punct = re.sub(r"[^\w\s]","",text)
    return no_punct

# construct User Defined Function (UDF) to apply remove_punct function
from pyspark.sql.functions import udf
punct_remover = udf(lambda x: remove_punct(x))
df = df.select(punct_remover('text'), 'Score')
df = df.withColumnRenamed('<lambda>(text)', 'Text')
df = df.withColumnRenamed('Score', 'label') # <<< this is to prevent 'Error: "label" does not exist' 
df.show(3)

+--------------------+-----+
|                Text|label|
+--------------------+-----+
|I have bought sev...|    5|
|Product arrived l...|    1|
|This is a confect...|    4|
+--------------------+-----+
only showing top 3 rows



In [4]:
# remove instances with Scores that are not integers, and are displayed as 'null'
validScore = df.where(df["label"].isNotNull())
invalidScore = df.where(df["label"].isNull()) 

# produce a new valid DataFrame using only the Text and Score columns
valid = validScore.select("Text","label") # you can also switch to 'invalid' to see errors
valid.describe().show() 

+-------+--------------------+------------------+
|summary|                Text|             label|
+-------+--------------------+------------------+
|  count|              568162|            568162|
|   mean|1.2798334743042016E9| 4.176305349530591|
| stddev| 7.996328434963414E7|1.3838779707236695|
|    min|    Item arrived ...|                 0|
|    max|zzzzzzz I had hig...|                69|
+-------+--------------------+------------------+



In [18]:
# note that the max label is 69, much higher than the specified max (5) 
# as well the min is equal to 0, lower than the specified min (1)
# if these are not too many we want to remove them

# count the number of labels greater than 5
over5 = valid.filter("label >= 6").count()
print("Number of scores over 5:", over5)

# count the number of labels greater than 5
less1 = valid.filter("label == 0").count()
print("Number of scores less than 1:", less1)

less1_df = valid.filter("label == 0")

# count the total number of labels
total = valid.filter("label >= 0").count()
print("Total number of instances:", total)

# compute the proportion of over5 labels to total labels
over5prop = (over5/total)*100
print("Proportion of reviews with scores over 5:", over5prop)

less1prop = (less1/total)*100
print("Proportion of reviews with scores less than 1:", less1prop)

display(less1_df)

Number of scores over 5: 248
Number of scores less than 1: 1111
Total number of instances: 568162
Proportion of reviews with scores over 5: 0.04364952249534464
Proportion of reviews with scores less than 1: 0.1955428205335802


DataFrame[Text: string, label: int]

In [17]:
# since the proportion of labels less than 1 and over 5 is only 0.19 and 0.43%, respectively. we'll exclude them by
# filtering only for labels 1-5.
valid5 = valid.filter(valid["label"].between(1,5))
valid5.describe().show()

+-------+--------------------+-----------------+
|summary|                Text|            label|
+-------+--------------------+-----------------+
|  count|              566803|           566803|
|   mean|1.2641655634102564E9|4.179051628167105|
| stddev| 9.690087336154778E7|1.314378260670736|
|    min|    Item arrived ...|                1|
|    max|zzzzzzz I had hig...|                5|
+-------+--------------------+-----------------+



In [8]:
# check for class imbalance

# we can group the reviews by score by counting them within the DataFrame
# it appears that there are many more '5' scores than any other score
print("Number of less than 1:",valid5.filter("label ==0").count())
print("Number of 1 scores:",valid5.filter("label ==1").count())
print("Number of 2 scores:",valid5.filter("label ==2").count())
print("Number of 3 scores:",valid5.filter("label ==3").count())
print("Number of 4 scores:",valid5.filter("label ==4").count())
print("Number of 5 scores:",valid5.filter("label ==5").count())
print("Number of over 5 :",valid5.filter("label > 5").count())

Number of less than 1: 0
Number of 1 scores: 52635
Number of 2 scores: 29877
Number of 3 scores: 42502
Number of 4 scores: 80141
Number of 5 scores: 361648
Number of over 5 : 0


In [9]:
# it's better to visualize this using Pixiedust
import pixiedust
display(valid5)

## EXPERIMENT 1: Naive Bayes

In [19]:
# EXPERIMENT 1: Naive Bayes

# Step 1: construct pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram, HashingTF, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# construct estimators
tokenizer = Tokenizer(inputCol="Text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
ngrammer = NGram(n=2, inputCol="filtered", outputCol="ngrams") # << change n to {1,2,3}
hashingTF = HashingTF(inputCol = "ngrams", outputCol = "rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
nb = NaiveBayes()

# build the pipeline
pipeline = Pipeline(stages=[tokenizer,remover, ngrammer, hashingTF, idf, nb])

In [None]:
# In Step 1, we tokenize the text and remove stop words, to further reduce 
# the noise. We convert the tokens ngrams and experiment with
# different lengths of n. We believe that a larger n will improve the performance,
# since sentiment is captured in conditionally dependent sequences of text.

In [20]:
# Step 2: split the data into 80% training and 20% test set
train,test = valid5.randomSplit([0.8,0.2], seed=100)
# cache the training and test sets so that they don't have to be re-split each time
train.cache()
test.cache()

DataFrame[Text: string, label: int]

In [21]:
# Step 3: construct evaluator, selecting accuracy as the performance metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")

# at this initial stage, we select accuracy, although we may look at precision and recall later

In [22]:
# Step 4: introduce a Parameter Grid to iterate through different parameter values
# including features for preprocessing (estimators) and Machine Learning algorithm

paramGrid = ParamGridBuilder() \
    .addGrid(idf.minDocFreq,[10,100,1000]) \
    .addGrid(hashingTF.numFeatures, [1000,10000,100000]) \
    .addGrid(nb.smoothing,[0.1,0.5,1.0])\
    .build()

In [None]:
# Step 4: Parameter selection
# Parameter 1 [minDocFreq]: The fewer documents in which the term is found, the more relevant it is to 
# that document. We initialize with a range starting from 10, increasing 
# exponentially to 1000. 
# Parameter 2 [hashingTF]: this establishes a fixed feature vector spaced from which the term
# frequency count can be computed and standardized. Each ngram is a feature so
# the data runs into high dimensionality quickly. We initilize with a range
# starting from 1000, increasing exponentially to 100000. 
# Parameter 3 [nb smoothing]: this prevents the algorithm from having a prior of zero when it encounters a new label. 
# The parameter is set by default to 1.0, but we will use employ a range from 0.1, 0.5 to 1.0.

In [23]:
# Step 5: construct  model to fit to data
from pyspark.ml.tuning import TrainValidationSplit
# We split the Training data by 80:20 to produce a Validation set
tvs = TrainValidationSplit(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator,trainRatio=0.8)

In [24]:
# Step 6: fit the model
%time tvsModel = tvs.fit(train)

CPU times: user 5.55 s, sys: 1.57 s, total: 7.12 s
Wall time: 18min 11s


In [25]:
# Step 7: compute predictions on the training and test sets
TvsTrain_predictions = tvsModel.transform(train)
Tvstest_predictions = tvsModel.transform(test)

# Step 8: visually inspect a sample of predicted labels along with true labels
Tvstest_predictions.select("prediction","label").show(5)

# Step 9: compute the accuracy for the training and test sets as well as time taken
%time print ("Test accuracy for Naive Bayes model:",evaluator.evaluate(Tvstest_predictions))

+----------+-----+
|prediction|label|
+----------+-----+
|       4.0|    1|
|       3.0|    5|
|       4.0|    4|
|       4.0|    5|
|       4.0|    5|
+----------+-----+
only showing top 5 rows

Test accuracy for Naive Bayes model: 0.10868035396984571
CPU times: user 48 ms, sys: 16 ms, total: 64 ms
Wall time: 18.5 s


In [26]:
# we can also display the predictions vs the ground truth labels at this point
display(Tvstest_predictions.select("label", "prediction", "probability"))

DataFrame[label: int, prediction: double, probability: vector]

In [27]:
# Step 10: create (prediction, label) pairs to compute additional metrics
Pre_predict = Tvstest_predictions.select("label", "prediction")
Pre_predict = Pre_predict.selectExpr("cast(label as float) as label", "cast(prediction as float) as prediction")
Pre_predict.printSchema
predictionRDD = Pre_predict.select("prediction", "label").rdd
type(predictionRDD)

pyspark.rdd.RDD

In [28]:
# Step 11: compute additional metrics, Precision and Recall, to compare with Accuracy
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictionRDD)

# Summary stats
print("Recall = %s" % metrics.recall())
print("Precision = %s" % metrics.precision())
print("Accuracy = %s" % metrics.accuracy)


Recall = 0.10868035396984571
Precision = 0.10868035396984571
Accuracy = 0.10868035396984571


In [65]:
# Step 12: to improve the model, we can search for the parameter map that produced
# the best results in our validation set.

def bestValidationParamters(vaidatedModel,parameterGrid):
    """ Find the paramter map that produced the best outcome in our validation set 
        Positional arguments:
        ~ validatedModel: the model returned by tvs.fit()
        ~ parameterGrid: the parameterGrid used in the fitting
    """
    # link the results to the parameter maps in the grid
    metricParamPairs = zip(vaidatedModel.validationMetrics,parameterGrid)
    # higher values are better in this case
    bestMetric = 0 # we initialize with 0 as the minimum
    # iterate through all tested parameter maps
    for metric,params in metricParamPairs:
        if metric > bestMetric: # if the metric is better than current best
            bestParams = params # then keep the corresponding parameter map 
    return bestParams # and return the final best paramters

bestValidationParamters(tvsModel,paramGrid)

{Param(parent='IDF_45838e19394bac471435', name='minDocFreq', doc='minimum number of documents in which a term should appear for filtering'): 1000,
 Param(parent='HashingTF_464f9812a7a87b010804', name='numFeatures', doc='number of features.'): 10000}

##  EXPERIMENT 2: logistic regression

In [29]:
# EXPERIMENT 2: logistic regression
from pyspark.ml.classification import LogisticRegression
# Step 1: construct pipeline
tokenizer = Tokenizer(inputCol="Text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
ngrammer = NGram(n=2, inputCol="filtered", outputCol="ngrams") # use N=2 first
hashingTF = HashingTF(inputCol = "ngrams", outputCol = "rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(maxIter=10)

lrPipeline = Pipeline(stages=[tokenizer,remover, ngrammer, hashingTF, idf, lr])

# Step 2: we have already split the data 
# Step 3: we have already constructed out evaluator

# Step 4: introduce a Parameter Grid to iterate through
LRGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.1, 0.3, 0.5]) \
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) \
             .build()
            
# Step 5: construct model to fit to the data            
lrtvs = TrainValidationSplit(estimator = lrPipeline,
                           estimatorParamMaps = LRGrid,
                           evaluator = evaluator,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [30]:
# Step 6: fit the model
%time LrModel = lrtvs.fit(train)

CPU times: user 3.34 s, sys: 1.06 s, total: 4.4 s
Wall time: 11min 49s


In [31]:
# Step 7: compute predictions on the training and test sets
LrTrain_predictions = LrModel.transform(train)
LrTest_predictions = LrModel.transform(test)

# Step 8: visually inspect a sample of predicted labels along with true labels
LrTest_predictions.select("prediction","label").show(5)

# Step 9: compute the accuracy for the training and test sets as well as time
%time print ("LR: Accuracy - testing:",evaluator.evaluate(LrTest_predictions))

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|    1|
|       5.0|    5|
|       5.0|    4|
|       5.0|    5|
|       5.0|    5|
+----------+-----+
only showing top 5 rows

LR: Accuracy - testing: 0.783712461075632
CPU times: user 44 ms, sys: 12 ms, total: 56 ms
Wall time: 11.6 s


In [35]:
# Step 10: create (prediction, label) pairs to compute additional metrics
Lr_predict = LrTest_predictions.select("label", "prediction")
Lr_predict = Pre_predict.selectExpr("cast(label as float) as label", "cast(prediction as float) as prediction")
Lr_predict.printSchema
lrPredictionRDD = Lr_predict.select("prediction", "label").rdd
type(lrPredictionRDD)

# Step 11: compute Precision and Recall, to compare with Accuracy
lrmetrics = MulticlassMetrics(lrPredictionRDD)

# Summary stats
print("Recall = %s" % lrmetrics.recall())
print("Precision = %s" % lrmetrics.precision())
print("Accuracy = %s" % lrmetrics.accuracy)

Recall = 0.10868035396984571
Precision = 0.10868035396984571
Accuracy = 0.10868035396984571


In [81]:
# Step 12: to improve the model, we can search for the parameter map that produced
# the best results in our validation set.

bestValidationParamters(LrModel,LRGrid)

{Param(parent='LogisticRegression_4353934741d75dc232c7', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.2,
 Param(parent='LogisticRegression_4353934741d75dc232c7', name='regParam', doc='regularization parameter (>= 0).'): 0.5}

## Find the top words, 2-ngrams and  3-ngrams

In [87]:
# tokenize and remove stop words with ngram
# tokenize
tok = Tokenizer(inputCol="Text", outputCol="words")
review_tokenized = tok.transform(valid5)

# remove stop words
stopword_rm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
review_tokenized = stopword_rm.transform(review_tokenized)

# add ngram column

ngram = NGram(inputCol = 'words_nsw', outputCol = 'ngram', n = 3)
add_ngram = ngram.transform(review_tokenized)

# generate the top frequent ngram / filter only > 20
ngrams = add_ngram.rdd.flatMap(lambda x: x[-1]).filter(lambda x: len(x.split())==n)
ngram_tally = ngrams.map(lambda x: (x, 1))\
                      .reduceByKey(lambda x,y: x+y)\
                      .sortBy(lambda x: x[1], ascending=False)\
                      .filter(lambda x: x[1]>=20)
ngram_list = ngram_tally.map(lambda x: x[0]).collect()

In [95]:
ngram_tally.take(5)

[('local grocery store', 2791),
 ('health food store', 1714),
 ('highly recommend product', 1709),
 ('cant go wrong', 1428),
 ('high fructose corn', 1409)]

In [89]:
# add ngram column

ngram1 = NGram(inputCol = 'words_nsw', outputCol = 'ngram', n = 2)
add_ngram1 = ngram.transform(review_tokenized)

# generate the top frequent ngram / filter only > 20
ngrams1 = add_ngram1.rdd.flatMap(lambda x: x[-1]).filter(lambda x: len(x.split())==n)
ngram_tally1 = ngrams1.map(lambda x: (x, 1))\
                      .reduceByKey(lambda x,y: x+y)\
                      .sortBy(lambda x: x[1], ascending=False)\
                      .filter(lambda x: x[1]>=20)
ngram_list1 = ngram_tally1.map(lambda x: x[0]).collect()

In [92]:
ngram_tally1.take(5)

[(' highly recommend', 4407),
 (' dont know', 1946),
 (' ive tried', 1776),
 ('subscribe  save', 1527),
 ('cup coffee ', 1331)]

In [94]:
display(ngram_list1)

[' highly recommend',
 ' dont know',
 ' ive tried',
 'subscribe  save',
 'cup coffee ',
 'grocery store ',
 ' great product',
 'give try ',
 ' im sure',
 ' highly recommended',
 ' tastes like',
 'dog food ',
 'really good ',
 'great product ',
 'peanut butter ',
 'tastes great ',
 'taste good ',
 'taste great ',
 'long time ',
 'love product ',
 ' really like',
 ' much better',
 ' dont think',
 'every day ',
 ' dont like',
 'ever tasted ',
 ' also like',
 ' im glad',
 'green tea ',
 ' tastes great',
 'great price ',
 ' taste like',
 'pretty good ',
 ' taste great',
 'years ago ',
 ' hard find',
 'much better ',
 'love tea ',
 'ive tried ',
 ' thanks amazon',
 'cat food ',
 'love stuff ',
 ' first time',
 'gluten free ',
 ' cant wait',
 ' definitely recommend',
 ' great flavor',
 'free shipping ',
 ' even though',
 ' really good',
 'ive ever ',
 ' great price',
 ' kids love',
 ' definitely buy',
 ' im going',
 'tastes good ',
 ' make sure',
 ' havent tried',
 ' taste good',
 'coffee mak

In [98]:
# top words

ngram = NGram(inputCol = 'words_nsw', outputCol = 'ngram', n = 1)
add_ngram2 = ngram.transform(review_tokenized)

# generate the top frequent ngram / filter only > 20
words = add_ngram2.rdd.flatMap(lambda x: x[-1]).filter(lambda x: len(x.split())==1)
words_tally = words.map(lambda x: (x, 1))\
                      .reduceByKey(lambda x,y: x+y)\
                      .sortBy(lambda x: x[1], ascending=False)\
                      .filter(lambda x: x[1]>=20)
words_list = ngram_tally.map(lambda x: x[0]).collect()

In [100]:
words_tally.take(50)

[('like', 220988),
 ('br', 198214),
 ('good', 174872),
 ('great', 152200),
 ('one', 151099),
 ('taste', 148361),
 ('coffee', 143019),
 ('product', 131056),
 ('flavor', 126153),
 ('love', 119202),
 ('tea', 119164),
 ('food', 106526),
 ('get', 94531),
 ('really', 88636),
 ('dont', 81749),
 ('much', 80080),
 ('also', 73272),
 ('little', 73152),
 ('use', 72956),
 ('time', 72272),
 ('amazon', 70194),
 ('tried', 70069),
 ('best', 69340),
 ('buy', 68024),
 ('price', 66546),
 ('find', 65573),
 ('ive', 64902),
 ('im', 64827),
 ('even', 63180),
 ('make', 62382),
 ('well', 61262),
 ('better', 59768),
 ('try', 59197),
 ('eat', 58933),
 ('dog', 58000),
 ('first', 54757),
 ('chocolate', 51237),
 ('water', 48616),
 ('found', 48267),
 ('bag', 48241),
 ('used', 48099),
 ('bought', 46881),
 ('sugar', 44183),
 ('sweet', 44134),
 ('drink', 43816),
 ('made', 43297),
 ('cup', 43201),
 ('box', 42506),
 ('two', 41706),
 ('think', 41388)]