In [1]:
import pandas as pd
import gzip

In [2]:
# List of datasets to use
data = ['reviews_Amazon_Instant_Video_5.json.gz', 'reviews_Digital_Music_5.json.gz']

# functions to read Amazon data into a pandas data frame
def parse(path):
    g = gzip.open(path, 'rb')
    for l in g:
        yield eval(l)

def getDF(path):
    i = 0
    df = {}
    for d in parse(path):
        df[i] = d
        i += 1
    return pd.DataFrame.from_dict(df, orient='index')

# function to concatenate multiple Amazon datasets
def concatDF(data):
    df = pd.DataFrame()
    for dataset in data:
        dftemp = getDF(dataset)
        df = pd.concat([df, dftemp], axis=0)
    # drop unneeded columns
    df.drop(columns = ['reviewerID', 'asin', 'reviewerName', 'helpful', \
                       'summary', 'unixReviewTime', 'reviewTime'], inplace= True)
    return df

In [3]:
# Create list of file paths to pass to functions for pandas conversion
zipped_data = ['Resources/data/reviews_Amazon_Instant_Video_5.json.gz', 'Resources/data/reviews_Digital_Music_5.json.gz']

In [4]:
clean_reviews_df = concatDF(zipped_data)

In [5]:
clean_reviews_df.head()

Unnamed: 0,reviewText,overall
0,I had big expectations because I love English ...,2.0
1,I highly recommend this series. It is a must f...,5.0
2,This one is a real snoozer. Don't believe anyt...,1.0
3,Mysteries are interesting. The tension betwee...,4.0
4,"This show always is excellent, as far as briti...",5.0


In [6]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Sentiment_Analysis").getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [7]:
# Convert pandas df to a spark df
reviews_spark_df = spark.createDataFrame(clean_reviews_df)
reviews_spark_df.show()

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|I had big expecta...|    2.0|
|I highly recommen...|    5.0|
|This one is a rea...|    1.0|
|Mysteries are int...|    4.0|
|This show always ...|    5.0|
|I discovered this...|    5.0|
|It beats watching...|    3.0|
|There are many ep...|    3.0|
|This is the best ...|    5.0|
|Not bad.  Didn't ...|    3.0|
|Funny, interestin...|    4.0|
|I love the variet...|    4.0|
|comedy is a matte...|    3.0|
|if this had to do...|    3.0|
|Watched it for Ke...|    5.0|
|he's OK. His humo...|    2.0|
|some comedians ar...|    3.0|
|I only watched th...|    3.0|
|Enjoyed some of t...|    5.0|
|All the comedians...|    5.0|
+--------------------+-------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
reviews_spark_df = reviews_spark_df.withColumn('length', length(reviews_spark_df['reviewText']))
reviews_spark_df.show()

+--------------------+-------+------+
|          reviewText|overall|length|
+--------------------+-------+------+
|I had big expecta...|    2.0|   159|
|I highly recommen...|    5.0|   186|
|This one is a rea...|    1.0|   134|
|Mysteries are int...|    4.0|   141|
|This show always ...|    5.0|  1300|
|I discovered this...|    5.0|  3039|
|It beats watching...|    3.0|    99|
|There are many ep...|    3.0|   348|
|This is the best ...|    5.0|   186|
|Not bad.  Didn't ...|    3.0|   127|
|Funny, interestin...|    4.0|   111|
|I love the variet...|    4.0|   221|
|comedy is a matte...|    3.0|   112|
|if this had to do...|    3.0|   112|
|Watched it for Ke...|    5.0|   144|
|he's OK. His humo...|    2.0|   132|
|some comedians ar...|    3.0|   250|
|I only watched th...|    3.0|   740|
|Enjoyed some of t...|    5.0|   102|
|All the comedians...|    5.0|   301|
+--------------------+-------+------+
only showing top 20 rows



In [9]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

tokenizer = Tokenizer(inputCol="reviewText", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
vector_clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [11]:
from pyspark.ml import Pipeline
df_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, vector_clean_up])

