<h1 align="center">MDSC-302(P) Mini-Project</h1>

## Problem Statement :Sentiment analysis of Amazon kindle store reviews.

- This project is to perform a supervised sentiment analysis on the review data that from Amazon kindle store. 
- The data consists of 982,619 reviews spanning May 1996 to July 2014.
- Reviews with overall rating of 1, 2, or 3 are labelled as negative ("neg"/1), and reviews with overall rating of 4 or 5 are labelled as positive ("pos"/0).
- Dataset is available here: http://jmcauley.ucsd.edu/data/amazon/

## Import Libraries

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, LogisticRegression, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import html
import string
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /home/huser/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [2]:
spark = spark = SparkSession.builder \
    .master("local") \
    .appName("Sentimental analysis") \
    .getOrCreate()

## Import dataset

In [3]:
df = spark.read.json('Kindle_Store_5.json')

                                                                                

In [4]:
df.show(3)

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

+----------+-------+-------+--------------------+----------+--------------+------------+------------------+--------------+
|      asin|helpful|overall|          reviewText|reviewTime|    reviewerID|reviewerName|           summary|unixReviewTime|
+----------+-------+-------+--------------------+----------+--------------+------------+------------------+--------------+
|B000F83SZQ| [0, 0]|    5.0|I enjoy vintage b...|05 5, 2014|A1F6404F1VG29J|  Avidreader|Nice vintage story|    1399248000|
|B000F83SZQ| [2, 2]|    4.0|This book is a re...|01 6, 2014| AN0N05A9LIJEQ|    critters|      Different...|    1388966400|
|B000F83SZQ| [2, 2]|    4.0|This was a fairly...|04 4, 2014| A795DMNCJILA6|         dot|             Oldie|    1396569600|
+----------+-------+-------+--------------------+----------+--------------+------------+------------------+--------------+
only showing top 3 rows



## Creating Positive(0) and Negative(1) responses

In [5]:
df.createOrReplaceTempView('kindle_json_view')

data_json = spark.sql('''
  SELECT CASE WHEN overall<4 THEN 1
          ELSE 0
          
          END as label,
        reviewText as text
  FROM kindle_json_view
  WHERE length(reviewText)>2''')

data_json.groupBy('label').count().show()



+-----+------+
|label| count|
+-----+------+
|    1|153331|
|    0|829250|
+-----+------+





## Sampling the data

In [6]:
pos = data_json.where('label=0').sample(False, 0.05, seed=1220)
neg = data_json.where('label=1').sample(False, 0.25, seed=1220)
data = pos.union(neg)
data.show(5)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|I am not for sure...|
|    0|This is yet anoth...|
|    0|I almost didn't g...|
|    0|L'Among classic. ...|
|    0|The reporting is ...|
+-----+--------------------+
only showing top 5 rows



In [7]:
data.groupBy('label').count().show()

                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    1|38219|
|    0|41634|
+-----+-----+



## Function to pre-process

In [8]:
# Define preprocessing function
def preprocessing(text):
   
    line = html.unescape(text)
    line = line.replace("can't", 'can not')
    line = line.replace("n't", " not")
    
    pad_punct = str.maketrans({key: " {0} ".format(key) for key in string.punctuation}) 
    line = line.translate(pad_punct)
    line = line.lower()
    line = line.split() 
    lemmatizer = nltk.WordNetLemmatizer()
    line = [lemmatizer.lemmatize(t) for t in line] 
    
    # Negation handling
    tokens = []
    negated = False
    for t in line:
        if t in ['not', 'no']:
            negated = not negated
        elif t in string.punctuation or not t.isalpha():
            negated = False
        else:
            tokens.append('not_' + t if negated else t)
    
    invalidChars = str(string.punctuation.replace("_", ""))  
    bi_tokens = list(nltk.bigrams(line))
    bi_tokens = list(map('_'.join, bi_tokens))
    bi_tokens = [i for i in bi_tokens if all(j not in invalidChars for j in i)]
    tri_tokens = list(nltk.trigrams(line))
    tri_tokens = list(map('_'.join, tri_tokens))
    tri_tokens = [i for i in tri_tokens if all(j not in invalidChars for j in i)]
    tokens = tokens + bi_tokens + tri_tokens      
    
    return tokens

In [9]:
review = preprocessing("I think this book is not good. It is full of typos and factual errors that I can't ignore.")
print(review)

