In [None]:
# Set the spark context environment 
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [None]:
from pyspark.ml.feature import NGram
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

from pyspark.ml import Pipeline, PipelineModel

In [None]:
reviews = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
                   .option("uri","mongodb://ec2-34-212-28-18.us-west-2.compute.amazonaws.com/msan697.review") \
                   .load()

In [None]:
# First 10 records
reviews.show(10)

In [None]:
# Distribution of ratings across businesses 

In [None]:
reviews.groupBy(review_data["stars"]) \
       .count() \
       .show()

In [None]:

def positiveNegative(stars):
    if stars <3:
        return int(0) #negative reivews
    elif stars >3 :
        return int(1) #positive reviews
    else:
        return int(2) #neutral reviews 
    
starsToSentiment = udf(lambda x:positiveNegative(x))

trainTestRaw = review_data.select('text', starsToSentiment('stars') \
                               .alias('label')) \
                               .filter("label != 2")


In [None]:
from pyspark.sql.types import *
trainTestRaw = trainTestRaw.withColumn("label", train_test_DF_raw["label"].cast(DoubleType()))

trainTestRaw.printSchema() 

In [None]:
trainTestRaw.groupBy(train_test_DF["label"]) \
            .count() \
            .show()

In [None]:
import re
import string

def removePunctuation(text):

    my_string = text.replace("-", " ")
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", my_string) 

    nopunct = nopunct.split()
    #nopunct = [stemmer.stem(w).strip(" ") for w in nopunct] #remove stop word and normalize word using stemmer.
    nopunct = [w.strip() for w in nopunct]
    nopunct = ' '.join(nopunct)
    
    return nopunct

udfNumPunct = udf(lambda x:removePunctuation(x))

review_rmsw = train_test_DF.select(udfNumPunct('text').alias('text'), 'label')

review_rmsw.show(1,truncate = False)

In [None]:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
hashingTF = HashingTF().setNumFeatures(n_features).setInputCol("filtered").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

In [None]:
train_set, test_set= review_rmsw.randomSplit([0.8, 0.2])
train_set = train_set.cache()
test_set = test_set.cache()

In [None]:
# compute accuracy on the test set 
def evaluateMetric(predictions):
    
    evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
    print "Area under ROC curve:",evaluator.evaluate(predictions)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="f1")
    f1 = evaluator.evaluate(predictions)
    print("F1_score = %0.4f" %(f1))

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = %0.4f" %(accuracy))

# Logistic Regresssion

In [None]:
%%time

lr =  LogisticRegression(maxIter=100, regParam=0.01, elasticNetParam=0.8)
pipeline = Pipeline(stages=[tokenizer,remover,hashingTF,idf, lr])
logreg_model = pipeline.fit(train_set)

lr_predictions = logreg_model.transform(test_set)

# print the evaluation metrics
evaluateMetric(lr_predictions) 

In [None]:
predictions.show(10)

# Unigram Naive Bayes

In [None]:
%%time

nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, nb])
nb_model=pipeline.fit(train_set)
nb_predictions = nb_model.transform(test_set)

# print evaluation metrics
evaluateMetric(nb_predictions)

# Bigram Naive Bayes 

In [None]:
#tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
#remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
bigram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
hashingTF_bigram = HashingTF().setNumFeatures(n_features).setInputCol("bigrams").setOutputCol("rawFeatures")
idf_bigram = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

In [None]:
%%time

nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,bigram,hashingTF_bigram,idf_bigram, nb])
nb_model_bigram=pipeline.fit(train_set)

nb_bigram_predictions = nb_model_bigram.transform(test_set)

#print evaluation metrics
evaluate_metric(nb_bigram_predictions) 

In [None]:
nb_bigram_predictions.show(10 )