In [12]:
data_cleaner = df_pipeline.fit(reviews_spark_df)
final_tokenized_reviews_df = data_cleaner.transform(reviews_spark_df).select("token_text", "length", "stop_tokens", "idf_token", "features", "overall")

In [13]:
final_tokenized_reviews_df.show()

+--------------------+------+--------------------+--------------------+--------------------+-------+
|          token_text|length|         stop_tokens|           idf_token|            features|overall|
+--------------------+------+--------------------+--------------------+--------------------+-------+
|[i, had, big, exp...|   159|[big, expectation...|(262144,[8287,310...|(262145,[8287,310...|    2.0|
|[i, highly, recom...|   186|[highly, recommen...|(262144,[11472,19...|(262145,[11472,19...|    5.0|
|[this, one, is, a...|   134|[one, real, snooz...|(262144,[21823,33...|(262145,[21823,33...|    1.0|
|[mysteries, are, ...|   141|[mysteries, inter...|(262144,[23340,51...|(262145,[23340,51...|    4.0|
|[this, show, alwa...|  1300|[show, always, ex...|(262144,[2325,410...|(262145,[2325,410...|    5.0|
|[i, discovered, t...|  3039|[discovered, seri...|(262144,[1354,297...|(262145,[1354,297...|    5.0|
|[it, beats, watch...|    99|[beats, watching,...|(262144,[63139,73...|(262145,[63139,73...

In [14]:
input_data = final_tokenized_reviews_df.select('features', 'overall').withColumnRenamed('overall', 'label')
input_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262145,[8287,310...|  2.0|
|(262145,[11472,19...|  5.0|
|(262145,[21823,33...|  1.0|
|(262145,[23340,51...|  4.0|
|(262145,[2325,410...|  5.0|
|(262145,[1354,297...|  5.0|
|(262145,[63139,73...|  3.0|
|(262145,[33123,51...|  3.0|
|(262145,[21823,31...|  5.0|
|(262145,[54961,55...|  3.0|
|(262145,[2701,171...|  4.0|
|(262145,[8538,271...|  4.0|
|(262145,[63449,73...|  3.0|
|(262145,[9747,273...|  3.0|
|(262145,[24657,45...|  5.0|
|(262145,[40081,55...|  2.0|
|(262145,[19736,43...|  3.0|
|(262145,[8538,197...|  3.0|
|(262145,[77365,97...|  5.0|
|(262145,[10723,12...|  5.0|
+--------------------+-----+
only showing top 20 rows



In [15]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
train, test = input_data.randomSplit([0.7, 0.3])

In [16]:
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(train)

Py4JJavaError: An error occurred while calling o217.fit.
: org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1085)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1083)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1083)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2463)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2369)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2069)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2069)
	at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2018)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)
	at org.apache.spark.ml.classification.NaiveBayes.trainDiscreteImpl(NaiveBayes.scala:193)
	at org.apache.spark.ml.classification.NaiveBayes.$anonfun$trainWithLabelCheck$1(NaiveBayes.scala:160)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.NaiveBayes.trainWithLabelCheck(NaiveBayes.scala:144)
	at org.apache.spark.ml.classification.NaiveBayes.train(NaiveBayes.scala:133)
	at org.apache.spark.ml.classification.NaiveBayes.train(NaiveBayes.scala:95)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 53797)
Traceback (most recent call last):
  File "C:\Users\skim3\anaconda3\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Users\skim3\anaconda3\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "C:\Users\skim3\anaconda3\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Users\skim3\anaconda3\lib\socketserver.py", line 720, in __init__
    self.handle()
  File "C:\Users\skim3\anaconda3\lib\site-packages\pyspark\accumulators.py", line 262, in handle
    poll(accum_updates)
  File "C:\Users\skim3\anaconda3\lib\site-packages\pyspark\accumulators.py", line 235, in poll
    if func():
  File "C:\Users\skim3\anaconda3\lib\site-packages\pyspark\accumulators.py", line 239, in accum_updates
  

In [None]:
test_results = predictor.transform(test)
test_results.show()

In [None]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model: %f" % acc)