# **Amazon Recommendation system with User Sentiment understanding** # 
by

*Dixitha Kasturi*

*Bhargav Konakanchi*


* The aim of the project is to build a recommendation system( using the user 
ratings) and also perform sentiment analysis , to understand the overall User sentiment( negative/positive). A total of over 278,677 ***Clothing,shoes and Jewelry*** reviews were analyzed from the 'Amazon Reviews' dataset, which had other categories as well.

* Platforms/Sources:
  - Google colab for code exection
  - <a href="http://jmcauley.ucsd.edu/data/amazon/" target="_blank">Dataset Link</a>

* Algorithms : 
  - Recommendation System - ALS(Alternating least squares)
  - Sentiment Analysis - Logistic Regression



## **Mounting G-drive, installing PySpark** ##

In [None]:
!pip install PyDrive
import os
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
from google.colab import drive 
drive.mount('/content/drive')

In [None]:

!apt install openjdk-8-jdk-headless -qq
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

In [None]:
!tar -xvzf spark-3.2.0-bin-hadoop3.2.tgz &> /dev/null


In [None]:
!pip install -q findspark
!pip install pyspark


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

## **Sections** ##

Loading some basic libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pandas import DataFrame 
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as fn
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## **1. Amazon JSON data description and loading** ##



* The Amazon reviews dataset contains product reviews and metadata from Amazon, including 142.8 million reviews spanning May 1996 - July 2014.
This dataset includes reviews (ratings, text, helpfulness votes, review time and so on). The file is in JSON format.

* A subcategory of 'Clothing, Shoes and Jewelry' is chosen. It has 39387 unique users gave reviews to 23033 distinct products.

* Overall there are 278,677 reviews and 9 attributes

* Throughout the analysis, other columns were generated and added as required

Format of the reviews: 

{
  
  "reviewerID": "A2SUAM1J3GNN3B",

  "asin": "0000013714",

  "reviewerName": "J. McDonald",

  "helpful": [2, 3],

  "reviewText": "I bought this for my husband who plays the piano.  He is having a wonderful time playing these old hymns.  The music  is at times hard to read because we think the book was published for singing from more than playing from.  Great purchase though!",

  "overall": 5.0,

  "summary": "Heavenly Highway Hymns",

  "unixReviewTime": 1252800000,

  "reviewTime": "09 13, 2009"

}

where

reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B

asin - ID of the product, e.g. 0000013714

reviewerName - name of the reviewer

helpful - helpfulness rating of the review, e.g. 2/3

reviewText - text of the review

overall - rating of the product

summary - summary of the review

unixReviewTime - time of the review (unix time)

reviewTime - time of the review (raw)



In [None]:

amazon_spark_df = spark.read.json("/content/drive/MyDrive/Clothing_Shoes_and_Jewelry_5.json")
amazon_spark_df.show()

## **2. Exploratory Data Analysis** ##

In [None]:
amazon_spark_df.describe('overall').show()

In [None]:
from pyspark.sql.functions import col,isnan, when, count
df1 = amazon_spark_df.select('asin', 'overall', 'reviewText', 'reviewTime',   'reviewerID', 'reviewerName', 'summary', 'unixReviewTime')
df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns]).show()


*   The Data overall is clean. Reviewer name has 464 NAN values, but we donot use the reviewer name a lot only for reference purpose, so we keep the values.



In [None]:
from pyspark.sql.functions import countDistinct
amazon_spark_df.select(countDistinct('reviewerID')).show()



*   39387 unique users gave reviews to products in the shoes, clothing, jewelery category.



In [None]:
amazon_spark_df.select(countDistinct('asin')).show()



*   There are 23033 distinct products in clothing,shoes and jewelery category



In [None]:
# Getting count of reviews for products
group_by_product = amazon_spark_df.groupBy('asin').count().orderBy('count', ascending=False)
group_by_product.show()

In [None]:
#  Getting count of reviews given by user
group_by_user = amazon_spark_df.groupBy('reviewerID','reviewerName').count().orderBy('count', ascending=False)
group_by_user.show()

We now check the overall distribution of the ratings. They were distributed from 1 to 5

