In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from functools import reduce
from pyspark.sql.functions import sum as _sum
from math import log

In [2]:
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

In [15]:
class IMDBMovieReviewData:
    def __init__(self, spark_session, data_path):
        self.spark_session = spark_session
        self.data_path = data_path
    
    def loadData(self):
        self.movie_reviews_df = self.spark_session.read.csv(self.data_path, header=True, inferSchema=True, quote='"', escape='"')
        
    def preprocessData(self):
#         ToDO
#         1. Convert words to lowercase 
#         2. Remove special characters 
#         3. Remove stop words 
#         3. Stemming
        pass
        
    def splitData(self, training_fraction = 0.7, testing_fraction = 0.3):
        self.training_data, self.testing_data = self.movie_reviews_df.randomSplit([training_fraction, testing_fraction])
        self.movie_reviews_df.unpersist()
        del self.movie_reviews_df

In [16]:
class MultiNomialNaiveBayseSentimentalAnalysis:
    def __init__(self, spark, data):
        self.spark = spark
        self.data = data
       
    def calPriorProbs(self):
        counts = self.data.training_data.groupBy("sentiment").count().rdd.collectAsMap()
        total = counts["positive"] + counts["negative"]
        return counts["positive"]/total, counts["negative"]/total
    
    @staticmethod
    def countWordsInAClass(reviews_data_frame, class_label):
        class_reviews = reviews_data_frame.filter(reviews_data_frame.sentiment == class_label)
        words_column = explode(split(class_reviews.review, "\s+")).alias("word")
        words_counts = class_reviews.select(words_column).groupBy("word").count()
        total_count = words_counts.agg(_sum("count")).collect()[0][0]
        words_counts = words_counts.rdd.collectAsMap()
        return {"total-count":total_count, "words-counts":words_counts}

    def train(self):
        self.pos_prior_prob, self.neg_prior_prob = self.calPriorProbs()
        pos_counts = MultiNomialNaiveBayseSentimentalAnalysis.countWordsInAClass(self.data.training_data, "positive")
        neg_counts = MultiNomialNaiveBayseSentimentalAnalysis.countWordsInAClass(self.data.training_data, "negative")
        self.pos_counts_bc = spark.sparkContext.broadcast(pos_counts)
        self.neg_counts_bc = spark.sparkContext.broadcast(neg_counts)
    
    @staticmethod
    def calLogProb(review, class_counts, class_prior_prob):
        words = review.split()
        probs_list = [class_counts["words-counts"].get(word, 1)/class_counts["total-count"] for word in words]
        return log(class_prior_prob) + reduce(lambda a, b: a + log(b), probs_list, 0)
    
    def calPosLogPro(self, review):
        return MultiNomialNaiveBayseSentimentalAnalysis.calLogProb(review, self.pos_counts_bc.value, self.pos_prior_prob)
    
    def calNegLogPro(self, review):
        return MultiNomialNaiveBayseSentimentalAnalysis.calLogProb(review, self.neg_counts_bc.value, self.neg_prior_prob)

    def predict(self, review):
        pos_log_prob = self.calPosLogPro(review)
        neg_log_prob = self.calNegLogPro(review)
        return "positive" if pos_log_prob > neg_log_prob else "negative"

In [44]:
data = IMDBMovieReviewData(spark, "IMDB Dataset.csv")
data.loadData()
data.preprocessData()

In [47]:
data.splitData()

In [48]:
naive_bayse = Wrapper(spark, data)

In [49]:
naive_bayse.train()

In [50]:
naive_bayse.calTrainAccuracy()

Traceback (most recent call last):
  File "C:\Users\CJ\anaconda3\lib\site-packages\pyspark\serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "C:\Users\CJ\anaconda3\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "C:\Users\CJ\anaconda3\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CJ\anaconda3\lib\site-packages\pyspark\context.py", line 462, in __getnewargs__
    raise RuntimeError(
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.