In [2]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [1]:


from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Demo") \
    .config("spark.driver.memory","4G")\
    .config("spark.driver.maxResultSize", "3G") \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.0.3")\
    .config("spark.kryoserializer.buffer.max", "500m")\
    .getOrCreate()

In [2]:
import sparknlp
sparknlp.start()

# from sparknlp.pretrained import PretrainedPipeline
# pipeline = PretrainedPipeline('explain_document_ml', 'en')
# result = pipeline.annotate('Harry Potter are a greatest movies. I am the best')
# result
# pipeline = PretrainedPipeline("movies_sentiment_analysis")
# testDocs = [
#     "I felt so disapointed to see this very uninspired film. I recommend others to awoid this movie is not good.",
#     "This was movie was amesome, everything was nice."]
# result = pipeline.annotate(testDocs)

pipeline_d28333e025c9


LEMMATIZER_c62ad8f355f9

In [3]:
# Remarks / TO DO
# 1. unbalance class distribution ==> try up / down-sampling (but I heard from another student it did not improve)
# try SMOTE
# 2. add to pre-processing stemming
# 3. also try Naive Bayes and SVM (on top of log regr). See https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35
# see also: http://classes.ischool.syr.edu/ist718/content/unit09/lab-sentiment_analysis/
# 4. Try ensemlble of different methods
# 5. Try external library for sentiment analysis (sentimnetvader / but I heard from another student it did not help)
# 6. VERY IMPORTANT: I think we should rather reduce the number of categories from 5 to let's say 3
# the 3 categories would be bad (0 and 1 star), middle (3 star), good (4 and 5 stars)
# this would allow to have more training instance per categories and anyway how can even a human differentiate a 1 from a 2 stars or a 4 from a 5 stars

In [3]:
# start with easy implemetation: only consider the content of the 2 fields review_title and review_text
# concantenate them in one new field "review_concat"from pyspark.sql import SQLContext
from pyspark.sql import functions as fn
filepath = 'data_processed/ExctractedData.json'
# load JSON file
s_df = spark.read.json(filepath)
# concatenate review text and title in one field
s_df = s_df.withColumn('review_concat',fn.concat(fn.col('review_title'),fn.lit(' '), fn.col('review_text')))
s_df=s_df.dropDuplicates(['review_id'])
s_df.printSchema()

root
 |-- book_id: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_user: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- review_concat: string (nullable = true)



In [4]:
# review_score is of type String ==> cast it from String to Integer
from pyspark.sql.types import IntegerType
s_df = s_df.withColumn("review_score", s_df["review_score"].cast(IntegerType()))
s_df.printSchema()

root
 |-- book_id: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_user: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- review_concat: string (nullable = true)



In [5]:
# show score distribution, we see classes are highly unbalanced
from pyspark.sql.functions import col
s_df.groupBy("review_score") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+------------+-----+
|review_score|count|
+------------+-----+
|           5| 9383|
|           4| 1529|
|           3|  346|
|           2|  170|
|           1|  145|
+------------+-----+



In [20]:
# check if duplicate review (normally not the case as the python script that filters the JSON took care of that)
s_df.dropDuplicates(['review_id'])
s_df.groupBy("review_score") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+------------+-----+
|review_score|count|
+------------+-----+
|           5| 9383|
|           4| 1529|
|           3|  346|
|           2|  170|
|           1|  145|
+------------+-----+



In [12]:
# show first few reviews
s_df.head(2)

