In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark import SQLContext
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
# from pyspark import SparkContext as sc, SparkConf, SQLContext as sqlContext
import sys
sys.path.insert(1, 'Classifier')
import Classifier
from Classifier import YelpClassifier

In [3]:
def input_line_to_review(input_line):
    # some reviews may be malformed/not contain all of the fields
    tokens = input_line
    review_id, num_stars, review_text = tokens[0], tokens[1], tokens[2]
    return [(review_id, num_stars, review_text)]

def input_line_to_review_no_label(input_line):
    # some reviews may be malformed/not contain all of the fields
    tokens = input_line
    review_id, review_text = tokens[0], tokens[2]
    return [(review_id, review_text)]

def get_review_stars(actual_rdd):
    # Transformations:
    # 1. [(review_id, num_stars, review_text_as_string)] --> [(review_id, true_num_stars)]

    reviews_and_stars = actual_rdd \
                        .flatMap(filter_only_review_id_and_stars) \
                        .sortByKey()

    return reviews_and_stars

# Converts a review to a key-value pair of only review ID and its number of stars
def filter_only_review_id_and_stars(review):
    review_id, num_stars, review_text = review
    return [(review_id, num_stars)]


In [4]:
def initRDDs(training_data_file, test_data_file):

    # takes text file of reviews --> RDD
    try:
        train_rdd = training_data_file.flatMap(input_line_to_review)
        test_rdd = test_data_file.flatMap(input_line_to_review_no_label)
        actual_rdd = test_data_file.flatMap(input_line_to_review)
        return train_rdd, test_rdd, actual_rdd
    except:
        print >> sys.stderr, "Unable to load train and test data files"
        return


In [5]:
reviews = sqlContext.read.json("/Users/rileycampbell/Desktop/GitHub/yelp-star-prediction/yelp_dataset/review.json")
review_rows = reviews.select("review_id", "stars", "text")

In [79]:

split = review_rows.randomSplit([0.01, 0.001, 0.989])
split[2] = 0

train_split, test_split = split[0], split[1]
# print(train_split.count())
length1 = train_split.count()
length2 = test_split.count()
print(length1, length2)

(66835, 6670)


In [80]:
train_lst = []
for row in train_split.collect():
    new_text = str(row[2].encode("utf-8"))
    train_lst.append((row[0], row[1], new_text))

test_lst = []
for row in test_split.head(1000):
    new_text = str(row[2].encode("utf-8"))
    test_lst.append((row[0], row[1], new_text))

training_file = sc.parallelize(train_lst)
test_file = sc.parallelize(test_lst)


train_rdd, test_rdd, actual_rdd = initRDDs(training_file, test_file)

In [81]:
import sys
import math

# Star bins used for classifying Yelp reviews
STARS = [1.0, 2.0, 3.0, 4.0, 5.0]