In [None]:
ratings_row = amazon_spark_df.select('overall').collect()
ratings = [ratings_row[i][0] for i in range(len(ratings_row))]
plt.hist(ratings,alpha = 1, edgecolor = 'black',histtype='stepfilled', bins = [x * 0.1 for x in range(0,55,5)])
plt.ylabel('Frequency')
plt.xlabel('Rating')




*   No 0 ratings
*   less low ratings/ Most of the ratings are between 4 to 5.



### Creating proper date column from the unixtimestamp ###
##### ('Time')  ######


In [None]:

amazon_spark_df.withColumn('reviewTime',fn.trim(fn.col('reviewTime')))
amazon_spark_df.withColumn('reviewTime',fn.ltrim(fn.col('reviewTime')))
amazon_spark_df.withColumn('reviewTime',fn.rtrim(fn.col('reviewTime')))

amazon_spark_df= amazon_spark_df.withColumn( 'Time', fn.from_unixtime(fn.col("unixReviewTime"),"MM-dd-yyyy"))
amazon_spark_df.show()


### Creating 2 columns : Average rating for each product and by each user ###
#####('Average_rating_by_product', 'Average_rating_by_user')#####

In [None]:
group_by_product_avg = amazon_spark_df.groupBy('asin').agg(fn.mean('overall').alias("Average_rating_by_product"))
group_by_user_avg = amazon_spark_df.groupBy('reviewerID').agg(fn.mean('overall').alias("Average_rating_by_user"))

In [None]:
merged_df = amazon_spark_df.join(group_by_product_avg,['asin'],how='full')
amazon_spark_df = merged_df.join(group_by_user_avg,['reviewerID'],how='full')
amazon_spark_df.count()
amazon_spark_df.show()

In [None]:
amazon_spark_df.select('overall','asin','Average_rating_by_product','reviewerID','Average_rating_by_user','unixReviewTime','Time').show()

## **3. Collaborative filtering Recommendation System Model : ALS** ##

Overview of Recommendation System : 

From all of the available methods/techniques, Collaborative Filtering was used. g. It's called collaborative because it makes recommendations based on other people in effect, people collaborate( the algorithm does this) to come up with recommendations. This method aim to fill in the missing entries of a user-item association matrix. We will only be considering users and what items a user has interacted with( here interaction means which products the user has given a review/rating for). In real world, clicks/views besides what is bought previously and what ratings are given, are all used. 

We are dealing with Explicit data(ratings) instead of implicit(views).For instance according to our data, with ratings we know that a 1 means the user did not like that item and a 5 that he/she really liked it. Using our interaction term(ratings) from other users and the considered user, we generate recommendations of products which he/she might like.

Approach used: 
1. Checked the sparsity of user-item matrix
2. Converting all columns to Numeric for ALS
3. 70:30 Training and Testing split
4. ALS model generation with parameter tuning
5. Evaluating RMSE
6. Generating Recommendations

Under collaborative filtering, the one that is supported by spark is Matrix Factorization method known as ALS(Alternating least squares).

> Alternating least Squares 