[Row(book_id='62678426', book_title='The Woman in the Window: A Novel', review_id='RZFSIAUSHZH43', review_score=5, review_text="Someone recommended this book but wouldn't say much about it. Lots of twists and turns, trying to figure out what was reality and what wasn't.Don't start this if you have to be somewhere else. It's hard to put down until you know how it ends.", review_title='Twists and turns', review_user='LindaG', timestamp=1556699124, review_concat="Twists and turns Someone recommended this book but wouldn't say much about it. Lots of twists and turns, trying to figure out what was reality and what wasn't.Don't start this if you have to be somewhere else. It's hard to put down until you know how it ends."),
 Row(book_id='1455536156', book_title='Scraps, Wilt & Weeds: Turning Wasted Food into Plenty', review_id='RSSSXHBF4BJJY', review_score=4, review_text='A well written and inspiring cook book! As a chef my self i was impressed with some of the applications for food waste!',

In [13]:
# look at first 5 star review
s_df.where(fn.col('review_score') == 5).first()

Row(book_id='62678426', book_title='The Woman in the Window: A Novel', review_id='RZFSIAUSHZH43', review_score=5, review_text="Someone recommended this book but wouldn't say much about it. Lots of twists and turns, trying to figure out what was reality and what wasn't.Don't start this if you have to be somewhere else. It's hard to put down until you know how it ends.", review_title='Twists and turns', review_user='LindaG', timestamp=1556699124, review_concat="Twists and turns Someone recommended this book but wouldn't say much about it. Lots of twists and turns, trying to figure out what was reality and what wasn't.Don't start this if you have to be somewhere else. It's hard to put down until you know how it ends.")

In [14]:
# look at 1 very bad review
s_df.where(fn.col('review_score') == 1).first()

Row(book_id='0143110438', book_title='A Gentleman in Moscow: A Novel', review_id='R3QEDU4XU80W5U', review_score=1, review_text="I followed the high Amazon reviews for this book and cannot believe how this book is popular.  Yes, the author is talented and his writing is very sophisticated.....but this book has NO plot, the characters are boring and the story could be told in about 10 pages.  Literally, nothing interesting happens in this book....instead, it's 460 pages of non-nonsensical rambling.", review_title='BORING RAMBLING', review_user='Amazon Customer', timestamp=1554764357, review_concat="BORING RAMBLING I followed the high Amazon reviews for this book and cannot believe how this book is popular.  Yes, the author is talented and his writing is very sophisticated.....but this book has NO plot, the characters are boring and the story could be told in about 10 pages.  Literally, nothing interesting happens in this book....instead, it's 460 pages of non-nonsensical rambling.")

In [18]:
# Show ony review_concat field
s_df.select('review_concat').where(fn.col('review_score') == 1).first()

Row(review_concat="BORING RAMBLING I followed the high Amazon reviews for this book and cannot believe how this book is popular.  Yes, the author is talented and his writing is very sophisticated.....but this book has NO plot, the characters are boring and the story could be told in about 10 pages.  Literally, nothing interesting happens in this book....instead, it's 460 pages of non-nonsensical rambling.")

In [6]:
# import stop words to filter them out from the reviews
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
stop_words[0:10]

['a',
 'about',
 'above',
 'across',
 'after',
 'afterwards',
 'again',
 'against',
 'all',
 'almost']

In [22]:
s_df.printSchema()

root
 |-- book_id: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_user: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- review_concat: string (nullable = true)



In [24]:
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import lower, col
from pyspark.ml.feature import StringIndexer

pipeline = PretrainedPipeline('explain_document_ml', 'en')
s_df2 = pipeline.annotate(s_df,"review_text")
s_df2=s_df2.drop(*["document","sentence","token","spell","stems","pos","text"])

def mkString(line):    
    return " ".join([str(x[3]) for x in line])
string_udf= udf(lambda z: mkString(z), StringType())
s_df2=s_df2.withColumn("lemmatizedText",string_udf("lemmas"))
s_df2=s_df2.withColumn("lemmatizedText", lower(col("lemmatizedText")))
pipeline2=PretrainedPipeline("analyze_sentiment_ml")
s_df2=pipeline2.annotate(s_df2,"review_title")
s_df2=s_df2.drop(*["document","sentence","token","spell","stems","pos","text"])
s_df2=s_df2.withColumn("sentiment", s_df2["sentiment"].getItem(0).result)
indexer = StringIndexer(inputCol="sentiment", outputCol="sentimentIndex")
s_df2 = indexer.fit(s_df2).transform(s_df2)
s_df2.printSchema()
s_df2.select(["lemmatizedText","sentiment","sentimentIndex"]).show(truncate=False)


root
 |-- book_id: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_user: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- review_concat: string (nullable = true)
 |-- lemmas: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |    |    |-- sentence_embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- lemmatizedText: string (nullable = true)
 |--

In [7]:
# define processing 4 steps and execute them with a trsnformation pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline

# 1. Tokenizer, .setPattern("\\p{L}+") means that it remove accent from words (check it has no impact on the smileys !!!)
tokenizer = RegexTokenizer().setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("lemmatizedText")\
  .setOutputCol("words")

# 2. filter out stop words
sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")

# 3. TF: TF vectorization + remove words that appear in 20 docs or less
cv = CountVectorizer(minTF=1., minDF=20., vocabSize=2**17)\
  .setInputCol("filtered")\
  .setOutputCol("tf")
# 4. TF-IDF transform
idf = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')
# Create a pipelined transformer
tfidf_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv, idf]).fit(s_df2)


