### 0. Setup imports

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.ml.feature import Tokenizer
# import matplotlib.pyplot as plt
import re
import csv
import StringIO
import sys
from math import sqrt

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes

### 1. Data Cleanup and processing stage
#### Create some utility methods for data parsing and processing
##### loadRecord(line) will parse the record line from the input dataset, does basic sanity checks to verify if there are any problems and returns accordingly
##### isANumber(str) will validate if a given string is actually a number. This is to process and check if the data value read for score is a Number
##### tokenizer(string) is a simple tokenizer which splits the word based on any non word character and then converts into lowercase

    

In [63]:
split_regex = r'\W+'

def isANumber(str):
    try:
        float(str)
        return True
    except ValueError:
        return False

def loadRecord(line):
    """
    Parses the records using CSV reader and returns as dict fields
    :param line:
    :return:
    """
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["productId", "userId", "summary", "text", "score"])
    rec =reader.next()
    # print ("rec:", rec, len(rec.get("productId")), len(rec.get("userId")), len(rec.get("summary")), len(rec.get("text")))

    if rec is not None:
        if (rec.get("productId", "") is None \
            or rec.get("userId", "") is None \
            or rec.get("summary", "") is None \
            or rec.get("text", "") is None \
            or rec.get("score", "") is None):
        # if (len(rec) != 5):
            return (rec, 0)
        else:
            if isANumber(rec.get("score", "")):
                return (rec, 1)
            else:
                return (rec, 0)
    else:
        return (rec, 0)

def removeStopWords(tokenList):
    stopWords = ['a', 'the', 'of', 'and', 's', 'this', 'is', 'it', 'i', 'to', 'my', 'on', 'you', 'for']
    return [w for w in tokenList if not w in stopWords]
    
def tokenizer(string):
    """ A simple tokenization implementaion
        break the string into tokens based on the regex splitter
    Args:
        string (str): input string
    Returns:
        list: a list of tokens
    """
    return removeStopWords([x for x in re.split(split_regex, string.lower()) if x != ''])



In [64]:
tokenizer("this is a simple-string") 

['simple', 'string']

#### Next we will load the data from the external File. We will also sample the dataset to create a smaller sample size of 1% or 10%
#### we will then parse the data. Using the utility functions created above, we will process the dataset and filter out bad records.

In [65]:
# read the dataset and create an RDD excluding the header line
input = sc.textFile("amazonreviews/movies-reviews.csv")
header = input.take(1)[0]
lineslarge = input.filter(lambda line: line != header)

# Create a smaller dataset from the large sample for Test and experimentation purposes
lines = lineslarge.sample(False,0.01)

# parse and load the records, filtering out bad records
#   create reviews rdd with label and text
parsedRecords = lines.map(lambda line: loadRecord(line.encode('utf-8'))).cache()
reviews = parsedRecords.filter(lambda s: s[1] == 1).map(lambda s: s[0]).cache()
badrecs = parsedRecords.filter(lambda s: s[1] == 0).map(lambda s: s[0])

### 2 Explore input Data

In [43]:
totalParsed = parsedRecords.count()
goodRecs = reviews.count()
badRecs = totalParsed - goodRecs
print ("Total Records in the dataset:" ,totalParsed)
print ("# of good records:" ,goodRecs)
print ("# of bad records:" ,badRecs)


('Total Records in the dataset:', 78904)
('# of good records:', 78904)
('# of bad records:', 0)


### 3. Create Training and Test dataset. Transform data to create input features using the Bag-of-Words TF-IDF model

#### We will now split the dataset into a tTraining set and Test set. We will create training RDD and TestRDD which will contain the labeled data points and the review texts (which comes from the summary)

In [59]:
# split the dataset to training and test data
trainingRDD, testRDD = reviews.randomSplit([8, 2], seed=0L)

# Create a trainingRDD with the labels, and the review text
labelsTrainingRDD = trainingRDD.map(lambda fields:fields.get("score")).map(lambda score: (1 if float(score) >= 3 else 0 ))
reviewTextTokensTrainingRDD = trainingRDD.map(lambda fields:fields.get("summary")).map(lambda text: tokenizer(text))

# Create a testRDD with the labels, and the review text
labelsTestRDD = testRDD.map(lambda fields:fields.get("score")).map(lambda score: (1 if float(score) >= 3 else 0 ))
reviewTextTokensTestRDD = testRDD.map(lambda fields:fields.get("summary")).map(lambda text: tokenizer(text))

#### 3.a. Explore the parsed dataset

In [60]:
# lets look at a couple of reviews
reviewTextTokensTrainingRDD.take(5)

[['completely', 'misses', 'point'],
 ['classic', 'with', 'some', 'cool', 'extras'],
 ['enjoyable', 'journey'],
 ['great', 'movie'],
 ['last', 'samurai']]

In [61]:
# how many unique tokens
tokenCounts = reviewTextTokensTrainingRDD.flatMap(lambda text: text).map(lambda token: (token,1)).reduceByKey(lambda x, y: x + y)
print tokenCounts.count()

20124


In [62]:
# what are the top 10 tokens
tokenCounts.sortBy(lambda x: -x[1]).take(30)

