# Amazon Reviews - Prediction of Rating and Helpfulness (an NLP Use Case)
##  Machine Learning Prototyping

In this notebook, I have prototyped machine learning model to predict rating and helpfulness of Amazon reviews. For this prototyping, I used 0.5% of Amazon review dataset (~600,000 reviews). To know about how the data is gathered from Amazon S3 bucket and sampled, please see the notebook `amazon_reviews_data_gathering.ipynb`.

## Table of Contents

* [Text Preprocessing](#tp)
* [Sentiment Analysis](#sa)
* [Doc2Vec](#dv)
* [ML Prototyping: Rating Prediction](#rp)
  * [Logistic Regression](#lr)
  * [Random Forest](#rf)
  * [Multilayer Perceptron](#mlp)
* [Dealing with Class Imbalance](#ci)
  * [Logistic Regression with Class Wieght Balanced](#ilr)
  * [Ranodm Forest with Balanced Dataset](#irf)
  * [Multilayer Perceptron with Balanced Dataset](#imlp)
* [ML Prototyping: Helpfulness Prediction](#hp)
  * [Linear Regression](#lrg)
  * [Gradient Boosting Trees](#gbt)  
* [Conclusion](#cl)

In [4]:
# Create a dataframe by reading the file containing subset of amazon review dataset sampled in the notebook "amazon_reviews_data_gathering.ipynb"
# Define the schema of the dataframe to be created
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType

schema = StructType([
      StructField('marketplace', StringType()),
      StructField('customer_id', StringType()),
      StructField('review_id', StringType()),
      StructField('product_id', StringType()),
      StructField('product_parent', StringType()),
      StructField('product_title', StringType()),
      StructField('product_category', StringType()),
      StructField('star_rating', IntegerType()),
      StructField('helpful_votes', IntegerType()),
      StructField('total_votes', IntegerType()),
      StructField('vine', StringType()),
      StructField('verified_purchase', StringType()),
      StructField('review_headline', StringType()),
      StructField('review_body', StringType()),
      StructField('review_date', DateType())
])

review_subset_df = (sqlContext.read.format('com.databricks.spark.csv')
             .schema(schema)
             .option("inferSchema", False)
             .option("header", True)
             .load("dbfs:/FileStore/review_df_sample/review_df_sample.csv" ))

In [5]:
review_subset_df.cache()

<a id='tp'></a>
## Text Preprocessing

In [7]:
review_rating_helpful_df = review_subset_df.select('review_body', 'star_rating', 'helpful_votes', 'total_votes')

In [8]:
# Number of words in review_body corpus
review_body_list = review_rating_helpful_df.select('review_body').rdd.map(lambda row : row[0]).collect()
word_corpus = []
for i in review_body_list:
  word_corpus.extend(i.split())
print ("Total words in the corpus: {}".format(len(word_corpus)))
print ("Unique words in the corpus: {}".format(len(set(word_corpus))))

In [9]:
# set of characters in review body
chr_set = set()
for review in review_body_list:
  chr_set.update(list(review))
print(chr_set)

As can be seen in the above output, there are many unwanted characters, which need to be removed. The following cleaning is performed on review text.
- lower casing of the text
- Stripping html tags
- stripiing punctuation
- stripping multiple white spaces
- stripping numbers

In [11]:
import gensim.parsing.preprocessing as gsp
from pyspark.sql.functions import udf
from gensim import utils
import re

# Perform following cleaning tasks on each review
cleaning_tasks = [
           gsp.strip_tags, 
           gsp.strip_punctuation,
           gsp.strip_multiple_whitespaces,
           gsp.strip_numeric
          ]

def text_preprocessing(df_row):
  '''Takes in a text and preprocess/clean it for NLP'''
  review_txt = df_row[0]
  review_txt = review_txt.lower()
  review_txt = utils.to_unicode(review_txt)
  for task in cleaning_tasks:
      review_txt = task(review_txt)
  review_txt = re.sub(r'[^a-zA-Z\s]', "", review_txt)
  return (review_txt, df_row[1], df_row[2], df_row[3])

In [12]:
clean_review_rating_helpful_df = review_rating_helpful_df.rdd.map(lambda x : text_preprocessing(x)).toDF()

In [13]:
# Rename the columns as _1 and _2 are not descriptive
clean_review_rating_helpful_df = (clean_review_rating_helpful_df.withColumnRenamed("_1", "review_body")
                                  .withColumnRenamed("_2", "star_rating")
                                  .withColumnRenamed("_3", "helpful_votes")
                                  .withColumnRenamed("_4", "total_votes"))

In [14]:
# Check character set in review body after cleaning
review_body_list_2 = clean_review_rating_helpful_df.select('review_body').rdd.map(lambda row : row[0]).collect()
chr_set = set()
for review in review_body_list_2:
  chr_set.update(list(review))
print(chr_set)
print(len(chr_set))

In [15]:
# Number of words in review_body corpus after cleaning
word_corpus = []
for i in review_body_list_2:
  word_corpus.extend(i.split())
print ("Total words in the corpus after cleaning: {}".format(len(word_corpus)))
print ("Unique words in the corpus cleaning: {}".format(len(set(word_corpus))))

#### Positive, Negative and Neutral Class Based on Star Rating
A model that predicts whether a review is positive, negative or neutral will be trained. Star rating of 5 and 4 will be considered as positive, 3 as neutral and 1 and 2 as negative.

In [17]:
from pyspark.sql.functions import when
clean_review_rating_helpful_df = (clean_review_rating_helpful_df.withColumn("review_category", 
                                                when(clean_review_rating_helpful_df.star_rating.isin(5, 4), 'positive')
                                                .when(clean_review_rating_helpful_df.star_rating.isin(1, 2), 'negative')
                                                .otherwise('neutral')))

<a id='sa'></a>
## Sentiment Analysis with TextBlob
In the following section, the sentiment of each review is determined and compared with star rating. TextBlob is an open-source library for sentiment analysis. The polarity score given by TextBlob is a float within the range [-1.0, 1.0].

In [19]:
# Get polarity of each review
from textblob import TextBlob
def polarity(df_row):
    review_txt = df_row[0]
    return (review_txt, df_row[1], df_row[4], round(TextBlob(review_txt).polarity, 3))
polarity_df = clean_review_rating_helpful_df.rdd.map(lambda x : polarity(x)).toDF()
polarity_df = (polarity_df
               .withColumnRenamed("_1", "review_body")
               .withColumnRenamed("_2", "star_rating")
               .withColumnRenamed("_3", "review_category")
               .withColumnRenamed("_4", "polarity"))

#### Positive, Negative and Neutral Class Based on Sentiment

In [21]:
# Assign a sentiment to each review based on polarity value
polarity_df = (polarity_df.withColumn("sentiment", 
                                                when(polarity_df.polarity > 0, 'positive')
                                                .when(polarity_df.polarity < 0, 'negative')
                                                .otherwise('neutral')))

#### Discrepancy between review sentiment and review rating
In some cases, sentiment expressed in review may not match with star rating. For instance, positive sentiment is expressed in the review, but rated as 1 or 2 star (negative). We need to investigate such cases further.

In [23]:
# Count of reviews where sentiment and rating does not match
polarity_df.filter(polarity_df["sentiment"] != polarity_df["review_category"]).count()

In [24]:
135490/588395

In [25]:
# Count of reviews where sentiment is positive but rating is negative
polarity_df.filter("sentiment = 'positive'").filter("review_category = 'negative'").count()

In [26]:
# Count of reviews where sentiment is negative but rating is positive
polarity_df.filter("sentiment = 'negative'").filter("review_category = 'positive'").count()

Almost in 23% of the cases, sentiment expressed in the review does not match with star_rating. Some of these cases could be where rating is positive or negative, but review is so short or subjective that its sentiment is determined to be neutral. There are also as many as 36,310 cases where sentiment expressed in positive, but rated as negative. But we need to further investigate this as to whether there is real discrepancy or this is an artifact of text cleaning. It is also possible that, textblob fails to identify sentiment correctly in some cases.

In [28]:
# Reviews where sentiment is highly positive but rating is highly negative
import pandas as pd
pd.set_option('display.max_colwidth', -1)
discrepancy_df = polarity_df.filter("polarity > 0.9").filter("star_rating = 1")
discrepancy_df.limit(47).toPandas()

Unnamed: 0,review_body,star_rating,review_category,polarity,sentiment
0,excellent stars,1,negative,1.0,positive
1,there is nobody in this story that is likeable the story does not have a very good or beleiveable plot,1,negative,0.91,positive
2,you don t have to read this book it s a sort of corporation promotion of self help books don t confuse it with any of the excellent books about books that you might be reading instead ugh,1,negative,1.0,positive
3,stars grisham is the best,1,negative,1.0,positive
4,not her best work,1,negative,1.0,positive
5,the book is a very good book it is something that every one can read take some time and read the book,1,negative,0.91,positive
6,i didn t like it and don t recommend anyone to buy i just bought a refurbished version and according to its description it should be in very good condition but it s in very fair condition unacceptable i will attach some pics of it,1,negative,0.91,positive
7,not the best quality already peeling off,1,negative,1.0,positive
8,not the best material go for the iblason prime,1,negative,1.0,positive
9,excellent,1,negative,1.0,positive


In [29]:
# Investigate one of the reviews further
review_body_list_2.index("all these items i purchased are excellent ")

In [30]:
# Get the pre-cleaned version of the above review.
review_body_list[586154]

In [31]:
# Investigate one more review further
review_body_list_2.index("i bought this controller and it worked perfectly for about a week then it just wouldn t turn on wouldn t respond to being plugged in it was done i had to go to gamestop and buy a used one for  do not buy this ")

In [32]:
# Get the pre-cleaned version of the above review.
review_body_list[509879]

In [33]:
## The following is quite negative review. But text blob assigned a positive polarity (0.35).
TextBlob('The book was in perfect condition, but the access code, which is the most important part, did not work. Since I purchased this from Amazon, Cengage was unable to assist me. I do not recommend buying this.').polarity

As can be seen above, there are possibly many reviews that are mislabelled (e.g., review is positive, but rating is negative). Take a look at the following review for example,

**"All these items I purchased are excellent."**

The review is definitely positve, but the reviewer rated it with one star.

So I originally, I thought of removing all reviews where rating does not match with setniment (as determined by textblob) expressed in the review. However, textblob assignment of sentiment is not accurate either. Take a look at the following review;

**"I bought this controller, and it worked perfectly...for about a week. Then it just wouldn't turn on, wouldn't respond to being plugged in, it was done. I had to go
to Gamestop and buy a used one for $50. DO NOT BUY THIS."**

This is definitely a negative review, but textblob assigned a polarity of 1 (highly positive sentiment).

<a id='dv'></a>
## Doc2Vec
Apache Spark does not provide an API for ‘Doc2Vec’. But its ‘Word2Vec’ transformer based on the ‘Skip-Gram’ approach, can be used as Doc2Vec. `The Word2VecModel transforms each document into a vector using the average of all words in the document` ([Apache Spark Documentation](https://spark.apache.org/docs/latest/ml-features.html#word2vec))

In [36]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="review_body", outputCol="tokens")
word2vec = Word2Vec(vectorSize=300, minCount=0, inputCol="tokens", outputCol="features")
doc2vec_pipeline = Pipeline(stages=[tokenizer, word2vec])
doc2vec_model = doc2vec_pipeline.fit(clean_review_rating_helpful_df)
doc2vecs_df = doc2vec_model.transform(clean_review_rating_helpful_df)

<a id='rp'></a>
## ML Prototyping: Rating Prediction

In [38]:
from pyspark.ml.feature import StringIndexer

# Encode the target label
string_indexer = StringIndexer(inputCol="review_category", outputCol="label")
doc2vecs_df_encoded = string_indexer.fit(doc2vecs_df).transform(doc2vecs_df)

In [39]:
# Split the data into train and test set
train_set, test_set = doc2vecs_df_encoded.randomSplit([0.7, 0.3], seed=100)

In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import pandas as pd

# A function to get performance metrics
def print_performance_metrics(predictions):
  # Get accuracy of the model
  model_evaluator = MulticlassClassificationEvaluator(
      labelCol="label", predictionCol="prediction", metricName="accuracy")
  accuracy = model_evaluator.evaluate(predictions)
  print("Accuracy: {:.3f}\n".format(accuracy))

  # get rdd of predictions and labels for eval metrics
  predictionAndLabels = predictions.select("prediction","label").rdd

  # Instantiate metrics objects
  multi_metrics = MulticlassMetrics(predictionAndLabels)
  # Get confusion matrix
  cm = multi_metrics.confusionMatrix()
  print ("Confusion Metrix:")
  print(cm)
  print ("\nConfusion Metrix as a Pandas Dataframe:")
  print(pd.DataFrame(cm.toArray().tolist(), columns=['predicted_pos', 'predicted_neg', 'predicted_neu'], index=['actual_pos', 'actual_neg', 'actual_neu']))
  print("\nFraction of positive reviews correctly predicted as positive (recall): {:.3f}".format(cm[0,0]/(cm[0,0] + cm[0,1] + cm[0,2])))
  print("\nFraction of negative reviews correctly predicted as negative (recall): {:.3f}".format(cm[1,1]/(cm[1,0] + cm[1,1] + cm[1,2])))
  print("\nFraction of neutral reviews correctly predicted as neutral (recall): {:.3f}".format(cm[2,2]/(cm[2,0] + cm[2,1] + cm[2,2])))

<a id='lr'></a>
### Logistic Regression

In [42]:
# Fit the model and get the predictions for test set
from pyspark.ml.classification import LogisticRegression
# Instantiate a logistic regression classifier
lr = LogisticRegression(labelCol="label", featuresCol="features")
# Fit train set
lr_model = lr.fit(train_set)
# Predict test set
lr_predictions = lr_model.transform(test_set)
# Print performance metrics of logistic regression
print_performance_metrics(lr_predictions)

### Logistic Regression with Cross Validation

In [44]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Instantiate a classifier
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Perform gridsearch cv
lrParamGrid = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.01, 0.5, 2.0])
               .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
               .addGrid(lr.maxIter, [10, 100])
               .build())

# set up an evaluator
evaluator = MulticlassClassificationEvaluator(
      labelCol="label", predictionCol="prediction", metricName="f1")

# Create CrossValidator
lrCv = CrossValidator(estimator=lr, estimatorParamMaps=lrParamGrid, evaluator=evaluator, numFolds=3)

#Run cross validations
lrCvModel = lrCv.fit(train_set)

# # Look at best params from the CV
print(lrCvModel.bestModel._java_obj.getRegParam())
print(lrCvModel.bestModel._java_obj.getElasticNetParam())
print(lrCvModel.bestModel._java_obj.getMaxIter())

# Get prediction
lrCvPredictions = lrCvModel.transform(test_set)

# Print performance metrics of logistic regression with cross validation
print_performance_metrics(lr_predictions)

<a id='rf'></a>
### Random Forest

In [46]:
doc2vecs_df.cache()

In [47]:
from pyspark.ml.classification import RandomForestClassifier

# Instantiate a random forest classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", subsamplingRate=0.5, numTrees=500)
# Fit train set
rf_model = rf.fit(train_set)
# Predict test set
rf_predictions = rf_model.transform(test_set)
# Print performance metrics of random forest
print_performance_metrics(rf_predictions)

### Random Forest with Cross Validation

In [49]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Instantiate a classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", subsamplingRate=0.5)

# Perform gridsearch cv
rfParamGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [2, 5])
               .addGrid(rf.numTrees, [20, 40])
               .build())

# set up an evaluator
evaluator = MulticlassClassificationEvaluator(
      labelCol="label", predictionCol="prediction", metricName="f1")

# Create CrossValidator
rfCv = CrossValidator(estimator=rf, estimatorParamMaps=rfParamGrid, evaluator=evaluator, numFolds=3)

# Run cross validations
rfCvModel = rfCv.fit(train_set)

# # Look at best params from the CV
print(rfCvModel.bestModel._java_obj.getMaxDepth())
print(rfCvModel.bestModel._java_obj.getNumTrees())

# Get prediction
rfCvPredictions = rfCvModel.transform(test_set)

# Print performance of Random Forest classifier
print_performance_metrics(rfCvPredictions)

<a id='mlp'></a>
### Multilayer Perceptron

In [51]:
# from pyspark.ml.classification import MultilayerPerceptronClassifier

# # Instantiate a random forest classifier
# layers = [300, 150, 75, 3]
# mlp = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", layers=layers)
# # Fit train set
# mlp_model = mlp.fit(train_set)
# # Predict test set
# mlp_predictions = mlp_model.transform(test_set)
# # Print performance metrics of random forest
# print_performance_metrics(mlp_predictions)

<a id='ci'></a>
## Dealing with Class Imbalance

One of the problems with this dataset is class imbalance. The class distribution is as follow.

- Positive - 80%
- Negative - 12%
- Neutral - 8%

Positive class is way more than negative and neutral class. This negatively affect model performance in correctly predicting rare class to the extent that in classifier like random forest above, it simply predicted every smaple to be positive. To overcometime, in PySpark, in the case of logistic regression we have a technique called “Class Weighing”, wherein class weight is set to be inversly proportional to its frequency. 

However, for random forest we do not have such class weighing parameter in PySpark. So for random forest, I have downsampled the positive class (only 16% of positive class data is used). I opted for downsampling as the dataset is huge with more than 100 million rows, so downsampling should not affect training much.

<a id='ilr'></a>
### Logistic Regression with Class Weight Balanced

In [55]:
# Get the count of review categories
train_count = train_set.groupby("review_category").count().toPandas()

In [56]:
# Get the class weight based on its frequency
pos_count = train_count.iloc[0,1]
neu_count = train_count.iloc[1,1]
neg_count = train_count.iloc[2,1]

inv_pos = 1/pos_count
inv_neg = 1/neg_count
inv_neu = 1/neu_count

pos_weight = 1/(pos_count * (inv_pos +  inv_neg + inv_neu))
neg_weight = 1/(neg_count * (inv_pos +  inv_neg + inv_neu))
neu_weight = 1/(neu_count * (inv_pos +  inv_neg + inv_neu))
(pos_weight, neg_weight, neu_weight)

In [57]:
# create a classWeight column in trainset
train_set_2=(train_set.withColumn("classWeight", when(train_set.review_category == 'positive', pos_weight)
                          .when(train_set.review_category == 'negative', neg_weight)
                          .otherwise(neu_weight)))

In [58]:
# Fit the model and get the predictions for test set
from pyspark.ml.classification import LogisticRegression
# Instantiate a logistic regression classifier. Deafault parameters can be used as parameter tuning did not improve performance.
lrb = LogisticRegression(labelCol="label", featuresCol="features", weightCol='classWeight')
# Fit train set
lrb_model = lrb.fit(train_set_2)
# Predict test set
lrb_predictions = lrb_model.transform(test_set)
# Print performance metrics of logistic regression
print_performance_metrics(lrb_predictions)

<a id='irf'></a>
### Random Forest with Balanced Dataset
Since there is no built-in parameter in PySpark Random Forest to handle imbalanced data, I have downsampled the positive class.

In [60]:
# sub sample only 16% of data with positive label and all of data with negative and neutral samples
sample_frac = 0.16
# Filter rows with positve class
doc2vecs_df_pos = doc2vecs_df.filter("review_category = 'positive'")
# Sample 20% of positive class
doc2vecs_df_pos_sample = doc2vecs_df_pos.sample(False, sample_frac, 42)
# Filter negative and neutral rows
doc2vecs_df_neg_neu = doc2vecs_df.filter("review_category != 'positive'")
# Combine smapled positive rows with negative and neutral rows
doc2vecs_df_balanced = doc2vecs_df_pos_sample.union(doc2vecs_df_neg_neu)

In [61]:
from pyspark.sql.functions import rand
# Shuffle the data
doc2vecs_df_balanced = doc2vecs_df_balanced.orderBy(rand())

In [62]:
from pyspark.ml.feature import StringIndexer

# Encode the target label
string_indexer = StringIndexer(inputCol="review_category", outputCol="label")
doc2vecs_df_balanced_encoded = string_indexer.fit(doc2vecs_df_balanced).transform(doc2vecs_df_balanced)

In [63]:
# Split the data into train and test set
train_set_b, test_set_b = doc2vecs_df_balanced_encoded.randomSplit([0.7, 0.3], seed=100)

In [64]:
from pyspark.ml.classification import RandomForestClassifier

# Instantiate a random forest classifier
rfb = RandomForestClassifier(labelCol="label", featuresCol="features", subsamplingRate=0.5, numTrees=500)
# Fit train set
rfb_model = rfb.fit(train_set_b)
# Predict test set
rfb_predictions = rfb_model.transform(test_set_b)
# Print performance metrics of random forest
print_performance_metrics(rfb_predictions)

<a id='imlp'></a>
### Multilayer Perceptron with Balanced Data

In [66]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Instantiate a random forest classifier
layers = [300, 150, 75, 3]
mlpb = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", layers=layers)
# Fit train set
mlpb_model = mlpb.fit(train_set_b)
# Predict test set
mlpb_predictions = mlpb_model.transform(test_set_b)
# Print performance metrics of random forest
print_performance_metrics(mlpb_predictions)

<a id='hp'></a>
## ML Prototyping: Helpfulness Prediction

#### Helpfulness index
Absolute number of helpful votes is not suitable for comparison because of varying number of total votes among different reviews. For instance, a review with 15 helpful votes out of 10,000 total votes must be less helpful than a review with 10 helpful votes out of 11 total votes. To deal with this issue, a new helpful_index has been created by dividing helpful_votes by total_votes.

#### Subset Review with More Than Five Total Votes
It is tricky to analyze the total and helpful votes as most of the reveiws has 0 total votes. Furthermore, very low total votes may bias the analysis. For instance if there is only one total vote and if it is voted as helpful, it works out to a helpful_index of 1, which may or may not be reliable. To circumvent this problem, only reviews with more than 5 total votes are taken into consideration for this analysis

In [70]:
doc2vecs_helpful_df = doc2vecs_df.filter(doc2vecs_df["total_votes"] > 5)

In [71]:
doc2vecs_helpful_df = (doc2vecs_helpful_df.
                    withColumn('helpful_index', doc2vecs_helpful_df.helpful_votes/doc2vecs_helpful_df.total_votes))
(doc2vecs_helpful_df.count())

<a id='lrg'></a>
### Linear Regression with Cross Validation

In [73]:
# Split the data into train and test set
train_set_h, test_set_h = doc2vecs_helpful_df.randomSplit([0.7, 0.3], seed=100)

In [74]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Instantiate a classifier
lrg = LinearRegression(labelCol="helpful_index", featuresCol="features")

# Perform gridsearch cv
lrgParamGrid = (ParamGridBuilder()
               .addGrid(lrg.regParam, [0, 0.01, 0.5, 2.0])
               .addGrid(lrg.elasticNetParam, [0.0, 0.5, 1.0])
               .addGrid(lrg.maxIter, [100,200])
               .build())

# set up an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol=lrg.getLabelCol(), predictionCol=lrg.getPredictionCol())

# Create CrossValidator
lrgCv = CrossValidator(estimator=lrg, estimatorParamMaps=lrgParamGrid, evaluator=evaluator)

#Run cross validations
lrgCvModel = lrgCv.fit(train_set_h)

# # Look at best params from the CV
print(lrgCvModel.bestModel._java_obj.getRegParam())
print(lrgCvModel.bestModel._java_obj.getElasticNetParam())
print(lrgCvModel.bestModel._java_obj.getMaxIter())

# Get prediction
lrgCvPredictions = lrgCvModel.transform(test_set_h)

# Print evaluation metrics
# Print rmse
rmse = evaluator.evaluate(lrgCvPredictions)
print ("RMSE on the test set: {:.3f}".format(rmse))

# Print R2
r2 = RegressionEvaluator(metricName="r2", labelCol=lrg.getLabelCol(), predictionCol=lrg.getPredictionCol()).evaluate(lrgCvPredictions)
print ("R2 on the test set: {:.3f}".format(r2))

### Linear Regression

In [76]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Instantiate a classifier
lrg = LinearRegression(labelCol="helpful_index", featuresCol="features", maxIter=1000, regParam=0.05)

# Fit training Data
lrgModel = lrg.fit(train_set_h)

# Get prediction
lrgPredictions = lrgModel.transform(test_set_h)

# Print evaluation metrics
# Print rmse
rmse = RegressionEvaluator(metricName="rmse", labelCol=lrg.getLabelCol(), predictionCol=lrg.getPredictionCol()).evaluate(lrgPredictions)
print ("RMSE on the test set: {:.3f}".format(rmse))

# Print R2
r2 = RegressionEvaluator(metricName="r2", labelCol=lrg.getLabelCol(), predictionCol=lrg.getPredictionCol()).evaluate(lrgPredictions)
print ("R2 on the test set: {:.3f}".format(r2))

In [77]:
# What would be the R2, if we just predict mean of the helpful_index for all reviews.
mean_help_df = lrgPredictions.select('helpful_index')
from pyspark.sql.functions import lit
mean_ = mean_help_df.groupBy().avg("helpful_index").take(1)[0][0]
mean_help_df = mean_help_df.withColumn("mean_helpful_index", lit(mean_))
r2_mean = RegressionEvaluator(metricName="r2", labelCol='helpful_index', predictionCol='mean_helpful_index').evaluate(mean_help_df)
print("r2 if we just predict mean of the helpful_index for all reviews: {}".format(r2_mean))

<a id='gbt'></a>
### Gradient Boosting Tree with Cross Validation

In [79]:
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# from pyspark.ml.regression import GBTRegressor
# from pyspark.ml.evaluation import RegressionEvaluator

# # Instantiate a classifier
# gbt = GBTRegressor(labelCol="helpful_index", featuresCol="features")

# # Perform gridsearch cv
# gbtParamGrid = (ParamGridBuilder()
#                .addGrid(gbt.maxDepth, [2, 5])
#                .addGrid(gbt.maxIter, [20,100])
#                .build())

# # set up an evaluator
# evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

# # Create CrossValidator
# gbtCv = CrossValidator(estimator=gbt, estimatorParamMaps=gbtParamGrid, evaluator=evaluator)

# #Run cross validations
# gbtCvModel = gbtCv.fit(train_set_h)

# # # Look at best params from the CV
# print(gbtCvModel.bestModel._java_obj.getMaxDepth())
# print(gbtCvModel.bestModel._java_obj.getMaxIter())

# # Get prediction
# gbtCvPredictions = gbtCvModel.transform(test_set_h)

# # Print evaluation metrics
# # Print rmse
# rmse = evaluator.evaluate(gbtCvPredictions)
# print ("RMSE on the test set: {:.3f}".format(rmse))

# # Print R2
# r2 = RegressionEvaluator(metricName="r2", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol()).evaluate(gbtCvPredictions)
# print ("R2 on the test set: {:.3f}".format(r2))

### Gradient Boosting Tree

In [81]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Instantiate a classifier
gbt = GBTRegressor(labelCol="helpful_index", featuresCol="features", maxIter=100, maxDepth=5)

# Fit training Data
gbtModel = gbt.fit(train_set_h)

# Get prediction
gbtPredictions = gbtModel.transform(test_set_h)

# Print evaluation metrics
# Print rmse
rmse = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol()).evaluate(gbtPredictions)
print ("RMSE on the test set: {:.3f}".format(rmse))

# Print R2
r2 = RegressionEvaluator(metricName="r2", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol()).evaluate(gbtPredictions)
print ("R2 on the test set: {:.3f}".format(r2))

<a id='cl'></a>
## Conclusion

I have used three different algorithms - Logistic Regression, Random Forest and Multilayer Perceptron (MLP) - to predict star rating. Considering the accuracy and recall of positive, negative and neutal review classes and training time, I am planning to use Logistic Regression while scaling the ML model. The following is the performance metrics of this model:

- Accuracy: 73%
- Positive class recall: 75%
- Negative class recall: 70%
- Neutral class recall: 58%


Although MLP produced better recall with positive (82%) and negative classes (80%), its overall accuracy (69%) is less than that of Logistic Regression. To train,  Logistic Regression took just ~3 minutes, while MLP took ~49 minutes.

I used two different algorithms - Linear Regression and Gradient Boosting Trees - for predicting helpfullness. Linear regression and Gradient Boosting Trees produced an R2 of ~21 and 22%, respectively.