# Execute transform
#tfidf_pipeline.transform(s_df2)
#s_df.select('tf').where(fn.col('review_score') == 1).first()
#s_df2.printSchema()



In [9]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['tfidf','sentimentIndex'],outputCol="features")
#features = assembler.transform(s_df2)


In [26]:
# split in train, valid and test sets
#training_df, validation_df, testing_df = s_df.randomSplit([0.7, 0.2, 0.1], seed=42)
#[training_df.count(), validation_df.count(), testing_df.count()]

# start w/o validation set
training_df, testing_df = s_df2.randomSplit([0.8, 0.2], seed=42)
[training_df.count(), testing_df.count()]

[9191, 2382]

In [27]:
# perform simple logistic regression 
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression().\
    setLabelCol('review_score').\
    setFeaturesCol('features').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.5)
# new pipeline to chain idf_pipeline with logistic regression
lr_pipeline = Pipeline(stages=[tfidf_pipeline,assembler, lr]).fit(training_df)
# fitting + accuracy estimation
predictions = lr_pipeline.transform(testing_df)
predictions.select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()

+------------------+
|      avg(correct)|
+------------------+
|0.8064651553316541|
+------------------+



In [28]:
# score above seems OK but now let's check the accuracy per class. we see it is not good for all but 5
print('Score = 1')
predictions.filter(predictions['review_score'] == 1).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 2')
predictions.filter(predictions['review_score'] == 2).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 3')
predictions.filter(predictions['review_score'] == 3).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 4')
predictions.filter(predictions['review_score'] == 4).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 5')
predictions.filter(predictions['review_score'] == 5).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()

Score = 1
+------------------+
|      avg(correct)|
+------------------+
|0.2413793103448276|
+------------------+

Score = 2
+------------------+
|      avg(correct)|
+------------------+
|0.1794871794871795|
+------------------+

Score = 3
+-------------------+
|       avg(correct)|
+-------------------+
|0.35135135135135137|
+-------------------+

Score = 4
+-----------------+
|     avg(correct)|
+-----------------+
|0.573134328358209|
+-----------------+

Score = 5
+------------------+
|      avg(correct)|
+------------------+
|0.8866141732283465|
+------------------+



In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="review_score", outputCol="indexedLabel").fit(s_df2)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
assembler = VectorAssembler(inputCols=['tfidf','sentimentIndex'],outputCol="indexedFeatures")


# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = s_df2.randomSplit([0.7, 0.3])

# Train a GBT model.
rf = RandomForestClassifier(labelCol="review_score", featuresCol="indexedFeatures", numTrees=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[tfidf_pipeline, assembler, rf])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "review_score", "indexedFeatures").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="review_score", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

