<a href="https://colab.research.google.com/github/mahanabba/BigDataSentimentAnalysisSpark/blob/master/amazonreviews_fourstar.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("truerev").getOrCreate()

In [0]:
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep="\t", header=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

In [0]:
productgroup = df.groupby("product_title").count()

In [0]:
reviews = productgroup.select("product_title", "count")
reviews.show()

+--------------------+-----+
|       product_title|count|
+--------------------+-----+
|Green Yarn Yoshi ...|   27|
|       GameShark Pro|   86|
|Shin Megami Tense...|   56|
|3DS/DSi/DSi XL Ca...|  140|
|Razer Carcharias ...|    3|
|PlayStation 3 - 3...|  491|
|               Joust|    9|
|Unreal Anthology ...|   74|
|RollerCoaster Tyc...|   95|
|3D Ultra Pinball:...|   12|
|Retro-Bit Genesis...|   51|
|Major League Base...|   17|
|Sega Dreamcast Sy...|    9|
|SEGA SATURN VIRTU...|    1|
|   AMAZING SPIDERMAN|    2|
|Neewer 2x Battery...|   58|
|Female USB to Ori...|    2|
|Full Parts Replac...|   16|
|Black and White (...|    6|
|Pearl Harbor-Zero...|    2|
+--------------------+-----+
only showing top 20 rows



In [0]:
average_rating_df = df.groupby("product_title").agg({"star_rating":"avg", "product_title":"count"}).orderBy('count(product_title)', ascending=False)
average_rating_df.show()

+--------------------+------------------+--------------------+
|       product_title|  avg(star_rating)|count(product_title)|
+--------------------+------------------+--------------------+
|PlayStation 4 500...| 4.163787279220153|               10361|
|  Grand Theft Auto V| 4.544004589787722|                8715|
|Call of Duty: Ghosts|3.7910371318822023|                7810|
|       Battlefield 4|3.6683302141817427|                4809|
|  Assassin's Creed 4|4.5639559508682765|                4722|
|      The Last of Us| 4.733797303175294|                4598|
|Elder Scrolls V: ...| 4.307251487767247|                4537|
|             Destiny|3.7777777777777777|                4410|
| Diablo III - PC/Mac|2.7177676537585422|                4390|
|Call of Duty: Bla...| 4.065401326320604|                4373|
|SimCity - Limited...| 1.452920443101712|                3972|
|       Battlefield 3|3.8350619782443713|                3953|
|      Rocksmith 2014| 4.255569782330346|              

In [0]:
vine_rating = df.groupby("vine").agg({"star_rating":"avg"})
vine_rating.show()

+----+-----------------+
|vine| avg(star_rating)|
+----+-----------------+
|   Y|4.074807737124213|
|   N|4.059856676690767|
+----+-----------------+



In [0]:
numrows = df.count()
numrows

1785997

In [0]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [0]:
nlp_df = df.select("star_rating", "review_headline", "review_body")
nlp_df.show()

+-----------+--------------------+--------------------+
|star_rating|     review_headline|         review_body|
+-----------+--------------------+--------------------+
|          5|an amazing joysti...|Used this for Eli...|
|          5|Definitely a sile...|Loved it,  I didn...|
|          1|            One Star|poor quality work...|
|          3|good, but could b...|nice, but tend to...|
|          4|   Great but flawed.|Great amiibo, gre...|
|          1|            One Star|The remote consta...|
|          5|              A Must|I have a 2012-201...|
|          5|          Five Stars|Perfect, kids lov...|
|          5|          Five Stars|            Excelent|
|          4|          Four Stars|Slippery but expe...|
|          5|          Five Stars|Love the game. Se...|
|          1| Game will get stuck|Does not fit prop...|
|          2|We have tried it ...|This was way too ...|
|          4|          Four Stars|Works great good ...|
|          1|Now i have to buy...|It did not fit

In [0]:
from pyspark.sql.functions import concat, lit, col
 
nlp_df=df.select("star_rating", concat(col("review_headline"),lit(" "),col("review_body")).alias("review"))
nlp_df.show()

+-----------+--------------------+
|star_rating|              review|
+-----------+--------------------+
|          5|an amazing joysti...|
|          5|Definitely a sile...|
|          1|One Star poor qua...|
|          3|good, but could b...|
|          4|Great but flawed....|
|          1|One Star The remo...|
|          5|A Must I have a 2...|
|          5|Five Stars Perfec...|
|          5| Five Stars Excelent|
|          4|Four Stars Slippe...|
|          5|Five Stars Love t...|
|          1|Game will get stu...|
|          2|We have tried it ...|
|          4|Four Stars Works ...|
|          1|Now i have to buy...|
|          5|Five Stars perfec...|
|          5|    Five Stars great|
|          5|Five Stars Works ...|
|          5|Five Stars Kids l...|
|          4|Four Stars Goodngame|
+-----------+--------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

In [0]:
udf = UserDefinedFunction(lambda x: 'good' if x >= '3' else 'bad', StringType())


In [0]:
nlpclass = nlp_df.withColumn('star_rating', udf('star_rating'))

nlpclass2 = nlp_df.withColumn('star_rating', udf('star_rating'))
nlpclass2.show()

+-----------+--------------------+
|star_rating|              review|
+-----------+--------------------+
|       good|an amazing joysti...|
|       good|Definitely a sile...|
|        bad|One Star poor qua...|
|       good|good, but could b...|
|       good|Great but flawed....|
|        bad|One Star The remo...|
|       good|A Must I have a 2...|
|       good|Five Stars Perfec...|
|       good| Five Stars Excelent|
|       good|Four Stars Slippe...|
|       good|Five Stars Love t...|
|        bad|Game will get stu...|
|        bad|We have tried it ...|
|       good|Four Stars Works ...|
|        bad|Now i have to buy...|
|       good|Five Stars perfec...|
|       good|    Five Stars great|
|       good|Five Stars Works ...|
|       good|Five Stars Kids l...|
|       good|Four Stars Goodngame|
+-----------+--------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import length
nlpclass = nlpclass.withColumn('length', length(nlpclass['review']))
testdf = nlpclass.selectExpr("star_rating as class", "review as text", 'length as length')
testdf2 = nlpclass2.selectExpr("star_rating as class", "review as text", 'length as length')
testdf.count()

1785997

In [0]:
testdfna = testdf.dropna(how='any')
testdfna2 = testdf2.dropna(how='any')
testdfna.count()

1785892

In [0]:
testdfna.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
| good|an amazing joysti...|   240|
| good|Definitely a sile...|   320|
|  bad|One Star poor qua...|    55|
| good|good, but could b...|   107|
| good|Great but flawed....|   108|
|  bad|One Star The remo...|    70|
| good|A Must I have a 2...|    92|
| good|Five Stars Perfec...|    75|
| good| Five Stars Excelent|    19|
| good|Four Stars Slippe...|    32|
| good|Five Stars Love t...|    82|
|  bad|Game will get stu...|   352|
|  bad|We have tried it ...|   414|
| good|Four Stars Works ...|    48|
|  bad|Now i have to buy...|   156|
| good|Five Stars perfec...|    63|
| good|    Five Stars great|    16|
| good|Five Stars Works ...|    91|
| good|Five Stars Kids l...|    51|
| good|Four Stars Goodngame|    20|
+-----+--------------------+------+
only showing top 20 rows



In [0]:
 from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [0]:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [0]:
#build pipeline to transform data
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
cleaner = data_prep_pipeline.fit(testdfna)
cleaner2 = data_prep_pipeline.fit(testdfna2)
cleaned = cleaner.transform(testdfna)
cleaned2 = cleaner.transform(testdfna)

In [0]:
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[15775,16...|
|  0.0|(262145,[1536,120...|
|  1.0|(262145,[15889,34...|
|  0.0|(262145,[9129,410...|
|  0.0|(262145,[16332,29...|
|  1.0|(262145,[12142,78...|
|  0.0|(262145,[14,15889...|
|  0.0|(262145,[11382,12...|
|  0.0|(262145,[12710,74...|
|  0.0|(262145,[74975,93...|
|  0.0|(262145,[12710,16...|
|  1.0|(262145,[2437,420...|
|  1.0|(262145,[2437,844...|
|  0.0|(262145,[12888,74...|
|  1.0|(262145,[24417,28...|
|  0.0|(262145,[12710,12...|
|  0.0|(262145,[12710,74...|
|  0.0|(262145,[9639,127...|
|  0.0|(262145,[12710,26...|
|  0.0|(262145,[74975,93...|
+-----+--------------------+
only showing top 20 rows



In [0]:
from pyspark.ml.classification import NaiveBayes
# split
training, testing = cleaned.randomSplit([0.7, 0.3])

training2, testing2 = cleaned2.randomSplit([0.7, 0.3])

#fit
nb = NaiveBayes()
predictor = nb.fit(training)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

test_results = predictor.transform(testing)

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.874746


In [0]:
test_results.show(5)

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  bad|...|   136|  1.0|[...|[...|(262144,[24698,28...|(262144,[24698,28...|(262145,[24698,28...|[-673.98598243308...|[3.76364080306958...|       1.0|
|  bad|...|    62|  1.0|[...|[...|(262144,[24417,34...|(262144,[24417,34...|(262145,[24417,34...|[-513.66999336772...|[4.29475814441052...|       1.0|


In [0]:
testpred = test_results.select("label", "prediction").toPandas()

In [0]:
from sklearn.metrics import confusion_matrix
matrix = confusion_matrix(testpred["label"], testpred["prediction"])
matrix

array([[395551,  54505],
       [ 17476,  68623]])

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
model_save_name = 'classifier.json'
path = f"/content/gdrive/My Drive/{model_save_name}"
predictor.write().overwrite().save(path)

In [0]:
from pyspark.ml.classification import NaiveBayesModel

loaded = NaiveBayesModel.load(path)

In [0]:
training2, testing2 = cleaned.randomSplit([0.999, 0.001])
loadedres = loaded.transform(testing2)
acc_eval2 = MulticlassClassificationEvaluator()
acc2 = acc_eval.evaluate(loadedres)
print(f'Accuracy of loaded model is {acc2}')

In [0]:
#TODO = hyperparameter tuning with CV
spark.stop()