# Multilevel Sentiment Analysis using Spark 
# < Amazon consumer review dataset >

- Multilevel Sentiment Analysis from bag-of-words<br>
- Libraries: Spark ml libraries
- Dataset from https://www.kaggle.com/datafiniti/consumer-reviews-of-amazon-products

<font color='navy'><font size=3.5>First, import csv type Amazon consumer review dataset as dataframe

In [128]:
val file = "./data/*.csv"

file: String =./data/*.csv


In [129]:
val df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(file)

df: org.apache.spark.sql.DataFrame = [id: string, dateAdded: timestamp ... 22 more fields]


In [130]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- dateAdded: timestamp (nullable = true)
 |-- dateUpdated: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- primaryCategories: string (nullable = true)
 |-- imageURLs: string (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- reviews.date: string (nullable = true)
 |-- reviews.dateSeen: string (nullable = true)
 |-- reviews.didPurchase: string (nullable = true)
 |-- reviews.doRecommend: boolean (nullable = true)
 |-- reviews.id: string (nullable = true)
 |-- reviews.numHelpful: integer (nullable = true)
 |-- reviews.rating: integer (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.username: st

<font color='navy'><font size=3.5> Among several data columns, I am going to use <br>
<font color='navy'><font size=3.5> - review.rating: Label <br>
<font color='navy'><font size=3.5> - review.text : Feature <br>

In [214]:
var data = df.select(col("`reviews.text`").as("text"),col("`reviews.rating`").as("label"))

data: org.apache.spark.sql.DataFrame = [text: string, label: int]


In [215]:
data.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|I order 3 of them...|    3|
|Bulk is always th...|    4|
|Well they are not...|    5|
|Seem to work as w...|    5|
|These batteries a...|    5|
+--------------------+-----+
only showing top 5 rows



In [216]:
data.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



<font color='navy'><font size=3.5> Counts of each rating

In [217]:
data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|   44|    1|
|    1|  965|
|   16|    1|
|    3| 1205|
|    5|19831|
|    4| 5621|
|    2|  617|
|    0|   91|
+-----+-----+



> --> <font color='navy'><font size=3.5> Rating is assumed to be from 0 to 5, therefore, the instance of label 44 and 16 are removed. 

In [218]:
// size of dataframe
data.cache().count()

res79: Long = 28332


In [220]:
data = data.filter(data("label") <= 5)

data: org.apache.spark.sql.DataFrame = [text: string, label: int]


In [222]:
// size of dataframe after filter two rows
data.cache().count()

res81: Long = 28330


<font color='navy'><font size=3.5> Extracting and transforming features for text classification

<font color='navy'><font size=3.5>   1) Tokenizer 

In [134]:
import org.apache.spark.ml.feature.RegexTokenizer
val regexTokenizer = new RegexTokenizer()
      .setInputCol("text") 
      .setOutputCol("words")
val words = regexTokenizer.transform(data)
words.show(10)

+--------------------+-----+--------------------+
|                text|label|               words|
+--------------------+-----+--------------------+
|I order 3 of them...|    3|[i, order, 3, of,...|
|Bulk is always th...|    4|[bulk, is, always...|
|Well they are not...|    5|[well, they, are,...|
|Seem to work as w...|    5|[seem, to, work, ...|
|These batteries a...|    5|[these, batteries...|
|Bought a lot of b...|    5|[bought, a, lot, ...|
|ive not had any p...|    5|[ive, not, had, a...|
|Well if you are l...|    5|[well, if, you, a...|
|These do not hold...|    3|[these, do, not, ...|
|AmazonBasics AA A...|    4|[amazonbasics, aa...|
+--------------------+-----+--------------------+
only showing top 10 rows



import org.apache.spark.ml.feature.RegexTokenizer
regexTokenizer: org.apache.spark.ml.feature.RegexTokenizer = RegexTokenizer: uid=regexTok_85fe7adf7907, minTokenLength=1, gaps=true, pattern=\s+, toLowercase=true
words: org.apache.spark.sql.DataFrame = [text: string, label: int ... 1 more field]


<font color='navy'><font size=3.5>  2) Stopwords remover

In [135]:
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
      .setInputCol("words")
      .setOutputCol("filtered")
val words_filtered = remover.transform(words)
words_filtered.show(10)

+--------------------+-----+--------------------+--------------------+
|                text|label|               words|            filtered|
+--------------------+-----+--------------------+--------------------+
|I order 3 of them...|    3|[i, order, 3, of,...|[order, 3, one, i...|
|Bulk is always th...|    4|[bulk, is, always...|[bulk, always, le...|
|Well they are not...|    5|[well, they, are,...|[well, duracell, ...|
|Seem to work as w...|    5|[seem, to, work, ...|[seem, work, well...|
|These batteries a...|    5|[these, batteries...|[batteries, long,...|
|Bought a lot of b...|    5|[bought, a, lot, ...|[bought, lot, bat...|
|ive not had any p...|    5|[ive, not, had, a...|[ive, problame, b...|
|Well if you are l...|    5|[well, if, you, a...|[well, looking, c...|
|These do not hold...|    3|[these, do, not, ...|[hold, amount, hi...|
|AmazonBasics AA A...|    4|[amazonbasics, aa...|[amazonbasics, aa...|
+--------------------+-----+--------------------+--------------------+
only s

import org.apache.spark.ml.feature.StopWordsRemover
remover: org.apache.spark.ml.feature.StopWordsRemover = StopWordsRemover: uid=stopWords_d593ab0e5208, numStopWords=181, locale=en_US, caseSensitive=false
words_filtered: org.apache.spark.sql.DataFrame = [text: string, label: int ... 2 more fields]


<font color='navy'><font size=3.5>  3) CountVectorizer

In [136]:
// fit a CountVectorizerModel from the corpus
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("filtered")
  .setOutputCol("features")
  .setVocabSize(5000)
  .fit(words_filtered)
val features = cvModel.transform(words_filtered)
features.show(10)

+--------------------+-----+--------------------+--------------------+--------------------+
|                text|label|               words|            filtered|            features|
+--------------------+-----+--------------------+--------------------+--------------------+
|I order 3 of them...|    3|[i, order, 3, of,...|[order, 3, one, i...|(5000,[12,22,83,9...|
|Bulk is always th...|    4|[bulk, is, always...|[bulk, always, le...|(5000,[11,66,94,1...|
|Well they are not...|    5|[well, they, are,...|[well, duracell, ...|(5000,[9,25,114,1...|
|Seem to work as w...|    5|[seem, to, work, ...|[seem, work, well...|(5000,[1,9,17,25,...|
|These batteries a...|    5|[these, batteries...|[batteries, long,...|(5000,[1,9,16,74,...|
|Bought a lot of b...|    5|[bought, a, lot, ...|[bought, lot, bat...|(5000,[1,5,7,18,4...|
|ive not had any p...|    5|[ive, not, had, a...|[ive, problame, b...|(5000,[1,354,768,...|
|Well if you are l...|    5|[well, if, you, a...|[well, looking, c...|(5000,[1,1

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
cvModel: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_e7c62d8fe82e, vocabularySize=5000
features: org.apache.spark.sql.DataFrame = [text: string, label: int ... 3 more fields]


<font color='navy'><font size=3.5>  Splitting data into train and test dataset

In [137]:
val Array(train, test) = data.randomSplit(Array(0.8, 0.2))

train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string, label: int]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string, label: int]


<font color='navy'><font size=3.5>  Making above trasforming processes into pipeline

In [138]:
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.Pipeline

val regexTokenizer = new RegexTokenizer()
      .setInputCol("text") 
      .setOutputCol("words")


val remover = new StopWordsRemover()
      .setInputCol("words")
      .setOutputCol("filtered")
                              

val pipeline = new Pipeline().setStages(Array(regexTokenizer, remover))


import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.Pipeline
regexTokenizer: org.apache.spark.ml.feature.RegexTokenizer = RegexTokenizer: uid=regexTok_5095a85a2076, minTokenLength=1, gaps=true, pattern=\s+, toLowercase=true
remover: org.apache.spark.ml.feature.StopWordsRemover = StopWordsRemover: uid=stopWords_216d0252aea2, numStopWords=181, locale=en_US, caseSensitive=false
pipeline: org.apache.spark.ml.Pipeline = pipeline_5b8921938305


In [151]:
val train_pipeline_model = pipeline.fit(train)        
val test_pipeline_model = pipeline.fit(test) 

train_pipeline_model: org.apache.spark.ml.PipelineModel = pipeline_5b8921938305
test_pipeline_model: org.apache.spark.ml.PipelineModel = pipeline_5b8921938305


In [153]:
val train_filtered = train_pipeline_model.transform(train)        
val test_filtered = test_pipeline_model.transform(test) 

train_filtered: org.apache.spark.sql.DataFrame = [text: string, label: int ... 2 more fields]
test_filtered: org.apache.spark.sql.DataFrame = [text: string, label: int ... 2 more fields]


In [166]:
import org.apache.spark.ml.feature.CountVectorizer
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("filtered")
  .setOutputCol("features")
  .setVocabSize(10000)
  .fit(train_filtered)

import org.apache.spark.ml.feature.CountVectorizer
cvModel: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_a72fd53ee83d, vocabularySize=10000


In [167]:
val features_train = cvModel.transform(train_filtered)
val features_test = cvModel.transform(test_filtered)

features_train: org.apache.spark.sql.DataFrame = [text: string, label: int ... 3 more fields]
features_test: org.apache.spark.sql.DataFrame = [text: string, label: int ... 3 more fields]


<font color='navy'><font size=3.5>  Apply text classification moel: logistic regression

In [168]:
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val model = lr.fit(features_train)

import org.apache.spark.ml.classification.LogisticRegression
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_0ad8c9e8e1a8
model: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_0ad8c9e8e1a8, numClasses=45, numFeatures=10000


In [169]:
val pred = model.transform(features_test)

pred: org.apache.spark.sql.DataFrame = [text: string, label: int ... 6 more fields]


In [176]:
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")
  .setPredictionCol("prediction")
  .setLabelCol("label")

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = MulticlassClassificationEvaluator: uid=mcEval_0d66e226ed9c, metricName=accuracy, metricLabel=0.0, beta=1.0, eps=1.0E-15


In [177]:
evaluator.evaluate(pred)

res54: Double = 0.790589912671538