AnalysisException: "cannot resolve '`indexedLabel`' given input columns: [probability, prediction, review_score, tf, tfidf, lemmas, lemmatizedText, words, filtered, book_title, rawPrediction, sentimentIndex, review_concat, review_user, indexedFeatures, sentiment, timestamp, book_id, review_id];;\n'Project [prediction#1006, 'indexedLabel, 'features]\n+- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, filtered#840, tf#855, tfidf#870, indexedFeatures#952, rawPrediction#969, probability#987, UDF(rawPrediction#969) AS prediction#1006]\n   +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, filtered#840, tf#855, tfidf#870, indexedFeatures#952, rawPrediction#969, UDF(rawPrediction#969) AS probability#987]\n      +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, filtered#840, tf#855, tfidf#870, indexedFeatures#952, UDF(indexedFeatures#952) AS rawPrediction#969]\n         +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, filtered#840, tf#855, tfidf#870, UDF(named_struct(tfidf, tfidf#870, sentimentIndex, sentimentIndex#342)) AS indexedFeatures#952]\n            +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, filtered#840, tf#855, UDF(tf#855) AS tfidf#870]\n               +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, filtered#840, UDF(filtered#840) AS tf#855]\n                  +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, words#827, UDF(words#827) AS filtered#840]\n                     +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, sentimentIndex#342, UDF(lemmatizedText#187) AS words#827]\n                        +- Sample 0.7, 1.0, false, 5479377581549130571\n                           +- Sort [book_id#6 ASC NULLS FIRST, book_title#7 ASC NULLS FIRST, review_id#8 ASC NULLS FIRST, review_score#32 ASC NULLS FIRST, review_user#12 ASC NULLS FIRST, timestamp#13L ASC NULLS FIRST, review_concat#22 ASC NULLS FIRST, lemmatizedText#187 ASC NULLS FIRST, sentiment#304 ASC NULLS FIRST, sentimentIndex#342 ASC NULLS FIRST], false\n                              +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#304, UDF(cast(sentiment#304 as string)) AS sentimentIndex#342]\n                                 +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#278[0].result AS sentiment#304]\n                                    +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, sentiment#278]\n                                       +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#198, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, document#220, sentence#233, token#247, spell#262, UDF(array(sentence#233, spell#262)) AS sentiment#278]\n                                          +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#198, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, document#220, sentence#233, token#247, UDF(array(token#247)) AS spell#262]\n                                             +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#198, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, document#220, sentence#233, UDF(array(sentence#233)) AS token#247]\n                                                +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#198, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, document#220, UDF(array(document#220)) AS sentence#233]\n                                                   +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#198, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187, UDF(text#198) AS document#220]\n                                                      +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_title#11 AS text#198, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lemmatizedText#187]\n                                                         +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_title#11, review_user#12, timestamp#13L, review_concat#22, lemmas#116, lower(lemmatizedText#176) AS lemmatizedText#187]\n                                                            +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_title#11, review_user#12, timestamp#13L, review_concat#22, lemmas#116, <lambda>(lemmas#116) AS lemmatizedText#176]\n                                                               +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_title#11, review_user#12, timestamp#13L, review_concat#22, lemmas#116]\n                                                                  +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, document#62, sentence#74, token#87, spell#101, lemmas#116, stems#132, UDF(array(spell#101, sentence#74)) AS pos#149]\n                                                                     +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, document#62, sentence#74, token#87, spell#101, lemmas#116, UDF(array(spell#101)) AS stems#132]\n                                                                        +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, document#62, sentence#74, token#87, spell#101, UDF(array(spell#101)) AS lemmas#116]\n                                                                           +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, document#62, sentence#74, token#87, UDF(array(token#87)) AS spell#101]\n                                                                              +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, document#62, sentence#74, UDF(array(sentence#74)) AS token#87]\n                                                                                 +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, document#62, UDF(array(document#62)) AS sentence#74]\n                                                                                    +- Project [book_id#6, book_title#7, review_id#8, review_score#32, text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22, UDF(text#42) AS document#62]\n                                                                                       +- Project [book_id#6, book_title#7, review_id#8, review_score#32, review_text#10 AS text#42, review_title#11, review_user#12, timestamp#13L, review_concat#22]\n                                                                                          +- Project [book_id#6, book_title#7, review_id#8, cast(review_score#9 as int) AS review_score#32, review_text#10, review_title#11, review_user#12, timestamp#13L, review_concat#22]\n                                                                                             +- Deduplicate [review_id#8]\n                                                                                                +- Project [book_id#6, book_title#7, review_id#8, review_score#9, review_text#10, review_title#11, review_user#12, timestamp#13L, concat(review_title#11,  , review_text#10) AS review_concat#22]\n                                                                                                   +- Relation[book_id#6,book_title#7,review_id#8,review_score#9,review_text#10,review_title#11,review_user#12,timestamp#13L] json\n"