class YelpClassifier(object):

    def __init__(self):
        self.NUM_REVIEWS = {} # num_stars -> # of reviews with num_stars
        self.NUM_WORDS = {} # num_stars -> # of words total across all reviews with num_stars
        self.LIKELIHOODS = {} # word -> P(word | num_stars)
        self.PRIORS = {} # num_stars -> P(num_stars)

    ######################################################
    ################# DRIVER FUNCTIONS ##################
    ######################################################

    def train(self, train_rdd):
        self.NUM_REVIEWS, self.NUM_WORDS = self.calculate_num_reviews_and_words_per_num_stars(train_rdd)
        self.PRIORS = self.calculate_priors()
        self.LIKELIHOODS = self.calculate_likelihoods(train_rdd)

    def classify(self, test_rdd):
        predictions = self.classify_reviews(test_rdd)
        predictedReviews = predictions.collect()
        return predictedReviews

    ######################################################
    ################# PYSPARK FUNCTIONS ##################
    ######################################################

    # Given the reviews in train_rdd, calculates P(word | num_stars) for every word found in a review,
    # i.e. the likelihood of a word in a review given the number of stars that review received.
    # The likelihood is calculated as, across all of the reviews given NUM_STARS:
    # P(WORD | NUM_STARS) = # of occurences of WORD / # of total words in all reviews with NUM_STARS
    def calculate_likelihoods(self, train_rdd):
        # Transformations:
        # 1. (review_id, num_stars, review_text_as_string) --> [((num_stars, word_in_review), 1)]
        # 2. [((num_stars, word_in_review), 1)] --> [((num_stars, word_in_review), num_reviews_of_num_stars_with_word)]
        # 3. [((num_stars, word_in_review), num_reviews_with_word_and_stars)] --> [(num_stars, {word : probability_in_review_of_num_stars})]
        # 4. [(num_stars, {word1 : prob1, word2 : prob2, word3 : prob3...})]
        # raise NotImplementedError()

        class_likelihoods = train_rdd \
                            .flatMap(self.review_to_word_counts) \
                            .reduceByKey(self.add_review_counts) \
                            .map(self.counts_to_probabilities) \
                            .aggregateByKey({}, self.combine_probability_tables, self.combine_probability_tables)

        LIKELIHOODS = {}
        for num_stars, likelihood in class_likelihoods.collect():
            LIKELIHOODS[int(num_stars)] = likelihood

        return LIKELIHOODS


    # Calculate number of reviews per number of stars
    def calculate_num_reviews_and_words_per_num_stars(self, train_rdd):
        # Transformations:
        # 1. (review_id, num_stars, review_text_as_string) --> [(num_stars, (1, num_words))]
        # 2. [(num_stars, (1, num_words))] --> [(num_stars, (num_reviews_of_num_stars, num_words_total_of_num_stars))]

        # raise NotImplementedError()

        num_reviews_words_per_num_stars = train_rdd \
                        .map(self.review_to_num_stars_num_words)\
                        .reduceByKey(self.add_review_and_word_counts)


        NUM_REVIEWS = {}
        NUM_WORDS = {}
        for num_stars, counts in num_reviews_words_per_num_stars.collect():
            num_reviews, num_words = counts
            NUM_REVIEWS[int(num_stars)] = int(num_reviews)
            NUM_WORDS[int(num_stars)] = int(num_words)

        return NUM_REVIEWS, NUM_WORDS

    # Calculates priors for each class as: P(num_stars) = # of reviews with num_stars / # of total reviews
    def calculate_priors(self):
        total_num_reviews = 0
        for num_stars, num_reviews in self.NUM_REVIEWS.items():
            total_num_reviews += num_reviews

        PRIORS = self.NUM_REVIEWS.copy()
        for num_stars, num_reviews in PRIORS.items():
            prior = float(num_reviews) / float(total_num_reviews)
            PRIORS[num_stars] = prior

        return PRIORS

    def classify_reviews(self, test_rdd):
        # Transformations:
        # 1. (review_id, num_stars, review_text_as_string) --> [((num_stars1, review_id) word1), ((num_stars1, review_id), word2), ((num_stars2, review_id), word1)...]
        # 2. [((num_stars1, review_id) word1), ((num_stars1, review_id), word2), ((num_stars2, review_id), word1)...] ->
        #    [((num_stars1, review_id) log_p_word1), ((num_stars1, review_id), log_p_word2), ((num_stars2, review_id), log_p_word1)...]
        # 3. [((num_stars1, review_id) p_word1), ((num_stars1, review_id), p_word2)] --> [((num_stars1, review_id), p_word2_sum)]
        # 4. [((num_stars1, review_id), log_likelihood)] --> [((num_stars1, review_id), log_posterior)]
        # 5. [((num_stars1, review_id), log_posterior)] --> [(review_id, (num_stars1, log_posterior1)]
        # 6. [(review_id, (num_stars1, log_posterior1)] --> [(review_id, (num_stars, max_posterior)]
        # 7. [(review_id, (num_stars, max_posterior)] --> [(review_id, most_likely_num_stars)]
        predictions = test_rdd \
                            .flatMap(self.review_to_num_stars_and_word_pairs) \
                            .map(self.words_to_log_likelihoods) \
                            .reduceByKey(self.add_log_likelihoods) \
                            .map(self.likelihood_to_posterior) \
                            .map(self.review_id_only_as_key) \
                            .reduceByKey(self.find_max_posterior) \
                            .sortByKey()

        return predictions

    # ___________________________________________________ #
    # ____________TRAINING HELPERS_______________________ #
    # ___________________________________________________ #

    # ____________calculate_likelihoods() helpers________ #


    # Maps the words in a review to pairs of that word, the number of stars
    # of the review that the word was in, and the count of that word.
    # Review is of the format (review_id, num_stars, review_text_as_string)
    # Converts to [(key, 1)] where key = (num_stars, word_in_review)
    @staticmethod
    def review_to_word_counts(review):
        lst = []
        for elem in review[2].split(" "):
            lst.append(((review[1], elem), 1.0))
        return lst
        #raise NotImplementedError()


    # Adds two review counts together
    @staticmethod
    def add_review_counts(count1, count2):
        return count1 + count2
        #raise NotImplementedError()

    # Helper function to compute the likelihood of a word with Laplace smoothing
    def compute_likelihood(self, count_of_word, num_stars):
        probability = float((count_of_word + 1)) / float((self.NUM_WORDS[int(num_stars)] + 1))
        return probability


    # Maps the count of a word, over reviews of the same number of stars,
    # to its likelihood: P(WORD | NUM_STARS) = count of WORD / count of ALL words in reviews with NUM_STARS
    def counts_to_probabilities(self, num_stars_and_word_counts):
        # raise NotImplementedError()
        num_stars = num_stars_and_word_counts[0][0]
        word = num_stars_and_word_counts[0][1]
        ugh = num_stars_and_word_counts[1]
        return(num_stars, {word : self.compute_likelihood(ugh, num_stars)})


    # Given two probability tables as dictionaries (e.g. {word1: prob1, word2: prob2...}),
    # Combines the two tables, by adding the contents of the second table into the first,
    # and returning the now updated first table.
    def combine_probability_tables(self, word1_and_probability, word2_and_probability):
        word1_and_probability.update(word2_and_probability)
        return word1_and_probability

    # ____________calculate_num_reviews_per_num_stars() helpers________ #


    # Converts a review into a pair of its number of stars with the
    # number of words in the review and the number of reviews it represents
    @staticmethod
    def review_to_num_stars_num_words(review):
        return (review[1], (1, len(review[2].split(" "))))
        # raise NotImplementedError()


    # Combines the values of two pairs outputted by review_to_num_stars_num_words
    @staticmethod
    def add_review_and_word_counts(count1, count2):
        return (count1[0] + count2[0], count1[1] + count2[1])
        # raise NotImplementedError()

    # _________________________________________________________ #
    # ____________CLASSIFICATION HELPERS_______________________ #
    # _________________________________________________________ #

    # ____________classify_reviews() helpers____________________ #

    # For all words in the review and all possible star ratings for the review,
    # returns list of pairs of the possible number of stars, the review's ID, and the word itself.
    @staticmethod
    def review_to_num_stars_and_word_pairs(review):
        review_id, review_text = review
        review = review_text.split(" ")

        possible_num_stars_word_pairs = []
        for word in review:
            possible_num_stars_and_word = [((num_stars, review_id), word) for num_stars in STARS]
            possible_num_stars_word_pairs.extend(possible_num_stars_and_word)

        return possible_num_stars_word_pairs

    # Given a possible number of stars, maps a word to its log-likelihood using self.LIKELIHOODS
    # i.e. for WORD and possible NUM_STARS, maps WORD to log(P(WORD | NUM_STARS))
    # If WORD is not found in self.LIKELIHOODS, then it has zero likelihood, which after Laplace smoothing,
    # maps to a log-likelihood = log(1 / # of total words in reviews with NUM_STARS + 1).
    def words_to_log_likelihoods(self, stars_id_word):
        num_stars, review_id = int(stars_id_word[0][0]), str(stars_id_word[0][1])
        word = str(stars_id_word[1])

        likelihoods = self.LIKELIHOODS[num_stars]

        if word in likelihoods:
            word_likelihood = likelihoods[word]
        else:
            num_words_with_num_stars = self.NUM_WORDS[num_stars]
            word_likelihood = 1.0 / float(num_words_with_num_stars + 1)
        log_likelihood = math.log(word_likelihood)

        return ((num_stars, review_id), log_likelihood)

    # Adds two log-likelihoods together
    @staticmethod
    def add_log_likelihoods(likelihood1, likelihood2):
        return float(likelihood1) + float(likelihood2)

    # Given a likelihood, P(WORD | NUM_STARS), calculates the posterior:
    # P(WORD, NUM_STARS) = P(WORD | NUM_STARS) * P(NUM_STARS)
    # P(NUM_STARS) is stored in self.PRIORS
    def likelihood_to_posterior(self, stars_id_lhood):
        num_stars, review_id = int(stars_id_lhood[0][0]), stars_id_lhood[0][1]
        log_likelihood = float(stars_id_lhood[1])
        prior = self.PRIORS[num_stars]
        posterior = log_likelihood + math.log(prior)

        return ((num_stars, review_id), posterior)

    # Modifies key-value pair such that the key only contains the review_id
    # All other "values" in the key are moved into the value of the pair
    @staticmethod
    def review_id_only_as_key(stars_id_posterior):
        num_stars, review_id = stars_id_posterior[0][0], stars_id_posterior[0][1]
        posterior = stars_id_posterior[1]

        return (review_id, (num_stars, posterior))


    # Returns the max of two posterior probabilities as well as the number of stars
    # rating the corresponds to the greater posterior probability
    @staticmethod
    def find_max_posterior(num_stars_posterior1, num_stars_posterior2):
        if (num_stars_posterior1[1] < num_stars_posterior2[1]):
            return num_stars_posterior2
        return num_stars_posterior1
        # raise NotImplementedError()