![](https://drive.google.com/uc?export=view&id=1VaKRCiDf9fwfDOLmqfJujV6C-nbLTJzV)


The user-item utility matrix R where the values denotes how item i has been rated by user u on a scale of 1–5. It is a sparse matrix. The goal is to generate values that are missing,highest values turn out to be recommendations for that particular user(marked in green). 



* Latent factor model based collaborative filtering learns the user-item profiles( dimension K) through matrix factorization by minimizing the Root Mean Squared Error(RMSE) between the available ratings 'y' and their predicted values y^. Each item i is associated with a latent (feature) vector P, each user is associated with a latent (profile) vector U, and the rating y^(ui).

* ALS uses L2 regularization to reduce the errors by fitting the function appropriately on the given training set and avoid overfitting by reducing the complexity.The weights of features are handled by L2 regularization.L2 regularization forces weights towards zero but it does not make them exactly zero as it removes a small percentage of weights after each iteration. The parameter to tune is Lambda.


![](https://drive.google.com/uc?export=view&id=1oWkNZj_Qtt4uw2vgDofJw9DV76JW6P7i)


* Finally the way ALS works is shown in the image below:

![](https://drive.google.com/uc?export=view&id=1oH6sYxD-RX5NEt2o-eRknjwGBT49R74e)




* The values of U and P are generated by alternating the multiplications. When finding/approximaitng values for one( U or P) the other(P or U) takes up random values and is fixed.
  - Fixing U to solve for P
  - Fixing U to solve for U

* Advantage of ALS: Don't need domain knowledge, the embeddings are automatically learnt.
* DisAdvantage of ALS : if an item is not seen during training, the system will not be able to create an embedding for it and query the model with this item. This issue is often called the ***cold-start problem*** .
 






### Calculating the Sparsity of data ###

99.96% sparse(empty)


In [None]:
# Getting Sparsity
num = amazon_spark_df.select('overall').count()

#distinct user ids and items
user = amazon_spark_df.select('reviewerID').distinct().count()
items = amazon_spark_df.select('asin').distinct().count()

den = user * items

spars = (1-(num*1)/den) * 100
print('sparsity is ', "%.3f"%spars+"%")

### Selecting only required columns for model generation ###


In [None]:
# Getting only ratings and product id
new_df = amazon_spark_df.select('asin','overall','reviewerID','Average_rating_by_product','Average_rating_by_user')
new_df.count()

### Converting all columns into numeric format for ALS ###

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#Converting all columns into numeric for als model using string indexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(new_df) for column in list(set(new_df.columns))]

#creating a pipeline for the model to transform it using string indexer
pipeline = Pipeline(stages=indexers)
t2 = pipeline.fit(new_df).transform(new_df)
t2.show()

In [None]:
#t = t2.filter(t2.reviewerID_index == 1127)
#t.count()

### Selecting only required columns from indexed(converted) dataframe ###

In [None]:
t3 = t2.select(['asin_index', 'reviewerID_index','asin','reviewerID','overall'])
t3.show(10)

### Splitting Dataframe into 70% training, 30% testing ###


In [None]:
# Splitting data from transformed :
train, test = t3.randomSplit([0.7, 0.3], seed=0)

In [None]:
train.count()

In [None]:
test.count()

### Generating ALS Model and fitting with parameter tuning ###

Used paramter tuning with cross validation to take the best model possible.
Values chosen for Rank = []
Values chosen for lambda(regularization parameter) = [0.05,0.1]

Did 3-fold cross validation using the training split. Because the size of data is huge and comparing more models is time consuming , we took parameters less models for comparision.


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

als = ALS(maxIter=5, userCol="reviewerID_index",itemCol="asin_index",
          ratingCol="overall",coldStartStrategy="drop",nonnegative=True)

# Testing 4 models:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10,25]) \
            .addGrid(als.regParam, [0.05,0.1]) \
            .build()

# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="overall", 
           predictionCol="prediction") 

# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, 
                    estimatorParamMaps=param_grid, 
                    evaluator=evaluator, numFolds=3)
#Fit cross validator to the 'train' dataset
models = cv.fit(train)
#Extract best model from the cv model above
model = models.bestModel

#als = ALS(maxIter=5,regParam=0.09,rank=30,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)
#model=als.fit(train)

In [None]:
best_model = models.bestModel
print("For Best Model : ")
print("Rank:", best_model._java_obj.parent().getRank())
print("RegParam:", best_model._java_obj.parent().getRegParam())

### Model Evaluation (RMSE) ###

Getting RMSE for the best model that was generated

In [None]:
#model = all_models.bestModel

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
#evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
predictions = best_model.transform(test)
RMSE =evaluator.evaluate(predictions)
print("RMSE="+str(RMSE))

In [None]:
print(model)

### Recommendations for Users ###



* In ALS using the user similarity, we can get recommendations. We used the recommendForAllUsers fucntion that is avilable in spark.

* We generated reviews for a user with index 11276.


In [None]:
#Recommendations for users
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import IntegerType
test = best_model.recommendForAllUsers(25).filter(fn.col('reviewerID_index')== 11276).select("recommendations").collect()
item_recommends = []
for item in test[0][0]:        
    item_recommends.append(item.asin_index)
    
schema = StructType([StructField("asin_index",IntegerType(),True)])
items_fin = spark.createDataFrame(item_recommends,IntegerType()).toDF("asin_index")
items_fin.sort(fn.col('asin_index'),ascending=False).show(25)

In [None]:
t2\
.select(['asin','asin_index', 'reviewerID','reviewerID_index', 'overall' ,'Average_rating_by_user'])\
.filter(fn.col('reviewerID_index')== 25234)\
.sort(fn.col('asin_index'),ascending = False)\
.show()

tab = items_fin\
.join(t2, on = 'asin_index', how = 'inner')\
.select(['asin','asin_index','reviewerID','reviewerID_index','overall','Average_rating_by_product','Average_rating_by_user'])\
.drop_duplicates(subset=['asin'])\
.sort(fn.col('asin_index'),ascending = False)\
.collect()

tv = spark.createDataFrame(tab)
tv.show()



In [None]:
user_df = merged_df.select('reviewerID','reviewerName')
user_df.count()

In [None]:
#Getting recommendations for the above chosen user based on other users.
user_df = merged_df.select('reviewerID','reviewerName')
user_df\
.join(tv,on = 'reviewerID',how = 'inner')\
.distinct()\
.sort(fn.col('asin_index'),ascending =False)\
.show()

In [None]:
# For user :  25492.0 example

test = model.recommendForAllUsers(20).filter(fn.col('reviewerID_index')== 25492).select("recommendations").collect()
item_recommends = []
for item in test[0][0]:        
    item_recommends.append(item.asin_index)
    
schema = StructType([StructField("asin_index",IntegerType(),True)])
items_fin = spark.createDataFrame(item_recommends,IntegerType()).toDF("asin_index")
items_fin.sort(fn.col('asin_index'),ascending=False).show()

t2\
.select(['asin','asin_index', 'reviewerID','reviewerID_index', 'overall' ,'Average_rating_by_user'])\
.filter(fn.col('reviewerID_index')== 25492)\
.sort(fn.col('asin_index'),ascending = False)\
.show()

tab = items_fin\
.join(t2, on = 'asin_index', how = 'inner')\
.select(['asin','asin_index','reviewerID','reviewerID_index','overall','Average_rating_by_product'])\
.drop_duplicates(subset=['asin'])\
.sort(fn.col('asin_index'),ascending = False)\
.collect()


tv = spark.createDataFrame(tab)
tv.show()

#Getting recommendations for the above chosen user based on other users.
user_df = merged_df.select('reviewerID','reviewerName')

user_df\
.join(tv,on = 'reviewerID',how = 'inner')\
.distinct()\
.sort(fn.col('asin_index'),ascending =False)\
.show()


## **4. Sentiment Analysis** ## 

Overview of customer sentiment analysis:

Using the text review that was given by the user, sentiment analysis was performed using Logistic regression to understand the overall sentiment of the user.

Used all reviews given by the user to get their sentiment score. If their overall sentiment score is negative and the average rating given by them is <=2( we decided the threshold) or if the user overall sentiment score is positive and rating = 5, then further analysis should be peformed, where you look into the products reviewed to see if they are targetting a particular company's products.

* The steps that were followed are: 

  1. For the input data:

     a) Tokenizing words( breaking down sentences into words)

      b) Removing stop words (commonly occuring filler words like articles, pronouns etc)

      c).Converting the words into features(The column features is a sparse vector representation of the words that appeared in the text
  2. Get positive and negative words from Parquet file
  3. For each review, generating the sentiment score for each review( 1 for positive and 0 for negative). Positive implies average score > 0.
  4. Using Tf-idf, we reduce the words by this numerical statistic that is intended to reflect how important a word is to a review.

    \begin{equation}
    \text{tf-idf}_{ij} = f_{ij} \log \frac{|D|+1}{f_i+1}
    \end{equation}

  5. Using logistic regression to classify if the given review based on the tf-idf features, is positive(1) or negative(0).
  6. Flag users based on their sentiment score, average rating
    - Negative/flagged if sentiment score = 0 and average rating < 2
    - overly positive if sentiment score  = 1 and average rating = 5


> Logistic Regression:


  - The target variable here is to predict positive/negative sentiment. This is categorical. Binary logistic regression is used.

  - It uses linear or non-linear sigmoid function as decision boundary.

    \begin{equation}
    \text{y} =  \frac{1}{1+exp(x)}
    \end{equation}


  - To avoid overfitting Elastic net Regularization is used, which is a combination of both L1 and L2 regularization.
  \begin{equation}
L_\theta^{\lambda,\alpha}(p(X),Y) = -\left( \sum_i Y_i \log p_\theta(X_i) + (1-Y_i)\log (1-p_\theta(X_i)) \right) + \lambda \left[(1-\alpha) \sum_{j>0} \theta_j^2 + \alpha \sum_{j>0} \left| \theta_j \right| \right]
\end{equation}

  - HyperParamter tuning was performed, to select the model that gave the highest accuracy( meaning reduced cost). 60:30:10 training:validation:testing split was used. Based on the best accuracy generated, the final model was chosen to fit the testing set.

* We have 3 phases : Using only logistic regression(positive and negative words are overfit, weights are wrongly assigned), using Logistic regression with Elastic Net Regularization( weights are corrected) and final is parameter tuning with different lambda values to get the highest accuracy.

In [None]:
from __future__ import division
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Rowl
import matplotlib.pyplot as plt
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import rand 
from sklearn.metrics import classification_report

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder

### Tokenizing, stop word removal, feature generation ###

In [None]:
#String Indexer
sentiment_df = amazon_spark_df.select('asin','reviewerID','reviewerName','reviewText','summary','Average_rating_by_user')
sentiment_df = sentiment_df.dropna()
indexers2 = StringIndexer(inputCol='reviewerID', outputCol = "reviewerID_index").fit(sentiment_df)
indexers1 = StringIndexer(inputCol='asin', outputCol = "asin_index").fit(sentiment_df)

#Tokenizer
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\W")##'\w' remove none-word letters
df_tokenized = tokenizer.transform(sentiment_df)

import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
from pyspark.ml.feature import StopWordsRemover
sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")

#count vectorizer :
count_vectorizer_estimator = CountVectorizer().setInputCol('words').setOutputCol('features')
count_vectorizer_transformer = count_vectorizer_estimator.fit(df_tokenized)
count_vectorizer_transformer.transform(df_tokenized)

pipeline_sent = Pipeline(stages=[indexers1,indexers2,tokenizer,sw_filter,count_vectorizer_estimator]).fit(sentiment_df)
t2_sent = pipeline_sent.transform(sentiment_df)
t2_sent.show()
t2_sent = t2_sent.dropna()

In [None]:
t2_sent.count()

### sentiments parquet file ###

In [None]:
sentiments_df = spark.read.parquet('/content/drive/MyDrive/sentiments.parquet')
sentiments_df.printSchema()
sentiments_df.where(fn.col('sentiment') == 1).show(5)
sentiments_df.where(fn.col('sentiment') == -1).show(5)
sentiments_df.groupBy('sentiment').agg(fn.count('*')).show()

Generating sentiment value for each word


In [None]:
t2_split = t2_sent.select('*',fn.explode('words').alias('word')).join(sentiments_df,'word')
t2_split.show()

### Calculating average sentiment based on sentiment score ###

In [None]:
t2_split_sentiment_prediction = t2_split.\
    groupBy('asin', 'reviewerID').\
    agg(fn.avg('sentiment').alias('avg_sentiment')).\
    withColumn('score', fn.when(fn.col('avg_sentiment') > 0, 1.0).otherwise(0.))
t2_split_sentiment_prediction.show(5)

In [None]:
sentiment_score_df = t2_split.select('*')\
.join(t2_split_sentiment_prediction,on = ['asin','reviewerID'],how = 'inner')

sentiment_score_df.show()

### Creating Flag column: ###
##### rating < 2,sentiment = 0, "F" #####
#####  rating = 5 , sentiment = 1,"P" #####
##### other cases, NA #####

In [None]:
sentiment_score = sentiment_score_df.select(['asin', 'reviewerID', 'reviewerName', 'reviewText', 'summary','Average_rating_by_user' ,'sentiment', 'avg_sentiment', 'score']).\
                    distinct()


sentiment_score = sentiment_score.withColumn('flag',
                                             when((fn.col('Average_rating_by_user') < 2 ) & (fn.col('score') == 0 ),'F')\
                                             .when((fn.col('Average_rating_by_user') == 5 ) & (fn.col('score') == 1 ),'P').otherwise('NA'))
sentiment_score.show(10)

### TF-IDF ###

In [None]:
from pyspark.ml.feature import IDF
idf = IDF().\
    setInputCol('features').\
    setOutputCol('tfidf')

In [None]:
idf_pipeline = Pipeline(stages=[pipeline_sent,idf]).fit(sentiment_score)
idf_pipeline.transform(sentiment_score).show(20)
tfidf_df = idf_pipeline.transform(sentiment_score)
#tfidf_df = idf_pipeline.transform(sentiment_score_df)

### Train ,Validation, Test split ###

In [None]:
training_df, validation_df, testing_df = sentiment_score.randomSplit([0.6, 0.3, 0.1], seed=0)
[training_df.count(), validation_df.count(), testing_df.count()]

### Logistic Regression ( simple) ###

In [None]:
lr = LogisticRegression().\
    setLabelCol('score').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)