In [14]:
# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "review_score", "indexedFeatures").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="review_score", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

+----------+------------+--------------------+
|prediction|review_score|     indexedFeatures|
+----------+------------+--------------------+
|       5.0|           5|(2438,[19,22,301,...|
|       5.0|           5|(2438,[23,2437],[...|
|       5.0|           4|(2438,[6,7,9,55,5...|
|       5.0|           5|(2438,[10,13,201,...|
|       5.0|           5|(2438,[0,2,8,10,2...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.194484
RandomForestClassificationModel (uid=RandomForestClassifier_a5bfc1b53c59) with 10 trees


In [23]:
predictions.select("indexedFeatures").show(5,truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|indexedFeatures                                                                                                                                                                                                                                                                                               |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(2438,[19,22,301,1319,1455,2437],[2.6314828399669414,2.439801461780175,4.09382629322

In [22]:
s_df2.select("sentimentIndex").show(5,truncate=False)

+--------------+
|sentimentIndex|
+--------------+
|1.0           |
|1.0           |
|1.0           |
|0.0           |
|2.0           |
+--------------+
only showing top 5 rows



In [14]:
# show some predictions
predictions.select("review_id","review_concat","review_score","prediction") \
    .show(n = 100, truncate = 70)

+--------------+----------------------------------------------------------------------+------------+----------+
|     review_id|                                                         review_concat|review_score|prediction|
+--------------+----------------------------------------------------------------------+------------+----------+
|R1CEBJPZE4H5YE|The Next Worst Thing to Being There When you move through this book...|           5|       5.0|
|R1CNZRHE674DEW|The Next Worst Thing to Being There When you move through this book...|           5|       5.0|
|R1CNZRHE674DEW|The Next Worst Thing to Being There When you move through this book...|           5|       5.0|
|R2FO32ILFIQR3V|                                   Great true crime novel. Great book.|           5|       5.0|
|R39UXH89UAMX37|                                   Great true crime novel. Great book.|           5|       5.0|
|R2A5P47JL8BTXD|Another wonderful story Thanks to the author for giving us yet anot...|           5|    

In [27]:
# show some predictions for which the ground truth was score = 1
predictions.filter(predictions['review_score'] == 1).\
    select("review_id","review_concat","review_score","prediction"). \
    show(n = 100, truncate = 70)

+--------------+----------------------------------------------------------------------+------------+----------+
|     review_id|                                                         review_concat|review_score|prediction|
+--------------+----------------------------------------------------------------------+------------+----------+
|R18E85EKCWU53F|A complete waste of my time Soooo tedious. The drinking, the pills,...|           1|       1.0|
|R1IHWAJIEV7VMM|Don’t bother.  Overhyped! I never write book reviews but I am moved...|           1|       2.0|
|R1IHWAJIEV7VMM|Don’t bother.  Overhyped! I never write book reviews but I am moved...|           1|       2.0|
|R1U03462BUZ8S5|Derivative, poorly written, and WAY overhyped... I couldn't even br...|           1|       2.0|
|R2FG2SAHEV8AD9|Be careful when you order! What I received was in German. Figuring ...|           1|       1.0|
|R2FNSA44HYXZTB|Deadly dull I bought this book because Amy Adams will star in the f...|           1|    

In [28]:
training_df.head(5)

 Row(book_id='006227564X', book_title='Fall and Rise: The Story of 9/11', review_id='R2YRFXZUJL3MVK', review_score=4, review_text="Received quickly but some sort of glue on cover. That's why I gave it 4 stars. It was a gift.", review_title='Received quickly.', review_user='Andrea L. Grlica', timestamp=1557296669, review_concat="Received quickly. Received quickly but some sort of glue on cover. That's why I gave it 4 stars. It was a gift."),

In [29]:
s_df.head(5)

[Row(book_id='62678426', book_title='The Woman in the Window: A Novel', review_id='RZFSIAUSHZH43', review_score=5, review_text="Someone recommended this book but wouldn't say much about it. Lots of twists and turns, trying to figure out what was reality and what wasn't.Don't start this if you have to be somewhere else. It's hard to put down until you know how it ends.", review_title='Twists and turns', review_user='LindaG', timestamp=1556699124, review_concat="Twists and turns Someone recommended this book but wouldn't say much about it. Lots of twists and turns, trying to figure out what was reality and what wasn't.Don't start this if you have to be somewhere else. It's hard to put down until you know how it ends."),
 Row(book_id='1455536156', book_title='Scraps, Wilt & Weeds: Turning Wasted Food into Plenty', review_id='RSSSXHBF4BJJY', review_score=4, review_text='A well written and inspiring cook book! As a chef my self i was impressed with some of the applications for food waste!',

In [30]:
# Now make a new stratified split to make sure we have enough representative examples in the train set
training2_df = s_df.sampleBy("review_score", fractions={1: 0.8, 2: 0.8, 3: 0.8, 4: 0.8, 5: 0.8}, seed=42)
test2_df = s_df.subtract(training2_df)
print('data set split')
[training2_df.count(), test2_df.count()]
# training set
print('training set')
training2_df.groupBy("review_score") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()
# test set
print('test set')
test2_df.groupBy("review_score") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

data set split
training set
+------------+-----+
|review_score|count|
+------------+-----+
|           5| 9718|
|           4| 1549|
|           3|  369|
|           2|  187|
|           1|  147|
+------------+-----+

test set
+------------+-----+
|review_score|count|
+------------+-----+
|           5| 1489|
|           4|  261|
|           3|   59|
|           2|   28|
|           1|   21|
+------------+-----+



In [31]:
# new prevision
lr = LogisticRegression().\
    setLabelCol('review_score').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)
# new pipeline to chain idf_pipeline with logistic regression
lr_pipeline = Pipeline(stages=[tfidf_pipeline, lr]).fit(training2_df)
# fitting + accuracy estimation
predictions = lr_pipeline.transform(test2_df)
predictions.select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()

+------------------+
|      avg(correct)|
+------------------+
|0.8277717976318623|
+------------------+



In [32]:
# score above seems OK but now let's check the accuracy per class. we see it is not good for all but 5
print('Score = 1')
predictions.filter(predictions['review_score'] == 1).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 2')
predictions.filter(predictions['review_score'] == 2).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 3')
predictions.filter(predictions['review_score'] == 3).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 4')
predictions.filter(predictions['review_score'] == 4).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()
print('Score = 5')
predictions.filter(predictions['review_score'] == 5).\
    select(fn.expr('float(prediction = review_score)').alias('correct')).\
    select(fn.avg('correct')).show()

Score = 1
+-------------------+
|       avg(correct)|
+-------------------+
|0.14285714285714285|
+-------------------+

Score = 2
+------------------+
|      avg(correct)|
+------------------+
|0.2857142857142857|
+------------------+

Score = 3
+------------------+
|      avg(correct)|
+------------------+
|0.3220338983050847|
+------------------+

Score = 4
+------------------+
|      avg(correct)|
+------------------+
|0.5440613026819924|
+------------------+

Score = 5
+------------------+
|      avg(correct)|
+------------------+
|0.9173942243116185|
+------------------+



In [16]:
testing_df.show()

+----------+--------------------+--------------+------------+--------------+----------+--------------------+--------------------+--------------------+---------+--------------+
|   book_id|          book_title|     review_id|review_score|   review_user| timestamp|       review_concat|              lemmas|      lemmatizedText|sentiment|sentimentIndex|
+----------+--------------------+--------------+------------+--------------+----------+--------------------+--------------------+--------------------+---------+--------------+
|006227564X|Fall and Rise: Th...|R1CEBJPZE4H5YE|           5|  Alan Beumann|1556909515|The Next Worst Th...|[[token, 0, 3, Wh...|when you move thr...| negative|           1.0|
|006227564X|Fall and Rise: Th...|R1CNZRHE674DEW|           5|  Alan Beumann|1556910535|The Next Worst Th...|[[token, 0, 3, Wh...|when you move thr...| negative|           1.0|
|006227564X|Fall and Rise: Th...|R1CNZRHE674DEW|           5|  Alan Beumann|1556910535|The Next Worst Th...|[[token, 0, 

In [None]:
training_df.show()