In [1]:
import re
from collections import Counter
import numpy as np

In [3]:
train_pos = sc.textFile('NB_files/train_pos.txt')
train_neg = sc.textFile('NB_files/train_neg.txt')
test_pos = sc.textFile('NB_files/test_pos.txt')
test_neg = sc.textFile('NB_files/test_neg.txt')

In [4]:
stop_words = ['a', 'able', 'about', 'across', 'after', 'all', 'almost', 'also',
              'am', 'among', 'an', 'and', 'any', 'are', 'as', 'at', 'be', 'because',
              'been', 'but', 'by', 'can', 'cannot', 'could', 'dear',
              'did', 'do', 'does', 'either', 'else', 'ever', 'every', 'for',
              'from', 'get', 'got', 'had', 'has', 'have', 'he', 'her', 'hers',
              'him', 'his', 'how', 'however', 'i', 'if', 'in', 'into', 'is',
              'it', 'its', 'just', 'least', 'let', 'like', 'likely', 'may',
              'me', 'might', 'most', 'must', 'my', 'neither', 'no', 'nor',
              'not', 'of', 'off', 'often', 'on', 'only', 'or', 'other', 'our',
              'own', 'rather', 'said', 'say', 'says', 'she', 'should', 'since',
              'so', 'some', 'than', 'that', 'the', 'their', 'them', 'then',
              'there', 'these', 'they', 'this', 'tis', 'to', 'too', 'twas', 'us',
              've', 'wants', 'was', 'we', 'were', 'what', 'when', 'where', 'which',
              'while', 'who', 'whom', 'why', 'will', 'with', 'would', 'yet',
              'you', 'your']

In [5]:
def parse(blob):
    blob = blob.split('\n')
    blob = [re.sub('[^A-Za-z]', ' ', i) for i in blob]
    blob = [i.split() for i in blob]
    blob = [item for sublist in blob for item in sublist]
    blob = [word.strip().lower() for word in blob if word not in stop_words if len(word) >= 3]
    return blob

In [6]:
# Flatten the list for total word count and 
train_pos_bag = train_pos.flatMap(parse)
train_neg_bag = train_neg.flatMap(parse)

In [7]:
# Count of All Words per Class
pos_word_count = train_pos_bag.count()
neg_word_count = train_neg_bag.count()

# Count per Word
pos_count_by_word = train_pos_bag.countByValue()
neg_count_by_word = train_neg_bag.countByValue()

# Unique Words in All Classes
unique_words_pos = [i for i in pos_count_by_word.keys() if type(i) is not int]
unique_words_neg = [i for i in neg_count_by_word.keys() if type(i) is not int]

num_unique = len(unique_words_pos) + len(unique_words_neg)

### Define Naive Bayes Function

In [8]:
def the_naive_bassifier(label_tuple):
    '''
    input: Spark RDD tuple of a class and a word list from document
    output: predicted class and actual class labels
    '''
    pos_list = list()
    neg_list = list()

    orig_label = label_tuple[0]
    new_text = Counter(label_tuple[1])
    test_doc_wc = len(new_text)
    
    for i in new_text.keys():
        try:   
            cnt_word_pos = pos_count_by_word[i]
        except KeyError:
            cnt_word_pos = 0

        try:
            cnt_word_neg = neg_count_by_word[i]
        except KeyError:
            cnt_word_neg = 0

        cl_pos = (cnt_word_pos + 1) / float(pos_word_count + num_unique)
        cl_neg = (cnt_word_neg + 1) / float(neg_word_count + num_unique)
        
        pos_list.append(cl_pos ** new_text[i])
        neg_list.append(cl_neg ** new_text[i])
        
    pos_score = np.log(.5) + np.sum(pos_list)
    neg_score = np.log(.5) + np.sum(neg_list)
        
    if pos_score > neg_score:
        pred_label = 1.
    elif pos_score == neg_score:
        pred_label = np.random.randint(0, 2)
    else:
        pred_label = 0

    return (pred_label, orig_label)

In [9]:
# Parse into Bags of Words
test_pos_bag = test_pos.map(parse)
test_neg_bag = test_neg.map(parse)

In [10]:
# Label
test_pos_bag = test_pos_bag.map(lambda x: (1., x))
test_neg_bag = test_neg_bag.map(lambda x: (0, x))

In [11]:
# Join
join_test = test_pos_bag.union(test_neg_bag)
join_test.cache()

UnionRDD[14] at union at NativeMethodAccessorImpl.java:-2

In [12]:
pred_rdd = join_test.map(the_naive_bassifier)
pred_rdd.cache()

PythonRDD[15] at RDD at PythonRDD.scala:43

### Model Accuracy

In [13]:
test_accuracy = 1.0 * pred_rdd.filter(lambda (x, v): x == v).count() / pred_rdd.count()
test_accuracy

0.69112