In [None]:
lr_pipeline = Pipeline(stages=[idf_pipeline, lr]).fit(training_df)

In [None]:
lr_pipeline.transform(validation_df).\
    select(fn.expr('float(prediction = score)').alias('correct')).\
    select(fn.avg('correct')).show()

In [None]:
vocabulary = idf_pipeline.stages[0].stages[-1].vocabulary
weights = lr_pipeline.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})


In [None]:
coeffs_df.sort_values('weight').head(5)

In [None]:
coeffs_df.sort_values('weight', ascending=False).head(5)

### Logistic Regression( with Elastic net Regularization) ###

In [None]:
lambda_par = 0.02
alpha_par = 0.3
en_lr = LogisticRegression().\
        setLabelCol('score').\
        setFeaturesCol('tfidf').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)

In [None]:
en_lr_estimator = Pipeline(
    stages=[tokenizer, sw_filter, count_vectorizer_estimator, idf, en_lr])

en_lr_pipeline = en_lr_estimator.fit(training_df)
en_lr_pipeline.transform(validation_df).select(fn.avg(fn.expr('float(prediction = score)'))).show()

In [None]:
en_weights = en_lr_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': en_lr_pipeline.stages[2].vocabulary, 'weight': en_weights})


In [None]:
en_coeffs_df.sort_values('weight').head(15)
en_coeffs_df.sort_values('weight', ascending=False).head(15)

