# Movie Review Analysis Outline
## import necessary python libraries


In [1]:


# "transformData.py" is this script which is intended to be run once on our data
# transform the raw data into a rdd readable format first


# import all necessary libraries
import re
import string
from operator import add
import os
import sys
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint

import pyspark.mllib.regression as mllib_reg
import pyspark.mllib.linalg as mllib_lalg
import pyspark.mllib.classification as mllib_class
import pyspark.mllib.tree as mllib_tree



print (pyspark) # test to see that pyspark is up and running okay



<module 'pyspark' from '/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/__init__.py'>


# CLEAN DATA

In [2]:
# get rid of all html tags in the data
def strip_html_tags(data):
    p = re.compile(r'<.*?>')
    return p.sub('', data.lower())

#parse and take care of funky symbols in summary and text <- normalize to root words maybe?
def cleanData (data):
    data = strip_html_tags(data)
    data =  re.sub("[\t\,\:;\(\)\"\'\~\-\!\?\`]", "",data, 0, 0)
    data =  re.sub("[\.0`]", "",data, 0, 0) # special case to get rid of ".0" of scores -- 
    return data

#re.sub("[\.\t\,\:;\(\)\.]", " ", strip_html_tags(data.lower()), 0, 0)

# clean the data for each element of each sub list
def prepData (list_str):
    L= []
    for x in list_str:
        #print x
        L.append(cleanData(x).strip())
    return L
#####
# Notes 
# headers for the project
# [u'productId', u'userId', u'profileName', u'helpfulness', u'score', u'time', u'summary', u'text']



# PARSE THE FILE WE WANT TO USE AS OUR DATA

In [3]:
# transform Data uses /// as a separator for each eleent of data for each movie review
# the original file for the current output.txt is a truncated version of all our data, so the end elements will be funky

movies_txt2 = sc.textFile("100k_parsed.txt").map(lambda x: (cleanData(x).split('///')))
#movies_txt2 = sc.textFile("smaller_parsed.txt").map(lambda x: (cleanData(x).split('///')))


## Parsing out relevant fields - score and text of review

In [4]:
# prep our data

#####
# Notes 
# headers for the project
# [u'productId', u'userId', u'profileName', u'helpfulness', u'score', u'time', u'summary', u'text']
# [u'productId', _ , _ , _ , u'score', _  u'summary', u'text']

# take the data that we care most about
movies_new = movies_txt2.map(lambda L: (L[0], L[4], L[6], L[7]) if len(L) == 8 else L )
movies_new1 = movies_new.filter(lambda L: len(L) == 4) # lets just take all the data that has been parsed correctly

#print (movies_new.top(20))
print (movies_new1.count(), movies_new.count())
print (movies_new.count() == movies_new1.count())

removeHTMLTags = movies_new1.map(prepData) # remove html tags

(99991, 100000)
False


## REVISED STOP WORDS REMOVAL 

In [6]:
#REVISED STOP WORDS REMOVAL Approach

#used the stopwords file in the virtual box instead... taking too long
baseDir = os.path.join('../data')
inputPath = os.path.join('cs100', 'lab3')
STOPWORDS_PATH = 'stopwords.txt'
split_regex = r'\W+'

stopfile = os.path.join(baseDir, inputPath, STOPWORDS_PATH)
stopwords = set(sc.textFile(stopfile).collect())


def tokenize(string):
    #reusing assignment code
    """ An implementation of input string tokenization to exclude stopwords
    Args:
        string (str): input string
    Returns:
        list: a list of tokens without stopwords
    """
    splitList = re.split(split_regex, string.lower().strip())
    
    return [x for x in splitList if x!="" and x not in stopwords]  



removeSW_output= removeHTMLTags.map(lambda x: (  int((str(x[1])).replace("/","")), tokenize(x[3]))   )

## Vectorizing - making the vector of features...

In [7]:
# We will use the spark ml library for this (took a bit long to figure out)
## key notes, use mllib if you are using rdd, ditch ml if you are not using data frams

# HashingTF calulates the tfid feature for us
htf = HashingTF(numFeatures=1) # features need to match dimensions of output

print "just testing the hashing TF transformation on the array: ['hi', 'boo', 'hi']"
print (htf.transform(["hi", "boo", "hi"]));
feature_vector = removeSW_output.map(lambda elements: LabeledPoint(1.0, [htf.transform(elements[1])]) if float(elements[0]) >= 2.5 else LabeledPoint(0.0, [htf.transform(elements[1])]))

#feature_vector = removeSW3.map(lambda elements: LabeledPoint(1, [2]))
print feature_vector.filter(lambda x: x.label == 1.0).count()

just testing the hashing TF transformation on the array: ['hi', 'boo', 'hi']
(1,[0],[3.0])
85876


In [None]:
# EXPLORE DIFFERENT ML APPROACHS
# https://spark.apache.org/docs/1.1.0/api/python/pyspark.mllib.classification-module.html

##  Trying SVM approach

In [None]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel, LogisticRegressionWithLBFGS 
from pyspark.mllib.regression import LabeledPoint


'''  REFRESH BLOCK
REFRESHING DATA
Re Run data -> refresh in case we are using another approach above this as well
Things to do - update number of features
'''
## Took forever to figure out
## key notes, use mllib if you are using rdd, ditch ml if you are not using data frams
htf = HashingTF(numFeatures=1) # features need to match dimensions of output
print ("SVM: REFRESHING HTF, and FEATURE VECTOR")
feature_vector = removeSW_output.map(lambda elements: LabeledPoint(1.0, [htf.transform(elements[1])]) if float(elements[0]) >= 2.5 else LabeledPoint(0.0, [htf.transform(elements[1])]))
'''
REFRESHED DATA
'''


