In [1]:
import sys
sys.path.append("./git")

# Naive Bayes

## A spam filter: principles

A simple spam filter might approach classifying spam emails in this way:

"what is the probabiility that an email is spam given that it contains the word Bitcoin?"

Bayes Theorem holds that the probability that the message is sam conditional on containing the word bitcoin is:

$$P(S|B)=P(B|S)P(S) / (P(B|S)P(S) + P(B|S')P(S'))$$

If you have a big collection of emails that you know or are sure are spam, you can easily estimate the probabilities $P(B|S)$, $P(S|S')$.

A more sophisticated one might scale this up and instead say:

"Given: 
- this list of words $w_1, ... ,w_n$;
- the event $X_i$ that an email contains word $w_i$; 
- some estimate $P(X_i|S)$ for the probability that a spam message contains the *i*th word; and
- some estimate $P(X_i|~S)$ for the probability that a non-spam message contains the *i*th word,

what is the probability that a message is spam?"

The assumption behind this is that the presence or absence of words $w_i$ to $w_n$ in a given message are independent of one another save for whether or not a message is spam.

This is a **huge** assumption, and hence this technique is called _Naive_ Bayes.

Imagine that our 'vocabulary' of spam words was only 'Bitcoin' and 'Rolex', and that half of all spam messages are abouting earning Bitcoin while the other half is about cheap Rolexes.

In this case the Naive Bayes estimate that a spam message contains both bitcoin and rolex is:

$$P(X_1 = 1, X_2 = 1|S) = P(X_1 = 1|S) P(X_2 = 1|S) = 0.5 * 0.5 = 0.25$$

Since we assumed away the knowledge that bitcoin and rolex actually never occur together.

Despite this, however, this technique often performs pretty well and is actually used in spam filters.

Applying Bayes Theorem again, we can calculate the probability that a message is spam using the equation 

$$P(S|X = x) = P(X = x|S) / [P(X = x|S + P(X = x|S')]$$

The NB assumption allows us to compute each of the probabilities on the right simply by multiplying together the individual probability estimates for each vocabulary word.

In practice, however, you don't want to multiply together to many probabilities because computers can run into difficulties multiplying many floating point values <1 together (called *underflow*).

Therefore, easy workaround, we use logarithms! Remember that $log(ab) = log(a) + log(b)$ and that $exp(log(x)) = x$, we usually compute $p_q * ... * p_n$ as the equivalent but floating-point-friendlier:

$$exp(log(p_1) + ... + log(p_n))$$

We are then left with figuring out a way to come up with estimates for $P(X_i|S)$ and $P(X_i|S')$ the probabilities that a spam message or nonspam message contains the word $w_i$.

This causes problems though - imagine that our training set contains the word 'data' only in nonspam messages. Our NB classifier would always assign spam probability 0 to any message that contains 'data on free bitcoin and authentic rolex watches'. 

To get around this we use some kind of smoothing.

We choose a *pseudocount* $k$ and estimate the probability of seeing the *i*th word in a spam message as:

$$P(X_i|S) = (k + number of spams containing  w_i) / (2k + number of spams)$$

And similarly with $P(X_i|S')$. That is, when computing the spam probabilities for the *i*th word, we assume we also saw k additional nonspams containing the word and k additional nonspams not containing the word.

For example, if 'data' occurs in 0/98 spam messages, and if k = 1, we estimate $P(data|S)$ as 1/100 = 0.01, which allows our classifier to still assign some nonzero spam probability to messages that contain the word data.

## A spam filter: implementation

With all the pieces in play, let's build a classifier up.

First up, lets create a function to *tokenise* messages into distinct words:

In [2]:
from typing import Set
import re

def tokenize(text: str) -> Set[str]:
    text = text.lower() # make lower case
    all_words = re.findall("[a-z0-9']+",text) # extract the words
    return set(all_words) # remove the duplicates

assert tokenize("Data science is science") == {'data','science','is'}

We'll also define a type for our training data:

In [3]:
from typing import NamedTuple

class Message(NamedTuple):
    text: str
    is_spam: bool

As our classifier needs to keep track of tokens, counts, and labels from the training data, we'll make it a class. Per convention, we'll call nonspam emails 'ham' emails.

The constructor will take one parameter, the pseudocount to use when computing probabilities. It also initialises an empty set of tokens, counters to track how often each token is seen in spam messages and ham messages, and counts of how many spam and ham messages it was trained on:

In [4]:
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

class NaiveBayesClassifier:
    def __init__(self,k:float = 0.5) -> None:
        self.k = k # smoothing factor
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str,int] = defaultdict(int)
        self.token_ham_counts: Dict[str,int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0
    
    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # Increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1
            
            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
    
    def _probabilities(self,token:str) -> Tuple[float,float]:
        """returns P(token | spam) and P(token | ham)"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
        
        p_token_spam = (spam + self.k) / (self.spam_messages + 2*self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2*self.k)
        
        return p_token_spam, p_token_ham
    
    def predict(self,text:str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0
        
        # Iterate through each word in our vocab
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            # If token in message, 
            # add the log probability of seeing it
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
                
            # Otherwise add the log prob of _not_ seeing it
            # which is (1 - prob of seeing it)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

## A spam filter: testing

Let's make sure it works by writing some unit tests for it:

In [5]:
messages = [Message("spam rules", is_spam = True),
            Message("ham rules", is_spam = False),
            Message("Hello ham", is_spam = False)]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

assert model.tokens == {'spam','ham','rules','hello'}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {'spam':1,'rules':1}
assert model.token_ham_counts == {'ham':2,'rules':1,'hello':1}

Now let's make a prediction and compare it to a hand-calculation.

In [6]:
text = 'hello spam'

probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),      # "spam"  (present)
    1 - (0 + 0.5) / (1 + 2 * 0.5),  # "ham"   (not present)
    1 - (1 + 0.5) / (1 + 2 * 0.5),  # "rules" (not present)
    (0 + 0.5) / (1 + 2 * 0.5)       # "hello" (present)
]

probs_if_ham = [
    (0 + 0.5) / (2 + 2 * 0.5),      # "spam"  (present)
    1 - (2 + 0.5) / (2 + 2 * 0.5),  # "ham"   (not present)
    1 - (1 + 0.5) / (2 + 2 * 0.5),  # "rules" (not present)
    (1 + 0.5) / (2 + 2 * 0.5),      # "hello" (present)
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

assert model.predict(text) == p_if_spam / (p_if_ham + p_if_spam)

So it works with training wheels on. Let's do it on real data.

## A spam filter: using our model

In [8]:
# from io import BytesIO
# import requests
# import tarfile

# base_url = "https://spamassassin.apache.org/old/publiccorpus"
# files = ["20021010_easy_ham.tar.bz2",
#          "20021010_hard_ham.tar.bz2",
#          "20021010_spam.tar.bz2"]

# output_dir = 'spam_data'

# for file in files:
#     content = requests.get(f"{base_url}/{file}").content
#     fin = BytesIO(content)
#     with tarfile.open(fileobj=fin,mode='r:bz2') as tf:
#         tf.extractall(output_dir)

To keep things simple we'll only look at the subject lines of the emails.

In [9]:
import glob, re

path = 'spam_data/*/*'

data: List[Message] = []
    
for file in glob.glob(path):
    is_spam = "ham" not in file
    
    with open(file,errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject,is_spam))
                break

Now we can split into train-test.

In [11]:
import random
from scratch.machine_learning import split_data
random.seed(0)

train_messages, test_messages = split_data(data,0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

Now let's generate some predictions and see how the model does:

In [12]:
from collections import Counter

predictions = [(message, model.predict(message.text))
               for message in test_messages]

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) 
                           for message, spam_probability 
                           in predictions)

print(confusion_matrix)

Counter({(False, False): 670, (True, True): 86, (True, False): 40, (False, True): 29})


This gives 84 true positives (spam classified as “spam”), 25 false positives (ham classified as “spam”), 703 true negatives (ham classified as “ham”), and 44 false negatives (spam classified as “ham”). This means our precision is 84 / (84 + 25) = 77%, and our recall is 84 / (84 + 44) = 65%, which are not bad numbers for such a simple model. (Presumably we’d do better if we looked at more than the subject lines.)

In [13]:
def p_spam_given_token(token:str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)
    return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens,key=lambda t: p_spam_given_token(t,model))

In [15]:
print("spammiest words", words[-10:])

spammiest words ['zzzz', 'attn', '95', 'clearance', 'per', 'money', 'sale', 'rates', 'systemworks', 'adv']


In [17]:
print("hammiest words", words[:10])

hammiest words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'ouch', 'spamassassin', 'selling']