In [None]:
en_coeffs_df.sort_values('weight').head(15)

In [None]:
en_coeffs_df.query('weight == 0.0') # There are words that have 0 weight.

### Logistic Regression with Parameter tuning ###

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
grid = ParamGridBuilder().\
    addGrid(en_lr.regParam, [0., 0.01, 0.02]).\
    addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]).\
    build()

In [None]:
all_models = []
for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = en_lr_estimator.fit(training_df, grid[j])
    all_models.append(model)


In [None]:
# estimate the accuracy of each of the models:
accuracies = [m.\
    transform(validation_df).\
    select(fn.avg(fn.expr('float(score = prediction)')).alias('accuracy')).\
    first().\
    accuracy for m in all_models]

In [None]:
accuracies

In [None]:
best_model_idx = np.argmax(accuracies)
best_model_sent = all_models[best_model_idx]
accuracies[best_model_idx]


In [None]:
print("For Best Model in sentiment: ")
print("Rank:", best_model_sent._java_obj.parent().getRank())
print("RegParam:", best_model_sent._java_obj.parent().getRegParam())

### Testing using best model ###

In [None]:
# estimate performance
best_model_sent.\
    transform(testing_df).\
    select(fn.avg(fn.expr('float(score = prediction)')).alias('accuracy')).\
    show()