['i', 'think', 'this', 'book', 'is', 'not_good', 'it', 'is', 'full', 'of', 'typo', 'and', 'factual', 'error', 'that', 'i', 'can', 'not_ignore', 'i_think', 'think_this', 'this_book', 'book_is', 'is_not', 'not_good', 'it_is', 'is_full', 'full_of', 'of_typo', 'typo_and', 'and_factual', 'factual_error', 'error_that', 'that_i', 'i_can', 'can_not', 'not_ignore', 'i_think_this', 'think_this_book', 'this_book_is', 'book_is_not', 'is_not_good', 'it_is_full', 'is_full_of', 'full_of_typo', 'of_typo_and', 'typo_and_factual', 'and_factual_error', 'factual_error_that', 'error_that_i', 'that_i_can', 'i_can_not', 'can_not_ignore']


## Pre-processing data

In [10]:
preprocessing_udf = F.udf(preprocessing, ArrayType(StringType()))
data_tokens = data.withColumn('tokens', preprocessing_udf(F.col('text')))
data_tokens.show(3)

[Stage 23:>                                                         (0 + 1) / 1]

+-----+--------------------+--------------------+
|label|                text|              tokens|
+-----+--------------------+--------------------+
|    0|I am not for sure...|[i, am, not_for, ...|
|    0|This is yet anoth...|[this, is, yet, a...|
|    0|I almost didn't g...|[i, almost, did, ...|
+-----+--------------------+--------------------+
only showing top 3 rows



                                                                                

## Spliting Data

In [11]:
# Split data to 70% for training and 30% for testing
df_train, df_test = data_tokens.randomSplit([0.7,0.3],seed=18)
df_train.groupBy('label').count().show()

                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    1|26712|
|    0|29077|
+-----+-----+



In [12]:
df_train.cache()

DataFrame[label: int, text: string, tokens: array<string>]

## Naive Bayes

In [13]:
c_vec = CountVectorizer(inputCol='tokens', outputCol='c_vec', minDF=5.0)
idf = IDF(inputCol="c_vec", outputCol="features")
nb = NaiveBayes()

nb_pipeline = Pipeline(stages=[c_vec, idf, nb])

nb_model = nb_pipeline.fit(df_train)
nb_test_pred = nb_model.transform(df_test)
nb_test_pred.show(3)

21/12/01 11:53:03 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 11:53:07 WARN MemoryStore: Not enough space to cache rdd_82_4 in memory! (computed 31.5 MiB so far)
21/12/01 11:53:15 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 11:53:16 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 11:53:20 WARN MemoryStore: Not enough space to cache rdd_82_4 in memory! (computed 31.5 MiB so far)
21/12/01 11:53:29 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 11:53:45 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
[Stage 42:>                                                         (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|                text|              tokens|               c_vec|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|"A Light in the D...|[a, light, in, th...|(238355,[0,1,2,3,...|(238355,[0,1,2,3,...|[-14874.650633549...|[1.0,1.3995489911...|       0.0|
|    0|"Lokant" by Charl...|[lokant, by, char...|(238355,[0,1,2,3,...|(238355,[0,1,2,3,...|[-9401.2509876576...|[1.0,2.7927877286...|       0.0|
|    0|"Max" was written...|[max, wa, written...|(238355,[0,1,2,3,...|(238355,[0,1,2,3,...|[-42400.459294817...|[1.75287468380991...|       1.0|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+------------------

                                                                                

In [14]:
nb_ROC_ = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')
nb_ROC = nb_ROC_.evaluate(nb_test_pred)
print("ROC of the Naive Bayes model: {}".format(nb_ROC))

nb_Acc_ = MulticlassClassificationEvaluator(metricName='accuracy')
nb_Acc = nb_Acc_.evaluate(nb_test_pred)
print("Accuracy of the Naive Bayes model: {}".format(nb_Acc))

21/12/01 11:53:54 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
                                                                                

ROC of the Naive Bayes model: 0.8599035690204784


21/12/01 11:54:57 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB

Accuracy of the Naive Bayes model: 0.8600814494680851




In [15]:
nb_paramGrid = (ParamGridBuilder()
                .addGrid(c_vec.minDF, [3.0, 5.0, 7.0, 10.0, 15.0])
                .addGrid(nb.smoothing, [0.1, 0.5, 1.0])
                .build())
nb_cv = CrossValidator(estimator= nb_pipeline, estimatorParamMaps= nb_paramGrid, evaluator= nb_Acc_, numFolds= 4)
nb_cv_model = nb_cv.fit(df_train) 

21/12/01 11:56:19 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/01 11:56:27 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/01 11:56:29 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
21/12/01 11:56:39 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
21/12/01 11:57:04 WARN DAGScheduler: Broadcasting large task binary with size 11.8 MiB
21/12/01 11:57:34 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/01 11:57:43 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/01 11:57:44 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
21/12/01 11:57:54 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
21/12/01 11:58:17 WARN DAGScheduler: Broadcasting large task binary with size 11.8 MiB
21/12/01 11:58:43 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/01 11:58:51 WARN DAGScheduler: Broadcasting la

21/12/01 12:14:56 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
21/12/01 12:15:05 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
21/12/01 12:15:06 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/01 12:15:16 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/01 12:15:28 WARN DAGScheduler: Broadcasting large task binary with size 8.3 MiB
21/12/01 12:15:57 WARN DAGScheduler: Broadcasting large task binary with size 1868.6 KiB
21/12/01 12:16:04 WARN DAGScheduler: Broadcasting large task binary with size 1869.7 KiB
21/12/01 12:16:04 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
21/12/01 12:16:13 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
21/12/01 12:16:20 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
21/12/01 12:16:46 WARN DAGScheduler: Broadcasting large task binary with size 1868.6 KiB
21/12/01 12:16:52 WARN DAGScheduler: Broadcas

21/12/01 12:33:49 WARN DAGScheduler: Broadcasting large task binary with size 1292.4 KiB
21/12/01 12:33:55 WARN DAGScheduler: Broadcasting large task binary with size 1293.4 KiB
21/12/01 12:33:56 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
21/12/01 12:34:03 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
21/12/01 12:34:08 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/12/01 12:34:34 WARN DAGScheduler: Broadcasting large task binary with size 1292.4 KiB
21/12/01 12:34:41 WARN DAGScheduler: Broadcasting large task binary with size 1293.4 KiB
21/12/01 12:34:41 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
21/12/01 12:34:48 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
21/12/01 12:34:53 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/12/01 12:35:26 WARN DAGScheduler: Broadcasting large task binary with size 1822.4 KiB
21/12/01 12:35:32 WARN DAGScheduler: Br

In [17]:
nb_cv_test = nb_cv_model.transform(df_test)
nb_cv_Acc = nb_Acc_.evaluate(nb_cv_test)
print("Accuracy of the Naive Bayes cross validation model: {}".format(nb_cv_Acc))

21/12/01 13:22:47 WARN DAGScheduler: Broadcasting large task binary with size 7.5 MiB

Accuracy of the Naive Bayes cross validation model: 0.8609541223404256


                                                                                

## Logistic regression

In [19]:
lr = LogisticRegression(maxIter=5)
lr_pipeline = Pipeline(stages=[c_vec, idf, lr])

lr_model = lr_pipeline.fit(df_train)
lr_test = lr_model.transform(df_test)

21/12/01 13:27:02 WARN MemoryStore: Not enough space to cache rdd_82_9 in memory! (computed 30.4 MiB so far)
21/12/01 13:27:22 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 13:27:35 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 13:27:36 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:27:37 WARN MemoryStore: Not enough space to cache rdd_82_0 in memory! (computed 27.6 MiB so far)
21/12/01 13:28:00 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:28:01 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:28:02 WARN MemoryStore: Not enough space to cache rdd_82_0 in memory! (computed 27.6 MiB so far)
21/12/01 13:28:37 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:28:38 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:28:41 WARN DAGScheduler: Broadcasting large task binary wit

In [20]:
lr_ROC_ = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')
lr_ROC = lr_ROC_.evaluate(lr_test)
print("ROC of the Logistic model: {}".format(lr_ROC))

lr_Acc_ = MulticlassClassificationEvaluator(metricName='accuracy')
lr_Acc = lr_Acc_.evaluate(lr_test)
print("Accuracy of the Logistic model: {}".format(lr_Acc))

21/12/01 13:29:22 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB
                                                                                

ROC of the Logistic model: 0.8665037805637058


21/12/01 13:30:44 WARN DAGScheduler: Broadcasting large task binary with size 8.9 MiB

Accuracy of the Logistic model: 0.8676446143617021


                                                                                

## Linear SVC

In [22]:
lsvc = LinearSVC(maxIter=4)
lsvc_pipeline = Pipeline(stages=[c_vec, idf, lsvc])

lsvc_model = lsvc_pipeline.fit(df_train)
lsvc_test = lsvc_model.transform(df_test)

21/12/01 13:32:55 WARN MemoryStore: Not enough space to cache rdd_82_9 in memory! (computed 30.4 MiB so far)
21/12/01 13:33:20 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 13:33:40 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
21/12/01 13:33:41 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:33:41 WARN MemoryStore: Not enough space to cache rdd_82_0 in memory! (computed 27.6 MiB so far)
21/12/01 13:34:11 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:34:12 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:34:13 WARN MemoryStore: Not enough space to cache rdd_82_0 in memory! (computed 27.6 MiB so far)
21/12/01 13:35:34 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:35:36 WARN DAGScheduler: Broadcasting large task binary with size 7.0 MiB
21/12/01 13:35:41 WARN DAGScheduler: Broadcasting large task binary wit

In [26]:
lsvc_ROC_ = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')
lsvc_ROC = lsvc_ROC_.evaluate(lsvc_test)
print("ROC of the svc model: {}".format(lsvc_ROC))

lsvc_Acc_ = MulticlassClassificationEvaluator(metricName='accuracy')
lsvc_Acc = lsvc_Acc_.evaluate(lsvc_test)
print("Accuracy of the svc model: {}".format(lsvc_Acc))

21/12/01 13:38:07 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB
                                                                                

ROC of the svc model: 0.8656701853902683


21/12/01 13:39:13 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB

Accuracy of the svc model: 0.8670628324468085


                                                                                

## Predicting new reviews

In [28]:
review_1 = ["I loved this book. The author made me think long and hard about our split government and the trials of these three young women in Paris during the early 1940s when they fell under Nazi rule. I know much of this history but she brought it to life and placed me on the scene. Friends of family who underwent a camp don't talk about it much but to read this, though I've seen films regarding it, she really brought it to life in a way that you can't ignore what was happening."]
review_2 = ["I am never buying books from Amazon Kindle again. They NEVER arrive anymore on either of the devices where I used to read them. I have been on the phone for at least two working business days with Amazon reps. Some don't speak good English - poor souls, I know they're desperate for jobs and they're all in India or someplace, not their fault they have to work for this horrible corporation."]
review_3 = ["I enjoyed reading a Perfect New World: A Novel Kindle Edition by Dr. Jacob Lavi. It was an ingenious and creative look at a possible future for man and the earth. Dr. Lavi described a future world that seemed possible. He highlighted the way we are degrading our planet by our excesses."]
review_4 = ["The book has too much detail about the history of the US prison system and is very repetitive about his conversations with other guards. Prisons are terrible places but highlighting one Louisiana prison doesn't seem right. Does he ever consider why these people are there or what they did to their victims ?Did they all have poor lawyers or were they all victims of racism or were they repeat offenders ?Some of their crimes mentioned deserve harsher punishment than they received. Skipped many pages which were just a bore."]
review_5 = ["She just passed away but this is the only magazine that she loved out of the 8 I ordered. I was quite amazed since she was a conservative Hindu lady. Then my wife also loved this one."]

In [29]:
schema = StructType([StructField("text", StringType(), True)])

text = [review_1, review_2, review_3, review_4, review_5]
reviews = spark.createDataFrame(text, schema=schema)

In [30]:
reviews_token = reviews.withColumn('tokens', preprocessing_udf(F.col('text')))
reviews_token.show()

[Stage 763:>                                                        (0 + 2) / 2]

+--------------------+--------------------+
|                text|              tokens|
+--------------------+--------------------+
|I loved this book...|[i, loved, this, ...|
|I am never buying...|[i, am, never, bu...|
|I enjoyed reading...|[i, enjoyed, read...|
|The book has too ...|[the, book, ha, t...|
|She just passed a...|[she, just, passe...|
+--------------------+--------------------+





In [31]:
result = nb_cv_model.transform(reviews_token)
result.select('text', 'prediction').show()

21/12/01 13:42:26 WARN DAGScheduler: Broadcasting large task binary with size 7.5 MiB
21/12/01 13:42:27 WARN DAGScheduler: Broadcasting large task binary with size 7.5 MiB

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
|I loved this book...|       0.0|
|I am never buying...|       1.0|
|I enjoyed reading...|       0.0|
|The book has too ...|       1.0|
|She just passed a...|       0.0|
+--------------------+----------+



                                                                                