[('movie', 7349),
 ('great', 5262),
 ('it', 3980),
 ('to', 3419),
 ('good', 3312),
 ('for', 3146),
 ('i', 3093),
 ('not', 2835),
 ('best', 2766),
 ('but', 2582),
 ('film', 2550),
 ('in', 2537),
 ('one', 2169),
 ('you', 2105),
 ('dvd', 2064),
 ('classic', 1606),
 ('t', 1563),
 ('on', 1503),
 ('love', 1471),
 ('with', 1428),
 ('all', 1352),
 ('my', 1334),
 ('very', 1294),
 ('as', 1259),
 ('ever', 1249),
 ('an', 1243),
 ('what', 1163),
 ('excellent', 1111),
 ('time', 1099),
 ('fun', 1090)]

In [74]:
# lets look at the words most appearing in positive sentiments and words most appearing in negative sentiments
positiveTokens = labelsTrainingRDD.zip(reviewTextTokensTrainingRDD).flatMapValues(lambda x: x).filter(lambda x: x[0] ==1)
negativeTokens = labelsTrainingRDD.zip(reviewTextTokensTrainingRDD).flatMapValues(lambda x: x).filter(lambda x: x[0] ==0)
positiveTokenCounts = positiveTokens.map(lambda l: (l[1],1)).reduceByKey(lambda x, y: x + y)
negativeTokenCounts = negativeTokens.map(lambda l: (l[1],1)).reduceByKey(lambda x, y: x + y)

In [79]:
print "Top Positive tokens:", positiveTokenCounts.sortBy(lambda x: -x[1]).take(10)
print "Top Negative tokens:", negativeTokenCounts.sortBy(lambda x: -x[1]).take(10)

Positive tokens: [('movie', 6466), ('great', 5060), ('good', 3017), ('best', 2708), ('film', 2300), ('but', 2245), ('in', 2234), ('not', 1995), ('one', 1952), ('dvd', 1757)]
Negative tokens: [('movie', 883), ('not', 840), ('t', 439), ('bad', 402), ('but', 337), ('dvd', 307), ('in', 303), ('good', 295), ('what', 278), ('worst', 268)]


### 4. Create the input features which is a weighted bag-of-words model using TF-IDF: 
#### TF is weighted at the document level (or to the individual input text) such that it assigns greater weights to tokens that appear more frequently within the document. IDF is weighted to at the corpus level and assigns more weights to tokens that appear less frequently. IDF kind of normalizes the commonly occuring words, so the stop words get lesser weights

In [66]:
# Create TF-IDF model features on the training review text data
tfTrain = HashingTF().transform(reviewTextTokensTrainingRDD)
idfTrain = IDF().fit(tfTrain)
tfidfTrain = idfTrain.transform(tfTrain)

# Combine the labels with the TF-IDF feature vectors
training = labelsTrainingRDD.zip(tfidfTrain).map(lambda x: LabeledPoint(x[0], x[1]))

### 5. Train the Naive Bayes classifier, using the TF-IDF bag of words as the input features

In [67]:
# Now train a naive bayes classifier, using the TF-IDF feature set
model = NaiveBayes.train(training)

### 6. Transform the test dataset to create the TF-IDF features, using the same model that was trained earlier, and make the predictions on the test data

In [68]:
# Create TF-IDF model features on the test review text data
tfTest = HashingTF().transform(reviewTextTokensTestRDD)
# idfTest = IDF().fit(tfTest)
tfidfTest = idfTrain.transform(tfTest)

#Lets predict out accuracy
# predictedRDD =  labelsTrainingRDD.zip(model.predict(tfidfTrain)).map(lambda x: {"actual": x[0], "predicted": x[1]})

predictedRDD =  labelsTestRDD.zip(model.predict(tfidfTest)).cache()
sampleCount =  predictedRDD.count()


### 7. Evaluate the model

In [70]:
accuracy = 1.0 * predictedRDD.filter(lambda x: x[0] == x[1]).count() / sampleCount
print ("Accuracy:", accuracy, ", Predicted on the test set of count:", sampleCount)

('Accuracy:', 0.8752366527830368, ', Predicted on the test set of count:', 15846)


### 8. Explore the inaccuracies

In [89]:
incorrectPredictions = predictedRDD.zip(testRDD.map(lambda fields:fields.get("summary"))).filter(lambda x: x[0][0] != x[0][1])

In [93]:
print "Total incorrect predictions:",incorrectPredictions.count()
incorrectPredictions.map(lambda x: {"Actual sentiment":x[0][0], "Predicted sentiment":x[0][1], "Review text":x[1]}).take(30)

Total incorrect predictions: 1977


[{'Actual sentiment': 1,
  'Predicted sentiment': 0.0,
  'Review text': 'Hilariously Eurocentric'},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': 'Rent it (unless it\'s cheaper to buy it "previously viewed")'},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': 'Hannibal the Cannibal meets Miami Vice'},
 {'Actual sentiment': 1,
  'Predicted sentiment': 0.0,
  'Review text': 'The Cardinal'},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': 'Not a very good film in 1963, and now very dated.'},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': 'typical Oliver Stone'},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': "Even The Very Hot Elisha Can't Save This Lukewarm Flick"},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': 'Soul Plane (R-Rated)'},
 {'Actual sentiment': 0,
  'Predicted sentiment': 1.0,
  'Review text': 'Over the top and over again'},
 {'