# Amazon Review Sentiment Analysis

In [1]:
from textblob import TextBlob
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import * 
from sklearn.metrics import classification_report
from time import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

## Start Spark Session

In [2]:
spark = SparkSession\
    .builder\
    .getOrCreate()

## Create schema

In [3]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("dateAdded", StringType(), True),
    StructField("dateUpdated", StringType(), True),
    StructField("name", StringType(), True),
    StructField("asins", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("categories", StringType(), True),
    StructField("primaryCategories", StringType(), True),
    StructField("imageURLs", StringType(), True),
    StructField("keys", StringType(), True),
    StructField("manufacturer", StringType(), True),
    StructField("manufacturerNumber", StringType(), True),
    StructField("reviews.date", StringType(), True),
    StructField("reviews.dateAdded", StringType(), True),
    StructField("reviews.dateSeen", StringType(), True),
    StructField("reviews.doRecommend", StringType(), True),
    StructField("reviews.id", StringType(), True),
    StructField("reviews.numHelpful", StringType(), True),
    StructField("reviews.rating", IntegerType(), True),
    StructField("reviews.sourceURLs", StringType(), True),
    StructField("reviews.text", StringType(), True),
    StructField("reviews.title", StringType(), True),
    StructField("reviews.username", StringType(), True),
    StructField("sourceURLs", StringType(), True)])

## Import data

In [4]:
raw_data = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .schema(schema)\
    .csv("/Users/joanne/Documents/School/nlp_data/Datafiniti_Amazon_Consumer_Reviews_of_Amazon_Products.csv")

for name in raw_data.schema.names:
      raw_data = raw_data.withColumnRenamed(name, name.replace('.', '_'))

In [5]:
raw_data.cache()

DataFrame[id: string, dateAdded: string, dateUpdated: string, name: string, asins: string, brand: string, categories: string, primaryCategories: string, imageURLs: string, keys: string, manufacturer: string, manufacturerNumber: string, reviews_date: string, reviews_dateAdded: string, reviews_dateSeen: string, reviews_doRecommend: string, reviews_id: string, reviews_numHelpful: string, reviews_rating: int, reviews_sourceURLs: string, reviews_text: string, reviews_title: string, reviews_username: string, sourceURLs: string]

## Sentiment Analysis

In [6]:
def textblob_udf(text):
    result = TextBlob(text).sentiment[0] #use the polarity to decide user's opinion
    if result >= 0.0:
        return 1.0
    else:
        return 0.0 
    
sentiment = udf(textblob_udf) #register udf

In [7]:
# create label column with the polarity values
reviews = raw_data.withColumn("label", sentiment(raw_data['reviews_text']))\
    .select('name','reviews_text','reviews_rating','label')

In [8]:
# convert the label data from StringType to IntegerType
reviews = reviews\
    .withColumn('label', reviews['label'].cast(IntegerType()))

In [9]:
# print schema and cache the dataframe
reviews.printSchema
reviews.cache()

DataFrame[name: string, reviews_text: string, reviews_rating: int, label: int]

## EDA

In [13]:
reviews.select('reviews_rating').describe().show()

+-------+------------------+
|summary|    reviews_rating|
+-------+------------------+
|  count|              5000|
|   mean|            4.5968|
| stddev|0.7318038448747551|
|    min|                 1|
|    max|                 5|
+-------+------------------+



In [14]:
reviews.select('reviews_rating')\
    .groupBy('reviews_rating')\
    .count()\
    .orderBy('reviews_rating', ascending=True).show()

+--------------+-----+
|reviews_rating|count|
+--------------+-----+
|             1|   63|
|             2|   54|
|             3|  197|
|             4| 1208|
|             5| 3478|
+--------------+-----+



In [15]:
reviews.select('label')\
    .groupBy('label')\
    .count()\
    .orderBy('label', ascending=True).show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  236|
|    1| 4764|
+-----+-----+



In [16]:
# view matched vs. mismatched review_ratings to label
reviews.select('reviews_rating','label')\
    .groupBy('reviews_rating','label')\
    .count()\
    .orderBy('reviews_rating', ascending=True)\
    .show()

+--------------+-----+-----+
|reviews_rating|label|count|
+--------------+-----+-----+
|             1|    0|   34|
|             1|    1|   29|
|             2|    0|   20|
|             2|    1|   34|
|             3|    0|   33|
|             3|    1|  164|
|             4|    0|   58|
|             4|    1| 1150|
|             5|    1| 3387|
|             5|    0|   91|
+--------------+-----+-----+



## Create the pipeline

In [17]:
# split the data - may need to change the split
train_set, test_set = reviews.randomSplit([0.9, 0.1], seed=5)

In [18]:
# features
tokenizer = RegexTokenizer(inputCol="reviews_text", outputCol="token").setPattern("\\W")
remover = StopWordsRemover(inputCol="token", outputCol="stopwordsremoved")
hashtf = HashingTF(inputCol="stopwordsremoved", outputCol='hashingTF')
idf = IDF(inputCol='hashingTF', outputCol="IDF")

# label
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "StringIndexer")

# pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashtf, idf, label_stringIdx])
model = pipeline.fit(train_set)
train_df = model.transform(train_set)
test_df = model.transform(test_set)

## Logistic Regression

In [19]:
# train the logistic regression model
log_reg = LogisticRegression(labelCol="label", featuresCol="hashingTF", 
                        maxIter=10, regParam=0.01)
lr_model = log_reg.fit(train_df)

In [20]:
# run the logistic regression model
lr_predict = lr_model.transform(test_df)
lr_final = lr_predict.select("reviews_rating", "prediction", "label")
lr_final\
    .groupBy('reviews_rating','prediction', 'label')\
    .count()\
    .distinct()\
    .orderBy('reviews_rating', ascending=True)\
    .show()

+--------------+----------+-----+-----+
|reviews_rating|prediction|label|count|
+--------------+----------+-----+-----+
|             1|       1.0|    0|    1|
|             1|       1.0|    1|    2|
|             1|       0.0|    0|    1|
|             2|       0.0|    0|    1|
|             2|       1.0|    1|    5|
|             3|       0.0|    0|    1|
|             3|       1.0|    1|   10|
|             3|       1.0|    0|    3|
|             4|       0.0|    0|    4|
|             4|       1.0|    1|  114|
|             4|       0.0|    1|    1|
|             4|       1.0|    0|    1|
|             5|       0.0|    0|    2|
|             5|       1.0|    1|  344|
|             5|       1.0|    0|    8|
+--------------+----------+-----+-----+



In [21]:
lr_correct = lr_final\
    .filter(lr_final.prediction == lr_final.label)\
    .count()

lr_total = lr_final.count()

print("Correct Prediction:", lr_correct, ", Total:", lr_total, 
      ", Accuracy Rate:", lr_correct/lr_total)

Correct Prediction: 484 , Total: 498 , Accuracy Rate: 0.9718875502008032