In [82]:
classifier = YelpClassifier()
classifier.train(train_rdd)
predictedReviews = classifier.classify(test_rdd)

In [83]:
actualReviews = get_review_stars(actual_rdd).collect()
hits = 0
star_hits_dict = {1: 0, 2:0, 3: 0, 4:0, 5: 0}
total = 0
star_total_dict = {1: 0, 2:0, 3: 0, 4:0, 5: 0}

for predicted, actual in zip(predictedReviews, actualReviews):
    actualLabel = int(actual[1])
#     print(actual, predicted)
    predictedLabel = int(predicted[1][0])
    if actualLabel == predictedLabel:
        star_hits_dict[actualLabel] += 1
        hits += 1
    star_total_dict[actualLabel] += 1
    total += 1

accuracy = float(hits) / float(total)
one_star_acc = float(star_hits_dict[1]) / float(star_total_dict[1])
two_star_acc = float(star_hits_dict[2]) / float(star_total_dict[2])
three_star_acc = float(star_hits_dict[3]) / float(star_total_dict[3])
four_star_acc = float(star_hits_dict[4]) / float(star_total_dict[4])
five_star_acc = float(star_hits_dict[5]) / float(star_total_dict[5])

In [None]:
print("\n\n\n\t\t\t\t\t############################################################")
print("\t\t\t\t\t\t\t\t\t\t\t\t")
print("\t\t\t\t\t\tOVERALL ACCURACY: %f (%d/%d)\t\t" %(accuracy, hits, total))
print("\t\t\t\t\t\t1 STAR REVIEW ACCURACY: %f (%d/%d)\t" \
    %(one_star_acc, star_hits_dict[1], star_total_dict[1]))
print("\t\t\t\t\t\t3 STAR REVIEW ACCURACY: %f (%d/%d)\t" \
    %(two_star_acc, star_hits_dict[2], star_total_dict[2]))
print("\t\t\t\t\t\t3 STAR REVIEW ACCURACY: %f (%d/%d)\t" \
    %(three_star_acc, star_hits_dict[3], star_total_dict[3]))
print("\t\t\t\t\t\t3 STAR REVIEW ACCURACY: %f (%d/%d)\t" \
    %(four_star_acc, star_hits_dict[4], star_total_dict[4]))
print("\t\t\t\t\t\t5 STAR REVIEW ACCURACY: %f (%d/%d)\t" \
    %(five_star_acc, star_hits_dict[5], star_total_dict[5]))
print("\t\t\t\t\t\t\t\t\t\t\t\t")
print("\t\t\t\t\t############################################################\n\n\n\n")