Let's make a naive Bayes classifier. We will run it on a collection of tweets and Reddit comments whose sentiment is either positive, neutral, or negative.

First, let's load the data into training and test corpora.

In [1]:
def get_corpus():
    with open('twitter.csv', 'r', encoding='latin-1') as f:
        lines = f.readlines()
    corpus = []
    n = len(lines)
    i = 1
    while i < n:
        line = lines[i]
        xy = line.split(',')
        x = xy[0].rstrip().split(' ')
        if len(xy) == 1: # multiline tweet
            line = lines[i][1:].rstrip() # remove double quote
            i += 1
            if len(line) == 0 and i < n:
                # beginning of tweet so dont put space
                line += lines[i].split(',')[0].rstrip()
                i += 1
            while i < n and len(lines[i].split(',')) == 1:
                line += ' ' + lines[i].rstrip()
                i += 1
            xy = lines[i].split(',')
            endsize = len(xy[0])
            if endsize > 1: # ignore double quote i.e. endsize == 1
                # remove double quote at the end
                line += ' ' + xy[0][:endsize - 1] 
            x = line.split(' ')
        y = int(xy[1].rstrip())
        corpus.append([x, y])
        i += 1
    return corpus

corpus = get_corpus()
train = corpus[:int(len(corpus) * 0.8)]
test = corpus[int(len(corpus) * 0.8):]
classes = [-1, 0, 1] # negative, neutral, positive


Now we can define our naive Bayes classifier class. All we really need is a train function and a test function.

To train, we calculate the probabilities of each class, P(c), and of every feature (in our case individual words) given each class, P(w|c), using MLE and counting.

The prior probability of a class P(c) will be defined as the number of documents (tweets) in class c out of all available documents.

The likelihood of a feature given a class P(w|c) will be defined as the number of occurrences of w in documents in class c out of the number of occurrences of all words in documents in class c.

For a test sample, we find the probability of the sample document given each class, P(d|c), and return the argmax class as our prediction.

We'll need to import a log function since we would rather add up log probabilities than multiply (lots of near zero) probabilities.

We'll also need the Counter class to get frequency counts from our training corpus. Specifically, we're going to use a hyperparameter `vocab_size` to set our features as the `vocab_size` most common words in the corpus. These are the most common words in the entire corpus, not just for a class. 

Notice the additional `smoothing` parameter. Since we are using individual words as features, there's a chance during training we find a word that doesn't show up in any documents for a class and gets assigned zero probability. The computer will not like taking log of zero, and we will not like the model. To avoid this, we simply add the value of `smoothing` to every word count to get rid of zero probabilities. Since we add the same amount to every count in the numerator and denominator, we maintain a valid probability distribution.

In [2]:
from math import log2
from collections import Counter

class NaiveBayesClassifier:

    def __init__(self, C: list, vocab_size: int, smoothing=0.05):
        assert vocab_size > 0

        self.C = C
        self.vocab_size = vocab_size
        self.smoothing = smoothing

    def train(self, D: list):
        big = []
        for i in range(len(D)):
            big.extend(D[i][0])
        counts = Counter(big)
        self.V = [tup[0] for tup in counts.most_common(self.vocab_size)]

        N_doc = len(D)
        self.logpriors = []
        self.bigdoc = []
        self.loglikelihood = [[] for _ in range(self.vocab_size)]
        for i in range(len(self.C)): # calculate P(c) terms
            c = self.C[i]
            print('Training %d' % c)

            N_c = 0
            self.bigdoc.append([])
            for sample in D:
                if sample[1] == c:
                    N_c += 1
                    self.bigdoc[i].extend(sample[0])
            self.logpriors.append(log2(N_c / N_doc))
            print(' %d samples' % N_c)
            print(' log P(c) = %f. Counting word occurrences...' % self.logpriors[i])
            sum = 0
            counts = Counter(self.bigdoc[i])
            for w_prime in self.V:
                sum += counts[w_prime] + self.smoothing
            count = 0
            print(' Counted %d occurrences, now computing log P(w|c)s...' % sum)
            for j in range(self.vocab_size): # calculate P(w|c) terms
                count += 1
                w = self.V[j]
                self.loglikelihood[j].append(log2((counts[w] + self.smoothing) / sum))

    def test(self, testdoc):
        max_prob, max_c = -1000000, None
        for i in range(len(self.C)):
            class_prob = self.logpriors[i]
            for word in testdoc:
                if word in self.V:
                    index = self.V.index(word)
                    class_prob += self.loglikelihood[index][i]
            if class_prob > max_prob:
                max_prob = class_prob
                max_c = self.C[i]
        return max_c

We need to be able to evaluate our predictions, so for now we will just compute the recall of each class with a simple function

In [3]:
def evaluate(y, y_pred, tp, fp, tn, fn, tneu, fneu):
    if y == -1:
        if y_pred == -1:
            tn += 1
        else:
            fn += 1
    elif y == 0:
        if y_pred == 0:
            tneu += 1
        else:
            fneu += 1
    else:
        if y_pred == 1:
            tp += 1
        else:
            fp += 1
    return (tp, fp, tn, fn, tneu, fneu)

Now we can train and test.

In [15]:

vocab_size = 5000
model = NaiveBayesClassifier(classes, vocab_size)
model.train(train)
print('Testing %d samples' % len(test))
tp, fp, tn, fn, tneu, fneu, count = 0, 0, 0, 0, 0, 0, 0
for sample in test:
    count += 1
    y = sample[1]
    y_pred = model.test(sample[0])
    tp, fp, tn, fn, tneu, fneu = evaluate(y, y_pred, tp, fp, tn, fn, tneu, fneu)
    if count % 5000 == 0:
        print(' %d samples, tpr %f tneur %f tnr %f' % (count, (tp / (tp + fp)), (tneu / (tneu + fneu)), (tn / (tn + fn))))

print('positive recall %%: %f' % (100 * tp / (tp + fp)))
print('neutral recall %%: %f' % (100 * tneu / (tneu + fneu)))
print('negative recall %%: %f' % (100 * tn / (tn + fn)))

Training -1
 28151 samples
 log P(c) = -2.204365. Counting word occurrences...
 Counted 599221 occurrences, now computing log P(w|c)s...
Training 0
 43970 samples
 log P(c) = -1.561032. Counting word occurrences...
 Counted 554936 occurrences, now computing log P(w|c)s...
Training 1
 57619 samples
 log P(c) = -1.171007. Counting word occurrences...
 Counted 1168327 occurrences, now computing log P(w|c)s...
Testing 32436 samples
 5000 samples, tpr 0.830660 tneur 0.777712 tnr 0.757754
 10000 samples, tpr 0.836054 tneur 0.778696 tnr 0.766779
 15000 samples, tpr 0.841556 tneur 0.773861 tnr 0.763688
 20000 samples, tpr 0.842736 tneur 0.779944 tnr 0.756875
 25000 samples, tpr 0.846267 tneur 0.784471 tnr 0.753097
 30000 samples, tpr 0.849014 tneur 0.779122 tnr 0.752404
positive recall %: 84.784897
neutral recall %: 77.753524
negative recall %: 75.152948