svm_training_data, svm_testing_data= feature_vector.randomSplit([0.7, 0.3])


#LogisticRegressionWithLBFGS 
# Build the Support Vector Machine model
svm_model = SVMWithSGD.train(svm_training_data, iterations=500) # lower iterations so we arent here forever

# Evaluate the model on training data
svm_labelsAndPreds = svm_training_data.map(lambda p: (p.label, svm_model.predict(p.features)))
svm_trainErr = 100* svm_labelsAndPreds.filter(lambda (v, p): v != p).count() / float(svm_training_data.count())
print("training error of the SVM model: " + "%.3f" %(svm_trainErr)) + "%" 
print("training accuracy of the SVM model: " + "%.3f" %(100-svm_trainErr)) + "%" 


#Evaluate on testing data.. the correct way lolz

svm_labelsAndPredsTest = svm_testing_data.map(lambda p: (p.label, svm_model.predict(p.features)))
svm_testErr = 100* svm_labelsAndPredsTest.filter(lambda (v, p): v != p).count() / float(svm_testing_data.count())
print("testing error of the SVM model: " + "%.3f" %(svm_testErr)) + "%" 
print("training accuracy of the SVM model: " + "%.3f" %(100-svm_testErr)) + "%" 



In [None]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel, LogisticRegressionWithLBFGS 
from pyspark.mllib.regression import LabeledPoint


'''  REFRESH BLOCK
REFRESHING DATA
Re Run data -> refresh in case we are using another approach above this as well
Things to do - test/update number of features
'''
htf = HashingTF(numFeatures=1) # features need to match dimensions of output
print ("LogisticRegressionWithLBFGS: REFRESHING HTF, and FEATURE VECTOR")
feature_vector = removeSW_output.map(lambda elements: LabeledPoint(1.0, [htf.transform(elements[1])]) if float(elements[0]) >= 2.5 else LabeledPoint(0.0, [htf.transform(elements[1])]))
'''
REFRESHED DATA
'''

log_training_data, log_testing_data= feature_vector.randomSplit([0.7, 0.3])

#LogisticRegressionWithLBFGS 
# Build the Support Vector Machine model
log_model = LogisticRegressionWithLBFGS.train(log_training_data, iterations=500) # lower iterations so we arent here forever

# Evaluate the model on training data
log_labelsAndPreds = log_training_data.map(lambda p: (p.label, log_model.predict(p.features)))
log_trainErr = 100* log_labelsAndPreds.filter(lambda (v, p): v != p).count() / float(log_training_data.count())
print "Log Training Accuracy: " + "%.3f" %(100-log_trainErr) + "%" 
print("training error of the LogisticRegressionWithLBFGS model= " + "%.3f" % (log_trainErr)) + "%" 

#Evaluate on testing data.. the correct way lolz

log_labelsAndPredsTest = log_testing_data.map(lambda p: (p.label, log_model.predict(p.features)))
log_testErr = 100* log_labelsAndPredsTest.filter(lambda (v, p): v != p).count() / float(log_testing_data.count())

print "Log Testing Accuracy: " + "%.3f" %(100-log_testErr) + "%" 
print("testing error of the LogisticRegressionWithLBFGS model: " + "%.3f" %(log_testErr)) + "%" 


## Do some Predictions - Bayes

In [None]:
### THIS USES NAIVE BAYES CLASSIFIER
### with 116 elements with the first output.txt 86KB , gets 80-93%
# tested with lots of data.... - 111MB

'''  REFRESH BLOCK
REFRESHING DATA
Re Run data -> refresh in case we are using another approach above this as well
Things to do - update number of features
'''
## Took forever to figure out
## key notes, use mllib if you are using rdd, ditch ml if you are not using data frams
htf = HashingTF(numFeatures=1) # features need to match dimensions of output
print ("BAYES: REFRESHING HTF, and FEATURE VECTOR")
feature_vector = removeSW_output.map(lambda elements: LabeledPoint(1.0, [htf.transform(elements[1])]) if float(elements[0]) >= 2.5 else LabeledPoint(0.0, [htf.transform(elements[1])]))
'''
REFRESHED DATA
'''

# would be nice to figure out which division is the best
#training_data, testing_data= feature_vector.randomSplit([0.6, 0.4])
bayes_training_data, bayes_testing_data= feature_vector.randomSplit([0.7, 0.3])

# parameters:
lamda = 1.0

# build Naive Bayes Classifier
nbay = mllib_class.NaiveBayes.train(bayes_training_data, lamda)

# Make prediction and test accuracy.




bayes_labelsAndPreds = bayes_training_data.map(lambda p : (nbay.predict(p.features), p.label))
#bayes_trainErr = bayes_labelsAndPreds.filter(lambda (v, p): v != p).count() / float(bayes_training_data.count())
bayes_trainErr = 100.0 * bayes_labelsAndPreds.filter(lambda (x, v): x == v).count() / bayes_training_data.count()

print "Naive Bayes Training Accuracy: " + "%.3f" %(bayes_trainErr) + "%" 
print "Bayes Training error: " + "%.3f" % (100-bayes_trainErr) + "%" 


# Testing Data Accuracy

bayes_labelsAndPreds = bayes_testing_data.map(lambda p : (nbay.predict(p.features), p.label))
#bayes_testErr = bayes_labelsAndPreds.filter(lambda (v, p): v != p).count() / float(bayes_testing_data.count())
bayes_testErr = 100.0 * bayes_labelsAndPreds.filter(lambda (x, v): x == v).count() / bayes_testing_data.count()


print "Naive Bayes Testing Accuracy: " + "%.3f" %(bayes_testErr) + "%" 
print "Bayes Testing error: " + "%.3f" % (100-bayes_testErr) + "%" 